Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/agents-runtime/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ export type {
EntityTimelineRunItem,
EntityTimelineSection,
EntityTimelineState,
EntityTimelineReasoningDeltaItem,
EntityTimelineTextChunk,
EntityTimelineTextItem,
EntityTimelineToolCallItem,
Expand Down
100 changes: 62 additions & 38 deletions packages/agents-runtime/src/entity-timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ export interface EntityTimelineReasoningItem {
run_id?: string
order: TimelineOrder
status: `streaming` | `completed`
// The concatenated `reasoning_delta` content lives under
// `body.content` rather than top-level — the wrapper is what
// forces TanStack DB to materialize the include before the row
// reaches `useLiveQuery`. See the timeline-query comment.
body?: { content: string }
// Optional bolded title parsed at write time — only OpenAI Responses
// emits these; null for Anthropic / DeepSeek / Moonshot.
summary_title?: string
Expand All @@ -233,13 +238,6 @@ export interface EntityTimelineReasoningItem {
encrypted?: string
}

export interface EntityTimelineReasoningDeltaItem {
key: string
reasoning_id: string
delta: string
order: TimelineOrder
}

export interface EntityTimelineStepItem {
key: string
run_id?: string
Expand All @@ -264,7 +262,6 @@ export interface EntityTimelineRunRow {
finish_reason?: string
items: Collection<EntityTimelineRunItem>
reasoning: Collection<EntityTimelineReasoningItem>
reasoningDeltas: Collection<EntityTimelineReasoningDeltaItem>
steps: Collection<EntityTimelineStepItem>
errors: Collection<EntityTimelineErrorItem>
}
Expand Down Expand Up @@ -1351,6 +1348,43 @@ function buildEntityTimelineQuery(
}),
}))

// Mirror `runItemsSource`'s shape for reasoning rows: the
// `concat(toArray(...))` include is *defined* on this top-level
// source, then the `reasoning:` consumer inside `runSource.select`
// below dereferences it into `content: r.reasoningContent`. The
// two-layer source/consumer split is load-bearing: `useLiveQuery`
// reads of a sub-collection that has an include co-defined in the
// same select return the row with `content: null` + a deferred
// `Symbol(includesRouting)` marker. Naming the include field in a
// downstream `.select` is what forces materialization — exactly
// how `items.text.content` pulls `item.textContent` out of
// `runItemsSource`. Alias is `reasoningChunk` to avoid colliding
// with `textChunk` used above.
const runReasoningSource = q
.from({ reasoning: db.collections.reasoning })
.select(({ reasoning }) => ({
key: reasoning.key,
run_id: reasoning.run_id,
order: coalesce(reasoning._timeline_order, `~`),
status: reasoning.status,
summary_title: reasoning.summary_title,
encrypted: reasoning.encrypted,
reasoningContent: concat(
toArray(
q
.from({ reasoningChunk: db.collections.reasoningDeltas })
.where(({ reasoningChunk }) =>
eq(reasoningChunk.reasoning_id, reasoning.key)
)
.orderBy(({ reasoningChunk }) =>
coalesce(reasoningChunk._timeline_order, `~`)
)
.orderBy(({ reasoningChunk }) => reasoningChunk.key)
.select(({ reasoningChunk }) => reasoningChunk.delta)
)
),
}))

