From f17c57920343699cf0360036807bcdb5d14cd7f8 Mon Sep 17 00:00:00 2001 From: BinBandit Date: Wed, 11 Mar 2026 14:45:14 +1100 Subject: [PATCH] fix: isolate assistant projection identities per thread --- .../orchestration/Layers/CheckpointReactor.ts | 24 +- .../Layers/ProviderRuntimeIngestion.test.ts | 228 ++++++++++++++++-- .../Layers/ProviderRuntimeIngestion.ts | 13 +- .../assistantMessageIdentity.test.ts | 124 ++++++++++ .../orchestration/assistantMessageIdentity.ts | 114 +++++++++ .../Layers/ProjectionThreadMessages.test.ts | 43 ++++ .../Layers/ProjectionThreadMessages.ts | 6 +- apps/server/src/persistence/Migrations.ts | 2 + ...ProjectionThreadMessagesThreadScopedKey.ts | 64 +++++ .../Services/ProjectionThreadMessages.ts | 2 +- apps/server/src/wsServer.test.ts | 5 +- 11 files changed, 581 insertions(+), 44 deletions(-) create mode 100644 apps/server/src/orchestration/assistantMessageIdentity.test.ts create mode 100644 apps/server/src/orchestration/assistantMessageIdentity.ts create mode 100644 apps/server/src/persistence/Migrations/014_ProjectionThreadMessagesThreadScopedKey.ts diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index ab38c1033..b3910344f 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -2,6 +2,7 @@ import { CommandId, EventId, MessageId, + type OrchestrationThread, type ProjectId, ThreadId, TurnId, @@ -19,6 +20,7 @@ import { import { clearWorkspaceIndexCache } from "../../workspaceEntries.ts"; import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { resolveCheckpointAssistantMessageId } from "../assistantMessageIdentity.ts"; import { CheckpointReactor, type CheckpointReactorShape } from "../Services/CheckpointReactor.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts"; @@ -190,13 +192,7 @@ const make = Effect.gen(function* () { const captureAndDispatchCheckpoint = Effect.fnUntraced(function* (input: { readonly threadId: ThreadId; readonly turnId: TurnId; - readonly thread: { - readonly messages: ReadonlyArray<{ - readonly id: MessageId; - readonly role: string; - readonly turnId: TurnId | null; - }>; - }; + readonly thread: Pick; readonly cwd: string; readonly turnCount: number; readonly status: "ready" | "missing" | "error"; @@ -262,12 +258,14 @@ const make = Effect.gen(function* () { ), ); - const assistantMessageId = - input.assistantMessageId ?? - input.thread.messages - .toReversed() - .find((entry) => entry.role === "assistant" && entry.turnId === input.turnId)?.id ?? - MessageId.makeUnsafe(`assistant:${input.turnId}`); + const assistantMessageId = resolveCheckpointAssistantMessageId({ + thread: input.thread, + threadId: input.threadId, + turnId: input.turnId, + ...(input.assistantMessageId !== undefined + ? { assistantMessageId: input.assistantMessageId } + : {}), + }); yield* orchestrationEngine.dispatch({ type: "thread.turn.diff.complete", diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index b6b48c7ed..ab262ff1d 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -41,6 +41,8 @@ const asEventId = (value: string): EventId => EventId.makeUnsafe(value); const asMessageId = (value: string): MessageId => MessageId.makeUnsafe(value); const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value); const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value); +const assistantMessageId = (threadId: string, scope: string, value: string): MessageId => + asMessageId(`assistant:${threadId}:${scope}:${value}`); type LegacyProviderRuntimeEvent = { readonly type: string; @@ -556,11 +558,12 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.engine, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-1" && !message.streaming, + message.id === assistantMessageId("thread-1", "item", "item-1") && !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-1", + (entry: ProviderRuntimeTestMessage) => + entry.id === assistantMessageId("thread-1", "item", "item-1"), ); expect(message?.text).toBe("hello world"); expect(message?.streaming).toBe(false); @@ -588,11 +591,13 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.engine, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-no-delta" && !message.streaming, + message.id === assistantMessageId("thread-1", "item", "item-no-delta") && + !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-no-delta", + (entry: ProviderRuntimeTestMessage) => + entry.id === assistantMessageId("thread-1", "item", "item-no-delta"), ); expect(message?.text).toBe("assistant-only final text"); expect(message?.streaming).toBe(false); @@ -733,7 +738,8 @@ describe("ProviderRuntimeIngestion", () => { ); expect( midThread?.messages.some( - (message: ProviderRuntimeTestMessage) => message.id === "assistant:item-buffered", + (message: ProviderRuntimeTestMessage) => + message.id === assistantMessageId("thread-1", "item", "item-buffered"), ), ).toBe(false); @@ -754,11 +760,13 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.engine, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffered" && !message.streaming, + message.id === assistantMessageId("thread-1", "item", "item-buffered") && + !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffered", + (entry: ProviderRuntimeTestMessage) => + entry.id === assistantMessageId("thread-1", "item", "item-buffered"), ); expect(message?.text).toBe("buffer me"); expect(message?.streaming).toBe(false); @@ -819,13 +827,14 @@ describe("ProviderRuntimeIngestion", () => { const liveThread = await waitForThread(harness.engine, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-mode" && + message.id === assistantMessageId("thread-1", "item", "item-streaming-mode") && message.streaming && message.text === "hello live", ), ); const liveMessage = liveThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", + (entry: ProviderRuntimeTestMessage) => + entry.id === assistantMessageId("thread-1", "item", "item-streaming-mode"), ); expect(liveMessage?.streaming).toBe(true); @@ -847,11 +856,13 @@ describe("ProviderRuntimeIngestion", () => { const finalThread = await waitForThread(harness.engine, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-streaming-mode" && !message.streaming, + message.id === assistantMessageId("thread-1", "item", "item-streaming-mode") && + !message.streaming, ), ); const finalMessage = finalThread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-streaming-mode", + (entry: ProviderRuntimeTestMessage) => + entry.id === assistantMessageId("thread-1", "item", "item-streaming-mode"), ); expect(finalMessage?.text).toBe("hello live"); expect(finalMessage?.streaming).toBe(false); @@ -907,11 +918,13 @@ describe("ProviderRuntimeIngestion", () => { const thread = await waitForThread(harness.engine, (entry) => entry.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-buffer-spill" && !message.streaming, + message.id === assistantMessageId("thread-1", "item", "item-buffer-spill") && + !message.streaming, ), ); const message = thread.messages.find( - (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-buffer-spill", + (entry: ProviderRuntimeTestMessage) => + entry.id === assistantMessageId("thread-1", "item", "item-buffer-spill"), ); expect(message?.text.length).toBe(oversizedText.length); expect(message?.text).toBe(oversizedText); @@ -983,7 +996,8 @@ describe("ProviderRuntimeIngestion", () => { thread.session?.activeTurnId === null && thread.messages.some( (message: ProviderRuntimeTestMessage) => - message.id === "assistant:item-complete-dedup" && !message.streaming, + message.id === assistantMessageId("thread-1", "item", "item-complete-dedup") && + !message.streaming, ), ); @@ -997,7 +1011,7 @@ describe("ProviderRuntimeIngestion", () => { return false; } return ( - event.payload.messageId === "assistant:item-complete-dedup" && + event.payload.messageId === assistantMessageId("thread-1", "item", "item-complete-dedup") && event.payload.streaming === false ); }); @@ -1323,10 +1337,192 @@ describe("ProviderRuntimeIngestion", () => { (entry: ProviderRuntimeTestCheckpoint) => entry.turnId === "turn-p1", ); expect(checkpoint?.status).toBe("missing"); - expect(checkpoint?.assistantMessageId).toBe("assistant:item-p1-assistant"); + expect(checkpoint?.assistantMessageId).toBe( + assistantMessageId("thread-1", "item", "item-p1-assistant"), + ); expect(checkpoint?.checkpointRef).toBe("provider-diff:evt-turn-diff-updated"); }); + it("keeps assistant message identities isolated across threads when item IDs are reused", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-create-2"), + threadId: asThreadId("thread-2"), + projectId: asProjectId("project-1"), + title: "Thread 2", + model: "gpt-5-codex", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt: now, + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.makeUnsafe("cmd-session-seed-2"), + threadId: asThreadId("thread-2"), + session: { + threadId: asThreadId("thread-2"), + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + updatedAt: now, + lastError: null, + }, + createdAt: now, + }), + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-shared-item-thread-1"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-shared-1"), + itemId: asItemId("item-shared"), + payload: { + streamKind: "assistant_text", + delta: "first thread text", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-shared-item-thread-2"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-2"), + turnId: asTurnId("turn-shared-2"), + itemId: asItemId("item-shared"), + payload: { + streamKind: "assistant_text", + delta: "second thread text", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-shared-item-thread-1-complete"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-shared-1"), + itemId: asItemId("item-shared"), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-shared-item-thread-2-complete"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-2"), + turnId: asTurnId("turn-shared-2"), + itemId: asItemId("item-shared"), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + await harness.drain(); + + const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const firstThread = readModel.threads.find((thread) => thread.id === asThreadId("thread-1")); + const secondThread = readModel.threads.find((thread) => thread.id === asThreadId("thread-2")); + + expect(firstThread?.messages.find((message) => message.role === "assistant")).toMatchObject({ + id: assistantMessageId("thread-1", "item", "item-shared"), + text: "first thread text", + streaming: false, + }); + expect(secondThread?.messages.find((message) => message.role === "assistant")).toMatchObject({ + id: assistantMessageId("thread-2", "item", "item-shared"), + text: "second thread text", + streaming: false, + }); + }); + + it("reuses legacy persisted assistant IDs when runtime resumes after upgrade", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + const legacyAssistantMessageId = asMessageId("assistant:item-legacy"); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.message.assistant.delta", + commandId: CommandId.makeUnsafe("cmd-legacy-assistant-delta"), + threadId: asThreadId("thread-1"), + messageId: legacyAssistantMessageId, + delta: "hello", + turnId: asTurnId("turn-legacy"), + createdAt: now, + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.message.assistant.complete", + commandId: CommandId.makeUnsafe("cmd-legacy-assistant-complete"), + threadId: asThreadId("thread-1"), + messageId: legacyAssistantMessageId, + turnId: asTurnId("turn-legacy"), + createdAt: now, + }), + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-legacy-assistant-delta"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-legacy"), + itemId: asItemId("item-legacy"), + payload: { + streamKind: "assistant_text", + delta: " world", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-legacy-assistant-complete"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-legacy"), + itemId: asItemId("item-legacy"), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === legacyAssistantMessageId && message.text === "hello world", + ), + ); + + const legacyMessages = thread.messages.filter( + (message: ProviderRuntimeTestMessage) => message.id === legacyAssistantMessageId, + ); + expect(legacyMessages).toHaveLength(1); + expect( + thread.messages.some( + (message) => message.id === assistantMessageId("thread-1", "item", "item-legacy"), + ), + ).toBe(false); + }); + it("projects Codex task lifecycle chunks into thread activities", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 0dd10dcb7..79188847f 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -16,6 +16,7 @@ import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { isGitRepository } from "../../git/isRepo.ts"; +import { resolveRuntimeAssistantMessageId } from "../assistantMessageIdentity.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { ProviderRuntimeIngestionService, @@ -891,9 +892,7 @@ const make = Effect.gen(function* () { event.type === "turn.proposed.delta" ? event.payload.delta : undefined; if (assistantDelta && assistantDelta.length > 0) { - const assistantMessageId = MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); + const assistantMessageId = resolveRuntimeAssistantMessageId(thread, event); const turnId = toTurnId(event.turnId); if (turnId) { yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); @@ -934,9 +933,7 @@ const make = Effect.gen(function* () { const assistantCompletion = event.type === "item.completed" && event.payload.itemType === "assistant_message" ? { - messageId: MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ), + messageId: resolveRuntimeAssistantMessageId(thread, event), fallbackText: event.payload.detail, } : undefined; @@ -1071,9 +1068,7 @@ const make = Effect.gen(function* () { if (thread.checkpoints.some((c) => c.turnId === turnId)) { // Already tracked; no-op. } else { - const assistantMessageId = MessageId.makeUnsafe( - `assistant:${event.itemId ?? event.turnId ?? event.eventId}`, - ); + const assistantMessageId = resolveRuntimeAssistantMessageId(thread, event); const maxTurnCount = thread.checkpoints.reduce( (max, c) => Math.max(max, c.checkpointTurnCount), 0, diff --git a/apps/server/src/orchestration/assistantMessageIdentity.test.ts b/apps/server/src/orchestration/assistantMessageIdentity.test.ts new file mode 100644 index 000000000..1c46ef620 --- /dev/null +++ b/apps/server/src/orchestration/assistantMessageIdentity.test.ts @@ -0,0 +1,124 @@ +import { + DEFAULT_PROVIDER_INTERACTION_MODE, + CheckpointRef, + MessageId, + ProjectId, + ThreadId, + TurnId, + type OrchestrationThread, + type ProviderRuntimeEvent, +} from "@t3tools/contracts"; +import { describe, expect, it } from "vitest"; + +import { + assistantMessageIdFromRuntimeEvent, + resolveCheckpointAssistantMessageId, + resolveRuntimeAssistantMessageId, +} from "./assistantMessageIdentity.ts"; + +const asMessageId = (value: string): MessageId => MessageId.makeUnsafe(value); +const asProjectId = (value: string): ProjectId => ProjectId.makeUnsafe(value); +const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value); +const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value); + +function makeThread(overrides: Partial = {}): OrchestrationThread { + return { + id: asThreadId("thread-1"), + projectId: asProjectId("project-1"), + title: "Thread", + model: "gpt-5-codex", + runtimeMode: "approval-required", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + branch: null, + worktreePath: null, + latestTurn: null, + createdAt: "2026-03-11T00:00:00.000Z", + updatedAt: "2026-03-11T00:00:00.000Z", + deletedAt: null, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + session: null, + ...overrides, + }; +} + +function makeRuntimeEvent(overrides: Partial = {}): ProviderRuntimeEvent { + return { + type: "item.completed", + eventId: "evt-1", + provider: "codex", + createdAt: "2026-03-11T00:00:01.000Z", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-1"), + itemId: "item-1", + payload: { + itemType: "assistant_message", + status: "completed", + }, + ...overrides, + } as ProviderRuntimeEvent; +} + +describe("assistantMessageIdentity", () => { + it("uses thread-scoped assistant IDs for new runtime messages", () => { + const event = makeRuntimeEvent(); + + expect(assistantMessageIdFromRuntimeEvent(event, asThreadId("thread-1"))).toBe( + asMessageId("assistant:thread-1:item:item-1"), + ); + }); + + it("reuses legacy persisted assistant message IDs during runtime resume", () => { + const legacyMessageId = asMessageId("assistant:item-1"); + const thread = makeThread({ + messages: [ + { + id: legacyMessageId, + role: "assistant", + text: "hello", + turnId: asTurnId("turn-1"), + streaming: false, + createdAt: "2026-03-11T00:00:00.000Z", + updatedAt: "2026-03-11T00:00:00.000Z", + }, + ], + }); + + expect(resolveRuntimeAssistantMessageId(thread, makeRuntimeEvent())).toBe(legacyMessageId); + }); + + it("reuses legacy checkpoint-linked assistant IDs before falling back", () => { + const legacyMessageId = asMessageId("assistant:turn-1"); + const thread = makeThread({ + checkpoints: [ + { + turnId: asTurnId("turn-1"), + checkpointTurnCount: 1, + checkpointRef: CheckpointRef.makeUnsafe("refs/t3/checkpoints/thread-1/turn/1"), + status: "ready", + files: [], + assistantMessageId: legacyMessageId, + completedAt: "2026-03-11T00:00:00.000Z", + }, + ], + }); + + expect( + resolveCheckpointAssistantMessageId({ + thread, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-1"), + }), + ).toBe(legacyMessageId); + + expect( + resolveCheckpointAssistantMessageId({ + thread: makeThread(), + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-2"), + }), + ).toBe(asMessageId("assistant:thread-1:turn:turn-2")); + }); +}); diff --git a/apps/server/src/orchestration/assistantMessageIdentity.ts b/apps/server/src/orchestration/assistantMessageIdentity.ts new file mode 100644 index 000000000..a36f51730 --- /dev/null +++ b/apps/server/src/orchestration/assistantMessageIdentity.ts @@ -0,0 +1,114 @@ +import { + MessageId, + type OrchestrationThread, + ThreadId, + TurnId, + type ProviderRuntimeEvent, +} from "@t3tools/contracts"; + +export type AssistantMessageIdentityThread = Pick< + OrchestrationThread, + "id" | "messages" | "latestTurn" | "checkpoints" +>; + +function toTurnId(value: TurnId | string | undefined): TurnId | undefined { + return value === undefined ? undefined : TurnId.makeUnsafe(String(value)); +} + +function uniqueMessageIds(ids: ReadonlyArray): Array { + return Array.from(new Set(ids)); +} + +function legacyAssistantMessageIdsFromRuntimeEvent(event: ProviderRuntimeEvent): Array { + const ids: Array = []; + if (event.itemId) { + ids.push(MessageId.makeUnsafe(`assistant:${event.itemId}`)); + } + const turnId = toTurnId(event.turnId); + if (turnId) { + ids.push(MessageId.makeUnsafe(`assistant:${turnId}`)); + } + ids.push(MessageId.makeUnsafe(`assistant:${event.eventId}`)); + return uniqueMessageIds(ids); +} + +function assistantMessageIdFromTurn(threadId: ThreadId, turnId: TurnId): MessageId { + return MessageId.makeUnsafe(`assistant:${threadId}:turn:${turnId}`); +} + +function persistedAssistantMessageIdForTurn( + thread: Pick, + turnId: TurnId, +): MessageId | undefined { + const messageId = thread.messages + .toReversed() + .find((entry) => entry.role === "assistant" && entry.turnId === turnId)?.id; + if (messageId) { + return messageId; + } + + const latestTurnMessageId = + thread.latestTurn?.turnId === turnId ? thread.latestTurn.assistantMessageId : null; + if (latestTurnMessageId) { + return latestTurnMessageId; + } + + return ( + thread.checkpoints.toReversed().find((entry) => entry.turnId === turnId)?.assistantMessageId ?? + undefined + ); +} + +export function assistantMessageIdFromRuntimeEvent( + event: ProviderRuntimeEvent, + threadId: ThreadId, +): MessageId { + const turnId = toTurnId(event.turnId); + if (event.itemId) { + return MessageId.makeUnsafe(`assistant:${threadId}:item:${event.itemId}`); + } + if (turnId) { + return assistantMessageIdFromTurn(threadId, turnId); + } + return MessageId.makeUnsafe(`assistant:${threadId}:event:${event.eventId}`); +} + +export function resolveRuntimeAssistantMessageId( + thread: AssistantMessageIdentityThread, + event: ProviderRuntimeEvent, +): MessageId { + const turnId = toTurnId(event.turnId); + if (turnId) { + const persistedMessageId = persistedAssistantMessageIdForTurn(thread, turnId); + if (persistedMessageId) { + return persistedMessageId; + } + } + + const nextMessageId = assistantMessageIdFromRuntimeEvent(event, thread.id); + const matchingExistingMessageId = thread.messages + .toReversed() + .find( + (entry) => + entry.role === "assistant" && + uniqueMessageIds([ + nextMessageId, + ...legacyAssistantMessageIdsFromRuntimeEvent(event), + ]).includes(entry.id), + )?.id; + + return matchingExistingMessageId ?? nextMessageId; +} + +export function resolveCheckpointAssistantMessageId(input: { + thread: Pick; + threadId: ThreadId; + turnId: TurnId; + assistantMessageId?: MessageId | null; +}): MessageId { + return ( + input.assistantMessageId ?? + persistedAssistantMessageIdForTurn(input.thread, input.turnId) ?? + assistantMessageIdFromTurn(input.threadId, input.turnId) + ); +} diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts index b761387d4..6510dc21d 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts @@ -103,4 +103,47 @@ layer("ProjectionThreadMessageRepository", (it) => { assert.deepEqual(rows[0]?.attachments, []); }), ); + + it.effect("keeps same message IDs isolated per thread", () => + Effect.gen(function* () { + const repository = yield* ProjectionThreadMessageRepository; + const firstThreadId = ThreadId.makeUnsafe("thread-shared-message-a"); + const secondThreadId = ThreadId.makeUnsafe("thread-shared-message-b"); + const messageId = MessageId.makeUnsafe("message-shared-across-threads"); + + yield* repository.upsert({ + messageId, + threadId: firstThreadId, + turnId: null, + role: "assistant", + text: "first thread text", + isStreaming: false, + createdAt: "2026-03-11T00:00:00.000Z", + updatedAt: "2026-03-11T00:00:00.000Z", + }); + + yield* repository.upsert({ + messageId, + threadId: secondThreadId, + turnId: null, + role: "assistant", + text: "second thread text", + isStreaming: false, + createdAt: "2026-03-11T00:00:01.000Z", + updatedAt: "2026-03-11T00:00:01.000Z", + }); + + const firstRows = yield* repository.listByThreadId({ threadId: firstThreadId }); + const secondRows = yield* repository.listByThreadId({ threadId: secondThreadId }); + + assert.deepEqual( + firstRows.map((row) => row.text), + ["first thread text"], + ); + assert.deepEqual( + secondRows.map((row) => row.text), + ["second thread text"], + ); + }), + ); }); diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts index 6f0b25ddf..f573b30f5 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts @@ -50,16 +50,16 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { ( SELECT attachments_json FROM projection_thread_messages - WHERE message_id = ${row.messageId} + WHERE thread_id = ${row.threadId} + AND message_id = ${row.messageId} ) ), ${row.isStreaming ? 1 : 0}, ${row.createdAt}, ${row.updatedAt} ) - ON CONFLICT (message_id) + ON CONFLICT (thread_id, message_id) DO UPDATE SET - thread_id = excluded.thread_id, turn_id = excluded.turn_id, role = excluded.role, text = excluded.text, diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 7deb890dd..f57db9902 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -25,6 +25,7 @@ import Migration0010 from "./Migrations/010_ProjectionThreadsRuntimeMode.ts"; import Migration0011 from "./Migrations/011_OrchestrationThreadCreatedRuntimeMode.ts"; import Migration0012 from "./Migrations/012_ProjectionThreadsInteractionMode.ts"; import Migration0013 from "./Migrations/013_ProjectionThreadProposedPlans.ts"; +import Migration0014 from "./Migrations/014_ProjectionThreadMessagesThreadScopedKey.ts"; import { Effect } from "effect"; /** @@ -51,6 +52,7 @@ const loader = Migrator.fromRecord({ "11_OrchestrationThreadCreatedRuntimeMode": Migration0011, "12_ProjectionThreadsInteractionMode": Migration0012, "13_ProjectionThreadProposedPlans": Migration0013, + "14_ProjectionThreadMessagesThreadScopedKey": Migration0014, }); /** diff --git a/apps/server/src/persistence/Migrations/014_ProjectionThreadMessagesThreadScopedKey.ts b/apps/server/src/persistence/Migrations/014_ProjectionThreadMessagesThreadScopedKey.ts new file mode 100644 index 000000000..7aa69381a --- /dev/null +++ b/apps/server/src/persistence/Migrations/014_ProjectionThreadMessagesThreadScopedKey.ts @@ -0,0 +1,64 @@ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as Effect from "effect/Effect"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + DROP INDEX IF EXISTS idx_projection_thread_messages_thread_created + `; + + yield* sql` + ALTER TABLE projection_thread_messages + RENAME TO projection_thread_messages_legacy + `; + + yield* sql` + CREATE TABLE projection_thread_messages ( + thread_id TEXT NOT NULL, + message_id TEXT NOT NULL, + turn_id TEXT, + role TEXT NOT NULL, + text TEXT NOT NULL, + is_streaming INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + attachments_json TEXT, + PRIMARY KEY (thread_id, message_id) + ) + `; + + yield* sql` + INSERT INTO projection_thread_messages ( + thread_id, + message_id, + turn_id, + role, + text, + is_streaming, + created_at, + updated_at, + attachments_json + ) + SELECT + thread_id, + message_id, + turn_id, + role, + text, + is_streaming, + created_at, + updated_at, + attachments_json + FROM projection_thread_messages_legacy + `; + + yield* sql` + DROP TABLE projection_thread_messages_legacy + `; + + yield* sql` + CREATE INDEX idx_projection_thread_messages_thread_created + ON projection_thread_messages(thread_id, created_at) + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts index 00b1d399c..4aaf3524a 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts @@ -49,7 +49,7 @@ export interface ProjectionThreadMessageRepositoryShape { /** * Insert or replace a projected thread message row. * - * Upserts by `messageId`. + * Upserts by `(threadId, messageId)`. */ readonly upsert: ( message: ProjectionThreadMessage, diff --git a/apps/server/src/wsServer.test.ts b/apps/server/src/wsServer.test.ts index f12792a31..34f7298fa 100644 --- a/apps/server/src/wsServer.test.ts +++ b/apps/server/src/wsServer.test.ts @@ -1308,7 +1308,8 @@ describe("WebSocket Server", () => { const domainPush = await waitForPush(ws, ORCHESTRATION_WS_CHANNELS.domainEvent, (push) => { const event = push.data as { type?: string; payload?: { messageId?: string; text?: string } }; return ( - event.type === "thread.message-sent" && event.payload?.messageId === "assistant:item-1" + event.type === "thread.message-sent" && + event.payload?.messageId === "assistant:thread-1:item:item-1" ); }); @@ -1317,7 +1318,7 @@ describe("WebSocket Server", () => { payload: { messageId: string; text: string }; }; expect(domainEvent.type).toBe("thread.message-sent"); - expect(domainEvent.payload.messageId).toBe("assistant:item-1"); + expect(domainEvent.payload.messageId).toBe("assistant:thread-1:item:item-1"); expect(domainEvent.payload.text).toBe("hello from runtime"); });