From 94d599884c5c745db55f7cf05e7b50cbcd91a182 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Tue, 23 Jun 2026 17:18:01 +0200 Subject: [PATCH] fix(codex): stream per-item LLM-call rows with full tool detail MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex collapsed an entire run into one agent_run_llm_calls row written at turn.completed, with tools stored as bare name strings (input dropped) — so the dashboard showed one end-of-run row of empty "bash" badges: not realtime, no command detail. Codex reports token usage only once (cumulative) at turn.completed, so per-row token attribution is not possible. Persist a Claude-Code-style content-block row per item.completed as it streams (text, or tool_use with full input; tool names normalized to the Claude vocab so the shared parser renders the command/args). Keep the single cumulative cost/usage row at turn.completed unchanged, so run-total cost stays accurate. New codex rows are content-block arrays and render via the existing parseClaudeCodeBlocks path; parseCodexPayload stays as a fallback for old rows. Also poll the run-detail LLM-calls list (and run status) while the run is active so the streamed rows appear live. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/backends/codex/index.ts | 112 ++++++++++++++---- tests/unit/backends/codex.test.ts | 95 +++++++++++---- tests/unit/utils/llmResponseParser.test.ts | 23 ++++ .../components/llm-calls/llm-call-list.tsx | 10 +- web/src/routes/runs/$runId.tsx | 10 +- 5 files changed, 204 insertions(+), 46 deletions(-) diff --git a/src/backends/codex/index.ts b/src/backends/codex/index.ts index f5ff61770..bce968c14 100644 --- a/src/backends/codex/index.ts +++ b/src/backends/codex/index.ts @@ -50,7 +50,6 @@ type JsonRecord = Record; */ type CodexTurnAccumulator = { textSummary: string[]; - toolNames: string[]; usage: UsageSummary | null; }; @@ -213,10 +212,11 @@ function persistTurnLlmCall(context: CodexLineContext): void { } } + // Tools/text detail now stream as their own per-item rows (persistItemRow); + // the turn.completed row carries the turn's cost/usage + a short text summary. const turnPayload = JSON.stringify({ turn: context.llmCallCount, text: acc.textSummary.join(' ').slice(0, 500) || undefined, - tools: acc.toolNames.length > 0 ? acc.toolNames : undefined, usage: usage ?? undefined, delta: delta ?? undefined, // Reasoning breakdown preserved for observability; it is already counted @@ -237,7 +237,53 @@ function persistTurnLlmCall(context: CodexLineContext): void { }); // Reset the accumulator for the next turn - context.currentTurn = { textSummary: [], toolNames: [], usage: null }; + context.currentTurn = { textSummary: [], usage: null }; +} + +/** + * Map a Codex tool name/input onto the Claude-Code tool vocabulary so the shared + * `summarizeInput` / `getToolStyle` render the argument and colour. Codex's + * command_execution surfaces as lowercase `bash`; function_call names vary. + */ +function normalizeCodexTool( + name: string, + input?: Record, +): { name: string; input?: Record } { + switch (name.toLowerCase()) { + case 'bash': + case 'shell': + return { name: 'Bash', input }; + case 'read_file': + case 'read': + return { name: 'Read', input }; + case 'write_file': + case 'write': + return { name: 'Write', input }; + case 'apply_patch': + case 'edit_file': + case 'edit': + return { name: 'Edit', input }; + default: + return { name, input }; + } +} + +/** + * Persist one realtime detail row for a completed Codex item (a text message or a + * tool call), stored as a Claude-Code-style content-block array so the shared + * response parser renders it identically (tool command/args shown). These rows + * carry NO tokens — Codex reports usage only once (cumulative) on turn.completed, + * which persistTurnLlmCall records as the single cost-bearing row. + */ +function persistItemRow(context: CodexLineContext, block: Record): void { + context.llmCallCount += 1; + logLlmCall({ + runId: context.input.runId, + callNumber: context.llmCallCount, + model: context.model, + response: JSON.stringify([block]), + engineLabel: 'Codex', + }); } /** @@ -263,7 +309,7 @@ async function handleStructuralEvent( } if (eventType === 'turn.started' || eventType === 'thread.started') { // Reset turn accumulator at the start of each new turn - context.currentTurn = { textSummary: [], toolNames: [], usage: null }; + context.currentTurn = { textSummary: [], usage: null }; return true; } if (eventType === 'item.started') { @@ -275,32 +321,58 @@ async function handleStructuralEvent( return false; } +/** + * Log + accumulate text, persisting a realtime text row only when a model ITEM + * completes (not for streaming deltas) as a content-block array. + */ +function handleCodexText( + context: CodexLineContext, + textParts: string[], + isItemCompleted: boolean, +): void { + for (const text of textParts) { + logText(context, text); + context.currentTurn.textSummary.push(text.slice(0, 200)); + } + if (isItemCompleted && textParts.length > 0) { + persistItemRow(context, { type: 'text', text: textParts.join('') }); + } +} + +/** + * Report a tool call to progress, persisting a realtime tool row (with full + * input, normalized to the Claude-Code vocab) only when the item completes. + */ +function handleCodexToolCall( + context: CodexLineContext, + toolCall: { name: string; input?: Record }, + isItemCompleted: boolean, +): void { + context.input.logWriter('DEBUG', 'Codex tool call', { + name: toolCall.name, + input: toolCall.input, + }); + context.input.progressReporter.onToolCall(toolCall.name, toolCall.input); + if (isItemCompleted) { + const normalized = normalizeCodexTool(toolCall.name, toolCall.input); + persistItemRow(context, { type: 'tool_use', name: normalized.name, input: normalized.input }); + } +} + async function handleParsedLine(context: CodexLineContext, parsed: JsonRecord): Promise { const eventType = typeof parsed.type === 'string' ? parsed.type : ''; if (await handleStructuralEvent(context, parsed, eventType)) return; const { textParts, toolCall, usage, error } = parseCodexEvent(parsed); + const isItemCompleted = eventType === 'item.completed'; if (textParts.length > 0 || toolCall) { await trackIteration(context); } - for (const text of textParts) { - logText(context, text); - // Accumulate text into the turn buffer for compact per-call payload - context.currentTurn.textSummary.push(text.slice(0, 200)); - } - - if (toolCall) { - context.input.logWriter('DEBUG', 'Codex tool call', { - name: toolCall.name, - input: toolCall.input, - }); - context.input.progressReporter.onToolCall(toolCall.name, toolCall.input); - // Track tool name in turn buffer for the compact payload - context.currentTurn.toolNames.push(toolCall.name); - } + handleCodexText(context, textParts, isItemCompleted); + if (toolCall) handleCodexToolCall(context, toolCall, isItemCompleted); if (usage) { context.input.logWriter('DEBUG', 'Codex usage', { usage }); @@ -719,7 +791,7 @@ export class CodexEngine extends NativeToolEngine { llmCallCount, cost, finalError, - currentTurn: { textSummary: [], toolNames: [], usage: null }, + currentTurn: { textSummary: [], usage: null }, cumulativeUsage: { inputTokens: 0, outputTokens: 0, diff --git a/tests/unit/backends/codex.test.ts b/tests/unit/backends/codex.test.ts index 2fe30bebc..3080bbee9 100644 --- a/tests/unit/backends/codex.test.ts +++ b/tests/unit/backends/codex.test.ts @@ -959,8 +959,9 @@ describe('CodexEngine', () => { expect(input.progressReporter.onToolCall).toHaveBeenCalledWith('bash', { command: 'cascade-tools session finish --comment done', }); - // Exactly ONE storeLlmCall row per completed turn - expect(mockStoreLlmCall).toHaveBeenCalledTimes(1); + // Two realtime per-item rows (text + tool) + one turn.completed cost row. + expect(mockStoreLlmCall).toHaveBeenCalledTimes(3); + // The cost row carries the turn usage. expect(mockStoreLlmCall).toHaveBeenCalledWith( expect.objectContaining({ inputTokens: 100, outputTokens: 50 }), ); @@ -1041,17 +1042,26 @@ describe('CodexEngine', () => { const result = await engine.execute(input); expect(result.success).toBe(true); - // Exactly two rows — one per completed turn - expect(mockStoreLlmCall).toHaveBeenCalledTimes(2); - // Codex emits CUMULATIVE session usage; rows must store per-turn DELTAS. + // Two realtime text rows (one per agent_message) interleaved with two + // turn.completed cost rows = 4 rows total. + expect(mockStoreLlmCall).toHaveBeenCalledTimes(4); + // Row 1 = 'First.' text row — a content-block array, no tokens. + const firstTextRow = mockStoreLlmCall.mock.calls[0][0] as { + response: string; + inputTokens?: number; + }; + expect(firstTextRow.inputTokens).toBeUndefined(); + expect(JSON.parse(firstTextRow.response)).toEqual([{ type: 'text', text: 'First.' }]); + // Codex emits CUMULATIVE session usage; the cost rows store per-turn DELTAS. // Feeding cumulative {50,20} then {80,30} → deltas {50,20} and {30,10}. + // Row 2 = turn-1 cost row; row 4 = turn-2 cost row. expect(mockStoreLlmCall).toHaveBeenNthCalledWith( - 1, - expect.objectContaining({ callNumber: 1, inputTokens: 50, outputTokens: 20 }), + 2, + expect.objectContaining({ callNumber: 2, inputTokens: 50, outputTokens: 20 }), ); expect(mockStoreLlmCall).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ callNumber: 2, inputTokens: 30, outputTokens: 10 }), + 4, + expect.objectContaining({ callNumber: 4, inputTokens: 30, outputTokens: 10 }), ); }); @@ -1088,7 +1098,7 @@ describe('CodexEngine', () => { ); }); - it('stores a compact turn-scoped payload with text summary and tool names', async () => { + it('streams per-item rows (text + tool with input) and a compact turn cost row', async () => { mockSpawn.mockImplementation((_cmd: string, args: string[]) => { const outputPath = args[args.indexOf('-o') + 1]; return createMockChild({ @@ -1115,18 +1125,59 @@ describe('CodexEngine', () => { const input = makeInput({ repoDir: workspaceDir, runId: 'run-payload-shape' }); await engine.execute(input); - expect(mockStoreLlmCall).toHaveBeenCalledTimes(1); - const [{ response }] = mockStoreLlmCall.mock.calls[0] as [{ response: string }][]; - const payload = JSON.parse(response) as Record; - // Payload must be a compact object, NOT a raw JSONL line dump - expect(payload).toMatchObject({ - turn: 1, - tools: ['bash'], - usage: { inputTokens: 30, outputTokens: 10 }, - }); - expect(typeof payload.text).toBe('string'); - // Payload must be reasonably sized (< 2 KB) — not a multi-KB raw event dump - expect(response.length).toBeLessThan(2000); + // 1 text row + 1 tool row + 1 turn.completed cost row. + expect(mockStoreLlmCall).toHaveBeenCalledTimes(3); + const calls = mockStoreLlmCall.mock.calls as Array< + [{ response: string; inputTokens?: number }] + >; + // Row 1: the agent message as a content-block array (renders via the shared parser). + expect(JSON.parse(calls[0][0].response)).toEqual([ + { type: 'text', text: 'I will run a command.' }, + ]); + // Row 2: the tool call keeps its full input, normalized to the Claude tool vocab. + expect(JSON.parse(calls[1][0].response)).toEqual([ + { type: 'tool_use', name: 'Bash', input: { command: 'ls' } }, + ]); + expect(calls[1][0].inputTokens).toBeUndefined(); + // Row 3: the compact turn cost row — carries usage/delta, no tool-name dump. + const costPayload = JSON.parse(calls[2][0].response) as Record; + expect(costPayload).toMatchObject({ turn: 3, usage: { inputTokens: 30, outputTokens: 10 } }); + expect(costPayload.tools).toBeUndefined(); + expect(calls[2][0].response.length).toBeLessThan(2000); + }); + + it('normalizes function_call names and persists only on item.completed (not deltas)', async () => { + mockSpawn.mockImplementation((_cmd: string, args: string[]) => { + const outputPath = args[args.indexOf('-o') + 1]; + return createMockChild({ + stdoutLines: [ + JSON.stringify({ type: 'turn.started' }), + // A streaming text delta must NOT persist a row (only completed items do). + JSON.stringify({ type: 'item.delta', delta: { type: 'text_delta', text: 'thinking…' } }), + // A completed function_call read_file → normalized to Read, input preserved. + JSON.stringify({ + type: 'item.completed', + item: { + type: 'function_call', + name: 'read_file', + arguments: '{"file_path":"src/a.ts"}', + }, + }), + JSON.stringify({ type: 'turn.completed', usage: { input_tokens: 5, output_tokens: 2 } }), + ], + onBeforeClose: () => writeFileSync(outputPath, 'done', 'utf-8'), + }); + }); + + const engine = new CodexEngine(); + await engine.execute(makeInput({ repoDir: workspaceDir, runId: 'run-normalize' })); + + // The delta did not persist; one tool row + one cost row = 2. + expect(mockStoreLlmCall).toHaveBeenCalledTimes(2); + const toolResponse = (mockStoreLlmCall.mock.calls[0][0] as { response: string }).response; + expect(JSON.parse(toolResponse)).toEqual([ + { type: 'tool_use', name: 'Read', input: { file_path: 'src/a.ts' } }, + ]); }); it('does not call storeLlmCall when no turn.completed event fires (no response events only)', async () => { diff --git a/tests/unit/utils/llmResponseParser.test.ts b/tests/unit/utils/llmResponseParser.test.ts index 2cf05db89..1e4cfec3a 100644 --- a/tests/unit/utils/llmResponseParser.test.ts +++ b/tests/unit/utils/llmResponseParser.test.ts @@ -152,6 +152,29 @@ describe.concurrent('parseLlmResponse', () => { }); }); + describe('Codex realtime per-item rows (content-block array)', () => { + // Codex now streams one content-block-array row per item.completed, so its + // tool calls carry full input and render via the shared Claude-Code path — + // no more empty-badge inputSummary. + it('renders a codex tool row with the command (normalized to Bash)', () => { + const response = JSON.stringify([ + { type: 'tool_use', name: 'Bash', input: { command: 'git status' } }, + ]); + const result = parseLlmResponse(response); + expect(result.blocks).toEqual([ + { kind: 'tool_use', name: 'Bash', inputSummary: 'git status' }, + ]); + expect(result.toolNames).toEqual(['Bash']); + }); + + it('renders a codex text row', () => { + const response = JSON.stringify([{ type: 'text', text: 'Reviewing the PR.' }]); + const result = parseLlmResponse(response); + expect(result.blocks).toEqual([{ kind: 'text', text: 'Reviewing the PR.' }]); + expect(result.textPreview).toBe('Reviewing the PR.'); + }); + }); + describe('LLMist format (gadget markup)', () => { const gadget = (name: string, args: Record) => { const argLines = Object.entries(args) diff --git a/web/src/components/llm-calls/llm-call-list.tsx b/web/src/components/llm-calls/llm-call-list.tsx index 9b1dd773e..092db2d32 100644 --- a/web/src/components/llm-calls/llm-call-list.tsx +++ b/web/src/components/llm-calls/llm-call-list.tsx @@ -8,6 +8,8 @@ import { LlmCallDetail } from './llm-call-detail.js'; interface LlmCallListProps { runId: string; + /** When the run is still active, poll so newly-persisted calls stream in live. */ + isRunning?: boolean; } type ToolCall = { name: string; inputSummary: string }; @@ -133,10 +135,14 @@ function CallRow({ runId, call, delta, isExpanded, onToggle }: CallRowProps) { ); } -export function LlmCallList({ runId }: LlmCallListProps) { +export function LlmCallList({ runId, isRunning }: LlmCallListProps) { const [expandedCall, setExpandedCall] = useState(null); - const callsQuery = useQuery(trpc.runs.listLlmCalls.queryOptions({ runId })); + const callsQuery = useQuery({ + ...trpc.runs.listLlmCalls.queryOptions({ runId }), + // While the run is active, poll so per-item rows appear in realtime. + refetchInterval: isRunning ? 3000 : false, + }); if (callsQuery.isLoading) { return
Loading LLM calls...
; diff --git a/web/src/routes/runs/$runId.tsx b/web/src/routes/runs/$runId.tsx index b3790aaac..631b7eaf9 100644 --- a/web/src/routes/runs/$runId.tsx +++ b/web/src/routes/runs/$runId.tsx @@ -19,7 +19,11 @@ function RunDetailPage() { const { runId } = runDetailRoute.useParams(); const [activeTab, setActiveTab] = useState('overview'); - const runQuery = useQuery(trpc.runs.getById.queryOptions({ id: runId })); + const runQuery = useQuery({ + ...trpc.runs.getById.queryOptions({ id: runId }), + // Poll while the run is active so status + the live-updating tabs refresh. + refetchInterval: (query) => (query.state.data?.status === 'running' ? 5000 : false), + }); if (runQuery.isLoading) { return
Loading run...
; @@ -96,7 +100,9 @@ function RunDetailPage() { {activeTab === 'overview' && } {activeTab === 'logs' && } - {activeTab === 'llm-calls' && } + {activeTab === 'llm-calls' && ( + + )} {activeTab === 'debug' && } );