Skip to content

Support non-text (object/JSON) streaming#18

Open
paschaldev wants to merge 1 commit intoget-convex:mainfrom
paschaldev:feature/pe-chunk-streaming
Open

Support non-text (object/JSON) streaming#18
paschaldev wants to merge 1 commit intoget-convex:mainfrom
paschaldev:feature/pe-chunk-streaming

Conversation

@paschaldev
Copy link

@paschaldev paschaldev commented Mar 17, 2026

Why this is needed

The component was built for text streaming (e.g. AI chat), where persistence is batched on sentence boundaries (., !, ?) to avoid writing every token to the database. That works well for natural language, but many streams are structured data—JSON or object payloads—not sentences. For example:

  • Mastra and other agent frameworks stream object responses (e.g. tool calls, structured steps, events). There are no sentence delimiters, so the previous logic would often buffer until the end or persist at wrong boundaries, leading to incomplete or incorrect persisted chunks.
  • Any API that streams JSON lines (NDJSON), SSE with JSON payloads, or incremental object updates has the same issue: each chunk is a complete unit and should be persisted as soon as it’s written, not when a delimiter appears in the string.

This PR adds first-class support for that use case so the same component works for both text and object/JSON streaming.

Benefits

  • Correct persistence for object/JSON streams: Callers can mark a chunk as “complete object/JSON” so it’s written to the DB immediately instead of waiting for sentence delimiters. Essential for Mastra and similar frameworks that stream structured responses.
  • Structured consumption: Queries can return chunks as a list (textList) via listItems: true, so clients can treat the stream as an array of items (e.g. parse each element as JSON) instead of one concatenated string.
  • One component for both modes: Text streaming keeps the existing sentence-boundary batching; object/JSON streaming opts in per chunk via the chunk appender’s second argument. No separate pipeline required.

Changes

  • Chunk appender: chunkAppender(text, isObjectOrJson?) — when the second argument is true, the chunk is persisted immediately (no delimiter check). Use this for each complete JSON/object chunk (e.g. each Mastra event or tool-call payload).
  • List items: getStreamBody(ctx, streamId, listItems?) — when listItems is true, the returned body includes textList (array of chunks). The React hook and StreamBody type already expose textList; queries can now request it so clients can iterate or parse by item.
  • README: New “Non-text (object/JSON) streaming” section with backend and frontend code samples (streaming JSON lines, using textList, and parsing on the client). Also fixes the getChatBody example to use args.listItems.

Example: Mastra

With Mastra, agents often stream structured objects (e.g. tool calls, steps). You can persist and replay them with this component by:

  1. In your HTTP action, for each Mastra streamed object (or JSON line), call
    await chunkAppender(JSON.stringify(chunk), true)
    so each object is stored as soon as it’s emitted.
  2. Expose a query that calls getStreamBody(ctx, streamId, true) (or pass listItems: true in args) and returns the result so the client gets textList.
  3. On the client, use useStream with that query and map textList to parsed objects (e.g. textList.map(line => JSON.parse(line))) for rendering or replay.

This PR makes that flow supported and documented, so Mastra (and other object-streaming frameworks) can use persistent-text-streaming without relying on sentence-boundary heuristics.


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Summary by CodeRabbit

  • New Features

    • Added support for streaming and retrieving structured data (JSON objects) alongside text content.
    • Introduced optional flag to return streamed content as an organized list of items for easier programmatic access.
  • Documentation

    • Expanded documentation with code examples demonstrating JSON streaming workflows and structured data handling patterns.

@coderabbitai
Copy link

coderabbitai bot commented Mar 17, 2026

📝 Walkthrough

Walkthrough

The PR extends the streaming system to support structured data (JSON objects) alongside text by introducing an optional listItems boolean flag throughout the query and handler layers, adding a textList field to accumulate streamed chunks as a list, and updating documentation with concrete JSON streaming examples.

Changes

Cohort / File(s) Summary
Documentation
README.md
Added comprehensive guide for JSON streaming workflows, including backend streaming via JSON lines, persistence strategy with listItems flag, and frontend parsing examples for structured data.
Backend Query Handlers
example/convex/messages.ts, example/convex/streaming.ts, src/component/lib.ts
Threaded optional listItems boolean parameter through query args and handlers; getStreamText now populates textList array when flag is true and stream is not pending.
Client Streaming Logic
src/client/index.ts
Extended StreamBody with textList field, updated ChunkAppender to accept optional isObjectOrJson flag for conditional persistence logic, modified StreamWriter type to include streamId parameter, and refined delimiter-based flushing to skip empty final chunks.
React Integration
src/react/index.ts
Added optional listItems parameter to useStream options, introduced streamBodyAsList state to accumulate streamed chunks, and exposed textList in returned StreamBody object with updated memoization dependencies.

Sequence Diagram

