From a50270e7cbea4aa5fbf2c82ea81fc970d7a217aa Mon Sep 17 00:00:00 2001 From: Advait Johari Date: Thu, 18 Jun 2026 13:33:57 -0500 Subject: [PATCH] fix: collapse work once final answer streams --- .../Layers/ProjectionPipeline.ts | 1 + .../Layers/ProviderRuntimeIngestion.ts | 76 ++++++++++++++++++- apps/server/src/orchestration/decider.ts | 1 + apps/server/src/orchestration/projector.ts | 2 + .../src/provider/Layers/CodexAdapter.ts | 10 +++ .../chat/MessagesTimeline.logic.test.ts | 69 +++++++++++++++++ .../components/chat/MessagesTimeline.logic.ts | 23 +++++- apps/web/src/store.ts | 3 + apps/web/src/types.ts | 1 + packages/contracts/src/orchestration.ts | 6 ++ packages/contracts/src/providerRuntime.ts | 2 + 11 files changed, 191 insertions(+), 3 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index f12df850941..3b6a9a0d39c 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -840,6 +840,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti turnId: event.payload.turnId, role: event.payload.role, text: nextText, + ...(event.payload.phase !== undefined ? { phase: event.payload.phase } : {}), ...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}), isStreaming: event.payload.streaming, createdAt: previousMessage?.createdAt ?? event.payload.createdAt, diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3e5978f4846..c1c8c3a3d08 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -5,9 +5,11 @@ import { MessageId, type OrchestrationEvent, type OrchestrationMessage, + type OrchestrationMessagePhase, type OrchestrationProposedPlanId, CheckpointRef, isToolLifecycleItemType, + RuntimeItemId, ThreadId, type ThreadTokenUsageSnapshot, TurnId, @@ -53,6 +55,10 @@ const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000; const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120); const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000; const BUFFERED_PROPOSED_PLAN_BY_ID_TTL = Duration.minutes(120); +const ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000; +const ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_TTL = Duration.minutes(120); +const ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_CACHE_CAPACITY = 20_000; +const ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_TTL = Duration.minutes(120); const MAX_BUFFERED_ASSISTANT_CHARS = 24_000; const STRICT_PROVIDER_LIFECYCLE_GUARD = process.env.T3CODE_STRICT_PROVIDER_LIFECYCLE_GUARD !== "0"; @@ -202,6 +208,11 @@ function assistantSegmentMessageId(baseKey: string, segmentIndex: number): Messa segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`, ); } + +function runtimeItemKey(threadId: ThreadId, itemId: RuntimeItemId): string { + return `${threadId}:${itemId}`; +} + function buildContextWindowActivityPayload( event: ProviderRuntimeEvent, ): ThreadTokenUsageSnapshot | undefined { @@ -666,6 +677,18 @@ const make = Effect.gen(function* () { lookup: () => Effect.succeed({ text: "", createdAt: "" }), }); + const assistantMessagePhaseByMessageId = yield* Cache.make({ + capacity: ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_CACHE_CAPACITY, + timeToLive: ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_TTL, + lookup: () => Effect.die(new Error("assistant message phase should be read through getOption")), + }); + + const assistantMessagePhaseByItemKey = yield* Cache.make({ + capacity: ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_CACHE_CAPACITY, + timeToLive: ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_TTL, + lookup: () => Effect.die(new Error("assistant item phase should be read through getOption")), + }); + const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) { return yield* projectionSnapshotQuery .getThreadDetailById(threadId) @@ -855,8 +878,29 @@ const make = Effect.gen(function* () { const clearBufferedProposedPlan = (planId: string) => Cache.invalidate(bufferedProposedPlanById, planId); + const rememberAssistantMessagePhase = (messageId: MessageId, phase: OrchestrationMessagePhase) => + Cache.set(assistantMessagePhaseByMessageId, messageId, phase); + + const getAssistantMessagePhase = (messageId: MessageId) => + Cache.getOption(assistantMessagePhaseByMessageId, messageId).pipe( + Effect.map(Option.getOrUndefined), + ); + + const clearAssistantMessagePhase = (messageId: MessageId) => + Cache.invalidate(assistantMessagePhaseByMessageId, messageId); + + const getAssistantMessagePhaseForRuntimeEvent = (event: ProviderRuntimeEvent) => + event.itemId + ? Cache.getOption( + assistantMessagePhaseByItemKey, + runtimeItemKey(event.threadId, event.itemId), + ).pipe(Effect.map(Option.getOrUndefined)) + : Effect.succeed(undefined as OrchestrationMessagePhase | undefined); + const clearAssistantMessageState = (messageId: MessageId) => - clearBufferedAssistantText(messageId); + clearBufferedAssistantText(messageId).pipe( + Effect.andThen(clearAssistantMessagePhase(messageId)), + ); const flushBufferedAssistantMessage = (input: { event: ProviderRuntimeEvent; @@ -871,6 +915,7 @@ const make = Effect.gen(function* () { if (!hasRenderableAssistantText(bufferedText)) { return false; } + const phase = yield* getAssistantMessagePhase(input.messageId); yield* orchestrationEngine.dispatch({ type: "thread.message.assistant.delta", @@ -879,6 +924,7 @@ const make = Effect.gen(function* () { messageId: input.messageId, delta: bufferedText, ...(input.turnId ? { turnId: input.turnId } : {}), + ...(phase !== undefined ? { phase } : {}), createdAt: input.createdAt, }); return true; @@ -937,6 +983,7 @@ const make = Effect.gen(function* () { ? input.fallbackText! : ""; const hasRenderableText = hasRenderableAssistantText(text); + const phase = yield* getAssistantMessagePhase(input.messageId); if (hasRenderableText) { yield* orchestrationEngine.dispatch({ @@ -946,6 +993,7 @@ const make = Effect.gen(function* () { messageId: input.messageId, delta: text, ...(input.turnId ? { turnId: input.turnId } : {}), + ...(phase !== undefined ? { phase } : {}), createdAt: input.createdAt, }); } @@ -1358,6 +1406,19 @@ const make = Effect.gen(function* () { } } + if ( + event.type === "item.started" && + event.payload.itemType === "assistant_message" && + event.payload.messagePhase !== undefined && + event.itemId !== undefined + ) { + yield* Cache.set( + assistantMessagePhaseByItemKey, + runtimeItemKey(event.threadId, event.itemId), + event.payload.messagePhase, + ); + } + const assistantDelta = event.type === "content.delta" && event.payload.streamKind === "assistant_text" ? event.payload.delta @@ -1372,6 +1433,10 @@ const make = Effect.gen(function* () { event, ...(turnId ? { turnId } : {}), }); + const assistantMessagePhase = yield* getAssistantMessagePhaseForRuntimeEvent(event); + if (assistantMessagePhase !== undefined) { + yield* rememberAssistantMessagePhase(assistantMessageId, assistantMessagePhase); + } if (turnId) { yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); } @@ -1390,6 +1455,7 @@ const make = Effect.gen(function* () { messageId: assistantMessageId, delta: spillChunk, ...(turnId ? { turnId } : {}), + ...(assistantMessagePhase !== undefined ? { phase: assistantMessagePhase } : {}), createdAt: now, }); } @@ -1401,6 +1467,7 @@ const make = Effect.gen(function* () { messageId: assistantMessageId, delta: assistantDelta, ...(turnId ? { turnId } : {}), + ...(assistantMessagePhase !== undefined ? { phase: assistantMessagePhase } : {}), createdAt: now, }); } @@ -1463,6 +1530,7 @@ const make = Effect.gen(function* () { `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, ), fallbackText: event.payload.detail, + messagePhase: event.payload.messagePhase, } : undefined; const proposedPlanCompletion = @@ -1487,6 +1555,12 @@ const make = Effect.gen(function* () { activeAssistantMessageId, () => assistantCompletion.messageId, ); + if (assistantCompletion.messagePhase !== undefined) { + yield* rememberAssistantMessagePhase( + assistantMessageId, + assistantCompletion.messagePhase, + ); + } const existingAssistantMessage = findMessageById(messages, assistantMessageId); const shouldApplyFallbackCompletionText = !existingAssistantMessage || existingAssistantMessage.text.length === 0; diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 0d4af771ca8..d741a1cd8bc 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -620,6 +620,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" role: "assistant", text: command.delta, turnId: command.turnId ?? null, + ...(command.phase !== undefined ? { phase: command.phase } : {}), streaming: true, createdAt: command.createdAt, updatedAt: command.createdAt, diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index fc6ab8f6fcf..96fc8a26ab7 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -401,6 +401,7 @@ export function projectEvent( text: payload.text, ...(payload.attachments !== undefined ? { attachments: payload.attachments } : {}), turnId: payload.turnId, + ...(payload.phase !== undefined ? { phase: payload.phase } : {}), streaming: payload.streaming, createdAt: payload.createdAt, updatedAt: payload.updatedAt, @@ -423,6 +424,7 @@ export function projectEvent( streaming: message.streaming, updatedAt: message.updatedAt, turnId: message.turnId, + ...(message.phase !== undefined ? { phase: message.phase } : {}), ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 270126e934b..67fd03bd405 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -284,6 +284,13 @@ function itemDetail(item: CodexLifecycleItem): string | undefined { return undefined; } +function itemMessagePhase(item: CodexLifecycleItem): "commentary" | "final_answer" | undefined { + if (!("phase" in item)) { + return undefined; + } + return item.phase === "commentary" || item.phase === "final_answer" ? item.phase : undefined; +} + function toRequestTypeFromMethod(method: string): CanonicalRequestType { switch (method) { case "item/commandExecution/requestApproval": @@ -481,6 +488,9 @@ function mapItemLifecycle( ...(status ? { status } : {}), ...(itemTitle(itemType, item) ? { title: itemTitle(itemType, item) } : {}), ...(detail ? { detail } : {}), + ...(itemType === "assistant_message" && itemMessagePhase(item) + ? { messagePhase: itemMessagePhase(item) } + : {}), ...(event.payload !== undefined ? { data: event.payload } : {}), }, }; diff --git a/apps/web/src/components/chat/MessagesTimeline.logic.test.ts b/apps/web/src/components/chat/MessagesTimeline.logic.test.ts index 032f8635698..8e92e18b180 100644 --- a/apps/web/src/components/chat/MessagesTimeline.logic.test.ts +++ b/apps/web/src/components/chat/MessagesTimeline.logic.test.ts @@ -729,6 +729,75 @@ describe("deriveMessagesTimelineRows", () => { ]); }); + it("folds active-turn work once the final answer starts streaming", () => { + const rows = deriveMessagesTimelineRows({ + timelineEntries: [ + { + id: "assistant-commentary-entry", + kind: "message", + createdAt: "2026-01-01T00:00:05Z", + message: { + id: "assistant-commentary" as never, + role: "assistant", + text: "Checking the repo.", + turnId: "turn-1" as never, + phase: "commentary", + createdAt: "2026-01-01T00:00:05Z", + completedAt: "2026-01-01T00:00:06Z", + streaming: false, + }, + }, + { + id: "work-entry-1", + kind: "work", + createdAt: "2026-01-01T00:00:08Z", + entry: { + id: "work-1", + createdAt: "2026-01-01T00:00:08Z", + turnId: "turn-1" as never, + label: "Ran command", + tone: "tool" as const, + }, + }, + { + id: "assistant-final-entry", + kind: "message", + createdAt: "2026-01-01T00:00:20Z", + message: { + id: "assistant-final" as never, + role: "assistant", + text: "The answer is streaming", + turnId: "turn-1" as never, + phase: "final_answer", + createdAt: "2026-01-01T00:00:20Z", + streaming: true, + }, + }, + ], + latestTurn: { + turnId: "turn-1" as never, + state: "running", + startedAt: "2026-01-01T00:00:00Z", + completedAt: null, + }, + isWorking: true, + activeTurnStartedAt: "2026-01-01T00:00:00Z", + turnDiffSummaryByAssistantMessageId: new Map(), + revertTurnCountByUserMessageId: new Map(), + }); + + expect(rows.map((row) => row.id)).toEqual([ + "turn-fold:turn-1", + "assistant-final-entry", + "working-indicator-row", + ]); + const foldRow = rows.find( + (row): row is Extract<(typeof rows)[number], { kind: "turn-fold" }> => + row.kind === "turn-fold", + ); + expect(foldRow?.expanded).toBe(false); + }); + it("only shows assistant metadata on the terminal assistant message", () => { const rows = deriveMessagesTimelineRows({ timelineEntries: [ diff --git a/apps/web/src/components/chat/MessagesTimeline.logic.ts b/apps/web/src/components/chat/MessagesTimeline.logic.ts index 416b37e4f51..794c665f2ad 100644 --- a/apps/web/src/components/chat/MessagesTimeline.logic.ts +++ b/apps/web/src/components/chat/MessagesTimeline.logic.ts @@ -139,6 +139,21 @@ function deriveTerminalAssistantMessageIds(timelineEntries: ReadonlyArray) { + const turnIds = new Set(); + for (const timelineEntry of timelineEntries) { + if ( + timelineEntry.kind === "message" && + timelineEntry.message.role === "assistant" && + timelineEntry.message.turnId && + timelineEntry.message.phase === "final_answer" + ) { + turnIds.add(timelineEntry.message.turnId); + } + } + return turnIds; +} + interface TurnFold { turnId: TurnId; anchorEntryId: string; @@ -170,6 +185,7 @@ function deriveUnsettledTurnId(latestTurn: TimelineLatestTurn | null): TurnId | function deriveTurnFolds(input: { timelineEntries: ReadonlyArray; terminalAssistantMessageIds: ReadonlySet; + finalAnswerStartedTurnIds: ReadonlySet; latestTurn: TimelineLatestTurn | null; unsettledTurnId: TurnId | null; }): ReadonlyMap { @@ -229,10 +245,11 @@ function deriveTurnFolds(input: { const foldsByAnchorEntryId = new Map(); for (const [turnId, group] of groupsByTurnId) { - if (turnId === input.unsettledTurnId) { + const finalAnswerStarted = input.finalAnswerStartedTurnIds.has(turnId); + if (turnId === input.unsettledTurnId && !finalAnswerStarted) { continue; } - if (group.hasStreamingMessage) { + if (group.hasStreamingMessage && !finalAnswerStarted) { continue; } const hiddenEntryIds = new Set(); @@ -303,10 +320,12 @@ export function deriveMessagesTimelineRows(input: { input.timelineEntries.flatMap((entry) => (entry.kind === "message" ? [entry.message] : [])), ); const terminalAssistantMessageIds = deriveTerminalAssistantMessageIds(input.timelineEntries); + const finalAnswerStartedTurnIds = deriveFinalAnswerStartedTurnIds(input.timelineEntries); const unsettledTurnId = deriveUnsettledTurnId(input.latestTurn ?? null); const foldsByAnchorEntryId = deriveTurnFolds({ timelineEntries: input.timelineEntries, terminalAssistantMessageIds, + finalAnswerStartedTurnIds, latestTurn: input.latestTurn ?? null, unsettledTurnId, }); diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index 9a9b05f92f2..008499b39e0 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -179,6 +179,7 @@ function mapMessage(environmentId: EnvironmentId, message: OrchestrationMessage) role: message.role, text: message.text, turnId: message.turnId, + ...(message.phase !== undefined ? { phase: message.phase } : {}), createdAt: message.createdAt, streaming: message.streaming, ...(message.streaming ? {} : { completedAt: message.updatedAt }), @@ -1393,6 +1394,7 @@ function applyEnvironmentOrchestrationEvent( ? { attachments: event.payload.attachments } : {}), turnId: event.payload.turnId, + ...(event.payload.phase !== undefined ? { phase: event.payload.phase } : {}), streaming: event.payload.streaming, createdAt: event.payload.createdAt, updatedAt: event.payload.updatedAt, @@ -1411,6 +1413,7 @@ function applyEnvironmentOrchestrationEvent( : entry.text, streaming: message.streaming, ...(message.turnId !== undefined ? { turnId: message.turnId } : {}), + ...(message.phase !== undefined ? { phase: message.phase } : {}), ...(message.streaming ? entry.completedAt !== undefined ? { completedAt: entry.completedAt } diff --git a/apps/web/src/types.ts b/apps/web/src/types.ts index d508e3c6010..d1c66a9f0b3 100644 --- a/apps/web/src/types.ts +++ b/apps/web/src/types.ts @@ -50,6 +50,7 @@ export interface ChatMessage { text: string; attachments?: ChatAttachment[]; turnId?: TurnId | null; + phase?: "commentary" | "final_answer"; createdAt: string; completedAt?: string | undefined; streaming: boolean; diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 46d51da371f..59909f1ad9e 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -224,12 +224,16 @@ export type OrchestrationProject = typeof OrchestrationProject.Type; export const OrchestrationMessageRole = Schema.Literals(["user", "assistant", "system"]); export type OrchestrationMessageRole = typeof OrchestrationMessageRole.Type; +export const OrchestrationMessagePhase = Schema.Literals(["commentary", "final_answer"]); +export type OrchestrationMessagePhase = typeof OrchestrationMessagePhase.Type; + export const OrchestrationMessage = Schema.Struct({ id: MessageId, role: OrchestrationMessageRole, text: Schema.String, attachments: Schema.optional(Schema.Array(ChatAttachment)), turnId: Schema.NullOr(TurnId), + phase: Schema.optional(OrchestrationMessagePhase), streaming: Schema.Boolean, createdAt: IsoDateTime, updatedAt: IsoDateTime, @@ -712,6 +716,7 @@ const ThreadMessageAssistantDeltaCommand = Schema.Struct({ messageId: MessageId, delta: Schema.String, turnId: Schema.optional(TurnId), + phase: Schema.optional(OrchestrationMessagePhase), createdAt: IsoDateTime, }); @@ -896,6 +901,7 @@ export const ThreadMessageSentPayload = Schema.Struct({ text: Schema.String, attachments: Schema.optional(Schema.Array(ChatAttachment)), turnId: Schema.NullOr(TurnId), + phase: Schema.optional(OrchestrationMessagePhase), streaming: Schema.Boolean, createdAt: IsoDateTime, updatedAt: IsoDateTime, diff --git a/packages/contracts/src/providerRuntime.ts b/packages/contracts/src/providerRuntime.ts index eb2563eff00..2bd662b9633 100644 --- a/packages/contracts/src/providerRuntime.ts +++ b/packages/contracts/src/providerRuntime.ts @@ -17,6 +17,7 @@ import { ProviderInstanceId, ProviderDriverKind } from "./providerInstance.ts"; const TrimmedNonEmptyStringSchema = TrimmedNonEmptyString; const UnknownRecordSchema = Schema.Record(Schema.String, Schema.Unknown); +const RuntimeMessagePhase = Schema.Literals(["commentary", "final_answer"]); const RuntimeEventRawSource = Schema.Union([ Schema.Literal("codex.app-server.notification"), @@ -406,6 +407,7 @@ export const ItemLifecyclePayload = Schema.Struct({ status: Schema.optional(RuntimeItemStatus), title: Schema.optional(TrimmedNonEmptyStringSchema), detail: Schema.optional(TrimmedNonEmptyStringSchema), + messagePhase: Schema.optional(RuntimeMessagePhase), data: Schema.optional(Schema.Unknown), }); export type ItemLifecyclePayload = typeof ItemLifecyclePayload.Type;