Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
turnId: event.payload.turnId,
role: event.payload.role,
text: nextText,
...(event.payload.phase !== undefined ? { phase: event.payload.phase } : {}),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phase not persisted in projection

Medium Severity

The PR writes phase on thread.message-sent projection upserts, but the projection message schema, SQL row mapping, and snapshot hydration never define or read phase. After a reload or snapshot-based thread fetch, assistant messages lose final_answer, so deriveFinalAnswerStartedTurnIds cannot early-collapse work during streaming.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a50270e. Configure here.

...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
isStreaming: event.payload.streaming,
createdAt: previousMessage?.createdAt ?? event.payload.createdAt,
Expand Down
76 changes: 75 additions & 1 deletion apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import {
MessageId,
type OrchestrationEvent,
type OrchestrationMessage,
type OrchestrationMessagePhase,
type OrchestrationProposedPlanId,
CheckpointRef,
isToolLifecycleItemType,
RuntimeItemId,
ThreadId,
type ThreadTokenUsageSnapshot,
TurnId,
Expand Down Expand Up @@ -53,6 +55,10 @@ const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000;
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120);
const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000;
const BUFFERED_PROPOSED_PLAN_BY_ID_TTL = Duration.minutes(120);
const ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000;
const ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_TTL = Duration.minutes(120);
const ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_CACHE_CAPACITY = 20_000;
const ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_TTL = Duration.minutes(120);
const MAX_BUFFERED_ASSISTANT_CHARS = 24_000;
const STRICT_PROVIDER_LIFECYCLE_GUARD = process.env.T3CODE_STRICT_PROVIDER_LIFECYCLE_GUARD !== "0";

Expand Down Expand Up @@ -202,6 +208,11 @@ function assistantSegmentMessageId(baseKey: string, segmentIndex: number): Messa
segmentIndex === 0 ? `assistant:${baseKey}` : `assistant:${baseKey}:segment:${segmentIndex}`,
);
}

function runtimeItemKey(threadId: ThreadId, itemId: RuntimeItemId): string {
return `${threadId}:${itemId}`;
}

