diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 8101b8e226..05c5a27b7c 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -65,7 +65,6 @@ export type { EntityTimelineRunItem, EntityTimelineSection, EntityTimelineState, - EntityTimelineReasoningDeltaItem, EntityTimelineTextChunk, EntityTimelineTextItem, EntityTimelineToolCallItem, diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 493718b9fd..72cf14a878 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -225,6 +225,11 @@ export interface EntityTimelineReasoningItem { run_id?: string order: TimelineOrder status: `streaming` | `completed` + // The concatenated `reasoning_delta` content lives under + // `body.content` rather than top-level — the wrapper is what + // forces TanStack DB to materialize the include before the row + // reaches `useLiveQuery`. See the timeline-query comment. + body?: { content: string } // Optional bolded title parsed at write time — only OpenAI Responses // emits these; null for Anthropic / DeepSeek / Moonshot. summary_title?: string @@ -233,13 +238,6 @@ export interface EntityTimelineReasoningItem { encrypted?: string } -export interface EntityTimelineReasoningDeltaItem { - key: string - reasoning_id: string - delta: string - order: TimelineOrder -} - export interface EntityTimelineStepItem { key: string run_id?: string @@ -264,7 +262,6 @@ export interface EntityTimelineRunRow { finish_reason?: string items: Collection reasoning: Collection - reasoningDeltas: Collection steps: Collection errors: Collection } @@ -1351,6 +1348,43 @@ function buildEntityTimelineQuery( }), })) + // Mirror `runItemsSource`'s shape for reasoning rows: the + // `concat(toArray(...))` include is *defined* on this top-level + // source, then the `reasoning:` consumer inside `runSource.select` + // below dereferences it into `content: r.reasoningContent`. The + // two-layer source/consumer split is load-bearing: `useLiveQuery` + // reads of a sub-collection that has an include co-defined in the + // same select return the row with `content: null` + a deferred + // `Symbol(includesRouting)` marker. Naming the include field in a + // downstream `.select` is what forces materialization — exactly + // how `items.text.content` pulls `item.textContent` out of + // `runItemsSource`. Alias is `reasoningChunk` to avoid colliding + // with `textChunk` used above. + const runReasoningSource = q + .from({ reasoning: db.collections.reasoning }) + .select(({ reasoning }) => ({ + key: reasoning.key, + run_id: reasoning.run_id, + order: coalesce(reasoning._timeline_order, `~`), + status: reasoning.status, + summary_title: reasoning.summary_title, + encrypted: reasoning.encrypted, + reasoningContent: concat( + toArray( + q + .from({ reasoningChunk: db.collections.reasoningDeltas }) + .where(({ reasoningChunk }) => + eq(reasoningChunk.reasoning_id, reasoning.key) + ) + .orderBy(({ reasoningChunk }) => + coalesce(reasoningChunk._timeline_order, `~`) + ) + .orderBy(({ reasoningChunk }) => reasoningChunk.key) + .select(({ reasoningChunk }) => reasoningChunk.delta) + ) + ), + })) + const runSource = q.from({ run: db.collections.runs }).select(({ run }) => ({ key: run.key, order: coalesce(run._timeline_order, `~`), @@ -1379,36 +1413,26 @@ function buildEntityTimelineQuery( toolCall: item.toolCall, })), reasoning: q - .from({ reasoning: db.collections.reasoning }) - .where(({ reasoning }) => eq(reasoning.run_id, run.key)) - .orderBy(({ reasoning }) => coalesce(reasoning._timeline_order, `~`)) - .orderBy(({ reasoning }) => reasoning.key) - .select(({ reasoning }) => ({ - key: reasoning.key, - run_id: reasoning.run_id, - order: coalesce(reasoning._timeline_order, `~`), - status: reasoning.status, - // `content` intentionally left undefined here — the previous - // `concat(toArray(...))` correlated sub-query went stale - // (returning `null` even though deltas were present) after the - // row's status flipped to `completed`. The UI assembles - // content client-side from `run.reasoningDeltas` below, which - // is a plain non-correlated query and stays reactive. - summary_title: reasoning.summary_title, - encrypted: reasoning.encrypted, - })), - reasoningDeltas: q - .from({ reasoningDelta: db.collections.reasoningDeltas }) - .where(({ reasoningDelta }) => eq(reasoningDelta.run_id, run.key)) - .orderBy(({ reasoningDelta }) => - coalesce(reasoningDelta._timeline_order, `~`) - ) - .orderBy(({ reasoningDelta }) => reasoningDelta.key) - .select(({ reasoningDelta }) => ({ - key: reasoningDelta.key, - reasoning_id: reasoningDelta.reasoning_id, - delta: reasoningDelta.delta, - order: coalesce(reasoningDelta._timeline_order, `~`), + .from({ r: runReasoningSource }) + .where(({ r }) => eq(r.run_id, run.key)) + .orderBy(({ r }) => r.order) + .orderBy(({ r }) => r.key) + .select(({ r }) => ({ + key: r.key, + run_id: r.run_id, + order: r.order, + status: r.status, + // Wrap the include reference inside a `caseWhen` object body + // — the same construct items uses to materialize + // `item.textContent` into `text.content`. Bare top-level + // references leave the include deferred until UI reads it + // through `useLiveQuery`, which never gets through. UI reads + // `entry.body?.content` instead of `entry.content`. + body: caseWhen(r.key, { + content: r.reasoningContent, + }), + summary_title: r.summary_title, + encrypted: r.encrypted, })), steps: q .from({ step: db.collections.steps }) diff --git a/packages/agents-runtime/test/entity-timeline.test.ts b/packages/agents-runtime/test/entity-timeline.test.ts index 30974ca733..2cbeebbe0b 100644 --- a/packages/agents-runtime/test/entity-timeline.test.ts +++ b/packages/agents-runtime/test/entity-timeline.test.ts @@ -2393,26 +2393,6 @@ describe(`entity includes query`, () => { expect(item.text?.content).toBe(`Hello world`) }) - it(`legacy reasoning sub-query: returns content even after status flip + post-status updates (currently skipped — sub-query goes stale; see #TODO)`, async () => { - // This test reproduces a staleness symptom we saw in the running - // app where the reasoning sub-collection's `content` field - // (built via `concat(toArray())`) returned - // `null` after the row's status flipped to `completed`. The - // current production code doesn't read that field anymore — the - // UI assembles content from `run.reasoningDeltas` instead — but - // this test is left in place as a placeholder for when we - // investigate / fix the underlying TanStack DB correlated - // sub-query behavior. - // - // Skipped by default until the projection is restored. - // - // To debug: remove the `.skip` and add a `content` field back to - // the reasoning sub-collection select in - // `entity-timeline.ts:buildEntityTimelineQuery`, then iterate - // with very small change-sets between assertions until you find - // the trigger. - }) - it(`reasoning content survives multiple run-row updates in sequence`, async () => { // Even closer to production: the run row gets updated MULTIPLE // times (each delta + status flip), which may invalidate the @@ -2480,15 +2460,7 @@ describe(`entity includes query`, () => { expect(runRow).toBeTruthy() const reasoning = Array.from(runRow.run.reasoning.toArray) as Array expect(reasoning).toHaveLength(1) - const deltas = Array.from(runRow.run.reasoningDeltas.toArray) as Array<{ - reasoning_id: string - delta: string - }> - const content = deltas - .filter((d) => d.reasoning_id === `reasoning-0`) - .map((d) => d.delta) - .join(``) - expect(content).toBe(`AB`) + expect(reasoning[0].body?.content).toBe(`AB`) }) it(`reasoning content populates even when text deltas are also present`, async () => { @@ -2559,14 +2531,9 @@ describe(`entity includes query`, () => { expect(runRow).toBeTruthy() const reasoning = Array.from(runRow.run.reasoning.toArray) as Array expect(reasoning).toHaveLength(1) - const reasoningDeltas = Array.from( - runRow.run.reasoningDeltas.toArray - ) as Array<{ reasoning_id: string; delta: string }> - const reasoningContent = reasoningDeltas - .filter((d) => d.reasoning_id === `reasoning-0`) - .map((d) => d.delta) - .join(``) - expect(reasoningContent).toBe(`Thinking part 1. Thinking part 2.`) + expect(reasoning[0].body?.content).toBe( + `Thinking part 1. Thinking part 2.` + ) const items = Array.from(runRow.run.items.toArray) as Array expect(items).toHaveLength(1) expect(items[0].text?.content).toBe(`Answer part 1. Answer part 2.`) @@ -2624,14 +2591,9 @@ describe(`entity includes query`, () => { expect(reasoning).toHaveLength(1) expect(reasoning[0].key).toBe(`reasoning-0`) expect(reasoning[0].status).toBe(`completed`) - const reasoningDeltas = Array.from( - runRow.run.reasoningDeltas.toArray - ) as Array<{ reasoning_id: string; delta: string }> - const content = reasoningDeltas - .filter((d) => d.reasoning_id === `reasoning-0`) - .map((d) => d.delta) - .join(``) - expect(content).toBe(`First thinking step. Second thinking step.`) + expect(reasoning[0].body?.content).toBe( + `First thinking step. Second thinking step.` + ) }) }) }) diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index 7aa268b448..65481428fd 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -401,37 +401,24 @@ export const AgentResponseLive = memo(function AgentResponseLive({ (q) => (run.errors ? q.from({ error: run.errors }) : undefined), [run.errors] ) - // Subscribe to the run's reasoning rows + deltas. We assemble - // `content` client-side from the deltas rather than reading it - // off the projected `reasoning.content`, because the correlated - // sub-query that produced that field went stale (returning `null`) - // after the row's status flipped to `completed`. Client-side - // concat is reliable and effectively free at this scale. + // Subscribe to the run's reasoning rows so the section ticks as + // each `reasoning_delta` arrives. Empty array for runs without + // any reasoning content (most non-extended-thinking models). const { data: reasoningRows = [] } = useLiveQuery( (q) => (run.reasoning ? q.from({ reasoning: run.reasoning }) : undefined), [run.reasoning] ) - const { data: reasoningDeltaRows = [] } = useLiveQuery( - (q) => - run.reasoningDeltas ? q.from({ delta: run.reasoningDeltas }) : undefined, - [run.reasoningDeltas] - ) - const reasoningEntries = useMemo>(() => { - const contentByReasoningId = new Map() - for (const delta of reasoningDeltaRows as Array<{ - reasoning_id: string - delta: string - }>) { - contentByReasoningId.set( - delta.reasoning_id, - (contentByReasoningId.get(delta.reasoning_id) ?? ``) + delta.delta - ) - } - return ( + const reasoningEntries = useMemo>( + () => ( - reasoningRows as Array< - Omit & { order?: unknown } - > + reasoningRows as Array<{ + key: string + status: `streaming` | `completed` + body?: { content?: string } + summary_title?: string + encrypted?: string + order?: unknown + }> ) .slice() // The live query already orders by `_timeline_order` then key, @@ -445,10 +432,13 @@ export const AgentResponseLive = memo(function AgentResponseLive({ status: row.status, summary_title: row.summary_title, encrypted: row.encrypted, - content: contentByReasoningId.get(row.key) ?? ``, - })) - ) - }, [reasoningRows, reasoningDeltaRows]) + // The projection in `entity-timeline.ts` wraps content under + // `body` (inside a caseWhen) to force include materialization. + // See the comment there. + content: row.body?.content ?? ``, + })), + [reasoningRows] + ) const sortedItems = useMemo( () => [...items].sort(compareLiveRunItems), [items]