Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 92 additions & 20 deletions src/backends/codex/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type JsonRecord = Record<string, unknown>;
*/
type CodexTurnAccumulator = {
textSummary: string[];
toolNames: string[];
usage: UsageSummary | null;
};

Expand Down Expand Up @@ -213,10 +212,11 @@ function persistTurnLlmCall(context: CodexLineContext): void {
}
}

// Tools/text detail now stream as their own per-item rows (persistItemRow);
// the turn.completed row carries the turn's cost/usage + a short text summary.
const turnPayload = JSON.stringify({
turn: context.llmCallCount,
text: acc.textSummary.join(' ').slice(0, 500) || undefined,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost row keeps text: acc.textSummary.join(' ')…, and parseCodexPayload turns that into a textPreview, so the agent's message now renders twice in the LLM-calls list — once as its own per-item text row, and again here on the usage row. Dropping text from this payload (leaving turn/usage/delta/reasoning) would make the cost row a pure usage row and better match the claude-code-style per-item layout the PR is aiming for. Non-blocking.

tools: acc.toolNames.length > 0 ? acc.toolNames : undefined,
usage: usage ?? undefined,
delta: delta ?? undefined,
// Reasoning breakdown preserved for observability; it is already counted
Expand All @@ -237,7 +237,53 @@ function persistTurnLlmCall(context: CodexLineContext): void {
});

// Reset the accumulator for the next turn
context.currentTurn = { textSummary: [], toolNames: [], usage: null };
context.currentTurn = { textSummary: [], usage: null };
}

/**
* Map a Codex tool name/input onto the Claude-Code tool vocabulary so the shared
* `summarizeInput` / `getToolStyle` render the argument and colour. Codex's
* command_execution surfaces as lowercase `bash`; function_call names vary.
*/
function normalizeCodexTool(
name: string,
input?: Record<string, unknown>,
): { name: string; input?: Record<string, unknown> } {
switch (name.toLowerCase()) {
case 'bash':
case 'shell':
return { name: 'Bash', input };
case 'read_file':
case 'read':
return { name: 'Read', input };
case 'write_file':
case 'write':
return { name: 'Write', input };
case 'apply_patch':
case 'edit_file':
case 'edit':
return { name: 'Edit', input };
default:
return { name, input };
}
}

/**
* Persist one realtime detail row for a completed Codex item (a text message or a
* tool call), stored as a Claude-Code-style content-block array so the shared
* response parser renders it identically (tool command/args shown). These rows
* carry NO tokens — Codex reports usage only once (cumulative) on turn.completed,
* which persistTurnLlmCall records as the single cost-bearing row.
*/
function persistItemRow(context: CodexLineContext, block: Record<string, unknown>): void {
context.llmCallCount += 1;
logLlmCall({
runId: context.input.runId,
callNumber: context.llmCallCount,
model: context.model,
response: JSON.stringify([block]),
engineLabel: 'Codex',
});
}

/**
Expand All @@ -263,7 +309,7 @@ async function handleStructuralEvent(
}
if (eventType === 'turn.started' || eventType === 'thread.started') {
// Reset turn accumulator at the start of each new turn
context.currentTurn = { textSummary: [], toolNames: [], usage: null };
context.currentTurn = { textSummary: [], usage: null };
return true;
}
if (eventType === 'item.started') {
Expand All @@ -275,32 +321,58 @@ async function handleStructuralEvent(
return false;
}

/**
* Log + accumulate text, persisting a realtime text row only when a model ITEM
* completes (not for streaming deltas) as a content-block array.
*/
function handleCodexText(
context: CodexLineContext,
textParts: string[],
isItemCompleted: boolean,
): void {
for (const text of textParts) {
logText(context, text);
context.currentTurn.textSummary.push(text.slice(0, 200));
}
if (isItemCompleted && textParts.length > 0) {
persistItemRow(context, { type: 'text', text: textParts.join('') });
}
}

