OpenAI Agents
Reasoning Events
Recipe for streaming chain-of-thought reasoning from OpenAI Agents SDK backend.
This recipe shows how to stream and display reasoning/thinking content from models like o1 and o3 when using the OpenAI Agents SDK on your backend.
Overview
Reasoning models generate internal "thinking" before producing final responses. Streaming this reasoning to users provides transparency and improves trust.
Backend Implementation (FastAPI)
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from agents import Agent, Runner, RawResponsesStreamEvent
from openai.types.responses import ResponseTextDeltaEvent
import json
router = APIRouter()
@router.post("/api/chat/")
async def chat(request: Request):
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
model="o1",
)
runner = Runner.run_streamed(
agent,
input=request.message,
max_turns=30
)
async def event_generator():
reasoning_buffer = ""
async for event in runner.stream_events():
if event.type == "raw_response_event":
data = event.data
# Handle text deltas
if isinstance(data, ResponseTextDeltaEvent) and data.delta:
text_data = {"type": "text_delta", "delta": data.delta}
yield f"data: {json.dumps(text_data)}\n\n"
# Handle reasoning deltas
elif hasattr(data, "type") and "reasoning" in str(data.type).lower():
text = getattr(data, 'text', '') or getattr(data, 'delta', '')
if text and isinstance(text, str):
if text.startswith(reasoning_buffer) and len(text) > len(reasoning_buffer):
new_text = text[len(reasoning_buffer):]
reasoning_data = {"type": "reasoning", "text": new_text}
yield f"data: {json.dumps(reasoning_data)}\n\n"
reasoning_buffer = text
elif text not in reasoning_buffer:
reasoning_data = {"type": "reasoning", "text": text}
yield f"data: {json.dumps(reasoning_data)}\n\n"
reasoning_buffer = text
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)Frontend Implementation
import { useStreamChat, type ContentPart } from "@deltakit/react";
type CustomEvent =
| { type: "text_delta"; delta: string }
| { type: "reasoning"; text: string };
function Chat() {
const { messages, isLoading } = useStreamChat<ContentPart, CustomEvent>({
api: "/api/chat",
onEvent: (event, { appendText, setMessages }) => {
switch (event.type) {
case "text_delta":
appendText(event.delta);
break;
case "reasoning":
setMessages((prev) => {
const last = prev[prev.length - 1];
if (!last || last.role !== "assistant") return prev;
const parts = [...last.parts];
const lastPart = parts[parts.length - 1];
if (lastPart?.type === "reasoning") {
parts[parts.length - 1] = {
...lastPart,
text: lastPart.text + event.text,
};
} else {
parts.push({ type: "reasoning", text: event.text });
}
return [...prev.slice(0, -1), { ...last, parts }];
});
break;
}
},
});
// ... render UI
}Rendering Reasoning
function ReasoningBlock({ text, isStreaming }: { text: string; isStreaming: boolean }) {
return (
<div className="rounded-lg border border-neutral-700 bg-neutral-800/50 p-4 my-2">
<div className="flex items-center gap-2 mb-2">
<span className="text-xs font-medium text-neutral-500 uppercase tracking-wider">
Thinking
</span>
{isStreaming && (
<span className="w-1.5 h-1.5 bg-neutral-500 rounded-full animate-pulse" />
)}
</div>
<p className="text-sm text-neutral-400 italic whitespace-pre-wrap">{text}</p>
</div>
);
}Best Practices
- Always accumulate: Treat reasoning like
text_delta— accumulate chunks, don't create new parts - Deduplicate backend: Prevent overlapping chunks using a buffer
- Show streaming state: Add pulsing indicator while reasoning is active
Related
- Custom Event Handling - General pattern for custom SSE events
- fromOpenAiAgents - Loading persisted reasoning from history