From e68de72c881bf8f6559c34dddfbc05e35794f3f9 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 15:35:05 +0200 Subject: [PATCH 01/11] feat(agents-server-ui): stream model reasoning into the UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While the model is "thinking" (Anthropic extended thinking, DeepSeek-R1 reasoning_content, Moonshot K2, OpenAI Responses summaries) the agent response now shows the reasoning text faded above the answer, with the existing `Thinking` shimmer heading + elapsed-time ticker. Once the reasoning settles, it collapses to `▸ Thought for 12s` — click to expand. Multiple reasoning rows per run render independently in order (one per LLM step in tool-using turns). End-to-end plumbing: - Schema: `reasoning` row gains `run_id`, `encrypted` (Anthropic redacted blocks must round-trip back to the model), and `summary_title` (extracted at write time). New `reasoningDeltas` collection mirrors `textDeltas` for streamed content. - Bridge: `OutboundBridge` gains `onReasoningStart` / `onReasoningDelta` / `onReasoningEnd`, parallel to text. - Adapter: `pi-adapter.ts` routes `thinking_start` / `thinking_delta` / `thinking_end` from pi-ai. Parses a `**Title**\n\n` heading once at write time (OpenAI Responses; no-op for others). - Timeline: live `reasoning: Collection` on `EntityTimelineRunRow`, content built via delta-join. - UI: new `` renders above items in `AgentResponseLive`. Streamdown body, click-to-expand on settle, redacted-block placeholder for opaque Anthropic payloads. --- .changeset/reasoning-content.md | 42 ++++ packages/agents-runtime/src/entity-schema.ts | 41 ++++ .../agents-runtime/src/entity-timeline.ts | 44 ++++ .../agents-runtime/src/outbound-bridge.ts | 81 ++++++- packages/agents-runtime/src/pi-adapter.ts | 80 +++++++ .../test/outbound-bridge.test.ts | 2 +- .../agents-runtime/test/pi-adapter.test.ts | 12 +- .../src/components/AgentResponse.tsx | 28 +++ .../components/ReasoningSection.module.css | 88 ++++++++ .../src/components/ReasoningSection.tsx | 198 ++++++++++++++++++ 10 files changed, 608 insertions(+), 8 deletions(-) create mode 100644 .changeset/reasoning-content.md create mode 100644 packages/agents-server-ui/src/components/ReasoningSection.module.css create mode 100644 packages/agents-server-ui/src/components/ReasoningSection.tsx diff --git a/.changeset/reasoning-content.md b/.changeset/reasoning-content.md new file mode 100644 index 0000000000..a6978dbe54 --- /dev/null +++ b/.changeset/reasoning-content.md @@ -0,0 +1,42 @@ +--- +'@electric-ax/agents-server-ui': minor +'@electric-ax/agents-runtime': minor +'@electric-ax/agents-desktop': patch +--- + +Stream model reasoning / extended-thinking content into the UI. While +the model is "thinking" (Anthropic extended thinking, DeepSeek-R1 +reasoning, Moonshot K2, OpenAI Responses summaries) the agent response +now shows the live reasoning text faded above the answer, with the +existing `Thinking` shimmer heading and an elapsed-time ticker. Once +the reasoning settles it collapses to `▸ Thought for 12s` — click to +expand. Multiple reasoning rows per run are rendered independently in +order, so tool-using turns show each step's reasoning separately. + +Implementation: + +- **Schema** — `reasoning` row gains `run_id`, `encrypted` (Anthropic + redacted-thinking opaque payload, must round-trip back to the model + verbatim), and `summary_title` (extracted at write time for + providers that emit a bolded heading). New `reasoningDeltas` + collection mirrors `textDeltas` for streamed content. +- **Bridge** — `OutboundBridge` gains `onReasoningStart` / + `onReasoningDelta` / `onReasoningEnd`, parallel to the text path. +- **Adapter** — `pi-adapter.ts` routes pi-ai's `thinking_start` / + `thinking_delta` / `thinking_end` events to the bridge, parses the + `**Title**\n\n` heading (OpenAI Responses only) once at + `thinking_end` so the UI doesn't re-parse on every render. +- **Timeline** — `EntityTimelineRunRow` gains a live + `reasoning: Collection` with content + built from a delta-join, mirroring `EntityTimelineTextItem`. +- **UI** — New `` component renders above the + answer in `AgentResponseLive`. Live shows faded markdown via + `Streamdown` with `ThinkingIndicator` heading + summary title + + elapsed-time ticker. Settled collapses to `Thought for Ns` with + click-to-expand. Redacted Anthropic blocks render a single muted + line — content is opaque, but the encrypted payload is still + persisted server-side so the model gets it back next turn. + +Providers without reasoning emit nothing → no reasoning section +rendered. Historical responses recorded before this PR have no +reasoning rows → no closure cue, same as today. diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 2610cdb4ff..87e179caec 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -148,7 +148,24 @@ type ToolCallValue = { } type ReasoningValue = { key?: string + run_id?: string status: `streaming` | `completed` + // Anthropic emits "redacted thinking" content blocks the client can't + // display but MUST round-trip back to the model on the next turn or + // the conversation errors. Persist verbatim, render nothing. + encrypted?: string + // OpenAI's Responses API surfaces reasoning with a bolded title line + // (`**Inspecting PR workflow**\n\n`). We split it out at write + // time so the UI can drive a separate heading without re-parsing on + // every render. Empty / absent for providers that don't emit titles + // (Anthropic, DeepSeek-R1, Moonshot K2). + summary_title?: string +} +type ReasoningDeltaValue = { + key?: string + reasoning_id: string + run_id: string + delta: string } type ErrorEventValue = { key?: string @@ -483,7 +500,20 @@ function createReasoningSchema(): Schema { return z.object({ key: z.string().optional(), ...timelineOrderField, + run_id: z.string().optional(), status: z.enum([`streaming`, `completed`]), + encrypted: z.string().optional(), + summary_title: z.string().optional(), + }) +} + +function createReasoningDeltaSchema(): Schema { + return z.object({ + key: z.string().optional(), + ...timelineOrderField, + reasoning_id: z.string(), + run_id: z.string(), + delta: z.string(), }) } @@ -768,6 +798,7 @@ export type Text = SequencedPersistedRow export type TextDelta = SequencedPersistedRow export type ToolCall = SequencedPersistedRow export type Reasoning = SequencedPersistedRow +export type ReasoningDelta = SequencedPersistedRow export type ErrorEvent = SequencedPersistedRow export type MessageReceived = SequencedPersistedRow export type WakeEntry = SequencedPersistedRow @@ -856,6 +887,7 @@ export const ENTITY_COLLECTIONS = { textDeltas: `textDeltas`, toolCalls: `toolCalls`, reasoning: `reasoning`, + reasoningDeltas: `reasoningDeltas`, errors: `errors`, inbox: `inbox`, wakes: `wakes`, @@ -879,6 +911,8 @@ export const BUILT_IN_EVENT_SCHEMAS = { tool_call: createToolCallSchema() as unknown as BuiltInEntitySchema, reasoning: createReasoningSchema() as unknown as BuiltInEntitySchema, + reasoning_delta: + createReasoningDeltaSchema() as unknown as BuiltInEntitySchema, error: createErrorEventSchema() as unknown as BuiltInEntitySchema, inbox: createMessageReceivedSchema() as unknown as BuiltInEntitySchema, @@ -912,6 +946,7 @@ type EntityCollectionsDefinition = { textDeltas: CollectionDefinition toolCalls: CollectionDefinition reasoning: CollectionDefinition + reasoningDeltas: CollectionDefinition errors: CollectionDefinition inbox: CollectionDefinition wakes: CollectionDefinition @@ -963,6 +998,12 @@ export const builtInCollections: EntityCollectionsDefinition = { type: `reasoning`, primaryKey: `key`, }, + reasoningDeltas: { + schema: + BUILT_IN_EVENT_SCHEMAS.reasoning_delta as StandardSchemaV1, + type: `reasoning_delta`, + primaryKey: `key`, + }, errors: { schema: BUILT_IN_EVENT_SCHEMAS.error as StandardSchemaV1, type: `error`, diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 5acc65286b..3bf03c1737 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -220,6 +220,22 @@ export type EntityTimelineRunItem = toolCall: EntityTimelineToolCallItem } +export interface EntityTimelineReasoningItem { + key: string + run_id?: string + order: TimelineOrder + status: `streaming` | `completed` + // Concatenated content from all `reasoning_delta` rows for this row, + // built live by the query (mirrors `EntityTimelineTextItem.content`). + content: string + // Optional bolded title parsed at write time — only OpenAI Responses + // emits these; null for Anthropic / DeepSeek / Moonshot. + summary_title?: string + // Anthropic redacted-thinking opaque payload. Persist verbatim so we + // can echo it back on the next turn; the UI shows a placeholder. + encrypted?: string +} + export interface EntityTimelineStepItem { key: string run_id?: string @@ -243,6 +259,7 @@ export interface EntityTimelineRunRow { status: `started` | `completed` | `failed` finish_reason?: string items: Collection + reasoning: Collection steps: Collection errors: Collection } @@ -1347,6 +1364,33 @@ function buildEntityTimelineQuery( }), toolCall: item.toolCall, })), + reasoning: q + .from({ reasoning: db.collections.reasoning }) + .where(({ reasoning }) => eq(reasoning.run_id, run.key)) + .orderBy(({ reasoning }) => coalesce(reasoning._timeline_order, `~`)) + .orderBy(({ reasoning }) => reasoning.key) + .select(({ reasoning }) => ({ + key: reasoning.key, + run_id: reasoning.run_id, + order: coalesce(reasoning._timeline_order, `~`), + status: reasoning.status, + // Same delta-join pattern as `items.text.textContent` above — + // we concatenate every `reasoning_delta` row scoped to this + // reasoning row's key in `_timeline_order` then `key` order. + // Live: re-runs as each delta arrives; settled: stable. + content: concat( + toArray( + q + .from({ chunk: db.collections.reasoningDeltas }) + .where(({ chunk }) => eq(chunk.reasoning_id, reasoning.key)) + .orderBy(({ chunk }) => coalesce(chunk._timeline_order, `~`)) + .orderBy(({ chunk }) => chunk.key) + .select(({ chunk }) => chunk.delta) + ) + ), + summary_title: reasoning.summary_title, + encrypted: reasoning.encrypted, + })), steps: q .from({ step: db.collections.steps }) .where(({ step }) => eq(step.run_id, run.key)) diff --git a/packages/agents-runtime/src/outbound-bridge.ts b/packages/agents-runtime/src/outbound-bridge.ts index 2c81851df1..c01c6ef1ef 100644 --- a/packages/agents-runtime/src/outbound-bridge.ts +++ b/packages/agents-runtime/src/outbound-bridge.ts @@ -7,6 +7,7 @@ interface IdCounters { step: number msg: number tc: number + reasoning: number deltaSeqs: Map } @@ -15,6 +16,7 @@ export interface OutboundIdSeed { step: number msg: number tc: number + reasoning: number cacheKey?: string } @@ -42,12 +44,13 @@ function scanCounters(events: Array): IdCounters { step: 0, msg: 0, tc: 0, + reasoning: 0, deltaSeqs: new Map(), } for (const ev of events) { if (!ev.key) continue - const match = ev.key.match(/^(run|step|msg|tc)-(\d+)/) + const match = ev.key.match(/^(run|step|msg|tc|reasoning)-(\d+)/) if (!match) continue const prefix = match[1] as keyof Omit const nextId = parseInt(match[2]!, 10) + 1 @@ -64,6 +67,7 @@ export async function loadOutboundIdSeed( const steps = db.collections.steps.toArray const texts = db.collections.texts.toArray const toolCalls = db.collections.toolCalls.toArray + const reasoning = db.collections.reasoning.toArray const runsCollectionId = db.collections.runs.id const dbSeed = { @@ -83,6 +87,10 @@ export async function loadOutboundIdSeed( toolCalls.map((toolCall) => toolCall.key), `tc` ), + reasoning: nextCounterFromKeys( + reasoning.map((r) => r.key), + `reasoning` + ), } const cachedSeed = outboundIdSeedCache.get(runsCollectionId) const seed: OutboundIdSeed = { @@ -90,6 +98,7 @@ export async function loadOutboundIdSeed( step: Math.max(dbSeed.step, cachedSeed?.step ?? 0), msg: Math.max(dbSeed.msg, cachedSeed?.msg ?? 0), tc: Math.max(dbSeed.tc, cachedSeed?.tc ?? 0), + reasoning: Math.max(dbSeed.reasoning, cachedSeed?.reasoning ?? 0), cacheKey: runsCollectionId, } outboundIdSeedCache.set(runsCollectionId, seed) @@ -110,6 +119,23 @@ export interface OutboundBridge { onTextStart: () => void onTextDelta: (delta: string) => void onTextEnd: () => void + // Reasoning / extended-thinking stream. Mirrors the text path: + // start opens a row, delta(s) append to a paired `reasoningDeltas` + // collection, end closes the row. + // + // `opts.encrypted` on end handles Anthropic's `redacted_thinking` + // content blocks — opaque payloads the client can't display but + // must round-trip back to the model verbatim on the next turn or + // the conversation errors. Persist as-is, render nothing. + // + // `opts.summaryTitle` (currently OpenAI Responses only — emitted + // as a bolded first line `**Inspecting PR workflow**\n\n`) + // is extracted at write time so the UI can drive a separate + // heading without re-parsing on every render. Skip for providers + // that don't emit titles (Anthropic, DeepSeek-R1, Moonshot K2). + onReasoningStart: () => void + onReasoningDelta: (delta: string) => void + onReasoningEnd: (opts?: { encrypted?: string; summaryTitle?: string }) => void onToolCallStart(toolCallId: string, name: string, args: unknown): void onToolCallStart(name: string, args: unknown): void onToolCallEnd( @@ -143,6 +169,7 @@ export function createOutboundBridge( step: counters.step, msg: counters.msg, tc: counters.tc, + reasoning: counters.reasoning, cacheKey, }) } @@ -152,6 +179,8 @@ export function createOutboundBridge( let currentStepNumber = 0 let currentMsgKey: string | null = null let currentTextRunKey: string | null = null + let currentReasoningKey: string | null = null + let currentReasoningRunKey: string | null = null const toolCallsById = new Map< string, { key: string; runKey: string; args: unknown } @@ -277,6 +306,56 @@ export function createOutboundBridge( ) }, + onReasoningStart() { + const runKey = requireActiveRun(`onReasoningStart`) + currentReasoningKey = `reasoning-${counters.reasoning++}` + persistSeed() + currentReasoningRunKey = runKey + counters.deltaSeqs.set(currentReasoningKey, 0) + writeEvent( + entityStateSchema.reasoning.insert({ + key: currentReasoningKey, + value: { status: `streaming`, run_id: runKey } as never, + }) as ChangeEvent + ) + }, + + onReasoningDelta(delta: string) { + if (!currentReasoningKey) return + const runKey = requireActiveRun(`onReasoningDelta`) + const seq = counters.deltaSeqs.get(currentReasoningKey) ?? 0 + counters.deltaSeqs.set(currentReasoningKey, seq + 1) + writeEvent( + entityStateSchema.reasoningDeltas.insert({ + key: `${currentReasoningKey}:${seq}`, + value: { + reasoning_id: currentReasoningKey, + run_id: runKey, + delta, + } as never, + }) as ChangeEvent + ) + }, + + onReasoningEnd(opts?: { encrypted?: string; summaryTitle?: string }) { + if (!currentReasoningKey) return + writeEvent( + entityStateSchema.reasoning.update({ + key: currentReasoningKey, + value: { + status: `completed`, + run_id: currentReasoningRunKey, + ...(opts?.encrypted !== undefined && { encrypted: opts.encrypted }), + ...(opts?.summaryTitle !== undefined && { + summary_title: opts.summaryTitle, + }), + } as never, + }) as ChangeEvent + ) + currentReasoningKey = null + currentReasoningRunKey = null + }, + onToolCallStart( toolCallIdOrName: string, nameOrArgs: string | unknown, diff --git a/packages/agents-runtime/src/pi-adapter.ts b/packages/agents-runtime/src/pi-adapter.ts index 71c4d0f99d..553cee1c48 100644 --- a/packages/agents-runtime/src/pi-adapter.ts +++ b/packages/agents-runtime/src/pi-adapter.ts @@ -28,6 +28,33 @@ import type { } from '@mariozechner/pi-ai' import type { LLMContentBlock, LLMMessage, LLMMessageContent } from './types' +/** + * Split a streamed reasoning blob into `{ title, body }`. + * + * OpenAI's Responses API surfaces reasoning summaries with a bolded + * first line — `**Inspecting PR workflow**\n\n` — which we want + * to drive a separate heading in the UI rather than render inline. + * Anthropic / DeepSeek-R1 / Moonshot K2 don't emit titles; for them + * the regex doesn't match and `title` stays `null`. + * + * Match is anchored to the start, requires a blank-line terminator + * (so partial titles mid-stream don't get prematurely promoted), and + * forbids `*` or newline inside the title (so we don't accidentally + * eat bolded emphasis later in the text). + */ +function parseReasoningSummary(text: string): { + title: string | null + body: string +} { + const content = text.trim() + const match = content.match(/^\*\*([^*\n]+)\*\*(?:\r?\n\r?\n|$)/) + if (!match) return { title: null, body: content } + return { + title: match[1]!.trim(), + body: content.slice(match[0].length).trimEnd(), + } +} + // ============================================================================ // Options // ============================================================================ @@ -221,6 +248,8 @@ export function createPiAgentAdapter( let disposed = false let stepStartTime = 0 let textStarted = false + let reasoningStarted = false + let reasoningAccum = `` let abortedRun = false const model = resolvePiModel({ @@ -274,6 +303,8 @@ export function createPiAgentAdapter( case `message_start`: { stepStartTime = Date.now() textStarted = false + reasoningStarted = false + reasoningAccum = `` bridge.onStepStart({ modelProvider: model.provider, modelId: model.id, @@ -293,6 +324,42 @@ export function createPiAgentAdapter( } bridge.onTextDelta(assistantEvent.delta ?? ``) textDeltaCount++ + } else if (assistantEvent?.type === `thinking_start`) { + // Open a reasoning row even if no delta arrives — some + // providers emit an empty thinking block (e.g. when + // reasoning is gated to a level the model didn't use). + // We close it on `thinking_end` regardless. + if (!reasoningStarted) { + reasoningStarted = true + reasoningAccum = `` + bridge.onReasoningStart() + } + } else if (assistantEvent?.type === `thinking_delta`) { + // Defensive: providers occasionally emit the first + // delta without a matching `thinking_start`. Open the + // row lazily so we don't drop the chunk. + if (!reasoningStarted) { + reasoningStarted = true + reasoningAccum = `` + bridge.onReasoningStart() + } + const delta = assistantEvent.delta ?? `` + reasoningAccum += delta + bridge.onReasoningDelta(delta) + } else if (assistantEvent?.type === `thinking_end`) { + if (reasoningStarted) { + // Parse a bolded `**Title**\n\n` prefix once, here, + // so the UI can drive a heading without re-parsing on + // every render. Only OpenAI's Responses API emits + // these today (Anthropic / DeepSeek don't); the + // helper returns no title for un-titled streams. + const { title } = parseReasoningSummary(reasoningAccum) + bridge.onReasoningEnd( + title !== null ? { summaryTitle: title } : undefined + ) + reasoningStarted = false + reasoningAccum = `` + } } else { runtimeLog.debug( logPrefix, @@ -339,6 +406,19 @@ export function createPiAgentAdapter( bridge.onTextEnd() textStarted = false } + if (reasoningStarted) { + // Provider closed the message without an explicit + // `thinking_end` (rare, but seen on aborts / errors). + // Close the open reasoning row with whatever title we + // can salvage from the accumulator so it doesn't sit + // forever in `streaming` state. + const { title } = parseReasoningSummary(reasoningAccum) + bridge.onReasoningEnd( + title !== null ? { summaryTitle: title } : undefined + ) + reasoningStarted = false + reasoningAccum = `` + } const usage = msg?.usage const hasToolCalls = msg?.content?.some( diff --git a/packages/agents-runtime/test/outbound-bridge.test.ts b/packages/agents-runtime/test/outbound-bridge.test.ts index 0b8094b0ca..10cda0a242 100644 --- a/packages/agents-runtime/test/outbound-bridge.test.ts +++ b/packages/agents-runtime/test/outbound-bridge.test.ts @@ -202,7 +202,7 @@ describe(`createOutboundBridge`, () => { it(`uses a preloaded ID seed for later reruns`, () => { const writes: Array = [] const bridge = createOutboundBridge( - { run: 2, step: 4, msg: 3, tc: 5 }, + { run: 2, step: 4, msg: 3, tc: 5, reasoning: 0 }, (event) => { writes.push(event) } diff --git a/packages/agents-runtime/test/pi-adapter.test.ts b/packages/agents-runtime/test/pi-adapter.test.ts index 3c33b6cb71..e6f4e80638 100644 --- a/packages/agents-runtime/test/pi-adapter.test.ts +++ b/packages/agents-runtime/test/pi-adapter.test.ts @@ -44,7 +44,7 @@ describe(`createPiAgentAdapter`, () => { entityUrl: `test/entity-1`, epoch: 1, messages: [], - outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0, reasoning: 0 }, writeEvent: (_event: ChangeEvent) => {}, } @@ -113,7 +113,7 @@ describe(`createPiAgentAdapter`, () => { entityUrl: `test/entity-1`, epoch: 1, messages: [], - outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0, reasoning: 0 }, writeEvent: (_event: ChangeEvent) => {}, }) const controller = new AbortController() @@ -141,7 +141,7 @@ describe(`createPiAgentAdapter`, () => { entityUrl: `test/entity-1`, epoch: 1, messages: [], - outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0, reasoning: 0 }, writeEvent: (_event: ChangeEvent) => {}, }) const controller = new AbortController() @@ -205,7 +205,7 @@ describe(`createPiAgentAdapter`, () => { entityUrl: `test/entity-1`, epoch: 1, messages: [], - outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0, reasoning: 0 }, writeEvent: (event: ChangeEvent) => { events.push(event) }, @@ -252,7 +252,7 @@ describe(`createPiAgentAdapter`, () => { entityUrl: `test/entity-1`, epoch: 1, messages: [], - outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0, reasoning: 0 }, writeEvent: (_event: ChangeEvent) => {}, } @@ -271,7 +271,7 @@ describe(`createPiAgentAdapter`, () => { entityUrl: `test/entity-1`, epoch: 1, messages: [], - outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0, reasoning: 0 }, writeEvent: (_event: ChangeEvent) => {}, } diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index 722f56fef8..f90dc89ba0 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -26,6 +26,7 @@ import { ToolCallView } from './ToolCallView' import { TimeText } from './TimeText' import { ThinkingIndicator } from './ThinkingIndicator' import { ElapsedTime } from './ElapsedTime' +import { ReasoningSection, type ReasoningEntry } from './ReasoningSection' import { formatElapsedDuration, toMillis } from '../lib/formatTime' import styles from './AgentResponse.module.css' import type { @@ -400,6 +401,25 @@ export const AgentResponseLive = memo(function AgentResponseLive({ (q) => (run.errors ? q.from({ error: run.errors }) : undefined), [run.errors] ) + // Subscribe to the run's reasoning rows so the section ticks as + // each `reasoning_delta` arrives. Empty array for runs without + // any reasoning content (most non-extended-thinking models). + const { data: reasoningRows = [] } = useLiveQuery( + (q) => (run.reasoning ? q.from({ reasoning: run.reasoning }) : undefined), + [run.reasoning] + ) + const reasoningEntries = useMemo>( + () => + (reasoningRows as Array) + .slice() + // The live query already orders by `_timeline_order` then key, + // but TanStack's projection isn't guaranteed stable across + // re-mounts — sort by `key` here as a cheap deterministic + // tiebreaker so the section doesn't visibly reflow between + // renders if two rows share an order. + .sort((a, b) => a.key.localeCompare(b.key)), + [reasoningRows] + ) const sortedItems = useMemo( () => [...items].sort(compareLiveRunItems), [items] @@ -477,6 +497,14 @@ export const AgentResponseLive = memo(function AgentResponseLive({ return ( + {/* Reasoning sits above the answer because providers stream it + first — the model "thinks" then "writes". Collapses on + settle so old turns don't drown out the actual response. */} + {sortedItems.map((item, i) => { if (item.text) { return ( diff --git a/packages/agents-server-ui/src/components/ReasoningSection.module.css b/packages/agents-server-ui/src/components/ReasoningSection.module.css new file mode 100644 index 0000000000..cdbcd76fde --- /dev/null +++ b/packages/agents-server-ui/src/components/ReasoningSection.module.css @@ -0,0 +1,88 @@ +/* Reasoning sits above the agent's visible answer. We want it to read + * as secondary content — never compete with the response — but stay + * legible enough that a curious user can skim it. + * + * Visual hierarchy: + * live → faded markdown body, animated "Thinking" heading + * settled → single muted line, click-to-expand + * redacted → single muted line, no expand + * + * Top/bottom padding matches the agent-response root so the layout + * doesn't shift when the reasoning section disappears post-collapse. */ + +.root { + margin-inline: auto; + width: max(0px, calc(100% - 24px)); +} + +.live { + border-left: 2px solid var(--ds-border-2); + padding-left: 10px; +} + +.header { + padding-bottom: 4px; +} + +.separator { + color: var(--ds-text-4); + opacity: 0.7; +} + +.title { + color: var(--ds-text-3); +} + +/* The reasoning body is rendered with `Streamdown` but at reduced + * weight so it reads as supporting material. `opacity` (rather than + * a different `color`) keeps inline code / links / emphasis tinted + * proportionally instead of forcing every span flat-muted. */ +.body { + opacity: 0.7; + font-size: 0.95em; +} + +/* Settled collapsed row. Click target spans the whole header. */ +.settled { + margin-block: 4px; +} + +.toggle { + /* Reset native button styling — visually it's just a muted line. */ + background: none; + border: none; + padding: 2px 0; + cursor: pointer; + text-align: left; + color: inherit; + font: inherit; +} + +.toggle:hover { + opacity: 1; +} + +.chevron { + display: inline-block; + width: 0.8em; + text-align: center; + color: var(--ds-text-4); + opacity: 0.7; + /* Tabular-style alignment so the chevron doesn't shift the trailing + * label between collapsed and expanded states (▸ and ▾ render at + * slightly different glyph widths in most fonts). */ + font-variant-numeric: tabular-nums; +} + +.expandedBody { + border-left: 2px solid var(--ds-border-2); + padding-left: 10px; + margin-top: 4px; + opacity: 0.7; + font-size: 0.95em; +} + +.redacted { + padding: 4px 0; + opacity: 0.6; +} diff --git a/packages/agents-server-ui/src/components/ReasoningSection.tsx b/packages/agents-server-ui/src/components/ReasoningSection.tsx new file mode 100644 index 0000000000..d2cfd15072 --- /dev/null +++ b/packages/agents-server-ui/src/components/ReasoningSection.tsx @@ -0,0 +1,198 @@ +import { useEffect, useRef, useState } from 'react' +import { Streamdown } from 'streamdown' +import { + streamdownComponents, + streamdownControls, + streamdownPlugins, +} from '../lib/streamdownConfig' +import { Stack, Text } from '../ui' +import { ThinkingIndicator } from './ThinkingIndicator' +import { ElapsedTime } from './ElapsedTime' +import { formatElapsedDuration, toMillis } from '../lib/formatTime' +import styles from './ReasoningSection.module.css' + +/** + * One reasoning row's worth of UI state — what the live query gives us + * for each row in `run.reasoning`. Mirrors `EntityTimelineReasoningItem` + * but pulled into a local type so the component file doesn't import + * from agents-runtime/client (keeps this file dep-light for the desktop + * + mobile embeds). + */ +export type ReasoningEntry = { + key: string + content: string + status: `streaming` | `completed` + summary_title?: string + encrypted?: string +} + +/** + * Renders the model's extended-thinking / reasoning content above the + * agent's visible response. Visual treatment intentionally mirrors + * Claude Code + OpenCode: + * + * - **While streaming**: faded markdown body with the `ThinkingIndicator` + * shimmer + the parsed `summary_title` (if any) as the heading. The + * elapsed-time ticker rides alongside so the user sees the model is + * actively chewing on the problem. + * - **Once settled**: collapses to a single-line `▸ Thought for 12s` + * row that the user can click to expand. Collapsed-by-default is the + * established pattern (OpenCode defaults to `hide` — reasoning is + * noise unless you're debugging). + * - **Anthropic redacted blocks** (`encrypted` set, no `content`): the + * provider has hidden the content behind a safety filter. We can't + * show anything meaningful, so render a single-line affordance and + * move on. The encrypted payload is still persisted server-side so + * the model gets it back on the next turn. + * + * Multiple reasoning rows per run are possible — typically one per LLM + * step in a tool-using turn — so we render each independently with its + * own collapse state, in order. + */ +export function ReasoningSection({ + entries, + isStreaming, + timestamp, +}: { + entries: Array + isStreaming: boolean + timestamp?: number | null +}): React.ReactElement | null { + if (entries.length === 0) return null + return ( + + {entries.map((entry) => ( + + ))} + + ) +} + +function ReasoningEntryView({ + entry, + isStreaming, + timestamp, +}: { + entry: ReasoningEntry + isStreaming: boolean + timestamp?: number | null +}): React.ReactElement { + const isLive = isStreaming && entry.status === `streaming` + const [expanded, setExpanded] = useState(false) + + // Snapshot the elapsed duration at the moment streaming flips to + // `completed`, the same `sawStreamingRef` trick used for "done in + // Xs" on `AgentResponse`. For reasoning rows that were already + // settled on first mount (page reload, scrollback into older + // turns) we don't have a real end timestamp, so the closure stays + // a bare "Thought" without a duration — better than printing a + // wildly-wrong number from `now() - userMessageTime`. + const sawStreamingRef = useRef(isLive) + if (isLive) sawStreamingRef.current = true + const [finalDurationMs, setFinalDurationMs] = useState(null) + useEffect(() => { + if ( + entry.status === `completed` && + sawStreamingRef.current && + timestamp != null && + finalDurationMs == null + ) { + setFinalDurationMs(Math.max(0, Date.now() - toMillis(timestamp))) + } + }, [entry.status, timestamp, finalDurationMs]) + + // Redacted thinking — opaque payload, nothing to render. + if (entry.encrypted && entry.content.trim().length === 0) { + return ( +
+ + ⊘ Reasoning redacted by provider safety filters + +
+ ) + } + + if (isLive) { + return ( +
+ + + {entry.summary_title && ( + <> + + · + + + {entry.summary_title} + + + )} + {timestamp != null && ( + <> + + · + + + + )} + +
+ + {entry.content} + +
+
+ ) + } + + // Settled. + const closureLabel = + finalDurationMs != null + ? `Thought for ${formatElapsedDuration(finalDurationMs)}` + : `Thought` + + return ( +
+ + {expanded && ( +
+ + {entry.content} + +
+ )} +
+ ) +} From a2d56c3393e8f02c5fae236418bee7176773e5a4 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 16:16:11 +0200 Subject: [PATCH 02/11] feat(agents): enable Anthropic extended thinking via reasoningEffort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously `withProviderPayloadDefaults` short-circuited for any provider other than OpenAI / OpenAI-Codex, so picking Claude with a `reasoningEffort` higher than `auto` produced no effect — no `thinking` parameter was added to the request, so Anthropic ran in standard mode and the model emitted no `thinking_delta` events. The inbound reasoning plumbing landed in the same PR was correct but unreachable from Anthropic without this. Now: when the chosen model is Anthropic-capable for reasoning AND `reasoningEffort` is explicit (minimal/low/medium/high), inject thinking: { type: "enabled", budget_tokens: } into the payload. Budgets follow Anthropic's docs (≥ 1024 floor): minimal=1024, low=2048, medium=8192, high=24576. `auto` stays opt-out of thinking so default sessions don't silently incur the extra reasoning tokens. --- .changeset/reasoning-content.md | 1 + packages/agents/src/model-catalog.ts | 112 ++++++++++++++++++++------- 2 files changed, 84 insertions(+), 29 deletions(-) diff --git a/.changeset/reasoning-content.md b/.changeset/reasoning-content.md index a6978dbe54..609eb119c4 100644 --- a/.changeset/reasoning-content.md +++ b/.changeset/reasoning-content.md @@ -1,6 +1,7 @@ --- '@electric-ax/agents-server-ui': minor '@electric-ax/agents-runtime': minor +'@electric-ax/agents': patch '@electric-ax/agents-desktop': patch --- diff --git a/packages/agents/src/model-catalog.ts b/packages/agents/src/model-catalog.ts index fcfc0889aa..082282e77f 100644 --- a/packages/agents/src/model-catalog.ts +++ b/packages/agents/src/model-catalog.ts @@ -213,42 +213,96 @@ function filterChoicesByEnabledModels( return filtered.length > 0 ? filtered : choices } +/** + * Anthropic-specific budget mapping for `reasoningEffort`. + * + * Anthropic's `thinking.budget_tokens` is a hard cap on tokens spent + * inside the thinking block before the model must commit to its + * answer. Docs require ≥ 1024; we scale from there. Numbers tuned so + * `medium` is the spot most "show your work" requests land, and + * `high` covers tougher reasoning without uncapped spend. + * + * Keep in sync with provider doc updates — Anthropic has shifted the + * minimum once already (older models capped lower). + */ +const ANTHROPIC_THINKING_BUDGET_BY_EFFORT: Record< + ExplicitReasoningEffort, + number +> = { + minimal: 1024, + low: 2048, + medium: 8192, + high: 24576, +} + function withProviderPayloadDefaults( config: PersistedModelConfig & { getApiKey?: AgentConfig[`getApiKey`] }, choice: BuiltinModelChoice, reasoningEffort: ExplicitReasoningEffort | null ): BuiltinAgentModelConfig { - if ( - (choice.provider !== `openai` && choice.provider !== `openai-codex`) || - !choice.reasoning - ) - return config - - const defaultEffort = choice.provider === `openai-codex` ? `low` : `minimal` - const effort = - reasoningEffort === `minimal` && choice.provider === `openai-codex` - ? `low` - : (reasoningEffort ?? defaultEffort) + if (!choice.reasoning) return config + + if (choice.provider === `openai` || choice.provider === `openai-codex`) { + const defaultEffort = choice.provider === `openai-codex` ? `low` : `minimal` + const effort = + reasoningEffort === `minimal` && choice.provider === `openai-codex` + ? `low` + : (reasoningEffort ?? defaultEffort) + + return { + ...config, + onPayload: (payload) => { + if (typeof payload !== `object` || payload === null) return undefined + const body = payload as Record + const existingReasoning = + typeof body.reasoning === `object` && body.reasoning !== null + ? (body.reasoning as Record) + : {} + + return { + ...body, + reasoning: { + ...existingReasoning, + effort, + }, + } + }, + } + } - return { - ...config, - onPayload: (payload) => { - if (typeof payload !== `object` || payload === null) return undefined - const body = payload as Record - const existingReasoning = - typeof body.reasoning === `object` && body.reasoning !== null - ? (body.reasoning as Record) - : {} - - return { - ...body, - reasoning: { - ...existingReasoning, - effort, - }, - } - }, + if (choice.provider === `anthropic`) { + // Anthropic extended thinking is opt-in per request — only enable + // when the user explicitly picks an effort level. `auto` leaves + // the standard (no-thinking) code path untouched so default + // sessions don't silently incur the extra reasoning tokens. + if (reasoningEffort === null) return config + const budgetTokens = ANTHROPIC_THINKING_BUDGET_BY_EFFORT[reasoningEffort] + + return { + ...config, + onPayload: (payload) => { + if (typeof payload !== `object` || payload === null) return undefined + const body = payload as Record + // Pass through any existing `thinking` so a caller-supplied + // payload (test fixtures, future overrides) can still set + // `type: "disabled"` explicitly without us clobbering it. + const existingThinking = + typeof body.thinking === `object` && body.thinking !== null + ? (body.thinking as Record) + : {} + return { + ...body, + thinking: { + type: `enabled`, + budget_tokens: budgetTokens, + ...existingThinking, + }, + } + }, + } } + + return config } function parseReasoningEffort(value: unknown): ExplicitReasoningEffort | null { From 74d3e77eac11dd1927291cc4e9ce3029103c8a14 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 8 Jun 2026 14:53:13 +0200 Subject: [PATCH 03/11] fix(reasoning-content): make text + thinking actually stream into the UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three latent bugs in the reasoning-content branch that together made extended thinking and the assistant's answer text fail to render: 1. **Alias collision in the timeline live query** — `entity-timeline.ts` had two correlated sub-queries (one for `items.text.content`, one for `reasoning.content`) both using `chunk` as the `from({...})` alias. TanStack DB silently mis-bound the correlation when both were active in the same run projection, so `items.text.content` came back as an empty string even though the deltas were present in `db.collections.textDeltas`. Reasoning won the binding; the answer didn't render at all. Fix: rename the inner alias to `textChunk`, and hoist the union row's text fields to top-level scalars (`text_key`, `text_run_id`, …) so the correlation references a top-level field instead of a nested `item.text.key` (also a source of empty joins). 2. **Anthropic thinking always-on instead of opt-in** — `withProviderPayloadDefaults` short-circuited for Anthropic when `reasoningEffort` was `auto`, so no `thinking` parameter ever reached the API. The OpenAI branch already defaulted `auto` to `minimal`; Anthropic now does the same (1024-token budget). `low` / `medium` / `high` scale the budget exactly as before. 3. **Anthropic `thinking` merge order** — pi-ai writes `thinking: { type: "disabled" }` into the request body by default. Our `onPayload` was merging `existingThinking` _last_, so the default `type: "disabled"` clobbered our `type: "enabled"` and the API rejected `budget_tokens` with `thinking.disabled.budget_tokens: Extra inputs are not permitted`. Spread `existingThinking` first now, then `type` + `budget_tokens`. Tests: - `entity-timeline.test.ts` — regression test exercises `createEntityTimelineQuery` end-to-end with text and reasoning rows in the same run; fails on the alias collision, passes with the rename + flat-field projection. - `model-catalog.test.ts` — adds Anthropic-side coverage that mirrors the existing OpenAI tests: always-on minimal budget on `auto`, scaled budget on explicit effort, and `type: disabled` override for pre-existing `thinking` in the payload. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/reasoning-content.md | 8 +- .../agents-runtime/src/entity-timeline.ts | 58 ++++---- .../test/entity-timeline.test.ts | 135 ++++++++++++++++++ packages/agents/src/model-catalog.ts | 20 +-- packages/agents/test/model-catalog.test.ts | 82 +++++++++++ 5 files changed, 268 insertions(+), 35 deletions(-) diff --git a/.changeset/reasoning-content.md b/.changeset/reasoning-content.md index 609eb119c4..4753fa1cbd 100644 --- a/.changeset/reasoning-content.md +++ b/.changeset/reasoning-content.md @@ -40,4 +40,10 @@ Implementation: Providers without reasoning emit nothing → no reasoning section rendered. Historical responses recorded before this PR have no -reasoning rows → no closure cue, same as today. +closure cue, same as today. + +Anthropic extended thinking is now always-on for reasoning-capable +models: `reasoningEffort: auto` maps to the minimal budget +(1024 tokens), matching the OpenAI branch where `auto` already +defaulted to `minimal`. Explicit `low`/`medium`/`high` scale the +budget as before. diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 3bf03c1737..cde6d003df 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -1300,6 +1300,11 @@ function buildEntityTimelineQuery( new_state: signal.new_state, })) + // Hoist text fields to top-level scalars on the union row. The + // text-delta join below correlates on `item.text_key`, and TanStack + // DB's correlated sub-queries only resolve **top-level** fields of + // the outer row — referencing a nested field (`item.text.key`) + // returned an empty join, even though the deltas were present. const runItemsSource = q .unionAll({ text: db.collections.texts, @@ -1308,22 +1313,10 @@ function buildEntityTimelineQuery( .select(({ text, toolCall }) => ({ order: coalesce(text._timeline_order, toolCall._timeline_order, `~`), run_id: coalesce(text.run_id, toolCall.run_id, ``), - text: caseWhen(text.key, { - key: text.key, - run_id: text.run_id, - order: coalesce(text._timeline_order, `~`), - status: text.status, - }), - textContent: concat( - toArray( - q - .from({ chunk: db.collections.textDeltas }) - .where(({ chunk }) => eq(chunk.text_id, text.key)) - .orderBy(({ chunk }) => coalesce(chunk._timeline_order, `~`)) - .orderBy(({ chunk }) => chunk.key) - .select(({ chunk }) => chunk.delta) - ) - ), + text_key: text.key, + text_run_id: text.run_id, + text_order: coalesce(text._timeline_order, `~`), + text_status: text.status, toolCall: caseWhen(toolCall.key, { key: toolCall.key, run_id: toolCall.run_id, @@ -1348,19 +1341,36 @@ function buildEntityTimelineQuery( .orderBy(({ item }) => item.order) .orderBy(({ item }) => coalesce( - caseWhen(item.text.key, `text`), + caseWhen(item.text_key, `text`), caseWhen(item.toolCall.key, `toolCall`), `` ) ) - .orderBy(({ item }) => coalesce(item.text.key, item.toolCall.key, ``)) + .orderBy(({ item }) => coalesce(item.text_key, item.toolCall.key, ``)) .select(({ item }) => ({ - text: caseWhen(item.text.key, { - key: item.text.key, - run_id: item.text.run_id, - order: item.text.order, - status: item.text.status, - content: item.textContent, + text: caseWhen(item.text_key, { + key: item.text_key, + run_id: item.text_run_id, + order: item.text_order, + status: item.text_status, + // Concatenated delta content. The alias here MUST NOT collide + // with any other `from({...})` alias in this query graph — when + // we previously used the obvious `chunk` alias, this join + // silently returned empty strings (a `chunk` alias is also + // used inside the reasoning content sub-query below, and the + // collision broke correlated row binding). + content: concat( + toArray( + q + .from({ textChunk: db.collections.textDeltas }) + .where(({ textChunk }) => eq(textChunk.text_id, item.text_key)) + .orderBy(({ textChunk }) => + coalesce(textChunk._timeline_order, `~`) + ) + .orderBy(({ textChunk }) => textChunk.key) + .select(({ textChunk }) => textChunk.delta) + ) + ), }), toolCall: item.toolCall, })), diff --git a/packages/agents-runtime/test/entity-timeline.test.ts b/packages/agents-runtime/test/entity-timeline.test.ts index 4c2923cd82..fb9c97e890 100644 --- a/packages/agents-runtime/test/entity-timeline.test.ts +++ b/packages/agents-runtime/test/entity-timeline.test.ts @@ -7,6 +7,7 @@ import { buildEntityTimelineData, compareTimelineOrders, createEntityIncludesQuery, + createEntityTimelineQuery, getEntityState, normalizeEntityTimelineData, } from '../src/entity-timeline' @@ -2257,5 +2258,139 @@ describe(`entity includes query`, () => { expect(liveEntity?.type).toBeUndefined() expect(liveEntity?.status).toBeUndefined() }) + + function createTimelineCollections() { + let nextOffset = 1 + let nextSeq = 1 + const takeOffset = () => offset(nextOffset++) + const takeSeq = () => nextSeq++ + const runs = createSyncCollection(`tl-runs`, takeOffset) + const texts = createSyncCollection(`tl-texts`, takeOffset) + const textDeltas = createSyncCollection(`tl-textDeltas`, takeOffset) + const toolCalls = createSyncCollection(`tl-toolCalls`, takeOffset) + const steps = createSyncCollection(`tl-steps`, takeOffset) + const errors = createSyncCollection(`tl-errors`, takeOffset) + const inbox = createSyncCollection(`tl-inbox`, takeOffset) + const wakes = createSyncCollection(`tl-wakes`, takeOffset) + const signals = createSyncCollection(`tl-signals`, takeOffset) + const contextInserted = createSyncCollection( + `tl-context-inserted`, + takeOffset + ) + const contextRemoved = createSyncCollection( + `tl-context-removed`, + takeOffset + ) + const manifests = createSyncCollection(`tl-manifests`, takeOffset) + const childStatus = createSyncCollection(`tl-child-status`, takeOffset) + const reasoning = createSyncCollection(`tl-reasoning`, takeOffset) + const reasoningDeltas = createSyncCollection( + `tl-reasoningDeltas`, + takeOffset + ) + return { + collections: { + runs: runs.collection, + texts: texts.collection, + textDeltas: textDeltas.collection, + toolCalls: toolCalls.collection, + steps: steps.collection, + errors: errors.collection, + inbox: inbox.collection, + wakes: wakes.collection, + signals: signals.collection, + contextInserted: contextInserted.collection, + contextRemoved: contextRemoved.collection, + manifests: manifests.collection, + childStatus: childStatus.collection, + reasoning: reasoning.collection, + reasoningDeltas: reasoningDeltas.collection, + }, + sync: { + runs: withSeqInjection(runs, takeSeq), + texts: withSeqInjection(texts, takeSeq), + textDeltas: withSeqInjection(textDeltas, takeSeq), + toolCalls: withSeqInjection(toolCalls, takeSeq), + steps: withSeqInjection(steps, takeSeq), + errors: withSeqInjection(errors, takeSeq), + inbox: withSeqInjection(inbox, takeSeq), + wakes: withSeqInjection(wakes, takeSeq), + signals: withSeqInjection(signals, takeSeq), + contextInserted: withSeqInjection(contextInserted, takeSeq), + contextRemoved: withSeqInjection(contextRemoved, takeSeq), + manifests: withSeqInjection(manifests, takeSeq), + childStatus: withSeqInjection(childStatus, takeSeq), + reasoning: withSeqInjection(reasoning, takeSeq), + reasoningDeltas: withSeqInjection(reasoningDeltas, takeSeq), + }, + } + } + + function getRows(liveQuery: any): Array { + return Array.from(liveQuery.entries()).map(([, v]: any) => v) + } + + it(`live items.text.content streams in even alongside reasoning (alias-collision regression)`, async () => { + // Regression: the text-content correlated sub-query inside + // `items.select(...)` and the reasoning-content sub-query both + // used `chunk` as their `from({...})` alias. The collision broke + // the items text-content join silently — `content` came back as + // an empty string even though the deltas were in the local DB. + // The fix is to use distinct aliases (`textChunk` vs `chunk`). + const { collections, sync } = createTimelineCollections() + const liveQuery = createLiveQueryCollection({ + query: createEntityTimelineQuery({ collections } as any), + startSync: true, + }) + await liveQuery.preload() + + sync.runs.insert({ key: `run-0`, status: `started` }) + sync.texts.insert({ + key: `msg-0`, + run_id: `run-0`, + status: `streaming`, + }) + sync.textDeltas.insert({ + key: `msg-0:0`, + text_id: `msg-0`, + run_id: `run-0`, + delta: `Hello`, + }) + sync.textDeltas.insert({ + key: `msg-0:1`, + text_id: `msg-0`, + run_id: `run-0`, + delta: ` world`, + }) + // Insert a reasoning row alongside the text row so the items + // text-content sub-query and the reasoning sub-query are both + // active in the same live projection — that's the configuration + // that surfaced the collision. + sync.reasoning.insert({ + key: `reasoning-0`, + run_id: `run-0`, + status: `streaming`, + }) + sync.texts.update({ + key: `msg-0`, + run_id: `run-0`, + status: `completed`, + }) + sync.runs.update({ + key: `run-0`, + status: `completed`, + finish_reason: `stop`, + }) + await new Promise((r) => setTimeout(r, 50)) + + const rows = getRows(liveQuery) + const runRow = rows.find((r) => r.run?.key === `run-0`) + expect(runRow).toBeTruthy() + const items = Array.from(runRow.run.items.toArray) as Array + expect(items).toHaveLength(1) + const item = items[0] + expect(item.text?.key).toBe(`msg-0`) + expect(item.text?.content).toBe(`Hello world`) + }) }) }) diff --git a/packages/agents/src/model-catalog.ts b/packages/agents/src/model-catalog.ts index 082282e77f..8783ce0ae1 100644 --- a/packages/agents/src/model-catalog.ts +++ b/packages/agents/src/model-catalog.ts @@ -271,21 +271,21 @@ function withProviderPayloadDefaults( } if (choice.provider === `anthropic`) { - // Anthropic extended thinking is opt-in per request — only enable - // when the user explicitly picks an effort level. `auto` leaves - // the standard (no-thinking) code path untouched so default - // sessions don't silently incur the extra reasoning tokens. - if (reasoningEffort === null) return config - const budgetTokens = ANTHROPIC_THINKING_BUDGET_BY_EFFORT[reasoningEffort] + // `auto` maps to the minimal budget so extended thinking is always + // on for reasoning-capable Anthropic models, matching the OpenAI + // branch above (where `auto` falls through to a `minimal` default). + const effectiveEffort = reasoningEffort ?? `minimal` + const budgetTokens = ANTHROPIC_THINKING_BUDGET_BY_EFFORT[effectiveEffort] return { ...config, onPayload: (payload) => { if (typeof payload !== `object` || payload === null) return undefined const body = payload as Record - // Pass through any existing `thinking` so a caller-supplied - // payload (test fixtures, future overrides) can still set - // `type: "disabled"` explicitly without us clobbering it. + // pi-ai writes `thinking: { type: "disabled" }` into the payload + // by default. Merge our enabled-thinking values last so they win + // — otherwise the API rejects `budget_tokens` for a disabled + // `thinking` block. const existingThinking = typeof body.thinking === `object` && body.thinking !== null ? (body.thinking as Record) @@ -293,9 +293,9 @@ function withProviderPayloadDefaults( return { ...body, thinking: { + ...existingThinking, type: `enabled`, budget_tokens: budgetTokens, - ...existingThinking, }, } }, diff --git a/packages/agents/test/model-catalog.test.ts b/packages/agents/test/model-catalog.test.ts index bdd5f7c436..3c6eb3ffb4 100644 --- a/packages/agents/test/model-catalog.test.ts +++ b/packages/agents/test/model-catalog.test.ts @@ -117,6 +117,88 @@ describe(`model catalog`, () => { }) }) + it(`enables Anthropic extended thinking with a minimal budget when reasoningEffort is auto`, async () => { + process.env.ANTHROPIC_API_KEY = `test-anthropic-key` + vi.stubGlobal( + `fetch`, + vi.fn(async (url: string) => { + if (String(url).includes(`api.anthropic.com`)) { + return { + ok: true, + status: 200, + json: async () => ({ data: [{ id: `claude-sonnet-4-6` }] }), + } + } + return { ok: false, status: 401, json: async () => ({}) } + }) + ) + + const catalog = await createBuiltinModelCatalog() + const config = resolveBuiltinModelConfig(catalog!, { + model: `anthropic:claude-sonnet-4-6`, + }) + + expect(config.onPayload).toBeTypeOf(`function`) + expect(config.onPayload!({}, {} as any)).toEqual({ + thinking: { type: `enabled`, budget_tokens: 1024 }, + }) + }) + + it(`overrides a pre-existing thinking.type=disabled in the Anthropic payload`, async () => { + process.env.ANTHROPIC_API_KEY = `test-anthropic-key` + vi.stubGlobal( + `fetch`, + vi.fn(async (url: string) => { + if (String(url).includes(`api.anthropic.com`)) { + return { + ok: true, + status: 200, + json: async () => ({ data: [{ id: `claude-sonnet-4-6` }] }), + } + } + return { ok: false, status: 401, json: async () => ({}) } + }) + ) + + const catalog = await createBuiltinModelCatalog() + const config = resolveBuiltinModelConfig(catalog!, { + model: `anthropic:claude-sonnet-4-6`, + }) + + expect( + config.onPayload!({ thinking: { type: `disabled` } }, {} as any) + ).toEqual({ + thinking: { type: `enabled`, budget_tokens: 1024 }, + }) + }) + + it(`scales Anthropic thinking budget with explicit reasoningEffort`, async () => { + process.env.ANTHROPIC_API_KEY = `test-anthropic-key` + vi.stubGlobal( + `fetch`, + vi.fn(async (url: string) => { + if (String(url).includes(`api.anthropic.com`)) { + return { + ok: true, + status: 200, + json: async () => ({ data: [{ id: `claude-sonnet-4-6` }] }), + } + } + return { ok: false, status: 401, json: async () => ({}) } + }) + ) + + const catalog = await createBuiltinModelCatalog() + const config = resolveBuiltinModelConfig(catalog!, { + model: `anthropic:claude-sonnet-4-6`, + reasoningEffort: `high`, + }) + + expect(config.onPayload!({}, {} as any)).toEqual({ + thinking: { type: `enabled`, budget_tokens: 24576 }, + }) + }) + it(`does not expose providers whose keys are rejected`, async () => { vi.stubGlobal( `fetch`, From 012765669eb8c9f8ac4fab4519cbf67cdcc5f8d4 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 8 Jun 2026 15:27:56 +0200 Subject: [PATCH 04/11] fix(reasoning-content): assemble reasoning content client-side from deltas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reasoning sub-collection's `content` field — projected via `concat(toArray())` — went stale in the running app after the row's status flipped to `completed`, surfacing `content: null` in the live query even though the deltas were still present in the local DB. The expand-thought-block view rendered an empty body until the user navigated away and back (forcing a fresh live-query subscription), at which point the join evaluated cleanly. Unit tests for the same projection pattern all pass — the bug only reproduces in the running app, against an established live-query graph with overlapping text/reasoning subscriptions. The sub-query itself is correct (data is there after a fresh subscription), but something about the long-lived subscription state makes the correlated row binding stale. Sidestep the unreliable projection entirely: - **Timeline query** — drop the `content` field from `EntityTimelineReasoningItem`. Expose `run.reasoningDeltas` as a parallel sub-collection (mirroring `run.reasoning`), surfacing the raw deltas keyed by `reasoning_id`. - **UI** — `AgentResponseLive` subscribes to both `run.reasoning` and `run.reasoningDeltas`, builds a `Map` from the deltas client-side, and merges it onto the reasoning rows before handing them to ``. Reactive on every delta arrival, no stale state. - **State lift** — `expanded` for the collapsed "Thought for Ns" toggle moves from `ReasoningEntryView` (per-entry) up to `ReasoningSection` (keyed by `entry.key`), so the user's choice survives any spurious unmount of the entry view (virtualizer measurement passes, brief entries-empty states, etc.). Tests: - New regressions in `entity-timeline.test.ts` exercise the deltas sub-collection with the same shape as the failing production scenario: reasoning + text together, multi-step run-row updates, status transitions. Follow-up: investigate why the original correlated sub-query goes stale only against long-lived live-query graphs (passes in tests). The `content` projection has been left commented-out in case we want to restore it after fixing the underlying TanStack DB issue. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/agents-runtime/src/client.ts | 1 + .../agents-runtime/src/entity-timeline.ts | 44 ++-- .../test/entity-timeline.test.ts | 221 ++++++++++++++++++ .../src/components/AgentResponse.tsx | 47 +++- .../src/components/ReasoningSection.tsx | 27 ++- 5 files changed, 311 insertions(+), 29 deletions(-) diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 05c5a27b7c..8101b8e226 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -65,6 +65,7 @@ export type { EntityTimelineRunItem, EntityTimelineSection, EntityTimelineState, + EntityTimelineReasoningDeltaItem, EntityTimelineTextChunk, EntityTimelineTextItem, EntityTimelineToolCallItem, diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index cde6d003df..8489a30ff1 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -225,9 +225,6 @@ export interface EntityTimelineReasoningItem { run_id?: string order: TimelineOrder status: `streaming` | `completed` - // Concatenated content from all `reasoning_delta` rows for this row, - // built live by the query (mirrors `EntityTimelineTextItem.content`). - content: string // Optional bolded title parsed at write time — only OpenAI Responses // emits these; null for Anthropic / DeepSeek / Moonshot. summary_title?: string @@ -236,6 +233,13 @@ export interface EntityTimelineReasoningItem { encrypted?: string } +export interface EntityTimelineReasoningDeltaItem { + key: string + reasoning_id: string + delta: string + order: TimelineOrder +} + export interface EntityTimelineStepItem { key: string run_id?: string @@ -260,6 +264,7 @@ export interface EntityTimelineRunRow { finish_reason?: string items: Collection reasoning: Collection + reasoningDeltas: Collection steps: Collection errors: Collection } @@ -1384,23 +1389,28 @@ function buildEntityTimelineQuery( run_id: reasoning.run_id, order: coalesce(reasoning._timeline_order, `~`), status: reasoning.status, - // Same delta-join pattern as `items.text.textContent` above — - // we concatenate every `reasoning_delta` row scoped to this - // reasoning row's key in `_timeline_order` then `key` order. - // Live: re-runs as each delta arrives; settled: stable. - content: concat( - toArray( - q - .from({ chunk: db.collections.reasoningDeltas }) - .where(({ chunk }) => eq(chunk.reasoning_id, reasoning.key)) - .orderBy(({ chunk }) => coalesce(chunk._timeline_order, `~`)) - .orderBy(({ chunk }) => chunk.key) - .select(({ chunk }) => chunk.delta) - ) - ), + // `content` intentionally left undefined here — the previous + // `concat(toArray(...))` correlated sub-query went stale + // (returning `null` even though deltas were present) after the + // row's status flipped to `completed`. The UI assembles + // content client-side from `run.reasoningDeltas` below, which + // is a plain non-correlated query and stays reactive. summary_title: reasoning.summary_title, encrypted: reasoning.encrypted, })), + reasoningDeltas: q + .from({ reasoningDelta: db.collections.reasoningDeltas }) + .where(({ reasoningDelta }) => eq(reasoningDelta.run_id, run.key)) + .orderBy(({ reasoningDelta }) => + coalesce(reasoningDelta._timeline_order, `~`) + ) + .orderBy(({ reasoningDelta }) => reasoningDelta.key) + .select(({ reasoningDelta }) => ({ + key: reasoningDelta.key, + reasoning_id: reasoningDelta.reasoning_id, + delta: reasoningDelta.delta, + order: coalesce(reasoningDelta._timeline_order, `~`), + })), steps: q .from({ step: db.collections.steps }) .where(({ step }) => eq(step.run_id, run.key)) diff --git a/packages/agents-runtime/test/entity-timeline.test.ts b/packages/agents-runtime/test/entity-timeline.test.ts index fb9c97e890..7911c301a1 100644 --- a/packages/agents-runtime/test/entity-timeline.test.ts +++ b/packages/agents-runtime/test/entity-timeline.test.ts @@ -2392,5 +2392,226 @@ describe(`entity includes query`, () => { expect(item.text?.key).toBe(`msg-0`) expect(item.text?.content).toBe(`Hello world`) }) + + it(`reasoning content survives multiple run-row updates in sequence`, async () => { + // Even closer to production: the run row gets updated MULTIPLE + // times (each delta + status flip), which may invalidate the + // child sub-collection between evaluations. + const { collections, sync } = createTimelineCollections() + const liveQuery = createLiveQueryCollection({ + query: createEntityTimelineQuery({ collections } as any), + startSync: true, + }) + await liveQuery.preload() + + sync.runs.insert({ key: `run-0`, status: `started` }) + sync.reasoning.insert({ + key: `reasoning-0`, + run_id: `run-0`, + status: `streaming`, + }) + sync.reasoningDeltas.insert({ + key: `reasoning-0:0`, + reasoning_id: `reasoning-0`, + run_id: `run-0`, + delta: `A`, + }) + sync.reasoningDeltas.insert({ + key: `reasoning-0:1`, + reasoning_id: `reasoning-0`, + run_id: `run-0`, + delta: `B`, + }) + sync.reasoning.update({ + key: `reasoning-0`, + run_id: `run-0`, + status: `completed`, + }) + // Then several text rows / deltas (each triggers run updates + // through derived projections). + sync.texts.insert({ + key: `msg-0`, + run_id: `run-0`, + status: `streaming`, + }) + for (let i = 0; i < 5; i++) { + sync.textDeltas.insert({ + key: `msg-0:${i}`, + text_id: `msg-0`, + run_id: `run-0`, + delta: `t${i}`, + }) + } + sync.texts.update({ + key: `msg-0`, + run_id: `run-0`, + status: `completed`, + }) + // Finally the run row update — the moment the bug surfaces. + sync.runs.update({ + key: `run-0`, + status: `completed`, + finish_reason: `stop`, + }) + await new Promise((r) => setTimeout(r, 100)) + + const rows = getRows(liveQuery) + const runRow = rows.find((r) => r.run?.key === `run-0`) + expect(runRow).toBeTruthy() + const reasoning = Array.from(runRow.run.reasoning.toArray) as Array + expect(reasoning).toHaveLength(1) + const deltas = Array.from(runRow.run.reasoningDeltas.toArray) as Array<{ + reasoning_id: string + delta: string + }> + const content = deltas + .filter((d) => d.reasoning_id === `reasoning-0`) + .map((d) => d.delta) + .join(``) + expect(content).toBe(`AB`) + }) + + it(`reasoning content populates even when text deltas are also present`, async () => { + // Production scenario: a run has BOTH text deltas and reasoning + // deltas. The reasoning sub-query was returning `content: null` + // in the running app even though the deltas were in the local DB. + const { collections, sync } = createTimelineCollections() + const liveQuery = createLiveQueryCollection({ + query: createEntityTimelineQuery({ collections } as any), + startSync: true, + }) + await liveQuery.preload() + + sync.runs.insert({ key: `run-0`, status: `started` }) + sync.reasoning.insert({ + key: `reasoning-0`, + run_id: `run-0`, + status: `streaming`, + }) + sync.reasoningDeltas.insert({ + key: `reasoning-0:0`, + reasoning_id: `reasoning-0`, + run_id: `run-0`, + delta: `Thinking part 1. `, + }) + sync.reasoningDeltas.insert({ + key: `reasoning-0:1`, + reasoning_id: `reasoning-0`, + run_id: `run-0`, + delta: `Thinking part 2.`, + }) + sync.reasoning.update({ + key: `reasoning-0`, + run_id: `run-0`, + status: `completed`, + }) + sync.texts.insert({ + key: `msg-0`, + run_id: `run-0`, + status: `streaming`, + }) + sync.textDeltas.insert({ + key: `msg-0:0`, + text_id: `msg-0`, + run_id: `run-0`, + delta: `Answer part 1. `, + }) + sync.textDeltas.insert({ + key: `msg-0:1`, + text_id: `msg-0`, + run_id: `run-0`, + delta: `Answer part 2.`, + }) + sync.texts.update({ + key: `msg-0`, + run_id: `run-0`, + status: `completed`, + }) + sync.runs.update({ + key: `run-0`, + status: `completed`, + finish_reason: `stop`, + }) + await new Promise((r) => setTimeout(r, 50)) + + const rows = getRows(liveQuery) + const runRow = rows.find((r) => r.run?.key === `run-0`) + expect(runRow).toBeTruthy() + const reasoning = Array.from(runRow.run.reasoning.toArray) as Array + expect(reasoning).toHaveLength(1) + const reasoningDeltas = Array.from( + runRow.run.reasoningDeltas.toArray + ) as Array<{ reasoning_id: string; delta: string }> + const reasoningContent = reasoningDeltas + .filter((d) => d.reasoning_id === `reasoning-0`) + .map((d) => d.delta) + .join(``) + expect(reasoningContent).toBe(`Thinking part 1. Thinking part 2.`) + const items = Array.from(runRow.run.items.toArray) as Array + expect(items).toHaveLength(1) + expect(items[0].text?.content).toBe(`Answer part 1. Answer part 2.`) + }) + + it(`reasoning content remains populated after status flips to completed`, async () => { + // Reproduces the bug where the reasoning row's `content` field + // came back as `undefined` (not even `""`) once the row's status + // transitioned to `completed`, even though the deltas were still + // present in the local DB. This made the "Thought for Ns" + // expanded view render an empty body. + const { collections, sync } = createTimelineCollections() + const liveQuery = createLiveQueryCollection({ + query: createEntityTimelineQuery({ collections } as any), + startSync: true, + }) + await liveQuery.preload() + + sync.runs.insert({ key: `run-0`, status: `started` }) + sync.reasoning.insert({ + key: `reasoning-0`, + run_id: `run-0`, + status: `streaming`, + }) + sync.reasoningDeltas.insert({ + key: `reasoning-0:0`, + reasoning_id: `reasoning-0`, + run_id: `run-0`, + delta: `First thinking step. `, + }) + sync.reasoningDeltas.insert({ + key: `reasoning-0:1`, + reasoning_id: `reasoning-0`, + run_id: `run-0`, + delta: `Second thinking step.`, + }) + // Now flip the row to completed — this is the transition that + // caused content to vanish in the running app. + sync.reasoning.update({ + key: `reasoning-0`, + run_id: `run-0`, + status: `completed`, + }) + sync.runs.update({ + key: `run-0`, + status: `completed`, + finish_reason: `stop`, + }) + await new Promise((r) => setTimeout(r, 50)) + + const rows = getRows(liveQuery) + const runRow = rows.find((r) => r.run?.key === `run-0`) + expect(runRow).toBeTruthy() + const reasoning = Array.from(runRow.run.reasoning.toArray) as Array + expect(reasoning).toHaveLength(1) + expect(reasoning[0].key).toBe(`reasoning-0`) + expect(reasoning[0].status).toBe(`completed`) + const reasoningDeltas = Array.from( + runRow.run.reasoningDeltas.toArray + ) as Array<{ reasoning_id: string; delta: string }> + const content = reasoningDeltas + .filter((d) => d.reasoning_id === `reasoning-0`) + .map((d) => d.delta) + .join(``) + expect(content).toBe(`First thinking step. Second thinking step.`) + }) }) }) diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index f90dc89ba0..7aa268b448 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -401,25 +401,54 @@ export const AgentResponseLive = memo(function AgentResponseLive({ (q) => (run.errors ? q.from({ error: run.errors }) : undefined), [run.errors] ) - // Subscribe to the run's reasoning rows so the section ticks as - // each `reasoning_delta` arrives. Empty array for runs without - // any reasoning content (most non-extended-thinking models). + // Subscribe to the run's reasoning rows + deltas. We assemble + // `content` client-side from the deltas rather than reading it + // off the projected `reasoning.content`, because the correlated + // sub-query that produced that field went stale (returning `null`) + // after the row's status flipped to `completed`. Client-side + // concat is reliable and effectively free at this scale. const { data: reasoningRows = [] } = useLiveQuery( (q) => (run.reasoning ? q.from({ reasoning: run.reasoning }) : undefined), [run.reasoning] ) - const reasoningEntries = useMemo>( - () => - (reasoningRows as Array) + const { data: reasoningDeltaRows = [] } = useLiveQuery( + (q) => + run.reasoningDeltas ? q.from({ delta: run.reasoningDeltas }) : undefined, + [run.reasoningDeltas] + ) + const reasoningEntries = useMemo>(() => { + const contentByReasoningId = new Map() + for (const delta of reasoningDeltaRows as Array<{ + reasoning_id: string + delta: string + }>) { + contentByReasoningId.set( + delta.reasoning_id, + (contentByReasoningId.get(delta.reasoning_id) ?? ``) + delta.delta + ) + } + return ( + ( + reasoningRows as Array< + Omit & { order?: unknown } + > + ) .slice() // The live query already orders by `_timeline_order` then key, // but TanStack's projection isn't guaranteed stable across // re-mounts — sort by `key` here as a cheap deterministic // tiebreaker so the section doesn't visibly reflow between // renders if two rows share an order. - .sort((a, b) => a.key.localeCompare(b.key)), - [reasoningRows] - ) + .sort((a, b) => a.key.localeCompare(b.key)) + .map((row) => ({ + key: row.key, + status: row.status, + summary_title: row.summary_title, + encrypted: row.encrypted, + content: contentByReasoningId.get(row.key) ?? ``, + })) + ) + }, [reasoningRows, reasoningDeltaRows]) const sortedItems = useMemo( () => [...items].sort(compareLiveRunItems), [items] diff --git a/packages/agents-server-ui/src/components/ReasoningSection.tsx b/packages/agents-server-ui/src/components/ReasoningSection.tsx index d2cfd15072..cd277e31a2 100644 --- a/packages/agents-server-ui/src/components/ReasoningSection.tsx +++ b/packages/agents-server-ui/src/components/ReasoningSection.tsx @@ -1,4 +1,4 @@ -import { useEffect, useRef, useState } from 'react' +import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { Streamdown } from 'streamdown' import { streamdownComponents, @@ -58,6 +58,18 @@ export function ReasoningSection({ isStreaming: boolean timestamp?: number | null }): React.ReactElement | null { + // Owned here rather than inside `ReasoningEntryView` so the user's + // expand/collapse choice survives the entry view being unmounted and + // remounted — e.g. when the reasoning row briefly disappears from + // the live query while another part of the run updates, or when a + // virtualizer measurement pass replaces the subtree. + const [expandedByKey, setExpandedByKey] = useState>( + {} + ) + const toggleExpanded = useCallback((key: string) => { + setExpandedByKey((prev) => ({ ...prev, [key]: !prev[key] })) + }, []) + if (entries.length === 0) return null return ( @@ -67,6 +79,8 @@ export function ReasoningSection({ entry={entry} isStreaming={isStreaming} timestamp={timestamp} + expanded={Boolean(expandedByKey[entry.key])} + onToggle={toggleExpanded} /> ))} @@ -77,13 +91,20 @@ function ReasoningEntryView({ entry, isStreaming, timestamp, + expanded, + onToggle, }: { entry: ReasoningEntry isStreaming: boolean timestamp?: number | null + expanded: boolean + onToggle: (key: string) => void }): React.ReactElement { const isLive = isStreaming && entry.status === `streaming` - const [expanded, setExpanded] = useState(false) + const handleToggle = useMemo( + () => () => onToggle(entry.key), + [entry.key, onToggle] + ) // Snapshot the elapsed duration at the moment streaming flips to // `completed`, the same `sawStreamingRef` trick used for "done in @@ -167,7 +188,7 @@ function ReasoningEntryView({