function buildContextWindowActivityPayload(
event: ProviderRuntimeEvent,
): ThreadTokenUsageSnapshot | undefined {
Expand Down Expand Up @@ -666,6 +677,18 @@ const make = Effect.gen(function* () {
lookup: () => Effect.succeed({ text: "", createdAt: "" }),
});

const assistantMessagePhaseByMessageId = yield* Cache.make<MessageId, OrchestrationMessagePhase>({
capacity: ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_CACHE_CAPACITY,
timeToLive: ASSISTANT_MESSAGE_PHASE_BY_MESSAGE_ID_TTL,
lookup: () => Effect.die(new Error("assistant message phase should be read through getOption")),
});

const assistantMessagePhaseByItemKey = yield* Cache.make<string, OrchestrationMessagePhase>({
capacity: ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_CACHE_CAPACITY,
timeToLive: ASSISTANT_MESSAGE_PHASE_BY_ITEM_KEY_TTL,
lookup: () => Effect.die(new Error("assistant item phase should be read through getOption")),
});

const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) {
return yield* projectionSnapshotQuery
.getThreadDetailById(threadId)
Expand Down Expand Up @@ -855,8 +878,29 @@ const make = Effect.gen(function* () {
const clearBufferedProposedPlan = (planId: string) =>
Cache.invalidate(bufferedProposedPlanById, planId);

const rememberAssistantMessagePhase = (messageId: MessageId, phase: OrchestrationMessagePhase) =>
Cache.set(assistantMessagePhaseByMessageId, messageId, phase);

const getAssistantMessagePhase = (messageId: MessageId) =>
Cache.getOption(assistantMessagePhaseByMessageId, messageId).pipe(
Effect.map(Option.getOrUndefined),
);

const clearAssistantMessagePhase = (messageId: MessageId) =>
Cache.invalidate(assistantMessagePhaseByMessageId, messageId);

const getAssistantMessagePhaseForRuntimeEvent = (event: ProviderRuntimeEvent) =>
event.itemId
? Cache.getOption(
assistantMessagePhaseByItemKey,
runtimeItemKey(event.threadId, event.itemId),
).pipe(Effect.map(Option.getOrUndefined))
: Effect.succeed(undefined as OrchestrationMessagePhase | undefined);

const clearAssistantMessageState = (messageId: MessageId) =>
clearBufferedAssistantText(messageId);
clearBufferedAssistantText(messageId).pipe(
Effect.andThen(clearAssistantMessagePhase(messageId)),
);

const flushBufferedAssistantMessage = (input: {
event: ProviderRuntimeEvent;
Expand All @@ -871,6 +915,7 @@ const make = Effect.gen(function* () {
if (!hasRenderableAssistantText(bufferedText)) {
return false;
}
const phase = yield* getAssistantMessagePhase(input.messageId);

yield* orchestrationEngine.dispatch({
type: "thread.message.assistant.delta",
Expand All @@ -879,6 +924,7 @@ const make = Effect.gen(function* () {
messageId: input.messageId,
delta: bufferedText,
...(input.turnId ? { turnId: input.turnId } : {}),
...(phase !== undefined ? { phase } : {}),
createdAt: input.createdAt,
});
return true;
Expand Down Expand Up @@ -937,6 +983,7 @@ const make = Effect.gen(function* () {
? input.fallbackText!
: "";
const hasRenderableText = hasRenderableAssistantText(text);
const phase = yield* getAssistantMessagePhase(input.messageId);

if (hasRenderableText) {
yield* orchestrationEngine.dispatch({
Expand All @@ -946,6 +993,7 @@ const make = Effect.gen(function* () {
messageId: input.messageId,
delta: text,
...(input.turnId ? { turnId: input.turnId } : {}),
...(phase !== undefined ? { phase } : {}),
createdAt: input.createdAt,
});
}
Expand Down Expand Up @@ -1358,6 +1406,19 @@ const make = Effect.gen(function* () {
}
}

if (
event.type === "item.started" &&
event.payload.itemType === "assistant_message" &&
event.payload.messagePhase !== undefined &&
event.itemId !== undefined
) {
yield* Cache.set(
assistantMessagePhaseByItemKey,
runtimeItemKey(event.threadId, event.itemId),
event.payload.messagePhase,
);
}

const assistantDelta =
event.type === "content.delta" && event.payload.streamKind === "assistant_text"
? event.payload.delta
Expand All @@ -1372,6 +1433,10 @@ const make = Effect.gen(function* () {
event,
...(turnId ? { turnId } : {}),
});
const assistantMessagePhase = yield* getAssistantMessagePhaseForRuntimeEvent(event);
if (assistantMessagePhase !== undefined) {
yield* rememberAssistantMessagePhase(assistantMessageId, assistantMessagePhase);
}
if (turnId) {
yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId);
}
Expand All @@ -1390,6 +1455,7 @@ const make = Effect.gen(function* () {
messageId: assistantMessageId,
delta: spillChunk,
...(turnId ? { turnId } : {}),
...(assistantMessagePhase !== undefined ? { phase: assistantMessagePhase } : {}),
createdAt: now,
});
}
Expand All @@ -1401,6 +1467,7 @@ const make = Effect.gen(function* () {
messageId: assistantMessageId,
delta: assistantDelta,
...(turnId ? { turnId } : {}),
...(assistantMessagePhase !== undefined ? { phase: assistantMessagePhase } : {}),
createdAt: now,
});
}
Expand Down Expand Up @@ -1463,6 +1530,7 @@ const make = Effect.gen(function* () {
`assistant:${event.itemId ?? event.turnId ?? event.eventId}`,
),
fallbackText: event.payload.detail,
messagePhase: event.payload.messagePhase,
}
: undefined;
const proposedPlanCompletion =
Expand All @@ -1487,6 +1555,12 @@ const make = Effect.gen(function* () {
activeAssistantMessageId,
() => assistantCompletion.messageId,
);
if (assistantCompletion.messagePhase !== undefined) {
yield* rememberAssistantMessagePhase(
assistantMessageId,
assistantCompletion.messagePhase,
);
}
const existingAssistantMessage = findMessageById(messages, assistantMessageId);
const shouldApplyFallbackCompletionText =
!existingAssistantMessage || existingAssistantMessage.text.length === 0;
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/orchestration/decider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand"
role: "assistant",
text: command.delta,
turnId: command.turnId ?? null,
...(command.phase !== undefined ? { phase: command.phase } : {}),
streaming: true,
createdAt: command.createdAt,
updatedAt: command.createdAt,
Expand Down
2 changes: 2 additions & 0 deletions apps/server/src/orchestration/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ export function projectEvent(
text: payload.text,
...(payload.attachments !== undefined ? { attachments: payload.attachments } : {}),
turnId: payload.turnId,
...(payload.phase !== undefined ? { phase: payload.phase } : {}),
streaming: payload.streaming,
createdAt: payload.createdAt,
updatedAt: payload.updatedAt,
Expand All @@ -423,6 +424,7 @@ export function projectEvent(
streaming: message.streaming,
updatedAt: message.updatedAt,
turnId: message.turnId,
...(message.phase !== undefined ? { phase: message.phase } : {}),
...(message.attachments !== undefined
? { attachments: message.attachments }
: {}),
Expand Down
10 changes: 10 additions & 0 deletions apps/server/src/provider/Layers/CodexAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ function itemDetail(item: CodexLifecycleItem): string | undefined {
return undefined;
}

function itemMessagePhase(item: CodexLifecycleItem): "commentary" | "final_answer" | undefined {
if (!("phase" in item)) {
return undefined;
}
return item.phase === "commentary" || item.phase === "final_answer" ? item.phase : undefined;
}

function toRequestTypeFromMethod(method: string): CanonicalRequestType {
switch (method) {
case "item/commandExecution/requestApproval":
Expand Down Expand Up @@ -481,6 +488,9 @@ function mapItemLifecycle(
...(status ? { status } : {}),
...(itemTitle(itemType, item) ? { title: itemTitle(itemType, item) } : {}),
...(detail ? { detail } : {}),
...(itemType === "assistant_message" && itemMessagePhase(item)
? { messagePhase: itemMessagePhase(item) }
: {}),
...(event.payload !== undefined ? { data: event.payload } : {}),
},
};
Expand Down
69 changes: 69 additions & 0 deletions apps/web/src/components/chat/MessagesTimeline.logic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,75 @@ describe("deriveMessagesTimelineRows", () => {
]);
});

it("folds active-turn work once the final answer starts streaming", () => {
const rows = deriveMessagesTimelineRows({
timelineEntries: [
{
id: "assistant-commentary-entry",
kind: "message",
createdAt: "2026-01-01T00:00:05Z",
message: {
id: "assistant-commentary" as never,
role: "assistant",
text: "Checking the repo.",
turnId: "turn-1" as never,
phase: "commentary",
createdAt: "2026-01-01T00:00:05Z",
completedAt: "2026-01-01T00:00:06Z",
streaming: false,
},
},
{
id: "work-entry-1",
kind: "work",
createdAt: "2026-01-01T00:00:08Z",
entry: {
id: "work-1",
createdAt: "2026-01-01T00:00:08Z",
turnId: "turn-1" as never,
label: "Ran command",
tone: "tool" as const,
},
},
{
id: "assistant-final-entry",
kind: "message",
createdAt: "2026-01-01T00:00:20Z",
message: {
id: "assistant-final" as never,
role: "assistant",
text: "The answer is streaming",
turnId: "turn-1" as never,
phase: "final_answer",
createdAt: "2026-01-01T00:00:20Z",
streaming: true,
},
},
],
latestTurn: {
turnId: "turn-1" as never,
state: "running",
startedAt: "2026-01-01T00:00:00Z",
completedAt: null,
},
isWorking: true,
activeTurnStartedAt: "2026-01-01T00:00:00Z",
turnDiffSummaryByAssistantMessageId: new Map(),
revertTurnCountByUserMessageId: new Map(),
});

expect(rows.map((row) => row.id)).toEqual([
"turn-fold:turn-1",
"assistant-final-entry",
"working-indicator-row",
]);
const foldRow = rows.find(
(row): row is Extract<(typeof rows)[number], { kind: "turn-fold" }> =>
row.kind === "turn-fold",
);
expect(foldRow?.expanded).toBe(false);
});

it("only shows assistant metadata on the terminal assistant message", () => {
const rows = deriveMessagesTimelineRows({
timelineEntries: [
Expand Down
23 changes: 21 additions & 2 deletions apps/web/src/components/chat/MessagesTimeline.logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ function deriveTerminalAssistantMessageIds(timelineEntries: ReadonlyArray<Timeli
return new Set(lastAssistantMessageIdByResponseKey.values());
}

function deriveFinalAnswerStartedTurnIds(timelineEntries: ReadonlyArray<TimelineEntry>) {
const turnIds = new Set<TurnId>();
for (const timelineEntry of timelineEntries) {
if (
timelineEntry.kind === "message" &&
timelineEntry.message.role === "assistant" &&
timelineEntry.message.turnId &&
timelineEntry.message.phase === "final_answer"
) {
turnIds.add(timelineEntry.message.turnId);
}
}
return turnIds;
}

interface TurnFold {
turnId: TurnId;
anchorEntryId: string;
Expand Down Expand Up @@ -170,6 +185,7 @@ function deriveUnsettledTurnId(latestTurn: TimelineLatestTurn | null): TurnId |
function deriveTurnFolds(input: {
timelineEntries: ReadonlyArray<TimelineEntry>;
terminalAssistantMessageIds: ReadonlySet<string>;
finalAnswerStartedTurnIds: ReadonlySet<TurnId>;
latestTurn: TimelineLatestTurn | null;
unsettledTurnId: TurnId | null;
}): ReadonlyMap<string, TurnFold> {
Expand Down Expand Up @@ -229,10 +245,11 @@ function deriveTurnFolds(input: {

const foldsByAnchorEntryId = new Map<string, TurnFold>();
for (const [turnId, group] of groupsByTurnId) {
if (turnId === input.unsettledTurnId) {
const finalAnswerStarted = input.finalAnswerStartedTurnIds.has(turnId);
if (turnId === input.unsettledTurnId && !finalAnswerStarted) {
continue;
}
if (group.hasStreamingMessage) {
if (group.hasStreamingMessage && !finalAnswerStarted) {
continue;
}
const hiddenEntryIds = new Set<string>();
Expand Down Expand Up @@ -303,10 +320,12 @@ export function deriveMessagesTimelineRows(input: {
input.timelineEntries.flatMap((entry) => (entry.kind === "message" ? [entry.message] : [])),
);
const terminalAssistantMessageIds = deriveTerminalAssistantMessageIds(input.timelineEntries);
const finalAnswerStartedTurnIds = deriveFinalAnswerStartedTurnIds(input.timelineEntries);
const unsettledTurnId = deriveUnsettledTurnId(input.latestTurn ?? null);
const foldsByAnchorEntryId = deriveTurnFolds({
timelineEntries: input.timelineEntries,
terminalAssistantMessageIds,
finalAnswerStartedTurnIds,
latestTurn: input.latestTurn ?? null,
unsettledTurnId,
});
Expand Down
3 changes: 3 additions & 0 deletions apps/web/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ function mapMessage(environmentId: EnvironmentId, message: OrchestrationMessage)
role: message.role,
text: message.text,
turnId: message.turnId,
...(message.phase !== undefined ? { phase: message.phase } : {}),
createdAt: message.createdAt,
streaming: message.streaming,
...(message.streaming ? {} : { completedAt: message.updatedAt }),
Expand Down Expand Up @@ -1393,6 +1394,7 @@ function applyEnvironmentOrchestrationEvent(
? { attachments: event.payload.attachments }
: {}),
turnId: event.payload.turnId,
...(event.payload.phase !== undefined ? { phase: event.payload.phase } : {}),
streaming: event.payload.streaming,
createdAt: event.payload.createdAt,
updatedAt: event.payload.updatedAt,
Expand All @@ -1411,6 +1413,7 @@ function applyEnvironmentOrchestrationEvent(
: entry.text,
streaming: message.streaming,
...(message.turnId !== undefined ? { turnId: message.turnId } : {}),
...(message.phase !== undefined ? { phase: message.phase } : {}),
...(message.streaming
? entry.completedAt !== undefined
? { completedAt: entry.completedAt }
Expand Down
Loading
Loading