DeltaKitDeltaKit

parseSSEStream

Async generator that parses Server-Sent Events from a ReadableStream.

parseSSEStream converts a raw ReadableStream<Uint8Array> (from fetch) into an async generator of parsed JSON events.

import { parseSSEStream } from "@deltakit/core";

Signature

async function* parseSSEStream(
  stream: ReadableStream<Uint8Array>,
  signal?: AbortSignal,
): AsyncGenerator<SSEEvent>
ParameterTypeDescription
streamReadableStream<Uint8Array>The response.body from a fetch() call
signalAbortSignal?Optional signal to cancel stream parsing

Usage

const response = await fetch("/api/chat", {
  method: "POST",
  body: JSON.stringify({ message: "Hello" }),
});

if (response.body) {
  for await (const event of parseSSEStream(response.body)) {
    switch (event.type) {
      case "text_delta":
        process.stdout.write(event.delta);
        break;
      case "tool_call":
        console.log("Tool call:", event.tool_name);
        break;
    }
  }
}

Expected Stream Format

The parser expects SSE data lines containing JSON:

data: {"type":"text_delta","delta":"Hello"}

data: {"type":"text_delta","delta":" world"}

data: [DONE]

Each data: line must contain valid JSON (except [DONE]). Lines are separated by double newlines (\n\n).

Behavior

  • Buffering. Handles partial chunks split across multiple read() calls.
  • Malformed JSON. Skips invalid JSON lines silently (does not throw).
  • Non-data lines. Ignores event:, id:, retry:, and comment lines.
  • [DONE]. Recognized as a termination signal. The generator returns.
  • Abort. If signal is aborted, the generator stops on the next iteration.

Cancellation

const controller = new AbortController();

const response = await fetch("/api/chat", {
  method: "POST",
  body: JSON.stringify({ message: "Hello" }),
  signal: controller.signal,
});

if (response.body) {
  for await (const event of parseSSEStream(response.body, controller.signal)) {
    console.log(event);

    if (shouldStop) {
      controller.abort();
      break;
    }
  }
}

Error Handling

try {
  for await (const event of parseSSEStream(response.body)) {
    // process event
  }
} catch (err) {
  if (err instanceof DOMException && err.name === "AbortError") {
    // Stream was cancelled - expected
  } else {
    // Actual stream error
    console.error("Stream error:", err);
  }
}

Notes

  • If using React, the useStreamChat hook from @deltakit/react calls parseSSEStream internally, so you don't need to use it directly.
  • parseSSEStream is useful when building non-React integrations or when you need direct control over the stream.
  • The reader lock is always released in the finally block, even on errors or cancellation.

On this page