From a3673fd9392a45ad2a16c00032a2885b44ca7f48 Mon Sep 17 00:00:00 2001 From: aaight Date: Mon, 11 May 2026 11:59:12 +0200 Subject: [PATCH 1/4] refactor(router): isolate worker snapshot docker operations (#1333) Co-authored-by: Cascade Bot --- src/router/container-manager.ts | 117 ++----------- src/router/dangling-image-cleanup.ts | 2 +- src/router/dispatch-error-classifier.ts | 2 +- src/router/snapshot-manager.ts | 2 +- src/router/worker-manager.ts | 2 +- src/router/worker-snapshots.ts | 110 ++++++++++++ .../unit/router/snapshot-integration.test.ts | 4 +- tests/unit/router/worker-snapshots.test.ts | 160 ++++++++++++++++++ 8 files changed, 292 insertions(+), 107 deletions(-) create mode 100644 src/router/worker-snapshots.ts create mode 100644 tests/unit/router/worker-snapshots.test.ts diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index 9bdf5784a..e341fdf7a 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -9,6 +9,7 @@ * - worker-env.ts — Job data parsing + env building * - orphan-cleanup.ts — Periodic orphan container cleanup * - snapshot-manager.ts — Snapshot metadata registry + * - worker-snapshots.ts — Docker snapshot commit/remove mechanics */ import type { Job } from 'bullmq'; @@ -21,7 +22,7 @@ import { routerConfig } from './config.js'; import { ROUTER_INSTANCE_ID } from './instance-id.js'; import { stopOrphanCleanup } from './orphan-cleanup.js'; import type { CascadeJob } from './queue.js'; -import { invalidateSnapshot, registerSnapshot } from './snapshot-manager.js'; +import { invalidateSnapshot } from './snapshot-manager.js'; import { clearAllWorkItemLocks } from './work-item-lock.js'; import { buildWorkerEnvWithProjectId, @@ -29,6 +30,11 @@ import { extractProjectIdFromJob, extractWorkItemId, } from './worker-env.js'; +import { + commitWorkerSnapshot, + isImageNotFoundError, + removeWorkerContainerBestEffort, +} from './worker-snapshots.js'; import { buildWorkerContainerName, resolveSpawnSettings } from './worker-spawn-settings.js'; import { killWorker } from './worker-timeouts.js'; @@ -54,6 +60,12 @@ export { buildWorkerEnv, extractProjectIdFromJob, } from './worker-env.js'; +export { + buildWorkerSnapshotImageName, + commitWorkerSnapshot, + isImageNotFoundError, + removeWorkerContainerBestEffort, +} from './worker-snapshots.js'; export { buildWorkerContainerName, ROUTER_KILL_BUFFER_MS, @@ -63,85 +75,6 @@ export { killWorker } from './worker-timeouts.js'; const docker = new Docker(); -/** - * Build a stable Docker image name for a snapshot. - * Uses a sanitised project+workItem key so it's valid as a Docker image tag. - */ -function buildSnapshotImageName(projectId: string, workItemId: string): string { - // Sanitise: lowercase, replace non-alphanumeric with '-', collapse runs - const sanitise = (s: string) => - s - .toLowerCase() - .replace(/[^a-z0-9]/g, '-') - .replace(/-+/g, '-') - .replace(/^-|-$/g, ''); - return `cascade-snapshot-${sanitise(projectId)}-${sanitise(workItemId)}:latest`; -} - -/** - * Commit a container to a snapshot image and register the metadata. - * On failure the error is logged and swallowed — snapshot failure must not - * break the normal post-run flow. - */ -async function inspectImageSizeBestEffort(imageName: string): Promise { - try { - const image = docker.getImage(imageName); - if (!image) return undefined; - const info = (await image.inspect()) as { Size?: number } | undefined; - return info?.Size; - } catch { - return undefined; - } -} - -async function commitContainerToSnapshot( - containerId: string, - projectId: string, - workItemId: string, -): Promise { - const imageName = buildSnapshotImageName(projectId, workItemId); - try { - const container = docker.getContainer(containerId); - await container.commit({ repo: imageName.split(':')[0], tag: 'latest' }); - // Populate the image size on the registered metadata so max-size - // eviction actually fires. Inspecting is best-effort — without size, - // the entry still gets TTL/max-count eviction. - const imageSize = await inspectImageSizeBestEffort(imageName); - registerSnapshot(projectId, workItemId, imageName, imageSize); - logger.info('[WorkerManager] Committed container to snapshot image:', { - containerId: containerId.slice(0, 12), - imageName, - projectId, - workItemId, - imageSizeBytes: imageSize, - }); - } catch (err) { - logger.warn('[WorkerManager] Failed to commit container to snapshot (non-fatal):', { - containerId: containerId.slice(0, 12), - imageName, - error: String(err), - }); - captureException(err, { - tags: { source: 'snapshot_commit' }, - extra: { containerId, imageName, projectId, workItemId }, - level: 'warning', - }); - } -} - -/** - * Remove a container (used after manual snapshot commit to clean up). - * Swallows errors — the container may already be removed. - */ -async function removeContainer(containerId: string): Promise { - try { - const container = docker.getContainer(containerId); - await container.remove({ force: true }); - } catch { - // Container may already be removed — not an error - } -} - /** * Inspect a just-exited container and pull the diagnostic facts that explain * its exit. `OOMKilled` and `State.Error` are only available before AutoRemove @@ -245,7 +178,7 @@ async function onWorkerExit(opts: { if (snapshotEnabled) { if (result.StatusCode === 0 && projectId && workItemId) { - await commitContainerToSnapshot(container.id, projectId, workItemId); + await commitWorkerSnapshot(container.id, projectId, workItemId); } else if (result.StatusCode !== 0) { logger.info('[WorkerManager] Skipping snapshot commit after non-zero exit:', { jobId, @@ -253,30 +186,12 @@ async function onWorkerExit(opts: { }); } // Always remove manually since AutoRemove is disabled for snapshot runs. - await removeContainer(container.id); + await removeWorkerContainerBestEffort(container.id); } cleanupWorker(jobId, result.StatusCode, { oomKilled, exitReason }); } -/** - * Returns true when a Docker error indicates the requested image does not exist. - * Uses the HTTP statusCode from dockerode's error objects as the primary signal, - * with a substring check on the message as a secondary guard. - * - * Exported for the dispatch-error classifier (spec 015/2) so it can - * recognise this terminal class and skip BullMQ retries for it. - */ -export function isImageNotFoundError(err: unknown): boolean { - return ( - err != null && - typeof err === 'object' && - 'statusCode' in err && - (err as { statusCode: unknown }).statusCode === 404 && - String(err).toLowerCase().includes('no such image') - ); -} - interface ContainerLaunchConfig { workerImage: string; snapshotEnabled: boolean; @@ -385,7 +300,7 @@ async function createAndMonitorContainer( }); // Ensure container is cleaned up even on wait error (snapshot runs only) if (snapshotEnabled) { - removeContainer(container.id).catch(() => {}); + removeWorkerContainerBestEffort(container.id).catch(() => {}); } cleanupWorker(jobId); }); diff --git a/src/router/dangling-image-cleanup.ts b/src/router/dangling-image-cleanup.ts index 8f5464003..d070b30f8 100644 --- a/src/router/dangling-image-cleanup.ts +++ b/src/router/dangling-image-cleanup.ts @@ -1,7 +1,7 @@ /** * Periodic dangling-image cleanup for cascade-managed Docker images. * - * Closes the leak class where `commitContainerToSnapshot` re-tags the + * Closes the leak class where `commitWorkerSnapshot` re-tags the * `cascade-snapshot--:latest` slot for repeated runs of the * same work item: the previous image's digest becomes dangling (untagged) * and is dropped from the in-memory snapshot registry, so the registry-driven diff --git a/src/router/dispatch-error-classifier.ts b/src/router/dispatch-error-classifier.ts index ceff8aaae..021bf5656 100644 --- a/src/router/dispatch-error-classifier.ts +++ b/src/router/dispatch-error-classifier.ts @@ -21,7 +21,7 @@ * - SLOT_WAIT_TIMEOUT from the slot-waiter primitive */ -import { isImageNotFoundError } from './container-manager.js'; +import { isImageNotFoundError } from './worker-snapshots.js'; export type DispatchErrorKind = 'transient' | 'terminal'; diff --git a/src/router/snapshot-manager.ts b/src/router/snapshot-manager.ts index 6669229f5..fa0827c65 100644 --- a/src/router/snapshot-manager.ts +++ b/src/router/snapshot-manager.ts @@ -7,7 +7,7 @@ * base worker image, reducing setup time. * * This module is pure state management — no Docker API usage. - * Docker commit operations are triggered from container-manager.ts. + * Docker commit operations are owned by worker-snapshots.ts. * * Eviction strategy: * - TTL eviction: snapshots older than ttlMs are removed on access (eager) diff --git a/src/router/worker-manager.ts b/src/router/worker-manager.ts index 385115936..f9c097bd3 100644 --- a/src/router/worker-manager.ts +++ b/src/router/worker-manager.ts @@ -103,7 +103,7 @@ export function startWorkerProcessor(): void { startSnapshotCleanup(); // Start periodic dangling-image cleanup. Closes the leak class where - // `commitContainerToSnapshot` re-tags `cascade-snapshot-*:latest` and + // `commitWorkerSnapshot` re-tags `cascade-snapshot-*:latest` and // orphans the prior digest outside the snapshot registry. See // dangling-image-cleanup.ts for the safety scope. startDanglingImageCleanup(); diff --git a/src/router/worker-snapshots.ts b/src/router/worker-snapshots.ts new file mode 100644 index 000000000..d3e2ad84a --- /dev/null +++ b/src/router/worker-snapshots.ts @@ -0,0 +1,110 @@ +/** + * Docker mechanics for CASCADE worker snapshots. + * + * Snapshot registry policy lives in snapshot-manager.ts; this module owns the + * Docker operations needed to name, commit, inspect, and remove worker + * containers/images during the post-exit lifecycle. + */ + +import Docker from 'dockerode'; +import { captureException } from '../sentry.js'; +import { logger } from '../utils/logging.js'; +import { registerSnapshot } from './snapshot-manager.js'; + +const docker = new Docker(); + +/** + * Build a stable Docker image name for a snapshot. + * Uses a sanitised project+workItem key so it's valid as a Docker image tag. + */ +export function buildWorkerSnapshotImageName(projectId: string, workItemId: string): string { + // Sanitise: lowercase, replace non-alphanumeric with '-', collapse runs. + const sanitise = (s: string) => + s + .toLowerCase() + .replace(/[^a-z0-9]/g, '-') + .replace(/-+/g, '-') + .replace(/^-|-$/g, ''); + return `cascade-snapshot-${sanitise(projectId)}-${sanitise(workItemId)}:latest`; +} + +/** + * Inspect a snapshot image size without making snapshot registration depend on + * Docker's image-inspect path. Missing size only affects max-size eviction; TTL + * and max-count eviction still apply. + */ +async function inspectImageSizeBestEffort(imageName: string): Promise { + try { + const image = docker.getImage(imageName); + if (!image) return undefined; + const info = (await image.inspect()) as { Size?: number } | undefined; + return info?.Size; + } catch { + return undefined; + } +} + +/** + * Commit a worker container to a snapshot image and register the resulting + * metadata. Snapshot failures are intentionally non-fatal to the worker run. + */ +export async function commitWorkerSnapshot( + containerId: string, + projectId: string, + workItemId: string, +): Promise { + const imageName = buildWorkerSnapshotImageName(projectId, workItemId); + try { + const container = docker.getContainer(containerId); + await container.commit({ repo: imageName.split(':')[0], tag: 'latest' }); + const imageSize = await inspectImageSizeBestEffort(imageName); + registerSnapshot(projectId, workItemId, imageName, imageSize); + logger.info('[WorkerManager] Committed container to snapshot image:', { + containerId: containerId.slice(0, 12), + imageName, + projectId, + workItemId, + imageSizeBytes: imageSize, + }); + } catch (err) { + logger.warn('[WorkerManager] Failed to commit container to snapshot (non-fatal):', { + containerId: containerId.slice(0, 12), + imageName, + error: String(err), + }); + captureException(err, { + tags: { source: 'snapshot_commit' }, + extra: { containerId, imageName, projectId, workItemId }, + level: 'warning', + }); + } +} + +/** + * Remove a worker container after a snapshot-enabled run. Snapshot containers + * use AutoRemove=false so they remain available for diagnostics and commit. + * Removal is best-effort because the container may already be gone. + */ +export async function removeWorkerContainerBestEffort(containerId: string): Promise { + try { + const container = docker.getContainer(containerId); + await container.remove({ force: true }); + } catch { + // Container may already be removed — not an error. + } +} + +/** + * Returns true when a Docker error indicates the requested image does not exist. + * Uses dockerode's HTTP statusCode as the primary signal, with a substring check + * on the message as a secondary guard. + */ +export function isImageNotFoundError(err: unknown): boolean { + return ( + err != null && + typeof err === 'object' && + 'statusCode' in err && + (err as { statusCode: unknown }).statusCode === 404 && + String(err).toLowerCase().includes('no such image') + ); +} diff --git a/tests/unit/router/snapshot-integration.test.ts b/tests/unit/router/snapshot-integration.test.ts index 767ab7596..fcffc6708 100644 --- a/tests/unit/router/snapshot-integration.test.ts +++ b/tests/unit/router/snapshot-integration.test.ts @@ -24,7 +24,7 @@ const { } = vi.hoisted(() => ({ mockDockerCreateContainer: vi.fn(), mockDockerGetContainer: vi.fn(), - // commitContainerToSnapshot inspects the freshly committed image to + // commitWorkerSnapshot inspects the freshly committed image to // populate imageSizeBytes; default to a fixed size so registerSnapshot // receives a deterministic 4th argument. mockDockerGetImage: vi.fn().mockReturnValue({ @@ -180,7 +180,7 @@ function setupMockContainer(exitCode = 0) { // --------------------------------------------------------------------------- // File-wide setup — vi.restoreAllMocks() in per-describe afterEach hooks wipes // mockReturnValue on hoisted mocks. Re-arm the docker getImage mock here so -// commitContainerToSnapshot's image-size lookup always resolves to a known +// commitWorkerSnapshot's image-size lookup always resolves to a known // value across every describe. // --------------------------------------------------------------------------- diff --git a/tests/unit/router/worker-snapshots.test.ts b/tests/unit/router/worker-snapshots.test.ts new file mode 100644 index 000000000..ac500a578 --- /dev/null +++ b/tests/unit/router/worker-snapshots.test.ts @@ -0,0 +1,160 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const { + mockCaptureException, + mockContainerCommit, + mockContainerRemove, + mockDockerGetContainer, + mockDockerGetImage, + mockImageInspect, + mockLoggerWarn, + mockRegisterSnapshot, +} = vi.hoisted(() => ({ + mockCaptureException: vi.fn(), + mockContainerCommit: vi.fn(), + mockContainerRemove: vi.fn(), + mockDockerGetContainer: vi.fn(), + mockDockerGetImage: vi.fn(), + mockImageInspect: vi.fn(), + mockLoggerWarn: vi.fn(), + mockRegisterSnapshot: vi.fn(), +})); + +vi.mock('dockerode', () => ({ + default: vi.fn().mockImplementation(() => ({ + getContainer: mockDockerGetContainer, + getImage: mockDockerGetImage, + })), +})); + +vi.mock('../../../src/sentry.js', () => ({ + captureException: (...args: unknown[]) => mockCaptureException(...args), +})); + +vi.mock('../../../src/utils/logging.js', () => ({ + logger: { + info: vi.fn(), + warn: (...args: unknown[]) => mockLoggerWarn(...args), + }, +})); + +vi.mock('../../../src/router/snapshot-manager.js', () => ({ + registerSnapshot: (...args: unknown[]) => mockRegisterSnapshot(...args), +})); + +import { + buildWorkerSnapshotImageName, + commitWorkerSnapshot, + isImageNotFoundError, + removeWorkerContainerBestEffort, +} from '../../../src/router/worker-snapshots.js'; + +describe('worker-snapshots', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockContainerCommit.mockResolvedValue(undefined); + mockContainerRemove.mockResolvedValue(undefined); + mockImageInspect.mockResolvedValue({ Size: 1_234_567_890 }); + mockDockerGetContainer.mockReturnValue({ + commit: mockContainerCommit, + remove: mockContainerRemove, + }); + mockDockerGetImage.mockReturnValue({ + inspect: mockImageInspect, + }); + }); + + it('preserves the existing snapshot image-name sanitization format', () => { + expect(buildWorkerSnapshotImageName('Proj Snap', 'MNG_652/Worker Snapshot!')).toBe( + 'cascade-snapshot-proj-snap-mng-652-worker-snapshot:latest', + ); + expect(buildWorkerSnapshotImageName('--LLMIST--', 'MNG---95')).toBe( + 'cascade-snapshot-llmist-mng-95:latest', + ); + }); + + it('commits the worker container, inspects image size, and registers metadata', async () => { + await commitWorkerSnapshot('container-snap-abc123', 'proj-snap', 'card-snap'); + + expect(mockDockerGetContainer).toHaveBeenCalledWith('container-snap-abc123'); + expect(mockContainerCommit).toHaveBeenCalledWith({ + repo: 'cascade-snapshot-proj-snap-card-snap', + tag: 'latest', + }); + expect(mockDockerGetImage).toHaveBeenCalledWith('cascade-snapshot-proj-snap-card-snap:latest'); + expect(mockRegisterSnapshot).toHaveBeenCalledWith( + 'proj-snap', + 'card-snap', + 'cascade-snapshot-proj-snap-card-snap:latest', + 1_234_567_890, + ); + }); + + it('still registers snapshot metadata when image-size inspection fails', async () => { + mockImageInspect.mockRejectedValueOnce(new Error('inspect failed')); + + await commitWorkerSnapshot('container-snap-abc123', 'proj-snap', 'card-snap'); + + expect(mockRegisterSnapshot).toHaveBeenCalledWith( + 'proj-snap', + 'card-snap', + 'cascade-snapshot-proj-snap-card-snap:latest', + undefined, + ); + }); + + it('swallows commit errors and reports them as non-fatal snapshot failures', async () => { + const err = new Error('commit failed'); + mockContainerCommit.mockRejectedValueOnce(err); + + await expect( + commitWorkerSnapshot('container-snap-abc123', 'proj-snap', 'card-snap'), + ).resolves.toBeUndefined(); + + expect(mockRegisterSnapshot).not.toHaveBeenCalled(); + expect(mockLoggerWarn).toHaveBeenCalledWith( + '[WorkerManager] Failed to commit container to snapshot (non-fatal):', + expect.objectContaining({ + containerId: 'container-sn', + imageName: 'cascade-snapshot-proj-snap-card-snap:latest', + error: 'Error: commit failed', + }), + ); + expect(mockCaptureException).toHaveBeenCalledWith( + err, + expect.objectContaining({ + tags: { source: 'snapshot_commit' }, + level: 'warning', + }), + ); + }); + + it('removes worker containers best-effort', async () => { + await removeWorkerContainerBestEffort('container-snap-abc123'); + + expect(mockDockerGetContainer).toHaveBeenCalledWith('container-snap-abc123'); + expect(mockContainerRemove).toHaveBeenCalledWith({ force: true }); + }); + + it('swallows remove errors', async () => { + mockContainerRemove.mockRejectedValueOnce(new Error('already gone')); + + await expect(removeWorkerContainerBestEffort('container-snap-abc123')).resolves.toBeUndefined(); + }); + + it('identifies docker image-not-found errors only for 404 no-such-image responses', () => { + expect( + isImageNotFoundError( + Object.assign(new Error('(HTTP code 404) no such container - No such image: x'), { + statusCode: 404, + }), + ), + ).toBe(true); + expect( + isImageNotFoundError(Object.assign(new Error('No such image: x'), { statusCode: 500 })), + ).toBe(false); + expect(isImageNotFoundError(Object.assign(new Error('not found'), { statusCode: 404 }))).toBe( + false, + ); + }); +}); From 4802c360e3e338f93a4672349e0d0387a6dc150a Mon Sep 17 00:00:00 2001 From: aaight Date: Mon, 11 May 2026 12:14:37 +0200 Subject: [PATCH 2/4] refactor(router): isolate worker exit handling (#1335) Co-authored-by: Cascade Bot --- src/router/container-manager.ts | 126 +--------- src/router/worker-exit-handler.ts | 153 ++++++++++++ tests/unit/router/worker-exit-handler.test.ts | 232 ++++++++++++++++++ 3 files changed, 393 insertions(+), 118 deletions(-) create mode 100644 src/router/worker-exit-handler.ts create mode 100644 tests/unit/router/worker-exit-handler.test.ts diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index e341fdf7a..55f97f4e2 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -30,6 +30,7 @@ import { extractProjectIdFromJob, extractWorkItemId, } from './worker-env.js'; +import { handleWorkerExit } from './worker-exit-handler.js'; import { commitWorkerSnapshot, isImageNotFoundError, @@ -60,6 +61,7 @@ export { buildWorkerEnv, extractProjectIdFromJob, } from './worker-env.js'; +export { handleWorkerExit, inspectExitedContainer } from './worker-exit-handler.js'; export { buildWorkerSnapshotImageName, commitWorkerSnapshot, @@ -75,123 +77,6 @@ export { killWorker } from './worker-timeouts.js'; const docker = new Docker(); -/** - * Inspect a just-exited container and pull the diagnostic facts that explain - * its exit. `OOMKilled` and `State.Error` are only available before AutoRemove - * (or our manual `removeContainer`) reaps the container, so this MUST run - * before either path. Returns nullable fields when inspection fails — never - * throws. - * - * @internal Exported for unit testing only — call `onWorkerExit` from app code. - */ -export async function inspectExitedContainer( - container: ReturnType, - jobId: string, -): Promise<{ oomKilled?: boolean; exitReason?: string; durationMs?: number }> { - // Wrap in try/catch — `inspect()` can fail post-exit if the daemon socket - // drops, the container is reaped between `wait()` and here, or the API - // times out. Diagnostics are best-effort; falling back to undefined is - // safer than failing the whole post-exit pipeline. - let inspectResult: Awaited> | null = null; - try { - inspectResult = await container.inspect(); - } catch (err) { - logger.warn('[WorkerManager] container.inspect() after wait failed:', { - jobId, - error: String(err), - }); - } - const state = inspectResult?.State; - const oomKilled = state?.OOMKilled; - // Empty `State.Error` (the common case for clean exits) → undefined so the - // run-record reason string omits the `reason="…"` segment entirely. - const exitReason = state?.Error?.length ? state.Error : undefined; - const startedAtIso = state?.StartedAt; - const finishedAtIso = state?.FinishedAt; - // Docker can report sentinel timestamps (e.g. `0001-01-01T00:00:00Z` for a - // container that never fully started) that parse to NaN — drop those so - // downstream logs/Sentry don't ship `durationMs: NaN`. - const rawDurationMs = - startedAtIso && finishedAtIso - ? new Date(finishedAtIso).getTime() - new Date(startedAtIso).getTime() - : undefined; - const durationMs = - rawDurationMs !== undefined && Number.isFinite(rawDurationMs) && rawDurationMs >= 0 - ? rawDurationMs - : undefined; - return { oomKilled, exitReason, durationMs }; -} - -/** - * Tail-log the worker's stdout/stderr for at-a-glance debugging. Full - * per-worker logs are also indexed in Loki via promtail's per-container label - * (`{container="/cascade-worker-${jobId}"}`); this 50-line tail is a - * convenience and is not load-bearing. - */ -async function logWorkerTail(container: ReturnType): Promise { - try { - const logs = await container.logs({ stdout: true, stderr: true, follow: false }); - const logText = logs.toString('utf-8'); - if (!logText.trim()) return; - const lines = logText.trim().split('\n'); - const tail = lines.slice(-50).join('\n'); - logger.info( - `[WorkerManager] Worker logs (last ${Math.min(lines.length, 50)} of ${lines.length} lines):\n${tail}`, - ); - } catch { - // Container may already be removed — expected with AutoRemove - } -} - -/** - * Post-exit handler for a worker container: pulls Docker diagnostics, tail-logs - * stdout, decides snapshot commit vs. skip, and runs cleanup. Extracted from - * the `wait()` callback to keep its cyclomatic complexity within the lint budget. - */ -async function onWorkerExit(opts: { - container: ReturnType; - result: { StatusCode: number }; - jobId: string; - jobType: string; - snapshotEnabled: boolean; - projectId: string | null; - workItemId: string | undefined; -}): Promise { - const { container, result, jobId, jobType, snapshotEnabled, projectId, workItemId } = opts; - - const { oomKilled, exitReason, durationMs } = await inspectExitedContainer(container, jobId); - await logWorkerTail(container); - - if (result.StatusCode !== 0) { - captureException(new Error(`Worker exited with status ${result.StatusCode}`), { - tags: { source: 'worker_exit', jobType }, - extra: { jobId, statusCode: result.StatusCode, oomKilled, exitReason, durationMs }, - }); - } - logger.info('[WorkerManager] Worker exited:', { - jobId, - statusCode: result.StatusCode, - oomKilled: oomKilled ?? null, - exitReason: exitReason ?? null, - durationMs: durationMs ?? null, - }); - - if (snapshotEnabled) { - if (result.StatusCode === 0 && projectId && workItemId) { - await commitWorkerSnapshot(container.id, projectId, workItemId); - } else if (result.StatusCode !== 0) { - logger.info('[WorkerManager] Skipping snapshot commit after non-zero exit:', { - jobId, - statusCode: result.StatusCode, - }); - } - // Always remove manually since AutoRemove is disabled for snapshot runs. - await removeWorkerContainerBestEffort(container.id); - } - - cleanupWorker(jobId, result.StatusCode, { oomKilled, exitReason }); -} - interface ContainerLaunchConfig { workerImage: string; snapshotEnabled: boolean; @@ -282,7 +167,7 @@ async function createAndMonitorContainer( container .wait() .then(async (result) => { - await onWorkerExit({ + await handleWorkerExit({ container, result, jobId, @@ -290,6 +175,11 @@ async function createAndMonitorContainer( snapshotEnabled, projectId, workItemId, + dependencies: { + commitWorkerSnapshot, + removeWorkerContainerBestEffort, + cleanupWorker, + }, }); }) .catch((err) => { diff --git a/src/router/worker-exit-handler.ts b/src/router/worker-exit-handler.ts new file mode 100644 index 000000000..803bac4f2 --- /dev/null +++ b/src/router/worker-exit-handler.ts @@ -0,0 +1,153 @@ +/** + * Post-exit handling for CASCADE worker containers. + * + * This module owns the load-bearing ordering after Docker `wait()` resolves: + * inspect the still-present container, tail-log stdout/stderr, capture non-zero + * exits, handle snapshot commit/removal, then release worker tracking state. + */ + +import type Docker from 'dockerode'; +import { captureException } from '../sentry.js'; +import { logger } from '../utils/logging.js'; +import type { ExitDetails } from './active-workers.js'; + +type WorkerContainer = Docker.Container; + +export interface WorkerExitSnapshotDependencies { + commitWorkerSnapshot: ( + containerId: string, + projectId: string, + workItemId: string, + ) => Promise; + removeWorkerContainerBestEffort: (containerId: string) => Promise; +} + +export interface WorkerExitDependencies extends WorkerExitSnapshotDependencies { + cleanupWorker: (jobId: string, exitCode?: number, details?: ExitDetails) => void; +} + +export interface HandleWorkerExitOptions { + container: WorkerContainer; + result: { StatusCode: number }; + jobId: string; + jobType: string; + snapshotEnabled: boolean; + projectId: string | null; + workItemId: string | undefined; + dependencies: WorkerExitDependencies; +} + +/** + * Inspect a just-exited container and pull the diagnostic facts that explain + * its exit. `OOMKilled` and `State.Error` are only available before AutoRemove + * (or our manual `removeContainer`) reaps the container, so this MUST run + * before either path. Returns nullable fields when inspection fails - never + * throws. + */ +export async function inspectExitedContainer( + container: WorkerContainer, + jobId: string, +): Promise<{ oomKilled?: boolean; exitReason?: string; durationMs?: number }> { + // Wrap in try/catch - `inspect()` can fail post-exit if the daemon socket + // drops, the container is reaped between `wait()` and here, or the API + // times out. Diagnostics are best-effort; falling back to undefined is + // safer than failing the whole post-exit pipeline. + let inspectResult: Awaited> | null = null; + try { + inspectResult = await container.inspect(); + } catch (err) { + logger.warn('[WorkerManager] container.inspect() after wait failed:', { + jobId, + error: String(err), + }); + } + const state = inspectResult?.State; + const oomKilled = state?.OOMKilled; + // Empty `State.Error` (the common case for clean exits) -> undefined so the + // run-record reason string omits the `reason="..."` segment entirely. + const exitReason = state?.Error?.length ? state.Error : undefined; + const startedAtIso = state?.StartedAt; + const finishedAtIso = state?.FinishedAt; + // Docker can report sentinel timestamps (e.g. `0001-01-01T00:00:00Z` for a + // container that never fully started) that parse to NaN - drop those so + // downstream logs/Sentry don't ship `durationMs: NaN`. + const rawDurationMs = + startedAtIso && finishedAtIso + ? new Date(finishedAtIso).getTime() - new Date(startedAtIso).getTime() + : undefined; + const durationMs = + rawDurationMs !== undefined && Number.isFinite(rawDurationMs) && rawDurationMs >= 0 + ? rawDurationMs + : undefined; + return { oomKilled, exitReason, durationMs }; +} + +/** + * Tail-log the worker's stdout/stderr for at-a-glance debugging. Full + * per-worker logs are also indexed in Loki via promtail's per-container label + * (`{container="/cascade-worker-${jobId}"}`); this 50-line tail is a + * convenience and is not load-bearing. + */ +async function logWorkerTail(container: WorkerContainer): Promise { + try { + const logs = await container.logs({ stdout: true, stderr: true, follow: false }); + const logText = logs.toString('utf-8'); + if (!logText.trim()) return; + const lines = logText.trim().split('\n'); + const tail = lines.slice(-50).join('\n'); + logger.info( + `[WorkerManager] Worker logs (last ${Math.min(lines.length, 50)} of ${lines.length} lines):\n${tail}`, + ); + } catch { + // Container may already be removed - expected with AutoRemove. + } +} + +/** + * Handle a worker container after Docker reports exit. Keep inspection before + * snapshot/manual removal so OOMKilled, State.Error, and duration facts survive. + */ +export async function handleWorkerExit(opts: HandleWorkerExitOptions): Promise { + const { + container, + result, + jobId, + jobType, + snapshotEnabled, + projectId, + workItemId, + dependencies, + } = opts; + + const { oomKilled, exitReason, durationMs } = await inspectExitedContainer(container, jobId); + await logWorkerTail(container); + + if (result.StatusCode !== 0) { + captureException(new Error(`Worker exited with status ${result.StatusCode}`), { + tags: { source: 'worker_exit', jobType }, + extra: { jobId, statusCode: result.StatusCode, oomKilled, exitReason, durationMs }, + }); + } + logger.info('[WorkerManager] Worker exited:', { + jobId, + statusCode: result.StatusCode, + oomKilled: oomKilled ?? null, + exitReason: exitReason ?? null, + durationMs: durationMs ?? null, + }); + + if (snapshotEnabled) { + if (result.StatusCode === 0 && projectId && workItemId) { + await dependencies.commitWorkerSnapshot(container.id, projectId, workItemId); + } else if (result.StatusCode !== 0) { + logger.info('[WorkerManager] Skipping snapshot commit after non-zero exit:', { + jobId, + statusCode: result.StatusCode, + }); + } + // Always remove manually since AutoRemove is disabled for snapshot runs. + await dependencies.removeWorkerContainerBestEffort(container.id); + } + + dependencies.cleanupWorker(jobId, result.StatusCode, { oomKilled, exitReason }); +} diff --git a/tests/unit/router/worker-exit-handler.test.ts b/tests/unit/router/worker-exit-handler.test.ts new file mode 100644 index 000000000..2b3482fb0 --- /dev/null +++ b/tests/unit/router/worker-exit-handler.test.ts @@ -0,0 +1,232 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const { mockCaptureException, mockLoggerInfo, mockLoggerWarn } = vi.hoisted(() => ({ + mockCaptureException: vi.fn(), + mockLoggerInfo: vi.fn(), + mockLoggerWarn: vi.fn(), +})); + +vi.mock('../../../src/sentry.js', () => ({ + captureException: (...args: unknown[]) => mockCaptureException(...args), +})); + +vi.mock('../../../src/utils/logging.js', () => ({ + logger: { + info: (...args: unknown[]) => mockLoggerInfo(...args), + warn: (...args: unknown[]) => mockLoggerWarn(...args), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +import { + handleWorkerExit, + inspectExitedContainer, +} from '../../../src/router/worker-exit-handler.js'; + +function makeContainer(options: { + id?: string; + state?: Record | null; + inspectRejects?: boolean; + logs?: string; + logsReject?: boolean; + events?: string[]; +}) { + const events = options.events; + return { + id: options.id ?? 'container-exit-123', + inspect: options.inspectRejects + ? vi.fn().mockImplementation(async () => { + events?.push('inspect'); + throw new Error('socket hang up'); + }) + : vi.fn().mockImplementation(async () => { + events?.push('inspect'); + return options.state === null ? null : { State: options.state ?? {} }; + }), + logs: options.logsReject + ? vi.fn().mockImplementation(async () => { + events?.push('logs'); + throw new Error('logs unavailable'); + }) + : vi.fn().mockImplementation(async () => { + events?.push('logs'); + return Buffer.from(options.logs ?? ''); + }), + }; +} + +function makeDependencies(events: string[] = []) { + return { + commitWorkerSnapshot: vi.fn().mockImplementation(async () => { + events.push('commit'); + }), + removeWorkerContainerBestEffort: vi.fn().mockImplementation(async () => { + events.push('remove'); + }), + cleanupWorker: vi.fn().mockImplementation(() => { + events.push('cleanup'); + }), + }; +} + +describe('inspectExitedContainer', () => { + beforeEach(() => { + mockLoggerInfo.mockReset(); + mockLoggerWarn.mockReset(); + mockCaptureException.mockReset(); + }); + + it('extracts OOM, State.Error, and duration before cleanup can remove the container', async () => { + const container = makeContainer({ + state: { + OOMKilled: true, + Error: 'OCI runtime error: exec failed', + StartedAt: '2026-04-25T08:00:00.000Z', + FinishedAt: '2026-04-25T08:00:30.000Z', + }, + }); + + await expect(inspectExitedContainer(container as never, 'job-oom')).resolves.toEqual({ + oomKilled: true, + exitReason: 'OCI runtime error: exec failed', + durationMs: 30_000, + }); + }); + + it('returns undefined facts and logs a warning when inspection fails', async () => { + const container = makeContainer({ inspectRejects: true }); + + await expect(inspectExitedContainer(container as never, 'job-inspect-fail')).resolves.toEqual({ + oomKilled: undefined, + exitReason: undefined, + durationMs: undefined, + }); + expect(mockLoggerWarn).toHaveBeenCalledWith( + '[WorkerManager] container.inspect() after wait failed:', + expect.objectContaining({ jobId: 'job-inspect-fail' }), + ); + }); +}); + +describe('handleWorkerExit', () => { + beforeEach(() => { + mockLoggerInfo.mockReset(); + mockLoggerWarn.mockReset(); + mockCaptureException.mockReset(); + }); + + it('preserves snapshot-enabled successful cleanup ordering', async () => { + const events: string[] = []; + const container = makeContainer({ + events, + state: { + OOMKilled: false, + Error: '', + StartedAt: '2026-04-25T08:00:00.000Z', + FinishedAt: '2026-04-25T08:00:30.000Z', + }, + logs: 'worker output', + }); + const dependencies = makeDependencies(events); + + await handleWorkerExit({ + container: container as never, + result: { StatusCode: 0 }, + jobId: 'job-success', + jobType: 'trello', + snapshotEnabled: true, + projectId: 'proj-1', + workItemId: 'card-1', + dependencies, + }); + + expect(events).toEqual(['inspect', 'logs', 'commit', 'remove', 'cleanup']); + expect(dependencies.commitWorkerSnapshot).toHaveBeenCalledWith( + 'container-exit-123', + 'proj-1', + 'card-1', + ); + expect(dependencies.cleanupWorker).toHaveBeenCalledWith('job-success', 0, { + oomKilled: false, + exitReason: undefined, + }); + }); + + it('captures non-zero exits with worker_exit tags and Docker diagnostic facts', async () => { + const events: string[] = []; + mockCaptureException.mockImplementation(() => { + events.push('capture'); + }); + const container = makeContainer({ + events, + state: { + OOMKilled: true, + Error: 'Out of memory', + StartedAt: '2026-04-25T08:00:00.000Z', + FinishedAt: '2026-04-25T08:00:05.000Z', + }, + }); + const dependencies = makeDependencies(events); + + await handleWorkerExit({ + container: container as never, + result: { StatusCode: 137 }, + jobId: 'job-crashed', + jobType: 'github', + snapshotEnabled: true, + projectId: 'proj-1', + workItemId: 'card-1', + dependencies, + }); + + expect(events).toEqual(['inspect', 'logs', 'capture', 'remove', 'cleanup']); + expect(dependencies.commitWorkerSnapshot).not.toHaveBeenCalled(); + expect(mockCaptureException).toHaveBeenCalledWith(expect.any(Error), { + tags: { source: 'worker_exit', jobType: 'github' }, + extra: { + jobId: 'job-crashed', + statusCode: 137, + oomKilled: true, + exitReason: 'Out of memory', + durationMs: 5000, + }, + }); + expect(dependencies.cleanupWorker).toHaveBeenCalledWith('job-crashed', 137, { + oomKilled: true, + exitReason: 'Out of memory', + }); + }); + + it('tail-log failures are best-effort and do not block cleanup', async () => { + const events: string[] = []; + const container = makeContainer({ + events, + logsReject: true, + state: { + OOMKilled: false, + Error: '', + StartedAt: '2026-04-25T08:00:00.000Z', + FinishedAt: '2026-04-25T08:00:01.000Z', + }, + }); + const dependencies = makeDependencies(events); + + await handleWorkerExit({ + container: container as never, + result: { StatusCode: 0 }, + jobId: 'job-log-fail', + jobType: 'linear', + snapshotEnabled: false, + projectId: 'proj-1', + workItemId: 'item-1', + dependencies, + }); + + expect(events).toEqual(['inspect', 'logs', 'cleanup']); + expect(dependencies.cleanupWorker).toHaveBeenCalledWith('job-log-fail', 0, { + oomKilled: false, + exitReason: undefined, + }); + }); +}); From 7e4c5f1cb92737ec77ca5ed60937dc1e97a70516 Mon Sep 17 00:00:00 2001 From: aaight Date: Mon, 11 May 2026 12:30:43 +0200 Subject: [PATCH 3/4] refactor(router): extract worker container launcher (#1336) Co-authored-by: Cascade Bot --- docs/architecture/01-services.md | 6 +- docs/architecture/10-resilience.md | 9 + src/router/container-manager.ts | 166 ++------- src/router/worker-container-launcher.ts | 177 ++++++++++ .../router/worker-container-launcher.test.ts | 334 ++++++++++++++++++ 5 files changed, 560 insertions(+), 132 deletions(-) create mode 100644 src/router/worker-container-launcher.ts create mode 100644 tests/unit/router/worker-container-launcher.test.ts diff --git a/docs/architecture/01-services.md b/docs/architecture/01-services.md index 07305aca8..a95eb1522 100644 --- a/docs/architecture/01-services.md +++ b/docs/architecture/01-services.md @@ -73,7 +73,11 @@ Module-load phase (runs at import time, before `startRouter()`): | `webhook-processor.ts` | Generic 12-step pipeline (see [02-webhook-pipeline](./02-webhook-pipeline.md)) | | `platform-adapter.ts` | `RouterPlatformAdapter` interface | | `adapters/` | Per-provider adapter implementations | -| `worker-manager.ts` | Spawns/monitors Docker worker containers | +| `worker-manager.ts` | BullMQ worker processor orchestration, capacity-slot waiting, and dispatch retry classification | +| `container-manager.ts` | Compatibility facade that assembles worker metadata/env and coordinates Docker worker spawn fallback | +| `worker-container-launcher.ts` | Docker worker create/start/wait wiring, active-worker registration, and router timeout timer setup | +| `worker-spawn-settings.ts` | Docker-free worker image, snapshot-reuse, timeout, and container-name resolution | +| `worker-exit-handler.ts` / `worker-timeouts.ts` | Post-exit cleanup/diagnostics and router-side timeout cancellation internals | | `queue.ts` | BullMQ `addJob()`, queue stats | | `action-dedup.ts` | In-memory deduplication of webhook deliveries | | `work-item-lock.ts` | Prevents concurrent agents on the same work item | diff --git a/docs/architecture/10-resilience.md b/docs/architecture/10-resilience.md index 44824bec9..a2d5020e7 100644 --- a/docs/architecture/10-resilience.md +++ b/docs/architecture/10-resilience.md @@ -183,6 +183,15 @@ Periodic scan for Docker containers that outlived their expected lifetime (watch When a worker container exits non-zero, the router inspects it before Docker AutoRemove can reap it and writes a grep-stable error reason: `Worker crashed with exit code N · OOMKilled= · reason=""`. `OOMKilled=true` is the definitive cgroup OOM signal; exit 137 without that marker means something else sent the signal. +## Worker Lifecycle Internals + +`src/router/worker-manager.ts` owns the BullMQ processor lifecycle: capacity-slot waiting, retry classification, and handoff to worker spawning. Docker worker lifecycle details sit behind `src/router/container-manager.ts`, which remains the compatibility facade for callers while delegating focused work to helpers: + +- `worker-spawn-settings.ts` resolves the effective image, snapshot reuse, timeout, and Docker-safe container name. +- `worker-container-launcher.ts` creates/starts the Docker container, registers active-worker state, sets the router timeout timer, and monitors `wait()`. +- `worker-exit-handler.ts` owns post-exit inspection, log tailing, snapshot commit/removal, and cleanup ordering. +- `worker-timeouts.ts` owns router-side timeout cancellation, timed-out run marking, and timeout notifications. + ## Snapshot Management `src/router/snapshot-manager.ts`, `src/router/snapshot-cleanup.ts` diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index 55f97f4e2..056a76a9b 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -1,43 +1,40 @@ /** * Docker container lifecycle management for CASCADE worker processes. * - * Handles spawning and killing of worker containers. + * Handles spawn orchestration for worker containers. * Each BullMQ job gets its own isolated Docker container. * * State management, env building, and orphan cleanup are in dedicated modules: - * - active-workers.ts — ActiveWorker state tracking - * - worker-env.ts — Job data parsing + env building - * - orphan-cleanup.ts — Periodic orphan container cleanup - * - snapshot-manager.ts — Snapshot metadata registry - * - worker-snapshots.ts — Docker snapshot commit/remove mechanics + * - active-workers.ts — ActiveWorker state tracking + * - worker-env.ts — Job data parsing + env building + * - orphan-cleanup.ts — Periodic orphan container cleanup + * - snapshot-manager.ts — Snapshot metadata registry + * - worker-snapshots.ts — Docker snapshot commit/remove mechanics + * - worker-container-launcher.ts — Docker create/start/wait wiring */ import type { Job } from 'bullmq'; -import Docker from 'dockerode'; import { captureException } from '../sentry.js'; import { logger } from '../utils/logging.js'; -import { activeWorkers, cleanupWorker } from './active-workers.js'; +import { activeWorkers } from './active-workers.js'; import { clearAllAgentTypeLocks } from './agent-type-lock.js'; import { routerConfig } from './config.js'; -import { ROUTER_INSTANCE_ID } from './instance-id.js'; import { stopOrphanCleanup } from './orphan-cleanup.js'; import type { CascadeJob } from './queue.js'; import { invalidateSnapshot } from './snapshot-manager.js'; import { clearAllWorkItemLocks } from './work-item-lock.js'; +import { + launchWorkerContainer, + type WorkerContainerLaunchConfig, +} from './worker-container-launcher.js'; import { buildWorkerEnvWithProjectId, extractAgentType, extractProjectIdFromJob, extractWorkItemId, } from './worker-env.js'; -import { handleWorkerExit } from './worker-exit-handler.js'; -import { - commitWorkerSnapshot, - isImageNotFoundError, - removeWorkerContainerBestEffort, -} from './worker-snapshots.js'; +import { isImageNotFoundError } from './worker-snapshots.js'; import { buildWorkerContainerName, resolveSpawnSettings } from './worker-spawn-settings.js'; -import { killWorker } from './worker-timeouts.js'; // Re-export from sub-modules so existing callers importing from container-manager.ts // continue to work without changes. @@ -57,6 +54,12 @@ export { invalidateSnapshot, registerSnapshot, } from './snapshot-manager.js'; +export { + launchWorkerContainer, + type WorkerContainerLaunchConfig, + type WorkerContainerLaunchContext, + type WorkerContainerLauncherDependencies, +} from './worker-container-launcher.js'; export { buildWorkerEnv, extractProjectIdFromJob, @@ -75,125 +78,26 @@ export { } from './worker-spawn-settings.js'; export { killWorker } from './worker-timeouts.js'; -const docker = new Docker(); - -interface ContainerLaunchConfig { - workerImage: string; - snapshotEnabled: boolean; - containerTimeoutMs: number; - workerEnv: string[]; -} - -/** - * Create, start, and set up async exit-monitoring for a single worker container. - * Extracted from spawnWorker so snapshot fallback can retry with a different image. - * Returns immediately after the container starts — exit monitoring runs in the background. - */ -async function createAndMonitorContainer( +async function launchConfiguredWorkerContainer( job: Job, jobId: string, containerName: string, projectId: string | null, workItemId: string | undefined, agentType: string | undefined, - config: ContainerLaunchConfig, + config: WorkerContainerLaunchConfig, ): Promise { - const { workerImage, snapshotEnabled, containerTimeoutMs, workerEnv } = config; - const container = await docker.createContainer({ - Image: workerImage, - name: containerName, - Env: workerEnv, - HostConfig: { - Memory: routerConfig.workerMemoryMb * 1024 * 1024, - MemorySwap: routerConfig.workerMemoryMb * 1024 * 1024, // No swap - NetworkMode: routerConfig.dockerNetwork, - // Disable AutoRemove for snapshot-enabled runs so the container remains - // available for docker commit after a successful exit. - AutoRemove: !snapshotEnabled, - }, - Labels: { - 'cascade.job.id': jobId, - 'cascade.job.type': job.data.type, - 'cascade.managed': 'true', - // Pinning the spawning router's instance id stops sibling - // cascade-router instances on the same host from claiming - // each other's containers as orphans — see `instance-id.ts` - // and `orphan-cleanup.ts:scanAndCleanupOrphans`. - 'cascade.router.instance': ROUTER_INSTANCE_ID, - 'cascade.project.id': projectId ?? '', - 'cascade.agent.type': agentType ?? '', - 'cascade.snapshot.enabled': snapshotEnabled ? 'true' : 'false', - }, - }); - - await container.start(); - - // Set up timeout — fires at watchdogTimeoutMs + 2min (router backup kill) - const startedAt = new Date(); - const timeoutHandle = setTimeout(() => { - const durationMs = Date.now() - startedAt.getTime(); - logger.warn('[WorkerManager] Worker timeout, killing:', { + await launchWorkerContainer( + { + job, jobId, - durationMs, - }); - captureException(new Error(`Worker timeout after ${durationMs}ms`), { - tags: { source: 'worker_timeout', jobType: job.data.type }, - extra: { jobId, durationMs }, - level: 'warning', - }); - killWorker(jobId).catch((err) => { - logger.error('[WorkerManager] Failed to kill timed-out worker:', err); - }); - }, containerTimeoutMs); - - // Track the worker - activeWorkers.set(jobId, { - containerId: container.id, - jobId, - startedAt, - timeoutHandle, - job: job.data, - projectId: projectId ?? undefined, - workItemId, - agentType, - }); - - logger.info('[WorkerManager] Worker started:', { - jobId, - containerId: container.id.slice(0, 12), - }); - - // Monitor container exit - container - .wait() - .then(async (result) => { - await handleWorkerExit({ - container, - result, - jobId, - jobType: job.data.type, - snapshotEnabled, - projectId, - workItemId, - dependencies: { - commitWorkerSnapshot, - removeWorkerContainerBestEffort, - cleanupWorker, - }, - }); - }) - .catch((err) => { - logger.error('[WorkerManager] Error waiting for container:', err); - captureException(err, { - tags: { source: 'worker_wait', jobType: job.data.type }, - extra: { jobId }, - }); - // Ensure container is cleaned up even on wait error (snapshot runs only) - if (snapshotEnabled) { - removeWorkerContainerBestEffort(container.id).catch(() => {}); - } - cleanupWorker(jobId); - }); + containerName, + projectId, + workItemId, + agentType, + }, + config, + ); } /** @@ -247,7 +151,7 @@ export async function spawnWorker(job: Job): Promise { workerImage, }); - const launchConfig: ContainerLaunchConfig = { + const launchConfig: WorkerContainerLaunchConfig = { workerImage, snapshotEnabled, containerTimeoutMs, @@ -255,7 +159,7 @@ export async function spawnWorker(job: Job): Promise { }; try { - await createAndMonitorContainer( + await launchConfiguredWorkerContainer( job, jobId, containerName, @@ -274,14 +178,14 @@ export async function spawnWorker(job: Job): Promise { ); invalidateSnapshot(projectId, workItemId); const fallbackEnv = await buildWorkerEnvWithProjectId(job, projectId, false, snapshotEnabled); - const fallbackConfig: ContainerLaunchConfig = { + const fallbackConfig: WorkerContainerLaunchConfig = { workerImage: routerConfig.workerImage, snapshotEnabled, containerTimeoutMs, workerEnv: fallbackEnv, }; try { - await createAndMonitorContainer( + await launchConfiguredWorkerContainer( job, jobId, containerName, diff --git a/src/router/worker-container-launcher.ts b/src/router/worker-container-launcher.ts new file mode 100644 index 000000000..e38ac73ed --- /dev/null +++ b/src/router/worker-container-launcher.ts @@ -0,0 +1,177 @@ +/** + * Docker create/start/wait wiring for CASCADE worker containers. + * + * container-manager.ts owns orchestration decisions; this module owns the + * Docker launch shape, active-worker registration, router timeout timer, and + * async wait handling for one worker container. + */ + +import type { Job } from 'bullmq'; +import Docker from 'dockerode'; +import { captureException as captureExceptionDefault } from '../sentry.js'; +import { logger } from '../utils/logging.js'; +import { activeWorkers, cleanupWorker } from './active-workers.js'; +import { routerConfig } from './config.js'; +import { ROUTER_INSTANCE_ID } from './instance-id.js'; +import type { CascadeJob } from './queue.js'; +import { handleWorkerExit } from './worker-exit-handler.js'; +import { commitWorkerSnapshot, removeWorkerContainerBestEffort } from './worker-snapshots.js'; +import { killWorker } from './worker-timeouts.js'; + +type WorkerContainer = Docker.Container; + +const docker = new Docker(); + +export interface WorkerContainerLaunchConfig { + workerImage: string; + snapshotEnabled: boolean; + containerTimeoutMs: number; + workerEnv: string[]; +} + +export interface WorkerContainerLaunchContext { + job: Job; + jobId: string; + containerName: string; + projectId: string | null; + workItemId: string | undefined; + agentType: string | undefined; +} + +export interface WorkerContainerLauncherDependencies { + createContainer: Docker['createContainer']; + killWorker: (jobId: string) => Promise; + handleWorkerExit: typeof handleWorkerExit; + commitWorkerSnapshot: typeof commitWorkerSnapshot; + removeWorkerContainerBestEffort: typeof removeWorkerContainerBestEffort; + cleanupWorker: typeof cleanupWorker; + captureException: typeof captureExceptionDefault; +} + +const defaultDependencies: WorkerContainerLauncherDependencies = { + createContainer: docker.createContainer.bind(docker), + killWorker, + handleWorkerExit, + commitWorkerSnapshot, + removeWorkerContainerBestEffort, + cleanupWorker, + captureException: captureExceptionDefault, +}; + +/** + * Create, start, and set up async exit monitoring for a single worker + * container. Returns immediately after Docker start succeeds; wait handling + * continues in the background. + */ +export async function launchWorkerContainer( + context: WorkerContainerLaunchContext, + config: WorkerContainerLaunchConfig, + dependencies: WorkerContainerLauncherDependencies = defaultDependencies, +): Promise { + const { job, jobId, containerName, projectId, workItemId, agentType } = context; + const { workerImage, snapshotEnabled, containerTimeoutMs, workerEnv } = config; + + const container = (await dependencies.createContainer({ + Image: workerImage, + name: containerName, + Env: workerEnv, + HostConfig: { + Memory: routerConfig.workerMemoryMb * 1024 * 1024, + MemorySwap: routerConfig.workerMemoryMb * 1024 * 1024, // No swap + NetworkMode: routerConfig.dockerNetwork, + // Disable AutoRemove for snapshot-enabled runs so the container remains + // available for docker commit after a successful exit. + AutoRemove: !snapshotEnabled, + }, + Labels: { + 'cascade.job.id': jobId, + 'cascade.job.type': job.data.type, + 'cascade.managed': 'true', + // Pinning the spawning router's instance id stops sibling + // cascade-router instances on the same host from claiming each other's + // containers as orphans. See `instance-id.ts` and orphan cleanup. + 'cascade.router.instance': ROUTER_INSTANCE_ID, + 'cascade.project.id': projectId ?? '', + 'cascade.agent.type': agentType ?? '', + 'cascade.snapshot.enabled': snapshotEnabled ? 'true' : 'false', + }, + })) as WorkerContainer; + + await container.start(); + + const startedAt = new Date(); + const timeoutHandle = setTimeout(() => { + const durationMs = Date.now() - startedAt.getTime(); + logger.warn('[WorkerManager] Worker timeout, killing:', { + jobId, + durationMs, + }); + dependencies.captureException(new Error(`Worker timeout after ${durationMs}ms`), { + tags: { source: 'worker_timeout', jobType: job.data.type }, + extra: { jobId, durationMs }, + level: 'warning', + }); + dependencies.killWorker(jobId).catch((err) => { + logger.error('[WorkerManager] Failed to kill timed-out worker:', err); + }); + }, containerTimeoutMs); + + activeWorkers.set(jobId, { + containerId: container.id, + jobId, + startedAt, + timeoutHandle, + job: job.data, + projectId: projectId ?? undefined, + workItemId, + agentType, + }); + + logger.info('[WorkerManager] Worker started:', { + jobId, + containerId: container.id.slice(0, 12), + }); + + monitorContainerExit(container, context, config, dependencies); +} + +function monitorContainerExit( + container: WorkerContainer, + context: WorkerContainerLaunchContext, + config: WorkerContainerLaunchConfig, + dependencies: WorkerContainerLauncherDependencies, +): void { + const { job, jobId, projectId, workItemId } = context; + const { snapshotEnabled } = config; + + container + .wait() + .then(async (result) => { + await dependencies.handleWorkerExit({ + container, + result, + jobId, + jobType: job.data.type, + snapshotEnabled, + projectId, + workItemId, + dependencies: { + commitWorkerSnapshot: dependencies.commitWorkerSnapshot, + removeWorkerContainerBestEffort: dependencies.removeWorkerContainerBestEffort, + cleanupWorker: dependencies.cleanupWorker, + }, + }); + }) + .catch((err) => { + logger.error('[WorkerManager] Error waiting for container:', err); + dependencies.captureException(err, { + tags: { source: 'worker_wait', jobType: job.data.type }, + extra: { jobId }, + }); + // Ensure container is cleaned up even on wait error (snapshot runs only). + if (snapshotEnabled) { + dependencies.removeWorkerContainerBestEffort(container.id).catch(() => {}); + } + dependencies.cleanupWorker(jobId); + }); +} diff --git a/tests/unit/router/worker-container-launcher.test.ts b/tests/unit/router/worker-container-launcher.test.ts new file mode 100644 index 000000000..21d60c1a2 --- /dev/null +++ b/tests/unit/router/worker-container-launcher.test.ts @@ -0,0 +1,334 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const { mockActiveWorkers, mockDockerCreateContainer, mockLogger } = vi.hoisted(() => ({ + mockActiveWorkers: new Map(), + mockDockerCreateContainer: vi.fn(), + mockLogger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +vi.mock('dockerode', () => ({ + default: vi.fn().mockImplementation(() => ({ + createContainer: mockDockerCreateContainer, + })), +})); + +vi.mock('../../../src/router/config.js', () => ({ + routerConfig: { + workerImage: 'base-worker:latest', + workerMemoryMb: 768, + workerTimeoutMs: 30 * 60 * 1000, + dockerNetwork: 'cascade-test-network', + }, +})); + +vi.mock('../../../src/router/active-workers.js', () => ({ + activeWorkers: mockActiveWorkers, + cleanupWorker: vi.fn(), +})); + +vi.mock('../../../src/router/worker-timeouts.js', () => ({ + killWorker: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('../../../src/router/worker-exit-handler.js', () => ({ + handleWorkerExit: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('../../../src/router/worker-snapshots.js', () => ({ + commitWorkerSnapshot: vi.fn().mockResolvedValue(undefined), + removeWorkerContainerBestEffort: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('../../../src/sentry.js', () => ({ + captureException: vi.fn(), +})); + +vi.mock('../../../src/utils/logging.js', () => ({ + logger: mockLogger, +})); + +import type { CascadeJob } from '../../../src/router/queue.js'; +import { + launchWorkerContainer, + type WorkerContainerLauncherDependencies, +} from '../../../src/router/worker-container-launcher.js'; + +function makeJob(overrides: Partial<{ id: string; data: CascadeJob }> = {}) { + return { + id: overrides.id ?? 'job-launcher-1', + data: + overrides.data ?? + ({ + type: 'trello', + projectId: 'proj-1', + workItemId: 'card-1', + agentType: 'implementation', + } as CascadeJob), + }; +} + +function makeContainer(overrides: Partial<{ wait: () => Promise<{ StatusCode: number }> }> = {}) { + return { + id: 'container-launcher-abc123', + start: vi.fn().mockResolvedValue(undefined), + wait: vi.fn(overrides.wait ?? (() => new Promise(() => {}))), + }; +} + +function makeDependencies( + overrides: Partial = {}, +): WorkerContainerLauncherDependencies { + return { + createContainer: vi.fn(), + killWorker: vi.fn().mockResolvedValue(undefined), + handleWorkerExit: vi.fn().mockResolvedValue(undefined), + commitWorkerSnapshot: vi.fn().mockResolvedValue(undefined), + removeWorkerContainerBestEffort: vi.fn().mockResolvedValue(undefined), + cleanupWorker: vi.fn(), + captureException: vi.fn(), + ...overrides, + }; +} + +function clearTrackedTimeouts() { + for (const worker of mockActiveWorkers.values()) { + const timeoutHandle = (worker as { timeoutHandle?: NodeJS.Timeout }).timeoutHandle; + if (timeoutHandle) clearTimeout(timeoutHandle); + } + mockActiveWorkers.clear(); +} + +describe('worker-container-launcher', () => { + beforeEach(() => { + vi.useRealTimers(); + mockDockerCreateContainer.mockReset(); + mockLogger.info.mockClear(); + mockLogger.warn.mockClear(); + mockLogger.error.mockClear(); + clearTrackedTimeouts(); + }); + + afterEach(() => { + vi.useRealTimers(); + clearTrackedTimeouts(); + }); + + it('preserves the Docker create/start config and cascade labels', async () => { + const container = makeContainer(); + const dependencies = makeDependencies({ + createContainer: vi.fn().mockResolvedValue(container), + }); + + await launchWorkerContainer( + { + job: makeJob() as never, + jobId: 'job-launcher-1', + containerName: 'cascade-worker-job-launcher-1', + projectId: 'proj-1', + workItemId: 'card-1', + agentType: 'implementation', + }, + { + workerImage: 'snapshot-worker:latest', + snapshotEnabled: true, + containerTimeoutMs: 5000, + workerEnv: ['JOB_ID=job-launcher-1', 'JOB_TYPE=trello'], + }, + dependencies, + ); + + expect(dependencies.createContainer).toHaveBeenCalledWith({ + Image: 'snapshot-worker:latest', + name: 'cascade-worker-job-launcher-1', + Env: ['JOB_ID=job-launcher-1', 'JOB_TYPE=trello'], + HostConfig: { + Memory: 768 * 1024 * 1024, + MemorySwap: 768 * 1024 * 1024, + NetworkMode: 'cascade-test-network', + AutoRemove: false, + }, + Labels: expect.objectContaining({ + 'cascade.job.id': 'job-launcher-1', + 'cascade.job.type': 'trello', + 'cascade.managed': 'true', + 'cascade.router.instance': expect.any(String), + 'cascade.project.id': 'proj-1', + 'cascade.agent.type': 'implementation', + 'cascade.snapshot.enabled': 'true', + }), + }); + expect(container.start).toHaveBeenCalled(); + expect(mockActiveWorkers.get('job-launcher-1')).toEqual( + expect.objectContaining({ + containerId: 'container-launcher-abc123', + jobId: 'job-launcher-1', + projectId: 'proj-1', + workItemId: 'card-1', + agentType: 'implementation', + }), + ); + }); + + it('uses AutoRemove=true and blank optional labels when snapshots and metadata are absent', async () => { + const container = makeContainer(); + const dependencies = makeDependencies({ + createContainer: vi.fn().mockResolvedValue(container), + }); + + await launchWorkerContainer( + { + job: makeJob({ data: { type: 'github' } as CascadeJob }) as never, + jobId: 'job-no-project', + containerName: 'cascade-worker-job-no-project', + projectId: null, + workItemId: undefined, + agentType: undefined, + }, + { + workerImage: 'base-worker:latest', + snapshotEnabled: false, + containerTimeoutMs: 5000, + workerEnv: [], + }, + dependencies, + ); + + expect(dependencies.createContainer).toHaveBeenCalledWith( + expect.objectContaining({ + HostConfig: expect.objectContaining({ AutoRemove: true }), + Labels: expect.objectContaining({ + 'cascade.project.id': '', + 'cascade.agent.type': '', + 'cascade.snapshot.enabled': 'false', + }), + }), + ); + }); + + it('fires the router timeout path through killWorker and captures warning context', async () => { + vi.useFakeTimers(); + const container = makeContainer(); + const dependencies = makeDependencies({ + createContainer: vi.fn().mockResolvedValue(container), + }); + + await launchWorkerContainer( + { + job: makeJob() as never, + jobId: 'job-timeout', + containerName: 'cascade-worker-job-timeout', + projectId: 'proj-1', + workItemId: 'card-1', + agentType: 'implementation', + }, + { + workerImage: 'base-worker:latest', + snapshotEnabled: false, + containerTimeoutMs: 1000, + workerEnv: [], + }, + dependencies, + ); + + await vi.advanceTimersByTimeAsync(1000); + + expect(mockLogger.warn).toHaveBeenCalledWith( + '[WorkerManager] Worker timeout, killing:', + expect.objectContaining({ jobId: 'job-timeout' }), + ); + expect(dependencies.captureException).toHaveBeenCalledWith(expect.any(Error), { + tags: { source: 'worker_timeout', jobType: 'trello' }, + extra: expect.objectContaining({ jobId: 'job-timeout' }), + level: 'warning', + }); + expect(dependencies.killWorker).toHaveBeenCalledWith('job-timeout'); + }); + + it('delegates successful waits to worker-exit-handler with snapshot dependencies', async () => { + const container = makeContainer({ wait: () => Promise.resolve({ StatusCode: 0 }) }); + const dependencies = makeDependencies({ + createContainer: vi.fn().mockResolvedValue(container), + }); + + await launchWorkerContainer( + { + job: makeJob() as never, + jobId: 'job-exit', + containerName: 'cascade-worker-job-exit', + projectId: 'proj-1', + workItemId: 'card-1', + agentType: 'implementation', + }, + { + workerImage: 'base-worker:latest', + snapshotEnabled: true, + containerTimeoutMs: 5000, + workerEnv: [], + }, + dependencies, + ); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(dependencies.handleWorkerExit).toHaveBeenCalledWith( + expect.objectContaining({ + container, + result: { StatusCode: 0 }, + jobId: 'job-exit', + jobType: 'trello', + snapshotEnabled: true, + projectId: 'proj-1', + workItemId: 'card-1', + dependencies: { + commitWorkerSnapshot: dependencies.commitWorkerSnapshot, + removeWorkerContainerBestEffort: dependencies.removeWorkerContainerBestEffort, + cleanupWorker: dependencies.cleanupWorker, + }, + }), + ); + }); + + it('captures wait errors, removes snapshot containers, and clears tracking', async () => { + const waitError = new Error('wait failed'); + const container = makeContainer({ wait: () => Promise.reject(waitError) }); + const dependencies = makeDependencies({ + createContainer: vi.fn().mockResolvedValue(container), + }); + + await launchWorkerContainer( + { + job: makeJob() as never, + jobId: 'job-wait-error', + containerName: 'cascade-worker-job-wait-error', + projectId: 'proj-1', + workItemId: 'card-1', + agentType: 'implementation', + }, + { + workerImage: 'base-worker:latest', + snapshotEnabled: true, + containerTimeoutMs: 5000, + workerEnv: [], + }, + dependencies, + ); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(mockLogger.error).toHaveBeenCalledWith( + '[WorkerManager] Error waiting for container:', + waitError, + ); + expect(dependencies.captureException).toHaveBeenCalledWith(waitError, { + tags: { source: 'worker_wait', jobType: 'trello' }, + extra: { jobId: 'job-wait-error' }, + }); + expect(dependencies.removeWorkerContainerBestEffort).toHaveBeenCalledWith( + 'container-launcher-abc123', + ); + expect(dependencies.cleanupWorker).toHaveBeenCalledWith('job-wait-error'); + }); +}); From 146ada0acf5874a98cdc1d280c3fffe57c61dff4 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Mon, 11 May 2026 11:15:10 +0000 Subject: [PATCH 4/4] fix(router): use event.projectId in Linear resolveProject (closes #1332 miss) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #1332 fixed `parseWebhook` to select the right cascade project by team + Linear-Project scope, but missed `resolveProject` at the same file (line 227-230) which re-looks up by `event.projectIdentifier` (the teamId) and uses the same `.find()` first-match pattern. Live evidence from 2026-05-11 prod: webhook ddcee404 body had `data.projectId: 7108c72e-...` (ucho scope); run d58cd8ab dispatched with projectId=cascade, cloned mongrel-intelligence/cascade. Agent correctly identified the workspace/work-item mismatch and filed a friction report. Fix: resolveProject uses event.projectId (cascade project id parseWebhook already determined) when present, falling back to teamId lookup only for legacy bare-event callers. Audit of all .find() shadow sites — only Linear resolveProject needed fixing. webhookVerification.ts Linear lookup is harmless because Linear webhook secrets are team-scoped. findProjectByLinearTeamId is dead code. JIRA / Trello / GitHub adapters have naturally unique-per-cascade-project discriminators. Tests: extended resolveProject describe block with multi-cascade-project- per-team sub-describe covering: returns ucho regardless of array order, returns cascade for cascade events, null fail-closed when projectId is unconfigured, and legacy bare-event teamId-fallback compat. All 49 LinearRouterAdapter tests pass; full suite (9307 unit tests) clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/router/adapters/linear.ts | 17 +++++ tests/unit/router/adapters/linear.test.ts | 90 ++++++++++++++++++++++- 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/src/router/adapters/linear.ts b/src/router/adapters/linear.ts index a9cc6c077..6f34d2483 100644 --- a/src/router/adapters/linear.ts +++ b/src/router/adapters/linear.ts @@ -226,6 +226,23 @@ export class LinearRouterAdapter implements RouterPlatformAdapter { async resolveProject(event: ParsedWebhookEvent): Promise { const config = await loadProjectConfig(); + // parseWebhook already selected the right cascade project (by team + + // Linear Project scope when multiple cascade projects share a Linear + // team) and embedded its id on the LinearParsedEvent extension. Use + // that id directly — re-looking up by teamId would re-introduce the + // `.find()` first-match shadow that PR #1332 fixed in parseWebhook. + // Closes a prod regression on 2026-05-11 where MNG-638 (ucho's + // Linear Project) routed to `cascade` because both cascade projects + // share the same Linear team and `cascade` appeared first in the + // projects array. + const linearEvent = event as LinearParsedEvent; + if (linearEvent.projectId) { + return config.projects.find((p) => p.id === linearEvent.projectId) ?? null; + } + // Fallback: legacy call sites that construct a bare `ParsedWebhookEvent` + // without the Linear extension's projectId still get teamId-based + // lookup. Single-cascade-project-per-team setups continue working + // unchanged. return config.projects.find((p) => p.linear?.teamId === event.projectIdentifier) ?? null; } diff --git a/tests/unit/router/adapters/linear.test.ts b/tests/unit/router/adapters/linear.test.ts index 17749075a..036be8344 100644 --- a/tests/unit/router/adapters/linear.test.ts +++ b/tests/unit/router/adapters/linear.test.ts @@ -683,7 +683,10 @@ describe('LinearRouterAdapter', () => { }); describe('resolveProject', () => { - it('returns project matching Linear teamId', async () => { + // Existing tests — bare ParsedWebhookEvent (no Linear extension) + // exercise the teamId-fallback branch. Single-cascade-project-per-team + // setups continue working unchanged. + it('returns project matching Linear teamId (legacy bare-event path)', async () => { const project = await adapter.resolveProject({ projectIdentifier: 'team-abc-123', eventType: 'create/Issue', @@ -692,7 +695,7 @@ describe('LinearRouterAdapter', () => { expect(project?.id).toBe('p1'); }); - it('returns null for unknown teamId', async () => { + it('returns null for unknown teamId (legacy bare-event path)', async () => { const project = await adapter.resolveProject({ projectIdentifier: 'unknown-team', eventType: 'create/Issue', @@ -700,6 +703,89 @@ describe('LinearRouterAdapter', () => { }); expect(project).toBeNull(); }); + + // 2026-05-11: multi-cascade-project-per-Linear-team support. Closes + // the MNG-638 regression that surfaced after `cascade` was migrated + // from Trello → Linear, putting both `cascade` and `ucho` cascade + // projects on the same Linear team. PR #1332 fixed parseWebhook but + // missed THIS call site — `resolveProject` was re-looking up by + // teamId and returning the first array match, discarding the cascade + // project that parseWebhook had correctly selected. + describe('multi-cascade-project-per-team — uses event.projectId from LinearParsedEvent', () => { + const cascadeProject: RouterProjectConfig = { + id: 'cascade', + repo: 'mongrel/cascade', + pmType: 'linear', + linear: { teamId: 'team-abc-123', projectId: 'P-cascade' }, + }; + const uchoProject: RouterProjectConfig = { + id: 'ucho', + repo: 'mongrel/ucho', + pmType: 'linear', + linear: { teamId: 'team-abc-123', projectId: 'P-ucho' }, + }; + + beforeEach(() => { + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [cascadeProject, uchoProject], + fullProjects: [{ id: 'cascade' } as never, { id: 'ucho' } as never], + }); + }); + + it('returns ucho when event.projectId is "ucho", regardless of array order', async () => { + const project = await adapter.resolveProject({ + projectIdentifier: 'team-abc-123', + eventType: 'update/Issue', + isCommentEvent: false, + // @ts-expect-error LinearParsedEvent extension field + projectId: 'ucho', + action: 'update', + resourceType: 'Issue', + }); + // Pre-fix this returned `cascade` (first match by teamId). + expect(project?.id).toBe('ucho'); + }); + + it('returns cascade when event.projectId is "cascade"', async () => { + const project = await adapter.resolveProject({ + projectIdentifier: 'team-abc-123', + eventType: 'update/Issue', + isCommentEvent: false, + // @ts-expect-error LinearParsedEvent extension field + projectId: 'cascade', + action: 'update', + resourceType: 'Issue', + }); + expect(project?.id).toBe('cascade'); + }); + + it('returns null when event.projectId points at no configured cascade project (fail-closed)', async () => { + const project = await adapter.resolveProject({ + projectIdentifier: 'team-abc-123', + eventType: 'update/Issue', + isCommentEvent: false, + // @ts-expect-error LinearParsedEvent extension field + projectId: 'never-configured-project', + action: 'update', + resourceType: 'Issue', + }); + expect(project).toBeNull(); + }); + + it('falls back to teamId lookup when event lacks the projectId extension (legacy compat)', async () => { + // Bare event — no `projectId` field. The fallback `.find()` by + // teamId returns the first match (cascade) — same as the legacy + // behavior. Real production calls go through parseWebhook which + // always populates `projectId`; this only matters for unit tests + // or external callers that construct events directly. + const project = await adapter.resolveProject({ + projectIdentifier: 'team-abc-123', + eventType: 'update/Issue', + isCommentEvent: false, + }); + expect(project?.id).toBe('cascade'); + }); + }); }); describe('dispatchWithCredentials', () => {