Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions apps/server/src/provider/Layers/ProviderSessionReaper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import * as Stream from "effect/Stream";
import { afterEach, describe, expect, it, vi } from "vite-plus/test";

import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts";
import {
OrchestrationEngineService,
type OrchestrationEngineShape,
} from "../../orchestration/Services/OrchestrationEngine.ts";
import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts";
import { ProviderSessionRuntimeRepositoryLive } from "../../persistence/Layers/ProviderSessionRuntime.ts";
import { ProviderSessionRuntimeRepository } from "../../persistence/Services/ProviderSessionRuntime.ts";
Expand Down Expand Up @@ -118,7 +122,7 @@ function makeReadModel(

describe("ProviderSessionReaper", () => {
let runtime: ManagedRuntime.ManagedRuntime<
ProviderSessionReaper | ProviderSessionRuntimeRepository,
ProviderSessionReaper | ProviderSessionRuntimeRepository | OrchestrationEngineService,
unknown
> | null = null;
let scope: Scope.Closeable | null = null;
Expand All @@ -141,6 +145,9 @@ describe("ProviderSessionReaper", () => {
}) => ReturnType<ProviderServiceShape["stopSession"]>;
}) {
const stoppedThreadIds = new Set<ThreadId>();
const dispatch = vi.fn<OrchestrationEngineShape["dispatch"]>(() =>
Effect.succeed({ sequence: 1 }),
);
const stopSession = vi.fn<ProviderServiceShape["stopSession"]>(
(request) =>
(input.stopSessionImplementation
Expand Down Expand Up @@ -189,6 +196,13 @@ describe("ProviderSessionReaper", () => {
Layer.provideMerge(providerSessionDirectoryLayer),
Layer.provideMerge(runtimeRepositoryLayer),
Layer.provideMerge(Layer.succeed(ProviderService, providerService)),
Layer.provideMerge(
Layer.succeed(OrchestrationEngineService, {
dispatch,
readEvents: () => Stream.empty,
streamDomainEvents: Stream.empty,
}),
),
Layer.provideMerge(
Layer.succeed(ProjectionSnapshotQuery, {
getCommandReadModel: () => Effect.die("unused"),
Expand Down Expand Up @@ -216,7 +230,7 @@ describe("ProviderSessionReaper", () => {
);

runtime = ManagedRuntime.make(layer);
return { stopSession, stoppedThreadIds };
return { dispatch, stopSession, stoppedThreadIds };
}

it("reaps stale persisted sessions without active turns", async () => {
Expand Down Expand Up @@ -404,10 +418,71 @@ describe("ProviderSessionReaper", () => {
await Effect.runPromise(drainFibers);

expect(harness.stopSession).not.toHaveBeenCalled();
expect(harness.dispatch).not.toHaveBeenCalled();
const remaining = await runtime!.runPromise(repository.getByThreadId({ threadId }));
expect(Option.isSome(remaining)).toBe(true);
});

it("reconciles stopped persisted sessions that still project as running", async () => {
const threadId = ThreadId.make("thread-reaper-stopped-projection-running");
const turnId = TurnId.make("turn-reaper-stopped-projection-running");
const now = "2026-01-01T00:00:00.000Z";
const harness = await createHarness({
readModel: makeReadModel([
{
id: threadId,
session: {
threadId,
status: "running",
providerName: "codex",
runtimeMode: "full-access",
activeTurnId: turnId,
lastError: null,
updatedAt: now,
},
},
]),
});
const repository = await runtime!.runPromise(Effect.service(ProviderSessionRuntimeRepository));

await runtime!.runPromise(
repository.upsert({
threadId,
providerName: "codex",
providerInstanceId: ProviderInstanceId.make("codex"),
adapterKey: "codex",
runtimeMode: "full-access",
status: "stopped",
lastSeenAt: "2026-04-14T00:00:00.000Z",
resumeCursor: {
opaque: "resume-stopped-projection-running",
},
runtimePayload: null,
}),
);

const reaper = await runtime!.runPromise(Effect.service(ProviderSessionReaper));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));

await waitFor(() => harness.dispatch.mock.calls.length === 1);

expect(harness.stopSession).not.toHaveBeenCalled();
expect(harness.dispatch.mock.calls[0]?.[0]).toMatchObject({
type: "thread.session.set",
threadId,
session: {
threadId,
status: "stopped",
providerName: "codex",
providerInstanceId: ProviderInstanceId.make("codex"),
runtimeMode: "full-access",
activeTurnId: null,
lastError: null,
},
});
});

it("continues reaping other sessions when one stop attempt fails", async () => {
const failedThreadId = ThreadId.make("thread-reaper-stop-failure");
const reapedThreadId = ThreadId.make("thread-reaper-stop-success");
Expand Down
41 changes: 38 additions & 3 deletions apps/server/src/provider/Layers/ProviderSessionReaper.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import * as Clock from "effect/Clock";
import { CommandId } from "@t3tools/contracts";
import * as Crypto from "effect/Crypto";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Schedule from "effect/Schedule";

import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts";
import { ProjectionSnapshotQuery } from "../../orchestration/Services/ProjectionSnapshotQuery.ts";
import { ProviderSessionDirectory } from "../Services/ProviderSessionDirectory.ts";
import {
Expand All @@ -25,7 +28,9 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) =
Effect.gen(function* () {
const providerService = yield* ProviderService;
const directory = yield* ProviderSessionDirectory;
const orchestrationEngine = yield* OrchestrationEngineService;
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
const crypto = yield* Crypto.Crypto;

const inactivityThresholdMs = Math.max(
1,
Expand All @@ -36,10 +41,43 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) =
const sweep = Effect.gen(function* () {
const bindings = yield* directory.listBindings();
const now = yield* Clock.currentTimeMillis;
const nowIso = new Date(now).toISOString();
let reapedCount = 0;

for (const binding of bindings) {
const thread = yield* projectionSnapshotQuery
.getThreadShellById(binding.threadId)
.pipe(Effect.map(Option.getOrUndefined));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell query before inactivity gate

Medium Severity

Each sweep now calls getThreadShellById at the start of every binding iteration, before the stopped branch and before the inactivity threshold check. Previously, non-stopped bindings below the threshold never hit projection, and stopped bindings skipped the query entirely.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 4ba2676. Configure here.


if (binding.status === "stopped") {
if (thread?.session?.status === "running" || thread?.session?.status === "starting") {
const commandUuid = yield* crypto.randomUUIDv4;
yield* orchestrationEngine.dispatch({
type: "thread.session.set",
commandId: CommandId.make(
`provider-session-reaper:stopped-reconcile:${binding.threadId}:${commandUuid}`,
),
threadId: binding.threadId,
session: {
threadId: binding.threadId,
status: "stopped",
providerName: binding.provider,
...(binding.providerInstanceId !== undefined
? { providerInstanceId: binding.providerInstanceId }
: {}),
runtimeMode: thread.session.runtimeMode,
activeTurnId: null,
lastError: thread.session.lastError,
updatedAt: nowIso,
},
createdAt: nowIso,
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconcile dispatch aborts sweep

Medium Severity

When a stopped binding triggers orchestrationEngine.dispatch for projection reconcile, a failed dispatch ends the whole sweep iteration. Other bindings in the same pass are skipped until the next interval, unlike stopSession, which logs and continues per binding.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 4ba2676. Configure here.

yield* Effect.logInfo("provider.session.reaper.reconciled-stopped-projection", {
threadId: binding.threadId,
provider: binding.provider,
projectionStatus: thread.session.status,
});
}
continue;
}

Expand All @@ -58,9 +96,6 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) =
continue;
}

const thread = yield* projectionSnapshotQuery
.getThreadShellById(binding.threadId)
.pipe(Effect.map(Option.getOrUndefined));
if (thread?.session?.activeTurnId != null) {
yield* Effect.logDebug("provider.session.reaper.skipped-active-turn", {
threadId: binding.threadId,
Expand Down
Loading