/**
* Report a tool call to progress, persisting a realtime tool row (with full
* input, normalized to the Claude-Code vocab) only when the item completes.
*/
function handleCodexToolCall(
context: CodexLineContext,
toolCall: { name: string; input?: Record<string, unknown> },
isItemCompleted: boolean,
): void {
context.input.logWriter('DEBUG', 'Codex tool call', {
name: toolCall.name,
input: toolCall.input,
});
context.input.progressReporter.onToolCall(toolCall.name, toolCall.input);
if (isItemCompleted) {
const normalized = normalizeCodexTool(toolCall.name, toolCall.input);
persistItemRow(context, { type: 'tool_use', name: normalized.name, input: normalized.input });
}
}

async function handleParsedLine(context: CodexLineContext, parsed: JsonRecord): Promise<void> {
const eventType = typeof parsed.type === 'string' ? parsed.type : '';

if (await handleStructuralEvent(context, parsed, eventType)) return;

const { textParts, toolCall, usage, error } = parseCodexEvent(parsed);
const isItemCompleted = eventType === 'item.completed';

if (textParts.length > 0 || toolCall) {
await trackIteration(context);
}

for (const text of textParts) {
logText(context, text);
// Accumulate text into the turn buffer for compact per-call payload
context.currentTurn.textSummary.push(text.slice(0, 200));
}

if (toolCall) {
context.input.logWriter('DEBUG', 'Codex tool call', {
name: toolCall.name,
input: toolCall.input,
});
context.input.progressReporter.onToolCall(toolCall.name, toolCall.input);
// Track tool name in turn buffer for the compact payload
context.currentTurn.toolNames.push(toolCall.name);
}
handleCodexText(context, textParts, isItemCompleted);
if (toolCall) handleCodexToolCall(context, toolCall, isItemCompleted);

if (usage) {
context.input.logWriter('DEBUG', 'Codex usage', { usage });
Expand Down Expand Up @@ -719,7 +791,7 @@ export class CodexEngine extends NativeToolEngine {
llmCallCount,
cost,
finalError,
currentTurn: { textSummary: [], toolNames: [], usage: null },
currentTurn: { textSummary: [], usage: null },
cumulativeUsage: {
inputTokens: 0,
outputTokens: 0,
Expand Down
95 changes: 73 additions & 22 deletions tests/unit/backends/codex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,9 @@ describe('CodexEngine', () => {
expect(input.progressReporter.onToolCall).toHaveBeenCalledWith('bash', {
command: 'cascade-tools session finish --comment done',
});
// Exactly ONE storeLlmCall row per completed turn
expect(mockStoreLlmCall).toHaveBeenCalledTimes(1);
// Two realtime per-item rows (text + tool) + one turn.completed cost row.
expect(mockStoreLlmCall).toHaveBeenCalledTimes(3);
// The cost row carries the turn usage.
expect(mockStoreLlmCall).toHaveBeenCalledWith(
expect.objectContaining({ inputTokens: 100, outputTokens: 50 }),
);
Expand Down Expand Up @@ -1041,17 +1042,26 @@ describe('CodexEngine', () => {
const result = await engine.execute(input);

expect(result.success).toBe(true);
// Exactly two rows — one per completed turn
expect(mockStoreLlmCall).toHaveBeenCalledTimes(2);
// Codex emits CUMULATIVE session usage; rows must store per-turn DELTAS.
// Two realtime text rows (one per agent_message) interleaved with two
// turn.completed cost rows = 4 rows total.
expect(mockStoreLlmCall).toHaveBeenCalledTimes(4);
// Row 1 = 'First.' text row — a content-block array, no tokens.
const firstTextRow = mockStoreLlmCall.mock.calls[0][0] as {
response: string;
inputTokens?: number;
};
expect(firstTextRow.inputTokens).toBeUndefined();
expect(JSON.parse(firstTextRow.response)).toEqual([{ type: 'text', text: 'First.' }]);
// Codex emits CUMULATIVE session usage; the cost rows store per-turn DELTAS.
// Feeding cumulative {50,20} then {80,30} → deltas {50,20} and {30,10}.
// Row 2 = turn-1 cost row; row 4 = turn-2 cost row.
expect(mockStoreLlmCall).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ callNumber: 1, inputTokens: 50, outputTokens: 20 }),
2,
expect.objectContaining({ callNumber: 2, inputTokens: 50, outputTokens: 20 }),
);
expect(mockStoreLlmCall).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ callNumber: 2, inputTokens: 30, outputTokens: 10 }),
4,
expect.objectContaining({ callNumber: 4, inputTokens: 30, outputTokens: 10 }),
);
});

