diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 64955590235..7a25891a1da 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -448,6 +448,51 @@ describe("ProviderRuntimeIngestion", () => { expect(thread.session?.lastError).toBeNull(); }); + it("clears active turn when provider session becomes ready", async () => { + const harness = await createHarness(); + const now = "2026-01-01T00:00:00.000Z"; + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-session-ready"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-session-ready"), + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-session-ready", + 10_000, + ); + + harness.emit({ + type: "session.state.changed", + eventId: asEventId("evt-session-state-ready-with-active-turn"), + provider: ProviderDriverKind.make("codex"), + threadId: asThreadId("thread-1"), + createdAt: "2026-01-01T00:00:01.000Z", + payload: { + state: "ready", + }, + }); + + const thread = await waitForThread( + harness.readModel, + (entry) => + entry.session?.status === "ready" && + entry.session?.activeTurnId === null && + entry.session?.lastError === null, + 10_000, + ); + expect(thread.session?.status).toBe("ready"); + expect(thread.session?.activeTurnId).toBeNull(); + expect(thread.session?.lastError).toBeNull(); + }); + it("does not clear active turn when session/thread started arrives mid-turn", async () => { const harness = await createHarness(); const now = "2026-01-01T00:00:00.000Z"; @@ -466,6 +511,7 @@ describe("ProviderRuntimeIngestion", () => { (thread) => thread.session?.status === "running" && thread.session?.activeTurnId === "turn-midturn-lifecycle", + 10_000, ); harness.emit({ @@ -502,6 +548,7 @@ describe("ProviderRuntimeIngestion", () => { await waitForThread( harness.readModel, (thread) => thread.session?.status === "ready" && thread.session?.activeTurnId === null, + 10_000, ); }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3e5978f4846..c8949d796c4 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -245,6 +245,12 @@ function orchestrationSessionStatusFromRuntimeState( } } +function sessionStatusAllowsActiveTurn( + status: ReturnType, +): boolean { + return status === "starting" || status === "running"; +} + function requestKindFromCanonicalRequestType( requestType: string | undefined, ): "command" | "file-read" | "file-change" | undefined { @@ -1281,12 +1287,6 @@ const make = Effect.gen(function* () { event.type === "turn.started" || event.type === "turn.completed" ) { - const nextActiveTurnId = - event.type === "turn.started" - ? (eventTurnId ?? null) - : event.type === "turn.completed" || event.type === "session.exited" - ? null - : activeTurnId; const status = (() => { switch (event.type) { case "session.state.changed": @@ -1306,6 +1306,14 @@ const make = Effect.gen(function* () { return activeTurnId !== null ? "running" : "ready"; } })(); + const nextActiveTurnId = + event.type === "turn.started" + ? (eventTurnId ?? null) + : event.type === "turn.completed" || event.type === "session.exited" + ? null + : event.type === "session.state.changed" && !sessionStatusAllowsActiveTurn(status) + ? null + : activeTurnId; const lastError = event.type === "session.state.changed" && event.payload.state === "error" ? (event.payload.reason ?? thread.session?.lastError ?? "Provider session error")