diff --git a/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts b/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts index 18e6166c1cd..13e1b9d5581 100644 --- a/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts +++ b/apps/server/src/provider/Layers/ProviderSessionReaper.test.ts @@ -18,6 +18,10 @@ import * as Stream from "effect/Stream"; import { afterEach, describe, expect, it, vi } from "vite-plus/test"; import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; +import { + OrchestrationEngineService, + type OrchestrationEngineShape, +} from "../../orchestration/Services/OrchestrationEngine.ts"; import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; import { ProviderSessionRuntimeRepositoryLive } from "../../persistence/Layers/ProviderSessionRuntime.ts"; import { ProviderSessionRuntimeRepository } from "../../persistence/Services/ProviderSessionRuntime.ts"; @@ -118,7 +122,7 @@ function makeReadModel( describe("ProviderSessionReaper", () => { let runtime: ManagedRuntime.ManagedRuntime< - ProviderSessionReaper | ProviderSessionRuntimeRepository, + ProviderSessionReaper | ProviderSessionRuntimeRepository | OrchestrationEngineService, unknown > | null = null; let scope: Scope.Closeable | null = null; @@ -141,6 +145,9 @@ describe("ProviderSessionReaper", () => { }) => ReturnType; }) { const stoppedThreadIds = new Set(); + const dispatch = vi.fn(() => + Effect.succeed({ sequence: 1 }), + ); const stopSession = vi.fn( (request) => (input.stopSessionImplementation @@ -189,6 +196,13 @@ describe("ProviderSessionReaper", () => { Layer.provideMerge(providerSessionDirectoryLayer), Layer.provideMerge(runtimeRepositoryLayer), Layer.provideMerge(Layer.succeed(ProviderService, providerService)), + Layer.provideMerge( + Layer.succeed(OrchestrationEngineService, { + dispatch, + readEvents: () => Stream.empty, + streamDomainEvents: Stream.empty, + }), + ), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { getCommandReadModel: () => Effect.die("unused"), @@ -216,7 +230,7 @@ describe("ProviderSessionReaper", () => { ); runtime = ManagedRuntime.make(layer); - return { stopSession, stoppedThreadIds }; + return { dispatch, stopSession, stoppedThreadIds }; } it("reaps stale persisted sessions without active turns", async () => { @@ -404,10 +418,71 @@ describe("ProviderSessionReaper", () => { await Effect.runPromise(drainFibers); expect(harness.stopSession).not.toHaveBeenCalled(); + expect(harness.dispatch).not.toHaveBeenCalled(); const remaining = await runtime!.runPromise(repository.getByThreadId({ threadId })); expect(Option.isSome(remaining)).toBe(true); }); + it("reconciles stopped persisted sessions that still project as running", async () => { + const threadId = ThreadId.make("thread-reaper-stopped-projection-running"); + const turnId = TurnId.make("turn-reaper-stopped-projection-running"); + const now = "2026-01-01T00:00:00.000Z"; + const harness = await createHarness({ + readModel: makeReadModel([ + { + id: threadId, + session: { + threadId, + status: "running", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: turnId, + lastError: null, + updatedAt: now, + }, + }, + ]), + }); + const repository = await runtime!.runPromise(Effect.service(ProviderSessionRuntimeRepository)); + + await runtime!.runPromise( + repository.upsert({ + threadId, + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + adapterKey: "codex", + runtimeMode: "full-access", + status: "stopped", + lastSeenAt: "2026-04-14T00:00:00.000Z", + resumeCursor: { + opaque: "resume-stopped-projection-running", + }, + runtimePayload: null, + }), + ); + + const reaper = await runtime!.runPromise(Effect.service(ProviderSessionReaper)); + scope = await Effect.runPromise(Scope.make("sequential")); + await Effect.runPromise(reaper.start().pipe(Scope.provide(scope))); + + await waitFor(() => harness.dispatch.mock.calls.length === 1); + + expect(harness.stopSession).not.toHaveBeenCalled(); + expect(harness.dispatch.mock.calls[0]?.[0]).toMatchObject({ + type: "thread.session.set", + threadId, + session: { + threadId, + status: "stopped", + providerName: "codex", + providerInstanceId: ProviderInstanceId.make("codex"), + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + }, + }); + }); + it("continues reaping other sessions when one stop attempt fails", async () => { const failedThreadId = ThreadId.make("thread-reaper-stop-failure"); const reapedThreadId = ThreadId.make("thread-reaper-stop-success"); diff --git a/apps/server/src/provider/Layers/ProviderSessionReaper.ts b/apps/server/src/provider/Layers/ProviderSessionReaper.ts index ca396b40596..326938d77d1 100644 --- a/apps/server/src/provider/Layers/ProviderSessionReaper.ts +++ b/apps/server/src/provider/Layers/ProviderSessionReaper.ts @@ -1,10 +1,13 @@ import * as Clock from "effect/Clock"; +import { CommandId } from "@t3tools/contracts"; +import * as Crypto from "effect/Crypto"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Schedule from "effect/Schedule"; +import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts"; import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts"; import { ProviderSessionDirectory } from "../Services/ProviderSessionDirectory.ts"; import { @@ -25,7 +28,9 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) = Effect.gen(function* () { const providerService = yield* ProviderService; const directory = yield* ProviderSessionDirectory; + const orchestrationEngine = yield* OrchestrationEngineService; const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; + const crypto = yield* Crypto.Crypto; const inactivityThresholdMs = Math.max( 1, @@ -36,10 +41,43 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) = const sweep = Effect.gen(function* () { const bindings = yield* directory.listBindings(); const now = yield* Clock.currentTimeMillis; + const nowIso = new Date(now).toISOString(); let reapedCount = 0; for (const binding of bindings) { + const thread = yield* projectionSnapshotQuery + .getThreadShellById(binding.threadId) + .pipe(Effect.map(Option.getOrUndefined)); + if (binding.status === "stopped") { + if (thread?.session?.status === "running" || thread?.session?.status === "starting") { + const commandUuid = yield* crypto.randomUUIDv4; + yield* orchestrationEngine.dispatch({ + type: "thread.session.set", + commandId: CommandId.make( + `provider-session-reaper:stopped-reconcile:${binding.threadId}:${commandUuid}`, + ), + threadId: binding.threadId, + session: { + threadId: binding.threadId, + status: "stopped", + providerName: binding.provider, + ...(binding.providerInstanceId !== undefined + ? { providerInstanceId: binding.providerInstanceId } + : {}), + runtimeMode: thread.session.runtimeMode, + activeTurnId: null, + lastError: thread.session.lastError, + updatedAt: nowIso, + }, + createdAt: nowIso, + }); + yield* Effect.logInfo("provider.session.reaper.reconciled-stopped-projection", { + threadId: binding.threadId, + provider: binding.provider, + projectionStatus: thread.session.status, + }); + } continue; } @@ -58,9 +96,6 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) = continue; } - const thread = yield* projectionSnapshotQuery - .getThreadShellById(binding.threadId) - .pipe(Effect.map(Option.getOrUndefined)); if (thread?.session?.activeTurnId != null) { yield* Effect.logDebug("provider.session.reaper.skipped-active-turn", { threadId: binding.threadId,