sequenceDiagram
    participant Frontend as React Hook
    participant Client as Streaming Client
    participant Backend as Convex Query
    participant Stream as Stream Handler
    
    Frontend->>Frontend: Call useStream({listItems: true})
    Frontend->>Client: Call stream() with listItems flag
    Client->>Backend: Query getStreamBody(streamId, listItems=true)
    Backend->>Stream: Handler processes stream
    Stream->>Stream: Accumulate chunks into textList
    Backend-->>Client: Return {text, textList, status}
    Client->>Client: Parse textList as JSON objects
    Client->>Frontend: Emit chunk updates
    Frontend->>Frontend: Update streamBodyAsList state
    Frontend-->>Frontend: Return {text, textList}
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~15 minutes

Poem

🐰 A rabbit hops through streams so bright,
With listItems flagged—JSON takes flight!
Chunks accumulate in lists, not text alone,
Structured data's now made known!
Hop, hop—your data flows just right! 🚀

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Support non-text (object/JSON) streaming' directly and specifically describes the main feature addition across all changed files—enabling structured data streaming alongside text streaming.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can suggest fixes for GitHub Check annotations.

Configure the reviews.tools.github-checks setting to adjust the time to wait for GitHub Checks to complete.

@paschaldev paschaldev changed the title return text as a chunk list Support non-text (object/JSON) streaming Mar 17, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/react/index.ts (2)

69-80: ⚠️ Potential issue | 🔴 Critical

Fix streaming chunk boundary handling for reliable JSON parsing.

streamBodyAsList is populated directly from HTTP transport chunks (reader.read()), which don't align with JSON object boundaries. Line 175's streamBodyAsList.map(item => JSON.parse(item)) will fail when JSON objects are split across chunks, multiple objects appear in one chunk, or a trailing empty chunk is sent. This makes parsedList unreliable.

Implement an explicit wire format for item framing (e.g., newline-delimited JSON or length-prefixed messages), or buffer and parse based on logical boundaries rather than transport boundaries.

Also applies to lines 170-178.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/react/index.ts` around lines 69 - 80, The current approach appends raw
HTTP chunks into streamBodyAsList and later JSON.parse()s each chunk (e.g., in
parsedList), which breaks when objects span chunks or multiple objects exist per
chunk; modify the streaming handling in startStreaming's chunk callback (and
related uses of streamBodyAsList and streamBody) to implement explicit framing:
either emit newline-delimited JSON (NDJSON) from the wire and buffer/split on
'\n' before parsing, or maintain a parseBuffer string inside the callback that
concatenates chunks, repeatedly extracts complete framed messages (e.g.,
delimited by '\n' or length-prefix), JSON.parse()s each complete message and
pushes the parsed object into parsedList (only push complete objects), and keep
any leftover partial chunk in parseBuffer for the next callback; update usages
of streamBodyAsList/parsedList to rely on these framed/parsed messages rather
than raw transport chunks.

28-33: ⚠️ Potential issue | 🟠 Major

Forward listItems to the persistence query.

The hook accepts opts.listItems but never passes it to getPersistentBody. Line 66 calls the query with only { streamId }, causing reloads, non-driving clients, and error fallbacks to ignore the listItems parameter. The backend and client wrapper are already set up to support this (see src/component/lib.ts and src/client/index.ts); the React hook just needs to propagate it.

The function signature must also be updated to reflect the optional parameter:

🧩 Suggested wiring
 export function useStream(
   getPersistentBody: FunctionReference<
     "query",
     "public",
-    { streamId: string },
+    { streamId: string; listItems?: boolean },
     StreamBody
   >,
@@
   const persistentBody = useQuery(
     getPersistentBody,
-    usePersistence && streamId ? { streamId } : "skip",
+    usePersistence && streamId
+      ? { streamId, listItems: opts?.listItems }
+      : "skip",
   );

Also applies to: 42-43, 64-67

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/react/index.ts` around lines 28 - 33, The hook currently calls
getPersistentBody with only { streamId } and never forwards opts.listItems;
update the getPersistentBody FunctionReference signature and every call site in
this file to accept and pass an optional listItems parameter (e.g., { streamId,
listItems: opts?.listItems }) so the persistence query receives the option;
specifically modify the signature around getPersistentBody and the calls at/near
lines referencing getPersistentBody (and any uses inside the hook that invoke
the query) to include the optional listItems, preserving existing behavior when
opts.listItems is undefined.
🧹 Nitpick comments (1)
example/convex/messages.ts (1)

35-39: Add a returns validator to getHistory.

This new internalQuery now has an explicit args contract, but its return shape is still unchecked. Adding returns keeps this internal API enforced as the history payload evolves.