Expand Down Expand Up @@ -1088,7 +1098,7 @@ describe('CodexEngine', () => {
);
});

it('stores a compact turn-scoped payload with text summary and tool names', async () => {
it('streams per-item rows (text + tool with input) and a compact turn cost row', async () => {
mockSpawn.mockImplementation((_cmd: string, args: string[]) => {
const outputPath = args[args.indexOf('-o') + 1];
return createMockChild({
Expand All @@ -1115,18 +1125,59 @@ describe('CodexEngine', () => {
const input = makeInput({ repoDir: workspaceDir, runId: 'run-payload-shape' });
await engine.execute(input);

expect(mockStoreLlmCall).toHaveBeenCalledTimes(1);
const [{ response }] = mockStoreLlmCall.mock.calls[0] as [{ response: string }][];
const payload = JSON.parse(response) as Record<string, unknown>;
// Payload must be a compact object, NOT a raw JSONL line dump
expect(payload).toMatchObject({
turn: 1,
tools: ['bash'],
usage: { inputTokens: 30, outputTokens: 10 },
});
expect(typeof payload.text).toBe('string');
// Payload must be reasonably sized (< 2 KB) — not a multi-KB raw event dump
expect(response.length).toBeLessThan(2000);
// 1 text row + 1 tool row + 1 turn.completed cost row.
expect(mockStoreLlmCall).toHaveBeenCalledTimes(3);
const calls = mockStoreLlmCall.mock.calls as Array<
[{ response: string; inputTokens?: number }]
>;
// Row 1: the agent message as a content-block array (renders via the shared parser).
expect(JSON.parse(calls[0][0].response)).toEqual([
{ type: 'text', text: 'I will run a command.' },
]);
// Row 2: the tool call keeps its full input, normalized to the Claude tool vocab.
expect(JSON.parse(calls[1][0].response)).toEqual([
{ type: 'tool_use', name: 'Bash', input: { command: 'ls' } },
]);
expect(calls[1][0].inputTokens).toBeUndefined();
// Row 3: the compact turn cost row — carries usage/delta, no tool-name dump.
const costPayload = JSON.parse(calls[2][0].response) as Record<string, unknown>;
expect(costPayload).toMatchObject({ turn: 3, usage: { inputTokens: 30, outputTokens: 10 } });
expect(costPayload.tools).toBeUndefined();
expect(calls[2][0].response.length).toBeLessThan(2000);
});

it('normalizes function_call names and persists only on item.completed (not deltas)', async () => {
mockSpawn.mockImplementation((_cmd: string, args: string[]) => {
const outputPath = args[args.indexOf('-o') + 1];
return createMockChild({
stdoutLines: [
JSON.stringify({ type: 'turn.started' }),
// A streaming text delta must NOT persist a row (only completed items do).
JSON.stringify({ type: 'item.delta', delta: { type: 'text_delta', text: 'thinking…' } }),
// A completed function_call read_file → normalized to Read, input preserved.
JSON.stringify({
type: 'item.completed',
item: {
type: 'function_call',
name: 'read_file',
arguments: '{"file_path":"src/a.ts"}',
},
}),
JSON.stringify({ type: 'turn.completed', usage: { input_tokens: 5, output_tokens: 2 } }),
],
onBeforeClose: () => writeFileSync(outputPath, 'done', 'utf-8'),
});
});

const engine = new CodexEngine();
await engine.execute(makeInput({ repoDir: workspaceDir, runId: 'run-normalize' }));

// The delta did not persist; one tool row + one cost row = 2.
expect(mockStoreLlmCall).toHaveBeenCalledTimes(2);
const toolResponse = (mockStoreLlmCall.mock.calls[0][0] as { response: string }).response;
expect(JSON.parse(toolResponse)).toEqual([
{ type: 'tool_use', name: 'Read', input: { file_path: 'src/a.ts' } },
]);
});

it('does not call storeLlmCall when no turn.completed event fires (no response events only)', async () => {
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/utils/llmResponseParser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,29 @@ describe.concurrent('parseLlmResponse', () => {
});
});

describe('Codex realtime per-item rows (content-block array)', () => {
// Codex now streams one content-block-array row per item.completed, so its
// tool calls carry full input and render via the shared Claude-Code path —
// no more empty-badge inputSummary.
it('renders a codex tool row with the command (normalized to Bash)', () => {
const response = JSON.stringify([
{ type: 'tool_use', name: 'Bash', input: { command: 'git status' } },
]);
const result = parseLlmResponse(response);
expect(result.blocks).toEqual([
{ kind: 'tool_use', name: 'Bash', inputSummary: 'git status' },
]);
expect(result.toolNames).toEqual(['Bash']);
});

it('renders a codex text row', () => {
const response = JSON.stringify([{ type: 'text', text: 'Reviewing the PR.' }]);
const result = parseLlmResponse(response);
expect(result.blocks).toEqual([{ kind: 'text', text: 'Reviewing the PR.' }]);
expect(result.textPreview).toBe('Reviewing the PR.');
});
});

describe('LLMist format (gadget markup)', () => {
const gadget = (name: string, args: Record<string, string>) => {
const argLines = Object.entries(args)
Expand Down
10 changes: 8 additions & 2 deletions web/src/components/llm-calls/llm-call-list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { LlmCallDetail } from './llm-call-detail.js';

interface LlmCallListProps {
runId: string;
/** When the run is still active, poll so newly-persisted calls stream in live. */
isRunning?: boolean;
}

type ToolCall = { name: string; inputSummary: string };
Expand Down Expand Up @@ -133,10 +135,14 @@ function CallRow({ runId, call, delta, isExpanded, onToggle }: CallRowProps) {
);
}

export function LlmCallList({ runId }: LlmCallListProps) {
export function LlmCallList({ runId, isRunning }: LlmCallListProps) {
const [expandedCall, setExpandedCall] = useState<number | null>(null);

const callsQuery = useQuery(trpc.runs.listLlmCalls.queryOptions({ runId }));
const callsQuery = useQuery({
...trpc.runs.listLlmCalls.queryOptions({ runId }),
// While the run is active, poll so per-item rows appear in realtime.
refetchInterval: isRunning ? 3000 : false,
});

if (callsQuery.isLoading) {
return <div className="py-8 text-center text-muted-foreground">Loading LLM calls...</div>;
Expand Down
10 changes: 8 additions & 2 deletions web/src/routes/runs/$runId.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ function RunDetailPage() {
const { runId } = runDetailRoute.useParams();
const [activeTab, setActiveTab] = useState<Tab>('overview');

const runQuery = useQuery(trpc.runs.getById.queryOptions({ id: runId }));
const runQuery = useQuery({
...trpc.runs.getById.queryOptions({ id: runId }),
// Poll while the run is active so status + the live-updating tabs refresh.
refetchInterval: (query) => (query.state.data?.status === 'running' ? 5000 : false),
});

if (runQuery.isLoading) {
return <div className="py-8 text-center text-muted-foreground">Loading run...</div>;
Expand Down Expand Up @@ -96,7 +100,9 @@ function RunDetailPage() {

{activeTab === 'overview' && <RunSummaryCard run={run} />}
{activeTab === 'logs' && <LogViewer runId={runId} />}
{activeTab === 'llm-calls' && <LlmCallList runId={runId} />}
{activeTab === 'llm-calls' && (
<LlmCallList runId={runId} isRunning={run.status === 'running'} />
)}
{activeTab === 'debug' && <DebugAnalysis runId={runId} />}
</div>
);
Expand Down
Loading