parseSSEStream
Async generator that parses Server-Sent Events from a ReadableStream.
parseSSEStream converts a raw ReadableStream<Uint8Array> (from fetch) into an async generator of parsed JSON events.
import { parseSSEStream } from "@deltakit/core";Signature
async function* parseSSEStream(
stream: ReadableStream<Uint8Array>,
signal?: AbortSignal,
): AsyncGenerator<SSEEvent>| Parameter | Type | Description |
|---|---|---|
stream | ReadableStream<Uint8Array> | The response.body from a fetch() call |
signal | AbortSignal? | Optional signal to cancel stream parsing |
Usage
const response = await fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ message: "Hello" }),
});
if (response.body) {
for await (const event of parseSSEStream(response.body)) {
switch (event.type) {
case "text_delta":
process.stdout.write(event.delta);
break;
case "tool_call":
console.log("Tool call:", event.tool_name);
break;
}
}
}Expected Stream Format
The parser expects SSE data lines containing JSON:
data: {"type":"text_delta","delta":"Hello"}
data: {"type":"text_delta","delta":" world"}
data: [DONE]Each data: line must contain valid JSON (except [DONE]). Lines are separated by double newlines (\n\n).
Behavior
- Buffering. Handles partial chunks split across multiple
read()calls. - Malformed JSON. Skips invalid JSON lines silently (does not throw).
- Non-data lines. Ignores
event:,id:,retry:, and comment lines. [DONE]. Recognized as a termination signal. The generator returns.- Abort. If
signalis aborted, the generator stops on the next iteration.
Cancellation
const controller = new AbortController();
const response = await fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ message: "Hello" }),
signal: controller.signal,
});
if (response.body) {
for await (const event of parseSSEStream(response.body, controller.signal)) {
console.log(event);
if (shouldStop) {
controller.abort();
break;
}
}
}Error Handling
try {
for await (const event of parseSSEStream(response.body)) {
// process event
}
} catch (err) {
if (err instanceof DOMException && err.name === "AbortError") {
// Stream was cancelled - expected
} else {
// Actual stream error
console.error("Stream error:", err);
}
}Notes
- If using React, the
useStreamChathook from@deltakit/reactcallsparseSSEStreaminternally, so you don't need to use it directly. parseSSEStreamis useful when building non-React integrations or when you need direct control over the stream.- The reader lock is always released in the
finallyblock, even on errors or cancellation.