Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions apps/server/src/orchestration/Layers/CheckpointReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
CommandId,
EventId,
MessageId,
type OrchestrationThread,
type ProjectId,
ThreadId,
TurnId,
Expand All @@ -19,6 +20,7 @@ import {
import { clearWorkspaceIndexCache } from "../../workspaceEntries.ts";
import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts";
import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { resolveCheckpointAssistantMessageId } from "../assistantMessageIdentity.ts";
import { CheckpointReactor, type CheckpointReactorShape } from "../Services/CheckpointReactor.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts";
Expand Down Expand Up @@ -190,13 +192,7 @@ const make = Effect.gen(function* () {
const captureAndDispatchCheckpoint = Effect.fnUntraced(function* (input: {
readonly threadId: ThreadId;
readonly turnId: TurnId;
readonly thread: {
readonly messages: ReadonlyArray<{
readonly id: MessageId;
readonly role: string;
readonly turnId: TurnId | null;
}>;
};
readonly thread: Pick<OrchestrationThread, "messages" | "latestTurn" | "checkpoints">;
readonly cwd: string;
readonly turnCount: number;
readonly status: "ready" | "missing" | "error";
Expand Down Expand Up @@ -262,12 +258,14 @@ const make = Effect.gen(function* () {
),
);

const assistantMessageId =
input.assistantMessageId ??
input.thread.messages
.toReversed()
.find((entry) => entry.role === "assistant" && entry.turnId === input.turnId)?.id ??
MessageId.makeUnsafe(`assistant:${input.turnId}`);
const assistantMessageId = resolveCheckpointAssistantMessageId({
thread: input.thread,
threadId: input.threadId,
turnId: input.turnId,
...(input.assistantMessageId !== undefined
? { assistantMessageId: input.assistantMessageId }
: {}),
});

yield* orchestrationEngine.dispatch({
type: "thread.turn.diff.complete",
Expand Down
228 changes: 212 additions & 16 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const asEventId = (value: string): EventId => EventId.makeUnsafe(value);
const asMessageId = (value: string): MessageId => MessageId.makeUnsafe(value);
const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value);
const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value);
const assistantMessageId = (threadId: string, scope: string, value: string): MessageId =>
asMessageId(`assistant:${threadId}:${scope}:${value}`);

type LegacyProviderRuntimeEvent = {
readonly type: string;
Expand Down Expand Up @@ -556,11 +558,12 @@ describe("ProviderRuntimeIngestion", () => {
const thread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-1" && !message.streaming,
message.id === assistantMessageId("thread-1", "item", "item-1") && !message.streaming,
),
);
const message = thread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-1",
(entry: ProviderRuntimeTestMessage) =>
entry.id === assistantMessageId("thread-1", "item", "item-1"),
);
expect(message?.text).toBe("hello world");
expect(message?.streaming).toBe(false);
Expand Down Expand Up @@ -588,11 +591,13 @@ describe("ProviderRuntimeIngestion", () => {
const thread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-no-delta" && !message.streaming,
message.id === assistantMessageId("thread-1", "item", "item-no-delta") &&
!message.streaming,
),
);
const message = thread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-no-delta",
(entry: ProviderRuntimeTestMessage) =>
entry.id === assistantMessageId("thread-1", "item", "item-no-delta"),
);
expect(message?.text).toBe("assistant-only final text");
expect(message?.streaming).toBe(false);
Expand Down Expand Up @@ -733,7 +738,8 @@ describe("ProviderRuntimeIngestion", () => {
);
expect(
midThread?.messages.some(
(message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered",
(message: ProviderRuntimeTestMessage) =>
message.id === assistantMessageId("thread-1", "item", "item-buffered"),
),
).toBe(false);

Expand All @@ -754,11 +760,13 @@ describe("ProviderRuntimeIngestion", () => {
const thread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-buffered" && !message.streaming,
message.id === assistantMessageId("thread-1", "item", "item-buffered") &&
!message.streaming,
),
);
const message = thread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered",
(entry: ProviderRuntimeTestMessage) =>
entry.id === assistantMessageId("thread-1", "item", "item-buffered"),
);
expect(message?.text).toBe("buffer me");
expect(message?.streaming).toBe(false);
Expand Down Expand Up @@ -819,13 +827,14 @@ describe("ProviderRuntimeIngestion", () => {
const liveThread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-streaming-mode" &&
message.id === assistantMessageId("thread-1", "item", "item-streaming-mode") &&
message.streaming &&
message.text === "hello live",
),
);
const liveMessage = liveThread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode",
(entry: ProviderRuntimeTestMessage) =>
entry.id === assistantMessageId("thread-1", "item", "item-streaming-mode"),
);
expect(liveMessage?.streaming).toBe(true);

Expand All @@ -847,11 +856,13 @@ describe("ProviderRuntimeIngestion", () => {
const finalThread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-streaming-mode" && !message.streaming,
message.id === assistantMessageId("thread-1", "item", "item-streaming-mode") &&
!message.streaming,
),
);
const finalMessage = finalThread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode",
(entry: ProviderRuntimeTestMessage) =>
entry.id === assistantMessageId("thread-1", "item", "item-streaming-mode"),
);
expect(finalMessage?.text).toBe("hello live");
expect(finalMessage?.streaming).toBe(false);
Expand Down Expand Up @@ -907,11 +918,13 @@ describe("ProviderRuntimeIngestion", () => {
const thread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-buffer-spill" && !message.streaming,
message.id === assistantMessageId("thread-1", "item", "item-buffer-spill") &&
!message.streaming,
),
);
const message = thread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffer-spill",
(entry: ProviderRuntimeTestMessage) =>
entry.id === assistantMessageId("thread-1", "item", "item-buffer-spill"),
);
expect(message?.text.length).toBe(oversizedText.length);
expect(message?.text).toBe(oversizedText);
Expand Down Expand Up @@ -983,7 +996,8 @@ describe("ProviderRuntimeIngestion", () => {
thread.session?.activeTurnId === null &&
thread.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === "assistant:item-complete-dedup" && !message.streaming,
message.id === assistantMessageId("thread-1", "item", "item-complete-dedup") &&
!message.streaming,
),
);

Expand All @@ -997,7 +1011,7 @@ describe("ProviderRuntimeIngestion", () => {
return false;
}
return (
event.payload.messageId === "assistant:item-complete-dedup" &&
event.payload.messageId === assistantMessageId("thread-1", "item", "item-complete-dedup") &&
event.payload.streaming === false
);
});
Expand Down Expand Up @@ -1323,10 +1337,192 @@ describe("ProviderRuntimeIngestion", () => {
(entry: ProviderRuntimeTestCheckpoint) => entry.turnId === "turn-p1",
);
expect(checkpoint?.status).toBe("missing");
expect(checkpoint?.assistantMessageId).toBe("assistant:item-p1-assistant");
expect(checkpoint?.assistantMessageId).toBe(
assistantMessageId("thread-1", "item", "item-p1-assistant"),
);
expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated");
});

it("keeps assistant message identities isolated across threads when item IDs are reused", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.create",
commandId: CommandId.makeUnsafe("cmd-thread-create-2"),
threadId: asThreadId("thread-2"),
projectId: asProjectId("project-1"),
title: "Thread 2",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
}),
);
await Effect.runPromise(
harness.engine.dispatch({
type: "thread.session.set",
commandId: CommandId.makeUnsafe("cmd-session-seed-2"),
threadId: asThreadId("thread-2"),
session: {
threadId: asThreadId("thread-2"),
status: "ready",
providerName: "codex",
runtimeMode: "approval-required",
activeTurnId: null,
updatedAt: now,
lastError: null,
},
createdAt: now,
}),
);