🧩 Suggested validator
 export const getHistory = internalQuery({
   args: {
     listItems: v.optional(v.boolean()),
   },
+  returns: v.array(
+    v.object({
+      role: v.union(v.literal("user"), v.literal("assistant")),
+      content: v.string(),
+    }),
+  ),
   handler: async (ctx, { listItems}) => {

Based on learnings: ALWAYS include argument and return validators for all Convex functions, including query, internalQuery, mutation, internalMutation, action, and internalAction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@example/convex/messages.ts` around lines 35 - 39, getHistory's internalQuery
defines args but lacks a returns validator; update the internalQuery call for
getHistory to include a returns: validator that exactly matches the object/array
shape the handler returns (use Convex's v.* validators). Locate the getHistory
internalQuery and add a returns entry that mirrors the handler's return value
(for example an array validator if handler returns a list or an object validator
for a payload), ensuring the validator uses the same v namespace used for args.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/client/index.ts`:
- Around line 167-171: The stream finalization branch only calls addChunk when
pending is truthy, leaving streams stuck as "streaming" if the last
chunkAppender already flushed pending; remove the conditional and always
finalize the stream by invoking addChunk(ctx, streamId, pending, true) (or call
the dedicated finalize helper) so the stream status transitions to final even
when pending is empty—ensure addChunk (or the finalize helper) tolerates an
empty pending payload.

---

Outside diff comments:
In `@src/react/index.ts`:
- Around line 69-80: The current approach appends raw HTTP chunks into
streamBodyAsList and later JSON.parse()s each chunk (e.g., in parsedList), which
breaks when objects span chunks or multiple objects exist per chunk; modify the
streaming handling in startStreaming's chunk callback (and related uses of
streamBodyAsList and streamBody) to implement explicit framing: either emit
newline-delimited JSON (NDJSON) from the wire and buffer/split on '\n' before
parsing, or maintain a parseBuffer string inside the callback that concatenates
chunks, repeatedly extracts complete framed messages (e.g., delimited by '\n' or
length-prefix), JSON.parse()s each complete message and pushes the parsed object
into parsedList (only push complete objects), and keep any leftover partial
chunk in parseBuffer for the next callback; update usages of
streamBodyAsList/parsedList to rely on these framed/parsed messages rather than
raw transport chunks.
- Around line 28-33: The hook currently calls getPersistentBody with only {
streamId } and never forwards opts.listItems; update the getPersistentBody
FunctionReference signature and every call site in this file to accept and pass
an optional listItems parameter (e.g., { streamId, listItems: opts?.listItems })
so the persistence query receives the option; specifically modify the signature
around getPersistentBody and the calls at/near lines referencing
getPersistentBody (and any uses inside the hook that invoke the query) to
include the optional listItems, preserving existing behavior when opts.listItems
is undefined.

---

Nitpick comments:
In `@example/convex/messages.ts`:
- Around line 35-39: getHistory's internalQuery defines args but lacks a returns
validator; update the internalQuery call for getHistory to include a returns:
validator that exactly matches the object/array shape the handler returns (use
Convex's v.* validators). Locate the getHistory internalQuery and add a returns
entry that mirrors the handler's return value (for example an array validator if
handler returns a list or an object validator for a payload), ensuring the
validator uses the same v namespace used for args.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f3b20ec2-3c44-4d8e-9805-7b62884de5c1

📥 Commits

Reviewing files that changed from the base of the PR and between 374195a and 801ad8d.

📒 Files selected for processing (6)
  • README.md
  • example/convex/messages.ts
  • example/convex/streaming.ts
  • src/client/index.ts
  • src/component/lib.ts
  • src/react/index.ts

Comment on lines +167 to +171
// Do not add an empty chunk to the database.
if (pending) {
// Success? Flush any last updates
await this.addChunk(ctx, streamId, pending, true);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Always finalize the stream when pending is empty.

If the last chunkAppender call already flushed pending—which is the normal path for chunkAppender(..., true) and any chunk ending in punctuation—this block is skipped and the stream stays "streaming" forever. That leaves subscribers with the wrong status and breaks the new structured-streaming flow.

🧩 Proposed fix
-      // Do not add an empty chunk to the database.
-      if (pending) {
-        // Success? Flush any last updates
-        await this.addChunk(ctx, streamId, pending, true);
-      }
+      if (pending) {
+        // Success? Flush any last updates.
+        await this.addChunk(ctx, streamId, pending, true);
+      } else {
+        await this.setStreamStatus(ctx, streamId, "done");
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Do not add an empty chunk to the database.
if (pending) {
// Success? Flush any last updates
await this.addChunk(ctx, streamId, pending, true);
}
if (pending) {
// Success? Flush any last updates.
await this.addChunk(ctx, streamId, pending, true);
} else {
await this.setStreamStatus(ctx, streamId, "done");
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/client/index.ts` around lines 167 - 171, The stream finalization branch
only calls addChunk when pending is truthy, leaving streams stuck as "streaming"
if the last chunkAppender already flushed pending; remove the conditional and
always finalize the stream by invoking addChunk(ctx, streamId, pending, true)
(or call the dedicated finalize helper) so the stream status transitions to
final even when pending is empty—ensure addChunk (or the finalize helper)
tolerates an empty pending payload.

@ianmacartney
Copy link
Member

ianmacartney commented Mar 18, 2026 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants