DeltaKitDeltaKit
Agno Agents

Background Task Streaming

Recipe for streaming AI responses from a background task using the background-sse transport with Agno agents.

This recipe shows how to stream AI responses from a background task using the background-sse transport when using Agno agents on the backend. Unlike direct SSE where the response is tied to the HTTP request, background tasks run independently — users can navigate away and come back to resume the stream.

Problem

With direct SSE, the stream is tied to the HTTP connection. If the user navigates away or refreshes, the stream is lost. You need to:

  • Run AI generation in a background task
  • Stream events to the client via a separate SSE connection
  • Support resuming the stream after page navigation
  • Persist the run id so the client can reconnect

Solution Overview

Use the background-sse transport in useStreamChat. The flow is:

1. Client POSTs to startApi → receives a run id
2. Client connects to eventsApi with the run id → receives SSE events
3. If disconnected, client reconnects to eventsApi with the same run id

Backend Implementation (FastAPI + Agno)

The backend needs three endpoints: start a job, stream events, and check status.

Job Storage

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any

from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

router = APIRouter(prefix="/api/chat-agno-background-task")

@dataclass
class JobEvent:
    event_id: int
    payload: dict[str, Any]

@dataclass
class BackgroundJob:
    job_id: str
    message: str
    status: str = "pending"
    created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
    finished_at: datetime | None = None
    done: bool = False
    events: list[JobEvent] = field(default_factory=list)
    condition: asyncio.Condition = field(default_factory=asyncio.Condition)
    task: asyncio.Task[None] | None = None

jobs: dict[str, BackgroundJob] = {}

Start a Job

class JobCreatedResponse(BaseModel):
    job_id: str
    status: str

@router.post("/jobs", response_model=JobCreatedResponse)
async def create_job(request: ChatRequest):
    job_id = str(uuid.uuid4())
    job = BackgroundJob(job_id=job_id, message=request.message)
    jobs[job_id] = job
    job.task = asyncio.create_task(run_agent(job))
    return JobCreatedResponse(job_id=job_id, status=job.status)

The transport looks for job_id or runId in the response by default.

Cancel a Job

@router.post("/jobs/{job_id}/cancel")
async def cancel_job(job_id: str):
    job = jobs.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    if job.task and not job.done:
        job.task.cancel()
    return {"status": "ok"}

Stream Events

@router.get("/jobs/{job_id}/events")
async def stream_job_events(job_id: str, last_event_id: int = -1):
    job = jobs.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")

    async def event_generator():
        next_index = last_event_id + 1

        while True:
            async with job.condition:
                while next_index >= len(job.events) and not job.done:
                    await job.condition.wait()
                pending = job.events[next_index:]
                is_done = job.done and next_index >= len(job.events)

            for item in pending:
                yield f"id: {item.event_id}\n"
                yield f"data: {json.dumps(item.payload)}\n\n"
                next_index = item.event_id + 1

            if is_done:
                break

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

Run the Agent

async def _publish(job: BackgroundJob, payload: dict[str, Any]) -> None:
    async with job.condition:
        job.events.append(JobEvent(event_id=len(job.events), payload=payload))
        job.condition.notify_all()

async def run_agent(job: BackgroundJob) -> None:
    agent = Agent(model=OpenRouter(id="moonshotai/kimi-k2.5"))
    job.status = "running"

    try:
        async for event in agent.arun(
            job.message, stream=True, stream_events=True,
        ):
            if not hasattr(event, "event"):
                continue

            if event.event == "RunContent":
                if hasattr(event, "content") and event.content:
                    await _publish(job, {
                        "type": "text_delta",
                        "delta": event.content,
                    })
            elif event.event == "ToolCallStarted":
                await _publish(job, {
                    "type": "tool_call",
                    "tool_name": event.tool.tool_name,
                    "argument": json.dumps(event.tool.arguments),
                    "call_id": getattr(event.tool, "call_id", None),
                })
            elif event.event == "ToolCallCompleted":
                await _publish(job, {
                    "type": "tool_result",
                    "call_id": getattr(event.tool, "call_id", None),
                    "output": str(event.tool.output or event.tool.result),
                })

        job.status = "completed"
        job.done = True
        await _publish(job, {"type": "done"})
    except asyncio.CancelledError:
        job.status = "cancelled"
        job.done = True
        await _publish(job, {"type": "done"})
    except Exception as exc:
        job.status = "failed"
        job.done = True
        await _publish(job, {"type": "error", "message": str(exc)})

Frontend Implementation

Basic Setup

import { useStreamChat, type ContentPart, fromAgnoAgents } from "@deltakit/react";

const API_URL = "http://localhost:8000/api/chat-agno-background-task/";