const runSource = q.from({ run: db.collections.runs }).select(({ run }) => ({
key: run.key,
order: coalesce(run._timeline_order, `~`),
Expand Down Expand Up @@ -1379,36 +1413,26 @@ function buildEntityTimelineQuery(
toolCall: item.toolCall,
})),
reasoning: q
.from({ reasoning: db.collections.reasoning })
.where(({ reasoning }) => eq(reasoning.run_id, run.key))
.orderBy(({ reasoning }) => coalesce(reasoning._timeline_order, `~`))
.orderBy(({ reasoning }) => reasoning.key)
.select(({ reasoning }) => ({
key: reasoning.key,
run_id: reasoning.run_id,
order: coalesce(reasoning._timeline_order, `~`),
status: reasoning.status,
// `content` intentionally left undefined here — the previous
// `concat(toArray(...))` correlated sub-query went stale
// (returning `null` even though deltas were present) after the
// row's status flipped to `completed`. The UI assembles
// content client-side from `run.reasoningDeltas` below, which
// is a plain non-correlated query and stays reactive.
summary_title: reasoning.summary_title,
encrypted: reasoning.encrypted,
})),
reasoningDeltas: q
.from({ reasoningDelta: db.collections.reasoningDeltas })
.where(({ reasoningDelta }) => eq(reasoningDelta.run_id, run.key))
.orderBy(({ reasoningDelta }) =>
coalesce(reasoningDelta._timeline_order, `~`)
)
.orderBy(({ reasoningDelta }) => reasoningDelta.key)
.select(({ reasoningDelta }) => ({
key: reasoningDelta.key,
reasoning_id: reasoningDelta.reasoning_id,
delta: reasoningDelta.delta,
order: coalesce(reasoningDelta._timeline_order, `~`),
.from({ r: runReasoningSource })
.where(({ r }) => eq(r.run_id, run.key))
.orderBy(({ r }) => r.order)
.orderBy(({ r }) => r.key)
.select(({ r }) => ({
key: r.key,
run_id: r.run_id,
order: r.order,
status: r.status,
// Wrap the include reference inside a `caseWhen` object body
// — the same construct items uses to materialize
// `item.textContent` into `text.content`. Bare top-level
// references leave the include deferred until UI reads it
// through `useLiveQuery`, which never gets through. UI reads
// `entry.body?.content` instead of `entry.content`.
body: caseWhen(r.key, {
content: r.reasoningContent,
}),
summary_title: r.summary_title,
encrypted: r.encrypted,
})),
steps: q
.from({ step: db.collections.steps })
Expand Down
52 changes: 7 additions & 45 deletions packages/agents-runtime/test/entity-timeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2393,26 +2393,6 @@ describe(`entity includes query`, () => {
expect(item.text?.content).toBe(`Hello world`)
})

it(`legacy reasoning sub-query: returns content even after status flip + post-status updates (currently skipped — sub-query goes stale; see #TODO)`, async () => {
// This test reproduces a staleness symptom we saw in the running
// app where the reasoning sub-collection's `content` field
// (built via `concat(toArray(<correlated delta-join>))`) returned
// `null` after the row's status flipped to `completed`. The
// current production code doesn't read that field anymore — the
// UI assembles content from `run.reasoningDeltas` instead — but
// this test is left in place as a placeholder for when we
// investigate / fix the underlying TanStack DB correlated
// sub-query behavior.
//
// Skipped by default until the projection is restored.
//
// To debug: remove the `.skip` and add a `content` field back to
// the reasoning sub-collection select in
// `entity-timeline.ts:buildEntityTimelineQuery`, then iterate
// with very small change-sets between assertions until you find
// the trigger.
})

it(`reasoning content survives multiple run-row updates in sequence`, async () => {
// Even closer to production: the run row gets updated MULTIPLE
// times (each delta + status flip), which may invalidate the
Expand Down Expand Up @@ -2480,15 +2460,7 @@ describe(`entity includes query`, () => {
expect(runRow).toBeTruthy()
const reasoning = Array.from(runRow.run.reasoning.toArray) as Array<any>
expect(reasoning).toHaveLength(1)
const deltas = Array.from(runRow.run.reasoningDeltas.toArray) as Array<{
reasoning_id: string
delta: string
}>
const content = deltas
.filter((d) => d.reasoning_id === `reasoning-0`)
.map((d) => d.delta)
.join(``)
expect(content).toBe(`AB`)
expect(reasoning[0].body?.content).toBe(`AB`)
})

it(`reasoning content populates even when text deltas are also present`, async () => {
Expand Down Expand Up @@ -2559,14 +2531,9 @@ describe(`entity includes query`, () => {
expect(runRow).toBeTruthy()
const reasoning = Array.from(runRow.run.reasoning.toArray) as Array<any>
expect(reasoning).toHaveLength(1)
const reasoningDeltas = Array.from(
runRow.run.reasoningDeltas.toArray
) as Array<{ reasoning_id: string; delta: string }>
const reasoningContent = reasoningDeltas
.filter((d) => d.reasoning_id === `reasoning-0`)
.map((d) => d.delta)
.join(``)
expect(reasoningContent).toBe(`Thinking part 1. Thinking part 2.`)
expect(reasoning[0].body?.content).toBe(
`Thinking part 1. Thinking part 2.`
)
const items = Array.from(runRow.run.items.toArray) as Array<any>
expect(items).toHaveLength(1)
expect(items[0].text?.content).toBe(`Answer part 1. Answer part 2.`)
Expand Down Expand Up @@ -2624,14 +2591,9 @@ describe(`entity includes query`, () => {
expect(reasoning).toHaveLength(1)
expect(reasoning[0].key).toBe(`reasoning-0`)
expect(reasoning[0].status).toBe(`completed`)
const reasoningDeltas = Array.from(
runRow.run.reasoningDeltas.toArray
) as Array<{ reasoning_id: string; delta: string }>
const content = reasoningDeltas
.filter((d) => d.reasoning_id === `reasoning-0`)
.map((d) => d.delta)
.join(``)
expect(content).toBe(`First thinking step. Second thinking step.`)
expect(reasoning[0].body?.content).toBe(
`First thinking step. Second thinking step.`
)
})
})
})
50 changes: 20 additions & 30 deletions packages/agents-server-ui/src/components/AgentResponse.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -401,37 +401,24 @@ export const AgentResponseLive = memo(function AgentResponseLive({
(q) => (run.errors ? q.from({ error: run.errors }) : undefined),
[run.errors]
)
// Subscribe to the run's reasoning rows + deltas. We assemble
// `content` client-side from the deltas rather than reading it
// off the projected `reasoning.content`, because the correlated
// sub-query that produced that field went stale (returning `null`)
// after the row's status flipped to `completed`. Client-side
// concat is reliable and effectively free at this scale.
// Subscribe to the run's reasoning rows so the section ticks as
// each `reasoning_delta` arrives. Empty array for runs without
// any reasoning content (most non-extended-thinking models).
const { data: reasoningRows = [] } = useLiveQuery(
(q) => (run.reasoning ? q.from({ reasoning: run.reasoning }) : undefined),
[run.reasoning]
)
const { data: reasoningDeltaRows = [] } = useLiveQuery(
(q) =>
run.reasoningDeltas ? q.from({ delta: run.reasoningDeltas }) : undefined,
[run.reasoningDeltas]
)
const reasoningEntries = useMemo<Array<ReasoningEntry>>(() => {
const contentByReasoningId = new Map<string, string>()
for (const delta of reasoningDeltaRows as Array<{
reasoning_id: string
delta: string
}>) {
contentByReasoningId.set(
delta.reasoning_id,
(contentByReasoningId.get(delta.reasoning_id) ?? ``) + delta.delta
)
}
return (
const reasoningEntries = useMemo<Array<ReasoningEntry>>(
() =>
(
reasoningRows as Array<
Omit<ReasoningEntry, `content`> & { order?: unknown }
>
reasoningRows as Array<{
key: string
status: `streaming` | `completed`
body?: { content?: string }
summary_title?: string
encrypted?: string
order?: unknown
}>
)
.slice()
// The live query already orders by `_timeline_order` then key,
Expand All @@ -445,10 +432,13 @@ export const AgentResponseLive = memo(function AgentResponseLive({
status: row.status,
summary_title: row.summary_title,
encrypted: row.encrypted,
content: contentByReasoningId.get(row.key) ?? ``,
}))
)
}, [reasoningRows, reasoningDeltaRows])
// The projection in `entity-timeline.ts` wraps content under
// `body` (inside a caseWhen) to force include materialization.
// See the comment there.
content: row.body?.content ?? ``,
})),
[reasoningRows]
)
const sortedItems = useMemo(
() => [...items].sort(compareLiveRunItems),
[items]
Expand Down
Loading