Agno Agents
Reasoning Events
Recipe for streaming chain-of-thought reasoning from Agno agent backend.
This recipe shows how to stream and display reasoning/thinking content from Agno agents that expose their chain-of-thought process.
Overview
Agno agents can generate internal "thinking" before producing final responses (especially with reasoning-focused models like Kimi). Streaming this reasoning to users provides transparency and improves trust.
Backend Implementation (FastAPI + Agno)
import json
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from agno.agent import Agent
from agno.models.openrouter import OpenRouter
router = APIRouter()
@router.post("/api/chat-agno/")
async def chat(request: Request):
agent = Agent(
model=OpenRouter(id="moonshotai/kimi-k2.5"),
)
async def event_generator():
reasoning_buffer = ""
async for event in agent.arun(
request.message,
stream=True,
stream_events=True,
):
if not hasattr(event, "event"):
continue
event_type = event.event
if event_type == "RunContent":
if hasattr(event, "reasoning_content") and event.reasoning_content:
reasoning_text = str(event.reasoning_content)
if reasoning_text.startswith(reasoning_buffer) and len(
reasoning_text
) > len(reasoning_buffer):
new_text = reasoning_text[len(reasoning_buffer):]
if new_text:
yield f'data: {json.dumps({"type": "reasoning", "text": new_text})}\n\n'
reasoning_buffer = reasoning_text
elif reasoning_text not in reasoning_buffer:
yield f'data: {json.dumps({"type": "reasoning", "text": reasoning_text})}\n\n'
reasoning_buffer = reasoning_text
elif hasattr(event, "content") and event.content:
yield f'data: {json.dumps({"type": "text_delta", "delta": event.content})}\n\n'
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)Key Points
- Use
stream_events=Trueto get granular events including reasoning - Agno events use CamelCase:
RunContent, notrun_content - Reasoning is in
event.reasoning_contentduring the reasoning phase - Regular content is in
event.contentduring the response phase
Frontend Implementation
import { useStreamChat, type ContentPart } from "@deltakit/react";
type CustomEvent =
| { type: "text_delta"; delta: string }
| { type: "reasoning"; text: string };
function Chat() {
const { messages, isLoading } = useStreamChat<ContentPart, CustomEvent>({
api: "/api/chat-agno",
onEvent: (event, { appendText, setMessages }) => {
switch (event.type) {
case "text_delta":
appendText(event.delta);
break;
case "reasoning":
setMessages((prev) => {
const last = prev[prev.length - 1];
if (!last || last.role !== "assistant") return prev;
const parts = [...last.parts];
const lastPart = parts[parts.length - 1];
if (lastPart?.type === "reasoning") {
parts[parts.length - 1] = {
...lastPart,
text: lastPart.text + event.text,
};
} else {
parts.push({ type: "reasoning", text: event.text });
}
return [...prev.slice(0, -1), { ...last, parts }];
});
break;
}
},
});
// ... render UI
}Rendering Reasoning
function ReasoningBlock({ text, isStreaming }: { text: string; isStreaming: boolean }) {
return (
<div className="rounded-lg border border-neutral-700 bg-neutral-800/50 p-4 my-2">
<div className="flex items-center gap-2 mb-2">
<span className="text-xs font-medium text-neutral-500 uppercase tracking-wider">
Thinking
</span>
{isStreaming && (
<span className="w-1.5 h-1.5 bg-neutral-500 rounded-full animate-pulse" />
)}
</div>
<p className="text-sm text-neutral-400 italic whitespace-pre-wrap">{text}</p>
</div>
);
}Best Practices
- Always accumulate: Treat reasoning like
text_delta— accumulate chunks, don't create new parts - Deduplicate backend: Prevent overlapping chunks using a buffer
- Show streaming state: Add pulsing indicator while reasoning is active
Related
- Custom Event Handling - General pattern for custom SSE events
- fromAgnoAgents - Loading persisted reasoning from history