DeltaKitDeltaKit
Agno Agents

WebSocket Chat

Recipe for streaming AI responses over WebSocket with resumable runs using Agno agents.

This recipe shows how to stream AI responses over a WebSocket connection using the websocket transport when using Agno agents on the backend. WebSocket is ideal when you want a persistent, bidirectional connection — the client can send messages and receive events over a single socket.

Problem

SSE is request-bound: each message starts a new HTTP request. You need to:

  • Maintain a persistent connection for multiple message exchanges
  • Support reconnecting to an in-progress run after navigation
  • Replay buffered events when resuming a run
  • Control the wire format (serialization/parsing)

Solution Overview

Use the websocket transport in useStreamChat. The flow is:

1. Client opens WebSocket to the server
2. Client sends a JSON frame with { message: "..." }
3. Server streams back JSON event frames over the same socket
4. On reconnect, client sends { runId: "...", lastEventId: N } to resume

Backend Implementation (FastAPI + Agno)

WebSocket Endpoint

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any

from fastapi import APIRouter, WebSocket, WebSocketDisconnect

router = APIRouter(prefix="/api/chat-agno-websocket")

@dataclass
class JobEvent:
    event_id: int
    payload: dict[str, Any]

@dataclass
class SocketJob:
    job_id: str
    message: str
    status: str = "pending"
    done: bool = False
    events: list[JobEvent] = field(default_factory=list)
    condition: asyncio.Condition = field(default_factory=asyncio.Condition)
    task: asyncio.Task[None] | None = None

jobs: dict[str, SocketJob] = {}

@router.websocket("/ws")
async def chat_websocket(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            payload = await websocket.receive_json()
            message = payload.get("message")
            run_id = payload.get("runId")
            last_event_id = payload.get("lastEventId", -1)

            if run_id:
                # Resume an existing run
                job = jobs.get(run_id)
                if not job:
                    await websocket.send_json(
                        {"type": "error", "message": "Run not found"}
                    )
                    continue
            elif message:
                # Start a new run
                run_id = str(uuid.uuid4())
                job = SocketJob(job_id=run_id, message=message)
                jobs[run_id] = job
                job.events.append(JobEvent(
                    event_id=0,
                    payload={"type": "run_started", "runId": run_id},
                ))
                job.task = asyncio.create_task(run_agent(job))
            else:
                await websocket.send_json(
                    {"type": "error", "message": "Expected message or runId"}
                )
                continue

            # Stream events to the socket
            await stream_to_socket(websocket, job, last_event_id)
    except WebSocketDisconnect:
        return

Streaming Events to Socket

async def stream_to_socket(
    websocket: WebSocket, job: SocketJob, last_event_id: int = -1
) -> None:
    next_index = last_event_id + 1

    while True:
        async with job.condition:
            while next_index >= len(job.events) and not job.done:
                await job.condition.wait()
            pending = job.events[next_index:]
            is_done = job.done and next_index >= len(job.events)

        for item in pending:
            await websocket.send_json(item.payload)
            next_index = item.event_id + 1

        if is_done:
            break

Active Run Endpoint

For reconnection on page load, expose the current active run so the client can pick up where it left off:

@router.get("/active-run")
async def get_active_run():
    active = [j for j in jobs.values() if not j.done]
    if not active:
        return {"run_id": None}

    job = max(active, key=lambda j: j.created_at)
    return {
        "run_id": job.job_id,
        "message": job.message,
        "events": [e.payload for e in job.events],
    }

Cancel a Job

@router.post("/jobs/{job_id}/cancel")
async def cancel_job(job_id: str):
    job = jobs.get(job_id)
    if job and job.task and not job.done:
        job.task.cancel()
    return {"status": "ok"}

Run the Agent

The run_agent function handles asyncio.CancelledError so that calling stop() on the client gracefully terminates the server-side task:

async def run_agent(job: SocketJob) -> None:
    agent = Agent(model=OpenRouter(id="moonshotai/kimi-k2.5"))
    job.status = "running"

    try:
        async for event in agent.arun(
            job.message, stream=True, stream_events=True,
        ):
            if not hasattr(event, "event"):
                continue

            if event.event == "RunContent":
                if hasattr(event, "content") and event.content:
                    await _publish(job, {
                        "type": "text_delta",
                        "delta": event.content,
                    })
            elif event.event == "ToolCallStarted":
                await _publish(job, {
                    "type": "tool_call",
                    "tool_name": event.tool.tool_name,
                    "argument": json.dumps(event.tool.arguments),
                    "call_id": getattr(event.tool, "call_id", None),
                })
            elif event.event == "ToolCallCompleted":
                await _publish(job, {
                    "type": "tool_result",
                    "call_id": getattr(event.tool, "call_id", None),
                    "output": str(event.tool.output or event.tool.result),
                })

        job.status = "completed"
        job.done = True
        await _publish(job, {"type": "done"})
    except asyncio.CancelledError:
        job.status = "cancelled"
        job.done = True
        await _publish(job, {"type": "done"})
    except Exception as exc:
        job.status = "failed"
        job.done = True
        await _publish(job, {"type": "error", "message": str(exc)})

Frontend Implementation

Basic Setup

import { useStreamChat, type ContentPart } from "@deltakit/react";

const API_URL = "http://localhost:8000/api/chat-agno-websocket/";
const WS_URL = "ws://localhost:8000/api/chat-agno-websocket/ws";

function Chat() {
  const { messages, isLoading, sendMessage, stop } = useStreamChat<ContentPart>({
    transport: "websocket",
    transportOptions: {
      websocket: {
        url: WS_URL,
        cancelUrl: (id) => `${API_URL}jobs/${id}/cancel`,
      },
    },
    onEvent: (event, { appendText }) => {
      if (event.type === "text_delta") {
        appendText(event.delta);
      }
    },
  });

  return (/* render messages */);
}

Resumable WebSocket Runs

To resume a run after page navigation, persist the run id and provide a resume payload builder:

import { useState, useEffect, useRef } from "react";
import { useStreamChat, fromAgnoAgents, type ContentPart, type Message } from "@deltakit/react";

const API_URL = "http://localhost:8000/api/chat-agno-websocket/";
const WS_URL = "ws://localhost:8000/api/chat-agno-websocket/ws";
const STORAGE_KEY = "chat-ws-run-id";

function readPersistedRunId(): string | null {
  return window.sessionStorage.getItem(STORAGE_KEY);
}

function writePersistedRunId(runId: string | null) {
  if (runId) {
    window.sessionStorage.setItem(STORAGE_KEY, runId);
  } else {
    window.sessionStorage.removeItem(STORAGE_KEY);
  }
}

Fetching Active Run and Building Initial State

When the page loads, fetch the active run from the server and replay its buffered events to build the initial message state:

async function fetchActiveRun() {
  const res = await fetch(`${API_URL}active-run`);
  return await res.json();
}

function buildBufferedMessages(
  history: Message<ContentPart>[],
  activeRun: { run_id?: string; message?: string; events?: Array<Record<string, unknown>> },
): Message<ContentPart>[] {
  if (!activeRun.run_id || !activeRun.message) return history;

  const messages = [...history];
  const userMsg: Message<ContentPart> = {
    id: `buffered-user-${activeRun.run_id}`,
    role: "user",
    parts: [{ type: "text", text: activeRun.message }],
  };
  const assistantMsg: Message<ContentPart> = {
    id: `buffered-assistant-${activeRun.run_id}`,
    role: "assistant",
    parts: [],
  };

  messages.push(userMsg, assistantMsg);

  // Replay buffered events into the assistant message
  for (const event of activeRun.events ?? []) {
    if (event.type === "text_delta" && typeof event.delta === "string") {
      const lastPart = assistantMsg.parts[assistantMsg.parts.length - 1];
      if (lastPart?.type === "text") {
        lastPart.text += event.delta;
      } else {
        assistantMsg.parts.push({ type: "text", text: event.delta });
      }
    }
    // Handle other event types (tool_call, tool_result, reasoning) similarly
  }

  return messages;
}

Complete Chat Component

type CustomEvent =
  | { type: "run_started"; runId: string }
  | { type: "text_delta"; delta: string }
  | { type: "tool_call"; tool_name: string; argument: string; call_id?: string }
  | { type: "tool_result"; call_id: string | null; output: string }
  | { type: "reasoning"; text: string }
  | { type: "done" }
  | { type: "error"; message: string };

function ChatWebSocket({ initialMessages, activeRun }) {
  const loaderRunId = activeRun.run_id ?? null;
  const loaderLastEventId = (activeRun.events?.length ?? 0) - 1;

  const [activeRunId, setActiveRunId] = useState<string | null>(
    () => readPersistedRunId() ?? loaderRunId,
  );

  const { messages, isLoading, sendMessage, stop, runId } =
    useStreamChat<ContentPart, CustomEvent>({
      initialMessages,
      transport: "websocket",
      transportOptions: {
        websocket: {
          cancelUrl: (id) => `${API_URL}jobs/${id}/cancel`,
          url: WS_URL,
          runId: activeRunId,
          getResumeKey: () => readPersistedRunId(),
          onRunIdChange: (nextRunId) => {
            writePersistedRunId(nextRunId);
            setActiveRunId(nextRunId);
          },
          resolveRunId: (event) =>
            event.type === "run_started" ? event.runId : null,
          buildResumePayload: (id) => ({
            runId: id,
            lastEventId: loaderLastEventId,
          }),
        },
      },
      onEvent: (event, { appendText, appendPart, setMessages }) => {
        if (event.type === "run_started") return;

        if (event.type === "text_delta") {
          appendText(event.delta);
        } else if (event.type === "tool_call") {
          appendPart({
            type: "tool_call",
            tool_name: event.tool_name,
            argument: event.argument,
            callId: event.call_id,
          });
        } else if (event.type === "tool_result") {
          setMessages((prev) => {
            const last = prev[prev.length - 1];
            if (!last || last.role !== "assistant") return prev;

            const updatedParts = [...last.parts];
            for (let i = updatedParts.length - 1; i >= 0; i--) {
              const p = updatedParts[i];
              if (p.type === "tool_call" && p.callId === event.call_id) {
                updatedParts[i] = { ...p, result: event.output };
                break;
              }
            }

            return [...prev.slice(0, -1), { ...last, parts: updatedParts }];
          });
        }
      },
    });

  return (/* render messages */);
}

Transport Options Reference

OptionTypeDescription
urlstring | (runId: string | null) => stringWebSocket URL
protocolsstring | string[]WebSocket subprotocols
bodyRecord<string, unknown>Extra fields in outbound frames
runIdstring | nullPersisted run id to resume on mount
getResumeKey() => string | nullRead a persisted run id
onRunIdChange(id: string | null) => voidCalled when the run id changes
resolveRunId(event: TEvent) => string | nullExtract run id from an inbound event
buildResumePayload(runId: string) => Record<string, unknown>Payload sent when reconnecting
parseMessage(data: unknown) => TEvent | TEvent[] | nullCustom frame parser (defaults to JSON.parse)
serializeMessage(payload: Record<string, unknown>) => stringCustom frame serializer (defaults to JSON.stringify)
runIdKeystringKey name for the run id in outbound payloads (default: "runId")
cancelUrlstring | (runId: string) => stringHTTP endpoint to cancel a running job (POST). Called when stop() is invoked.

When to Use WebSocket

  • Bidirectional communication — Server can push events without client polling
  • Multiple messages per connection — No need to establish a new HTTP request per message
  • Resumable runs — Client can reconnect with a run id and replay from lastEventId
  • Low latency — No HTTP overhead per event

For simpler request-response streaming, use the default "sse" transport. For background processing with separate start/events endpoints, use "background-sse".

On this page