WebSocket Chat
Recipe for streaming AI responses over WebSocket with resumable runs using Agno agents.
This recipe shows how to stream AI responses over a WebSocket connection using the websocket transport when using Agno agents on the backend. WebSocket is ideal when you want a persistent, bidirectional connection — the client can send messages and receive events over a single socket.
Problem
SSE is request-bound: each message starts a new HTTP request. You need to:
- Maintain a persistent connection for multiple message exchanges
- Support reconnecting to an in-progress run after navigation
- Replay buffered events when resuming a run
- Control the wire format (serialization/parsing)
Solution Overview
Use the websocket transport in useStreamChat. The flow is:
1. Client opens WebSocket to the server
2. Client sends a JSON frame with { message: "..." }
3. Server streams back JSON event frames over the same socket
4. On reconnect, client sends { runId: "...", lastEventId: N } to resumeBackend Implementation (FastAPI + Agno)
WebSocket Endpoint
import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
router = APIRouter(prefix="/api/chat-agno-websocket")
@dataclass
class JobEvent:
event_id: int
payload: dict[str, Any]
@dataclass
class SocketJob:
job_id: str
message: str
status: str = "pending"
done: bool = False
events: list[JobEvent] = field(default_factory=list)
condition: asyncio.Condition = field(default_factory=asyncio.Condition)
task: asyncio.Task[None] | None = None
jobs: dict[str, SocketJob] = {}
@router.websocket("/ws")
async def chat_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
payload = await websocket.receive_json()
message = payload.get("message")
run_id = payload.get("runId")
last_event_id = payload.get("lastEventId", -1)
if run_id:
# Resume an existing run
job = jobs.get(run_id)
if not job:
await websocket.send_json(
{"type": "error", "message": "Run not found"}
)
continue
elif message:
# Start a new run
run_id = str(uuid.uuid4())
job = SocketJob(job_id=run_id, message=message)
jobs[run_id] = job
job.events.append(JobEvent(
event_id=0,
payload={"type": "run_started", "runId": run_id},
))
job.task = asyncio.create_task(run_agent(job))
else:
await websocket.send_json(
{"type": "error", "message": "Expected message or runId"}
)
continue
# Stream events to the socket
await stream_to_socket(websocket, job, last_event_id)
except WebSocketDisconnect:
returnStreaming Events to Socket
async def stream_to_socket(
websocket: WebSocket, job: SocketJob, last_event_id: int = -1
) -> None:
next_index = last_event_id + 1
while True:
async with job.condition:
while next_index >= len(job.events) and not job.done:
await job.condition.wait()
pending = job.events[next_index:]
is_done = job.done and next_index >= len(job.events)
for item in pending:
await websocket.send_json(item.payload)
next_index = item.event_id + 1
if is_done:
breakActive Run Endpoint
For reconnection on page load, expose the current active run so the client can pick up where it left off:
@router.get("/active-run")
async def get_active_run():
active = [j for j in jobs.values() if not j.done]
if not active:
return {"run_id": None}
job = max(active, key=lambda j: j.created_at)
return {
"run_id": job.job_id,
"message": job.message,
"events": [e.payload for e in job.events],
}Cancel a Job
@router.post("/jobs/{job_id}/cancel")
async def cancel_job(job_id: str):
job = jobs.get(job_id)
if job and job.task and not job.done:
job.task.cancel()
return {"status": "ok"}Run the Agent
The run_agent function handles asyncio.CancelledError so that calling stop() on the client gracefully terminates the server-side task:
async def run_agent(job: SocketJob) -> None:
agent = Agent(model=OpenRouter(id="moonshotai/kimi-k2.5"))
job.status = "running"
try:
async for event in agent.arun(
job.message, stream=True, stream_events=True,
):
if not hasattr(event, "event"):
continue
if event.event == "RunContent":
if hasattr(event, "content") and event.content:
await _publish(job, {
"type": "text_delta",
"delta": event.content,
})
elif event.event == "ToolCallStarted":
await _publish(job, {
"type": "tool_call",
"tool_name": event.tool.tool_name,
"argument": json.dumps(event.tool.arguments),
"call_id": getattr(event.tool, "call_id", None),
})
elif event.event == "ToolCallCompleted":
await _publish(job, {
"type": "tool_result",
"call_id": getattr(event.tool, "call_id", None),
"output": str(event.tool.output or event.tool.result),
})
job.status = "completed"
job.done = True
await _publish(job, {"type": "done"})
except asyncio.CancelledError:
job.status = "cancelled"
job.done = True
await _publish(job, {"type": "done"})
except Exception as exc:
job.status = "failed"
job.done = True
await _publish(job, {"type": "error", "message": str(exc)})Frontend Implementation
Basic Setup
import { useStreamChat, type ContentPart } from "@deltakit/react";
const API_URL = "http://localhost:8000/api/chat-agno-websocket/";
const WS_URL = "ws://localhost:8000/api/chat-agno-websocket/ws";
function Chat() {
const { messages, isLoading, sendMessage, stop } = useStreamChat<ContentPart>({
transport: "websocket",
transportOptions: {
websocket: {
url: WS_URL,
cancelUrl: (id) => `${API_URL}jobs/${id}/cancel`,
},
},
onEvent: (event, { appendText }) => {
if (event.type === "text_delta") {
appendText(event.delta);
}
},
});
return (/* render messages */);
}Resumable WebSocket Runs
To resume a run after page navigation, persist the run id and provide a resume payload builder:
import { useState, useEffect, useRef } from "react";
import { useStreamChat, fromAgnoAgents, type ContentPart, type Message } from "@deltakit/react";
const API_URL = "http://localhost:8000/api/chat-agno-websocket/";
const WS_URL = "ws://localhost:8000/api/chat-agno-websocket/ws";
const STORAGE_KEY = "chat-ws-run-id";
function readPersistedRunId(): string | null {
return window.sessionStorage.getItem(STORAGE_KEY);
}
function writePersistedRunId(runId: string | null) {
if (runId) {
window.sessionStorage.setItem(STORAGE_KEY, runId);
} else {
window.sessionStorage.removeItem(STORAGE_KEY);
}
}Fetching Active Run and Building Initial State
When the page loads, fetch the active run from the server and replay its buffered events to build the initial message state:
async function fetchActiveRun() {
const res = await fetch(`${API_URL}active-run`);
return await res.json();
}
function buildBufferedMessages(
history: Message<ContentPart>[],
activeRun: { run_id?: string; message?: string; events?: Array<Record<string, unknown>> },
): Message<ContentPart>[] {
if (!activeRun.run_id || !activeRun.message) return history;
const messages = [...history];
const userMsg: Message<ContentPart> = {
id: `buffered-user-${activeRun.run_id}`,
role: "user",
parts: [{ type: "text", text: activeRun.message }],
};
const assistantMsg: Message<ContentPart> = {
id: `buffered-assistant-${activeRun.run_id}`,
role: "assistant",
parts: [],
};
messages.push(userMsg, assistantMsg);
// Replay buffered events into the assistant message
for (const event of activeRun.events ?? []) {
if (event.type === "text_delta" && typeof event.delta === "string") {
const lastPart = assistantMsg.parts[assistantMsg.parts.length - 1];
if (lastPart?.type === "text") {
lastPart.text += event.delta;
} else {
assistantMsg.parts.push({ type: "text", text: event.delta });
}
}
// Handle other event types (tool_call, tool_result, reasoning) similarly
}
return messages;
}Complete Chat Component
type CustomEvent =
| { type: "run_started"; runId: string }
| { type: "text_delta"; delta: string }
| { type: "tool_call"; tool_name: string; argument: string; call_id?: string }
| { type: "tool_result"; call_id: string | null; output: string }
| { type: "reasoning"; text: string }
| { type: "done" }
| { type: "error"; message: string };
function ChatWebSocket({ initialMessages, activeRun }) {
const loaderRunId = activeRun.run_id ?? null;
const loaderLastEventId = (activeRun.events?.length ?? 0) - 1;
const [activeRunId, setActiveRunId] = useState<string | null>(
() => readPersistedRunId() ?? loaderRunId,
);
const { messages, isLoading, sendMessage, stop, runId } =
useStreamChat<ContentPart, CustomEvent>({
initialMessages,
transport: "websocket",
transportOptions: {
websocket: {
cancelUrl: (id) => `${API_URL}jobs/${id}/cancel`,
url: WS_URL,
runId: activeRunId,
getResumeKey: () => readPersistedRunId(),
onRunIdChange: (nextRunId) => {
writePersistedRunId(nextRunId);
setActiveRunId(nextRunId);
},
resolveRunId: (event) =>
event.type === "run_started" ? event.runId : null,
buildResumePayload: (id) => ({
runId: id,
lastEventId: loaderLastEventId,
}),
},
},
onEvent: (event, { appendText, appendPart, setMessages }) => {
if (event.type === "run_started") return;
if (event.type === "text_delta") {
appendText(event.delta);
} else if (event.type === "tool_call") {
appendPart({
type: "tool_call",
tool_name: event.tool_name,
argument: event.argument,
callId: event.call_id,
});
} else if (event.type === "tool_result") {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (!last || last.role !== "assistant") return prev;
const updatedParts = [...last.parts];
for (let i = updatedParts.length - 1; i >= 0; i--) {
const p = updatedParts[i];
if (p.type === "tool_call" && p.callId === event.call_id) {
updatedParts[i] = { ...p, result: event.output };
break;
}
}
return [...prev.slice(0, -1), { ...last, parts: updatedParts }];
});
}
},
});
return (/* render messages */);
}Transport Options Reference
| Option | Type | Description |
|---|---|---|
url | string | (runId: string | null) => string | WebSocket URL |
protocols | string | string[] | WebSocket subprotocols |
body | Record<string, unknown> | Extra fields in outbound frames |
runId | string | null | Persisted run id to resume on mount |
getResumeKey | () => string | null | Read a persisted run id |
onRunIdChange | (id: string | null) => void | Called when the run id changes |
resolveRunId | (event: TEvent) => string | null | Extract run id from an inbound event |
buildResumePayload | (runId: string) => Record<string, unknown> | Payload sent when reconnecting |
parseMessage | (data: unknown) => TEvent | TEvent[] | null | Custom frame parser (defaults to JSON.parse) |
serializeMessage | (payload: Record<string, unknown>) => string | Custom frame serializer (defaults to JSON.stringify) |
runIdKey | string | Key name for the run id in outbound payloads (default: "runId") |
cancelUrl | string | (runId: string) => string | HTTP endpoint to cancel a running job (POST). Called when stop() is invoked. |
When to Use WebSocket
- Bidirectional communication — Server can push events without client polling
- Multiple messages per connection — No need to establish a new HTTP request per message
- Resumable runs — Client can reconnect with a run id and replay from
lastEventId - Low latency — No HTTP overhead per event
For simpler request-response streaming, use the default "sse" transport. For background processing with separate start/events endpoints, use "background-sse".
Related
- useStreamChat — Hook API reference
- Background Task Streaming — Alternative transport for long-running tasks
- Handle Tool Results — Tool call/result matching patterns