Support non-text (object/JSON) streaming#18
Support non-text (object/JSON) streaming#18paschaldev wants to merge 1 commit intoget-convex:mainfrom
Conversation
📝 WalkthroughWalkthroughThe PR extends the streaming system to support structured data (JSON objects) alongside text by introducing an optional Changes
Sequence DiagramsequenceDiagram
participant Frontend as React Hook
participant Client as Streaming Client
participant Backend as Convex Query
participant Stream as Stream Handler
Frontend->>Frontend: Call useStream({listItems: true})
Frontend->>Client: Call stream() with listItems flag
Client->>Backend: Query getStreamBody(streamId, listItems=true)
Backend->>Stream: Handler processes stream
Stream->>Stream: Accumulate chunks into textList
Backend-->>Client: Return {text, textList, status}
Client->>Client: Parse textList as JSON objects
Client->>Frontend: Emit chunk updates
Frontend->>Frontend: Update streamBodyAsList state
Frontend-->>Frontend: Return {text, textList}
Estimated code review effort🎯 2 (Simple) | ⏱️ ~15 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can suggest fixes for GitHub Check annotations.Configure the |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/react/index.ts (2)
69-80:⚠️ Potential issue | 🔴 CriticalFix streaming chunk boundary handling for reliable JSON parsing.
streamBodyAsListis populated directly from HTTP transport chunks (reader.read()), which don't align with JSON object boundaries. Line 175'sstreamBodyAsList.map(item => JSON.parse(item))will fail when JSON objects are split across chunks, multiple objects appear in one chunk, or a trailing empty chunk is sent. This makesparsedListunreliable.Implement an explicit wire format for item framing (e.g., newline-delimited JSON or length-prefixed messages), or buffer and parse based on logical boundaries rather than transport boundaries.
Also applies to lines 170-178.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/react/index.ts` around lines 69 - 80, The current approach appends raw HTTP chunks into streamBodyAsList and later JSON.parse()s each chunk (e.g., in parsedList), which breaks when objects span chunks or multiple objects exist per chunk; modify the streaming handling in startStreaming's chunk callback (and related uses of streamBodyAsList and streamBody) to implement explicit framing: either emit newline-delimited JSON (NDJSON) from the wire and buffer/split on '\n' before parsing, or maintain a parseBuffer string inside the callback that concatenates chunks, repeatedly extracts complete framed messages (e.g., delimited by '\n' or length-prefix), JSON.parse()s each complete message and pushes the parsed object into parsedList (only push complete objects), and keep any leftover partial chunk in parseBuffer for the next callback; update usages of streamBodyAsList/parsedList to rely on these framed/parsed messages rather than raw transport chunks.
28-33:⚠️ Potential issue | 🟠 MajorForward
listItemsto the persistence query.The hook accepts
opts.listItemsbut never passes it togetPersistentBody. Line 66 calls the query with only{ streamId }, causing reloads, non-driving clients, and error fallbacks to ignore thelistItemsparameter. The backend and client wrapper are already set up to support this (seesrc/component/lib.tsandsrc/client/index.ts); the React hook just needs to propagate it.The function signature must also be updated to reflect the optional parameter:
🧩 Suggested wiring
export function useStream( getPersistentBody: FunctionReference< "query", "public", - { streamId: string }, + { streamId: string; listItems?: boolean }, StreamBody >, @@ const persistentBody = useQuery( getPersistentBody, - usePersistence && streamId ? { streamId } : "skip", + usePersistence && streamId + ? { streamId, listItems: opts?.listItems } + : "skip", );Also applies to: 42-43, 64-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/react/index.ts` around lines 28 - 33, The hook currently calls getPersistentBody with only { streamId } and never forwards opts.listItems; update the getPersistentBody FunctionReference signature and every call site in this file to accept and pass an optional listItems parameter (e.g., { streamId, listItems: opts?.listItems }) so the persistence query receives the option; specifically modify the signature around getPersistentBody and the calls at/near lines referencing getPersistentBody (and any uses inside the hook that invoke the query) to include the optional listItems, preserving existing behavior when opts.listItems is undefined.
🧹 Nitpick comments (1)
example/convex/messages.ts (1)
35-39: Add areturnsvalidator togetHistory.This new
internalQuerynow has an explicit args contract, but its return shape is still unchecked. Addingreturnskeeps this internal API enforced as the history payload evolves.🧩 Suggested validator
export const getHistory = internalQuery({ args: { listItems: v.optional(v.boolean()), }, + returns: v.array( + v.object({ + role: v.union(v.literal("user"), v.literal("assistant")), + content: v.string(), + }), + ), handler: async (ctx, { listItems}) => {Based on learnings: ALWAYS include argument and return validators for all Convex functions, including
query,internalQuery,mutation,internalMutation,action, andinternalAction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@example/convex/messages.ts` around lines 35 - 39, getHistory's internalQuery defines args but lacks a returns validator; update the internalQuery call for getHistory to include a returns: validator that exactly matches the object/array shape the handler returns (use Convex's v.* validators). Locate the getHistory internalQuery and add a returns entry that mirrors the handler's return value (for example an array validator if handler returns a list or an object validator for a payload), ensuring the validator uses the same v namespace used for args.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/client/index.ts`:
- Around line 167-171: The stream finalization branch only calls addChunk when
pending is truthy, leaving streams stuck as "streaming" if the last
chunkAppender already flushed pending; remove the conditional and always
finalize the stream by invoking addChunk(ctx, streamId, pending, true) (or call
the dedicated finalize helper) so the stream status transitions to final even
when pending is empty—ensure addChunk (or the finalize helper) tolerates an
empty pending payload.
---
Outside diff comments:
In `@src/react/index.ts`:
- Around line 69-80: The current approach appends raw HTTP chunks into
streamBodyAsList and later JSON.parse()s each chunk (e.g., in parsedList), which
breaks when objects span chunks or multiple objects exist per chunk; modify the
streaming handling in startStreaming's chunk callback (and related uses of
streamBodyAsList and streamBody) to implement explicit framing: either emit
newline-delimited JSON (NDJSON) from the wire and buffer/split on '\n' before
parsing, or maintain a parseBuffer string inside the callback that concatenates
chunks, repeatedly extracts complete framed messages (e.g., delimited by '\n' or
length-prefix), JSON.parse()s each complete message and pushes the parsed object
into parsedList (only push complete objects), and keep any leftover partial
chunk in parseBuffer for the next callback; update usages of
streamBodyAsList/parsedList to rely on these framed/parsed messages rather than
raw transport chunks.
- Around line 28-33: The hook currently calls getPersistentBody with only {
streamId } and never forwards opts.listItems; update the getPersistentBody
FunctionReference signature and every call site in this file to accept and pass
an optional listItems parameter (e.g., { streamId, listItems: opts?.listItems })
so the persistence query receives the option; specifically modify the signature
around getPersistentBody and the calls at/near lines referencing
getPersistentBody (and any uses inside the hook that invoke the query) to
include the optional listItems, preserving existing behavior when opts.listItems
is undefined.
---
Nitpick comments:
In `@example/convex/messages.ts`:
- Around line 35-39: getHistory's internalQuery defines args but lacks a returns
validator; update the internalQuery call for getHistory to include a returns:
validator that exactly matches the object/array shape the handler returns (use
Convex's v.* validators). Locate the getHistory internalQuery and add a returns
entry that mirrors the handler's return value (for example an array validator if
handler returns a list or an object validator for a payload), ensuring the
validator uses the same v namespace used for args.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f3b20ec2-3c44-4d8e-9805-7b62884de5c1
📒 Files selected for processing (6)
README.mdexample/convex/messages.tsexample/convex/streaming.tssrc/client/index.tssrc/component/lib.tssrc/react/index.ts
| // Do not add an empty chunk to the database. | ||
| if (pending) { | ||
| // Success? Flush any last updates | ||
| await this.addChunk(ctx, streamId, pending, true); | ||
| } |
There was a problem hiding this comment.
Always finalize the stream when pending is empty.
If the last chunkAppender call already flushed pending—which is the normal path for chunkAppender(..., true) and any chunk ending in punctuation—this block is skipped and the stream stays "streaming" forever. That leaves subscribers with the wrong status and breaks the new structured-streaming flow.
🧩 Proposed fix
- // Do not add an empty chunk to the database.
- if (pending) {
- // Success? Flush any last updates
- await this.addChunk(ctx, streamId, pending, true);
- }
+ if (pending) {
+ // Success? Flush any last updates.
+ await this.addChunk(ctx, streamId, pending, true);
+ } else {
+ await this.setStreamStatus(ctx, streamId, "done");
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Do not add an empty chunk to the database. | |
| if (pending) { | |
| // Success? Flush any last updates | |
| await this.addChunk(ctx, streamId, pending, true); | |
| } | |
| if (pending) { | |
| // Success? Flush any last updates. | |
| await this.addChunk(ctx, streamId, pending, true); | |
| } else { | |
| await this.setStreamStatus(ctx, streamId, "done"); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/client/index.ts` around lines 167 - 171, The stream finalization branch
only calls addChunk when pending is truthy, leaving streams stuck as "streaming"
if the last chunkAppender already flushed pending; remove the conditional and
always finalize the stream by invoking addChunk(ctx, streamId, pending, true)
(or call the dedicated finalize helper) so the stream status transitions to
final even when pending is empty—ensure addChunk (or the finalize helper)
tolerates an empty pending payload.
|
Have you tried using the Agent component, in particular using its streaming
decoupled from the rest of its AI SDK functionality?
https://docs.convex.dev/agents/streaming#advanced-streaming-deltas-asynchronously-without-using-an-agent
An example doing similar things to persistent-text-streaming:
get-convex/agent#232
…On Tue, Mar 17, 2026 at 6:57 AM Paschal Ezeugwu ***@***.***> wrote:
Support non-text (object/JSON) streaming Why this is needed
The component was built for *text streaming* (e.g. AI chat), where
persistence is batched on *sentence boundaries* (., !, ?) to avoid
writing every token to the database. That works well for natural language,
but many streams are *structured data*—JSON or object payloads—not
sentences. For example:
- *Mastra <https://mastra.ai>* and other agent frameworks stream *object
responses* (e.g. tool calls, structured steps, events). There are no
sentence delimiters, so the previous logic would often buffer until the end
or persist at wrong boundaries, leading to incomplete or incorrect
persisted chunks.
- Any API that streams *JSON lines* (NDJSON), *SSE with JSON payloads*,
or *incremental object updates* has the same issue: each chunk is a
complete unit and should be persisted as soon as it’s written, not when a
delimiter appears in the string.
This PR adds first-class support for that use case so the same component
works for both text and object/JSON streaming.
Benefits
- *Correct persistence for object/JSON streams*: Callers can mark a
chunk as “complete object/JSON” so it’s written to the DB immediately
instead of waiting for sentence delimiters. Essential for Mastra and
similar frameworks that stream structured responses.
- *Structured consumption*: Queries can return chunks as a *list* (
textList) via listItems: true, so clients can treat the stream as an
array of items (e.g. parse each element as JSON) instead of one
concatenated string.
- *One component for both modes*: Text streaming keeps the existing
sentence-boundary batching; object/JSON streaming opts in per chunk via the
chunk appender’s second argument. No separate pipeline required.
Changes
- *Chunk appender*: chunkAppender(text, isObjectOrJson?) — when the
second argument is true, the chunk is persisted immediately (no
delimiter check). Use this for each complete JSON/object chunk (e.g. each
Mastra event or tool-call payload).
- *List items*: getStreamBody(ctx, streamId, listItems?) — when
listItems is true, the returned body includes textList (array of
chunks). The React hook and StreamBody type already expose textList;
queries can now request it so clients can iterate or parse by item.
- *README*: New “Non-text (object/JSON) streaming” section with
backend and frontend code samples (streaming JSON lines, using textList,
and parsing on the client). Also fixes the getChatBody example to use
args.listItems.
Example: Mastra
With Mastra <https://mastra.ai>, agents often stream structured objects
(e.g. tool calls, steps). You can persist and replay them with this
component by:
1. In your HTTP action, for each Mastra streamed object (or JSON
line), call
await chunkAppender(JSON.stringify(chunk), true)
so each object is stored as soon as it’s emitted.
2. Expose a query that calls getStreamBody(ctx, streamId, true) (or
pass listItems: true in args) and returns the result so the client
gets textList.
3. On the client, use useStream with that query and map textList to
parsed objects (e.g. textList.map(line => JSON.parse(line))) for
rendering or replay.
This PR makes that flow supported and documented, so Mastra (and other
object-streaming frameworks) can use persistent-text-streaming without
relying on sentence-boundary heuristics.
------------------------------
By submitting this pull request, I confirm that you can use, modify, copy,
and redistribute this contribution, under the terms of your choice.
------------------------------
You can view, comment on, or merge this pull request online at:
#18
Commit Summary
- 801ad8d
<801ad8d>
return text as a chunk list
File Changes
(6 files
<https://github.com/get-convex/persistent-text-streaming/pull/18/files>)
- *M* README.md
<https://github.com/get-convex/persistent-text-streaming/pull/18/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5>
(53)
- *M* example/convex/messages.ts
<https://github.com/get-convex/persistent-text-streaming/pull/18/files#diff-bdec39d811c77776a973715ecb6117d5fd05da4e29577bf952143dcadd0ad2ad>
(7)
- *M* example/convex/streaming.ts
<https://github.com/get-convex/persistent-text-streaming/pull/18/files#diff-93aac08f2a79e01d29639a5353f22437e303bafaecc5f5cde355447775fd693d>
(7)
- *M* src/client/index.ts
<https://github.com/get-convex/persistent-text-streaming/pull/18/files#diff-24f1ac19d22913269d99a8a5afa4c13aa7fd2c8b9858529046424841ca6421da>
(27)
- *M* src/component/lib.ts
<https://github.com/get-convex/persistent-text-streaming/pull/18/files#diff-2452b78d1f8feefac3139c77eb2ac9518433bde434c0129719ca628e687c1f52>
(7)
- *M* src/react/index.ts
<https://github.com/get-convex/persistent-text-streaming/pull/18/files#diff-587b8de5cf54d87ca22716308934db0c220fd5f80869c11f3ec096f91fe2bb3d>
(7)
Patch Links:
- https://github.com/get-convex/persistent-text-streaming/pull/18.patch
- https://github.com/get-convex/persistent-text-streaming/pull/18.diff
—
Reply to this email directly, view it on GitHub
<#18>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACZQW3MSUE7XUEEL3JYTKT4RFKVHAVCNFSM6AAAAACWUU5JTKVHI2DSMVQWIX3LMV43ASLTON2WKOZUGA4DQNZUGQ4TKNY>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
Why this is needed
The component was built for text streaming (e.g. AI chat), where persistence is batched on sentence boundaries (
.,!,?) to avoid writing every token to the database. That works well for natural language, but many streams are structured data—JSON or object payloads—not sentences. For example:This PR adds first-class support for that use case so the same component works for both text and object/JSON streaming.
Benefits
textList) vialistItems: true, so clients can treat the stream as an array of items (e.g. parse each element as JSON) instead of one concatenated string.Changes
chunkAppender(text, isObjectOrJson?)— when the second argument istrue, the chunk is persisted immediately (no delimiter check). Use this for each complete JSON/object chunk (e.g. each Mastra event or tool-call payload).getStreamBody(ctx, streamId, listItems?)— whenlistItemsis true, the returned body includestextList(array of chunks). The React hook andStreamBodytype already exposetextList; queries can now request it so clients can iterate or parse by item.textList, and parsing on the client). Also fixes thegetChatBodyexample to useargs.listItems.Example: Mastra
With Mastra, agents often stream structured objects (e.g. tool calls, steps). You can persist and replay them with this component by:
await chunkAppender(JSON.stringify(chunk), true)so each object is stored as soon as it’s emitted.
getStreamBody(ctx, streamId, true)(or passlistItems: truein args) and returns the result so the client getstextList.useStreamwith that query and maptextListto parsed objects (e.g.textList.map(line => JSON.parse(line))) for rendering or replay.This PR makes that flow supported and documented, so Mastra (and other object-streaming frameworks) can use persistent-text-streaming without relying on sentence-boundary heuristics.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Summary by CodeRabbit
New Features
Documentation