function Chat() {
  const { messages, isLoading, sendMessage, stop } = useStreamChat<ContentPart>({
    transport: "background-sse",
    transportOptions: {
      backgroundSSE: {
        startApi: `${API_URL}jobs`,
        eventsApi: (id) => `${API_URL}jobs/${id}/events`,
        cancelApi: (id) => `${API_URL}jobs/${id}/cancel`,
      },
    },
    onEvent: (event, { appendText }) => {
      if (event.type === "text_delta") {
        appendText(event.delta);
      }
    },
  });

  return (/* render messages */);
}

Resumable Streams

To survive page navigation, persist the run id in session storage:

import { useState } from "react";

const STORAGE_KEY = "chat-background-run-id";

function readPersistedRunId(): string | null {
  return window.sessionStorage.getItem(STORAGE_KEY);
}

function writePersistedRunId(runId: string | null) {
  if (runId) {
    window.sessionStorage.setItem(STORAGE_KEY, runId);
  } else {
    window.sessionStorage.removeItem(STORAGE_KEY);
  }
}

function Chat() {
  const [activeRunId, setActiveRunId] = useState<string | null>(
    () => readPersistedRunId(),
  );

  const { messages, isLoading, sendMessage, stop, runId } = useStreamChat({
    transport: "background-sse",
    transportOptions: {
      backgroundSSE: {
        startApi: `${API_URL}jobs`,
        eventsApi: (id) => `${API_URL}jobs/${id}/events`,
        cancelApi: (id) => `${API_URL}jobs/${id}/cancel`,
        statusApi: (id) => `${API_URL}jobs/${id}`,
        runId: activeRunId,
        getResumeKey: () => readPersistedRunId(),
        onRunIdChange: (nextRunId) => {
          writePersistedRunId(nextRunId);
          setActiveRunId(nextRunId);
        },
      },
    },
    onEvent: (event, { appendText }) => {
      if (event.type === "text_delta") {
        appendText(event.delta);
      }
    },
  });

  return (/* render messages */);
}

When the component mounts and finds a persisted run id, it automatically reconnects to the SSE events endpoint and resumes streaming.

With Tool Calls and Reasoning

type CustomEvent =
  | { type: "text_delta"; delta: string }
  | { type: "tool_call"; tool_name: string; argument: string; call_id?: string }
  | { type: "tool_result"; call_id: string | null; output: string }
  | { type: "reasoning"; text: string }
  | { type: "done" }
  | { type: "error"; message: string };

const { messages, isLoading, sendMessage, stop } = useStreamChat<ContentPart, CustomEvent>({
  transport: "background-sse",
  transportOptions: {
    backgroundSSE: {
      startApi: `${API_URL}jobs`,
      eventsApi: (id) => `${API_URL}jobs/${id}/events`,
      cancelApi: (id) => `${API_URL}jobs/${id}/cancel`,
      runId: activeRunId,
      getResumeKey: () => readPersistedRunId(),
      onRunIdChange: (nextRunId) => {
        writePersistedRunId(nextRunId);
        setActiveRunId(nextRunId);
      },
    },
  },
  onEvent: (event, { appendText, appendPart, setMessages }) => {
    if (event.type === "text_delta") {
      appendText(event.delta);
    } else if (event.type === "tool_call") {
      appendPart({
        type: "tool_call",
        tool_name: event.tool_name,
        argument: event.argument,
        callId: event.call_id,
      });
    } else if (event.type === "tool_result") {
      setMessages((prev) => {
        const last = prev[prev.length - 1];
        if (!last || last.role !== "assistant") return prev;

        const updatedParts = [...last.parts];
        for (let i = updatedParts.length - 1; i >= 0; i--) {
          const p = updatedParts[i];
          if (p.type === "tool_call" && p.callId === event.call_id) {
            updatedParts[i] = { ...p, result: event.output };
            break;
          }
        }

        return [...prev.slice(0, -1), { ...last, parts: updatedParts }];
      });
    }
  },
});

Transport Options Reference

OptionTypeDescription
startApistringEndpoint that starts a background job and returns a run id
eventsApistring | (id: string) => stringSSE endpoint for a given run id
statusApistring | (id: string) => stringOptional status polling endpoint
runIdstring | nullPersisted run id to resume on mount
getResumeKey() => string | nullRead a persisted run id
onRunIdChange(id: string | null) => voidCalled when the run id changes
resolveRunId(response: unknown) => stringExtract run id from start response (defaults to runId or job_id)
startHeadersRecord<string, string>Extra headers for the start request
startBodyRecord<string, unknown>Extra body fields for the start request
eventHeadersRecord<string, string>Extra headers for the SSE connection
cancelApistring | (id: string) => stringEndpoint to cancel a running job (POST). Called when stop() is invoked.

When to Use Background SSE

  • Long-running tasks — AI processing that may take 30+ seconds
  • Resumable streams — Users should be able to navigate away and come back
  • Job status tracking — You need to query job status independently of the stream
  • Multi-tab support — Multiple tabs can connect to the same job's event stream

For simpler use cases where the stream is tied to the request, use the default "sse" transport.

On this page