diff --git a/packages/fetch/src/stream.test.ts b/packages/fetch/src/stream.test.ts index 084002ac54a..d7d996dea29 100644 --- a/packages/fetch/src/stream.test.ts +++ b/packages/fetch/src/stream.test.ts @@ -1,6 +1,6 @@ import { Readable } from "stream"; import { describe, expect, it, test } from "vitest"; -import { parseDataLine, streamSse } from "./stream.js"; +import { parseDataLine, streamJSON, streamSse } from "./stream.js"; function createMockResponse(sseLines: string[]): Response { // Create a Readable stream that emits the SSE lines @@ -21,6 +21,23 @@ function createMockResponse(sseLines: string[]): Response { } as unknown as Response; } +function createRawMockResponse(chunks: string[]): Response { + const stream = new Readable({ + read() { + for (const chunk of chunks) { + this.push(chunk); + } + this.push(null); + }, + }) as any; + + return { + status: 200, + body: stream, + text: async () => "", + } as unknown as Response; +} + describe("streamSse", () => { it("yields parsed SSE data objects that ends with `data:[DONE]`", async () => { const sseLines = [ @@ -63,6 +80,19 @@ describe("streamSse", () => { }); }); +describe("streamJSON", () => { + it("yields the final JSON object even without a trailing newline", async () => { + const response = createRawMockResponse(['{"foo":"bar"}\n', '{"baz":42}']); + + const results = []; + for await (const data of streamJSON(response)) { + results.push(data); + } + + expect(results).toEqual([{ foo: "bar" }, { baz: 42 }]); + }); +}); + describe("parseDataLine", () => { test("parseDataLine should parse valid JSON data with 'data: ' prefix", () => { const line = 'data: {"message":"hello","status":"ok"}'; diff --git a/packages/fetch/src/stream.ts b/packages/fetch/src/stream.ts index f73d61dbafc..6c12146c924 100644 --- a/packages/fetch/src/stream.ts +++ b/packages/fetch/src/stream.ts @@ -166,4 +166,13 @@ export async function* streamJSON(response: Response): AsyncGenerator { buffer = buffer.slice(position + 1); } } + + if (buffer.length > 0) { + try { + const data = JSON.parse(buffer); + yield data; + } catch (e) { + throw new Error(`Malformed JSON sent from server: ${buffer}`); + } + } }