harness.emit({
type: "content.delta",
eventId: asEventId("evt-shared-item-thread-1"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-shared-1"),
itemId: asItemId("item-shared"),
payload: {
streamKind: "assistant_text",
delta: "first thread text",
},
});
harness.emit({
type: "content.delta",
eventId: asEventId("evt-shared-item-thread-2"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-2"),
turnId: asTurnId("turn-shared-2"),
itemId: asItemId("item-shared"),
payload: {
streamKind: "assistant_text",
delta: "second thread text",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-shared-item-thread-1-complete"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-shared-1"),
itemId: asItemId("item-shared"),
payload: {
itemType: "assistant_message",
status: "completed",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-shared-item-thread-2-complete"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-2"),
turnId: asTurnId("turn-shared-2"),
itemId: asItemId("item-shared"),
payload: {
itemType: "assistant_message",
status: "completed",
},
});

await harness.drain();

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const firstThread = readModel.threads.find((thread) => thread.id === asThreadId("thread-1"));
const secondThread = readModel.threads.find((thread) => thread.id === asThreadId("thread-2"));

expect(firstThread?.messages.find((message) => message.role === "assistant")).toMatchObject({
id: assistantMessageId("thread-1", "item", "item-shared"),
text: "first thread text",
streaming: false,
});
expect(secondThread?.messages.find((message) => message.role === "assistant")).toMatchObject({
id: assistantMessageId("thread-2", "item", "item-shared"),
text: "second thread text",
streaming: false,
});
});

it("reuses legacy persisted assistant IDs when runtime resumes after upgrade", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
const legacyAssistantMessageId = asMessageId("assistant:item-legacy");

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.message.assistant.delta",
commandId: CommandId.makeUnsafe("cmd-legacy-assistant-delta"),
threadId: asThreadId("thread-1"),
messageId: legacyAssistantMessageId,
delta: "hello",
turnId: asTurnId("turn-legacy"),
createdAt: now,
}),
);
await Effect.runPromise(
harness.engine.dispatch({
type: "thread.message.assistant.complete",
commandId: CommandId.makeUnsafe("cmd-legacy-assistant-complete"),
threadId: asThreadId("thread-1"),
messageId: legacyAssistantMessageId,
turnId: asTurnId("turn-legacy"),
createdAt: now,
}),
);

harness.emit({
type: "content.delta",
eventId: asEventId("evt-legacy-assistant-delta"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-legacy"),
itemId: asItemId("item-legacy"),
payload: {
streamKind: "assistant_text",
delta: " world",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-legacy-assistant-complete"),
provider: "codex",
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-legacy"),
itemId: asItemId("item-legacy"),
payload: {
itemType: "assistant_message",
status: "completed",
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === legacyAssistantMessageId && message.text === "hello world",
),
);

const legacyMessages = thread.messages.filter(
(message: ProviderRuntimeTestMessage) => message.id === legacyAssistantMessageId,
);
expect(legacyMessages).toHaveLength(1);
expect(
thread.messages.some(
(message) => message.id === assistantMessageId("thread-1", "item", "item-legacy"),
),
).toBe(false);
});

it("projects Codex task lifecycle chunks into thread activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
Loading
Loading