From f938636bf968bd27f795ce5e27b9a43fe3cb28fe Mon Sep 17 00:00:00 2001 From: aaight Date: Mon, 11 May 2026 11:28:47 +0200 Subject: [PATCH 1/3] refactor(router): isolate worker spawn settings (#1330) Co-authored-by: Cascade Bot --- src/router/container-manager.ts | 99 +---------- src/router/worker-spawn-settings.ts | 103 ++++++++++++ .../unit/router/worker-spawn-settings.test.ts | 159 ++++++++++++++++++ 3 files changed, 271 insertions(+), 90 deletions(-) create mode 100644 src/router/worker-spawn-settings.ts create mode 100644 tests/unit/router/worker-spawn-settings.test.ts diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index 79737e36e..4d59d2773 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -18,12 +18,12 @@ import { captureException } from '../sentry.js'; import { logger } from '../utils/logging.js'; import { activeWorkers, cleanupWorker } from './active-workers.js'; import { clearAllAgentTypeLocks } from './agent-type-lock.js'; -import { loadProjectConfig, routerConfig } from './config.js'; +import { routerConfig } from './config.js'; import { ROUTER_INSTANCE_ID } from './instance-id.js'; import { notifyTimeout } from './notifications.js'; import { stopOrphanCleanup } from './orphan-cleanup.js'; import type { CascadeJob } from './queue.js'; -import { getSnapshot, invalidateSnapshot, registerSnapshot } from './snapshot-manager.js'; +import { invalidateSnapshot, registerSnapshot } from './snapshot-manager.js'; import { clearAllWorkItemLocks } from './work-item-lock.js'; import { buildWorkerEnvWithProjectId, @@ -31,6 +31,7 @@ import { extractProjectIdFromJob, extractWorkItemId, } from './worker-env.js'; +import { buildWorkerContainerName, resolveSpawnSettings } from './worker-spawn-settings.js'; // Re-export from sub-modules so existing callers importing from container-manager.ts // continue to work without changes. @@ -54,12 +55,14 @@ export { buildWorkerEnv, extractProjectIdFromJob, } from './worker-env.js'; +export { + buildWorkerContainerName, + ROUTER_KILL_BUFFER_MS, + resolveSpawnSettings, +} from './worker-spawn-settings.js'; const docker = new Docker(); -/** Buffer added on top of the in-container watchdog so the router kill is always a backstop. */ -const ROUTER_KILL_BUFFER_MS = 2 * 60 * 1000; - /** * Build a stable Docker image name for a snapshot. * Uses a sanitised project+workItem key so it's valid as a Docker image tag. @@ -256,84 +259,6 @@ async function onWorkerExit(opts: { cleanupWorker(jobId, result.StatusCode, { oomKilled, exitReason }); } -interface SpawnSettings { - snapshotEnabled: boolean; - workerImage: string; - containerTimeoutMs: number; - snapshotTtlMs: number; -} - -/** - * Resolve per-project spawn settings (snapshot flag, image, timeout). - * Centralises all loadProjectConfig() calls so spawnWorker stays simple. - * - * @internal Exported for unit testing only — call `spawnWorker` from app code. - */ -export async function resolveSpawnSettings( - projectId: string | null, - workItemId: string | undefined, - jobId: string, -): Promise { - let snapshotEnabled = false; - let workerImage = routerConfig.workerImage; - let containerTimeoutMs = routerConfig.workerTimeoutMs; - let snapshotTtlMs = routerConfig.snapshotDefaultTtlMs; - - if (!projectId) return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs }; - - const { fullProjects } = await loadProjectConfig(); - const projectCfg = fullProjects.find((p) => p.id === projectId); - - // Project-level snapshotEnabled overrides the global default - snapshotEnabled = projectCfg?.snapshotEnabled ?? routerConfig.snapshotEnabled; - - // Per-project TTL overrides the global default - snapshotTtlMs = projectCfg?.snapshotTtlMs ?? routerConfig.snapshotDefaultTtlMs; - - if (snapshotEnabled && workItemId) { - const snapshot = getSnapshot(projectId, workItemId, snapshotTtlMs); - if (snapshot) { - logger.info('[WorkerManager] Snapshot hit — using snapshot image:', { - jobId, - imageName: snapshot.imageName, - projectId, - workItemId, - }); - workerImage = snapshot.imageName; - } else { - logger.info('[WorkerManager] Snapshot miss — using base worker image:', { - jobId, - projectId, - workItemId, - }); - } - } - - // Determine container timeout: use project's watchdogTimeoutMs + buffer if available, - // falling back to the global workerTimeoutMs. This makes watchdogTimeoutMs the single source - // of truth — the in-container watchdog fires first, router kill is a backup. - if (projectCfg?.watchdogTimeoutMs) { - containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS; - } - - // Trace-log the actual values that will govern this worker's lifetime so a - // post-mortem can confirm whether the project's watchdogTimeoutMs override - // took effect or the global default leaked through. - logger.info('[WorkerManager] Resolved spawn settings:', { - jobId, - projectId, - workItemId, - workerImage, - snapshotEnabled, - containerTimeoutMs, - containerTimeoutMinutes: Math.round(containerTimeoutMs / 60_000), - projectWatchdogTimeoutMs: projectCfg?.watchdogTimeoutMs ?? null, - globalWorkerTimeoutMs: routerConfig.workerTimeoutMs, - }); - - return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs }; -} - /** * Returns true when a Docker error indicates the requested image does not exist. * Uses the HTTP statusCode from dockerode's error objects as the primary signal, @@ -480,13 +405,7 @@ async function createAndMonitorContainer( */ export async function spawnWorker(job: Job): Promise { const jobId = job.id ?? `unknown-${Date.now()}`; - // Docker container names accept only `[a-zA-Z0-9][a-zA-Z0-9_.-]`. PR #1226 - // introduced coalesced-job IDs shaped `coalesce:${projectId}:${workItemId}` - // where the colons crashed `createContainer` with HTTP 400 — every coalesced - // job that fired post-deploy failed to spawn. Sanitize disallowed chars to - // underscores; the original `jobId` stays intact in logs and dedup keys. - const containerSafeJobId = jobId.replace(/[^a-zA-Z0-9_.-]/g, '_'); - const containerName = `cascade-worker-${containerSafeJobId}`; + const containerName = buildWorkerContainerName(jobId); // Resolve projectId once — used for both credential env and work-item lock tracking const projectId = await extractProjectIdFromJob(job.data); diff --git a/src/router/worker-spawn-settings.ts b/src/router/worker-spawn-settings.ts new file mode 100644 index 000000000..0ef4371ac --- /dev/null +++ b/src/router/worker-spawn-settings.ts @@ -0,0 +1,103 @@ +/** + * Worker spawn settings for CASCADE worker containers. + * + * Resolves Docker-free configuration decisions used by container-manager.ts: + * effective worker image, snapshot reuse, router timeout, and safe container + * names. This module intentionally has no Docker dependency. + */ + +import { logger } from '../utils/logging.js'; +import { loadProjectConfig, routerConfig } from './config.js'; +import { getSnapshot } from './snapshot-manager.js'; + +/** Buffer added on top of the in-container watchdog so the router kill is always a backstop. */ +export const ROUTER_KILL_BUFFER_MS = 2 * 60 * 1000; + +export interface SpawnSettings { + snapshotEnabled: boolean; + workerImage: string; + containerTimeoutMs: number; + snapshotTtlMs: number; +} + +/** + * Build the Docker container name for a worker job. + * + * Docker container names accept only `[a-zA-Z0-9][a-zA-Z0-9_.-]`. BullMQ + * coalesced jobs can include colons (`coalesce:project:item`), so disallowed + * chars are replaced with underscores while the original jobId remains intact + * for logs and dedup keys. + */ +export function buildWorkerContainerName(jobId: string): string { + const containerSafeJobId = jobId.replace(/[^a-zA-Z0-9_.-]/g, '_'); + return `cascade-worker-${containerSafeJobId}`; +} + +/** + * Resolve per-project spawn settings (snapshot flag, image, timeout). + * Centralises all loadProjectConfig() calls so spawnWorker stays simple. + */ +export async function resolveSpawnSettings( + projectId: string | null, + workItemId: string | undefined, + jobId: string, +): Promise { + let snapshotEnabled = false; + let workerImage = routerConfig.workerImage; + let containerTimeoutMs = routerConfig.workerTimeoutMs; + let snapshotTtlMs = routerConfig.snapshotDefaultTtlMs; + + if (!projectId) return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs }; + + const { fullProjects } = await loadProjectConfig(); + const projectCfg = fullProjects.find((p) => p.id === projectId); + + // Project-level snapshotEnabled overrides the global default. + snapshotEnabled = projectCfg?.snapshotEnabled ?? routerConfig.snapshotEnabled; + + // Per-project TTL overrides the global default. + snapshotTtlMs = projectCfg?.snapshotTtlMs ?? routerConfig.snapshotDefaultTtlMs; + + if (snapshotEnabled && workItemId) { + const snapshot = getSnapshot(projectId, workItemId, snapshotTtlMs); + if (snapshot) { + logger.info('[WorkerManager] Snapshot hit — using snapshot image:', { + jobId, + imageName: snapshot.imageName, + projectId, + workItemId, + }); + workerImage = snapshot.imageName; + } else { + logger.info('[WorkerManager] Snapshot miss — using base worker image:', { + jobId, + projectId, + workItemId, + }); + } + } + + // Use project's watchdogTimeoutMs + buffer if available, falling back to the + // global workerTimeoutMs. The in-container watchdog fires first; router kill + // is a backup. + if (projectCfg?.watchdogTimeoutMs) { + containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS; + } + + // Trace-log the actual values that govern this worker's lifetime so a + // post-mortem can confirm whether the project's watchdogTimeoutMs override + // took effect or the global default leaked through. + logger.info('[WorkerManager] Resolved spawn settings:', { + jobId, + projectId, + workItemId, + workerImage, + snapshotEnabled, + containerTimeoutMs, + containerTimeoutMinutes: Math.round(containerTimeoutMs / 60_000), + projectWatchdogTimeoutMs: projectCfg?.watchdogTimeoutMs ?? null, + globalWorkerTimeoutMs: routerConfig.workerTimeoutMs, + }); + + return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs }; +} diff --git a/tests/unit/router/worker-spawn-settings.test.ts b/tests/unit/router/worker-spawn-settings.test.ts new file mode 100644 index 000000000..38867ea36 --- /dev/null +++ b/tests/unit/router/worker-spawn-settings.test.ts @@ -0,0 +1,159 @@ +import { readFile } from 'node:fs/promises'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const { mockLoggerInfo, mockLoadProjectConfig, mockGetSnapshot } = vi.hoisted(() => ({ + mockLoggerInfo: vi.fn(), + mockLoadProjectConfig: vi.fn(), + mockGetSnapshot: vi.fn(), +})); + +vi.mock('../../../src/utils/logging.js', () => ({ + logger: { + info: (...args: unknown[]) => mockLoggerInfo(...args), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +vi.mock('../../../src/router/config.js', () => ({ + loadProjectConfig: (...args: unknown[]) => mockLoadProjectConfig(...args), + routerConfig: { + workerImage: 'base-worker:latest', + workerTimeoutMs: 30 * 60 * 1000, + snapshotEnabled: false, + snapshotDefaultTtlMs: 24 * 60 * 60 * 1000, + }, +})); + +vi.mock('../../../src/router/snapshot-manager.js', () => ({ + getSnapshot: (...args: unknown[]) => mockGetSnapshot(...args), +})); + +import { + buildWorkerContainerName, + ROUTER_KILL_BUFFER_MS, + resolveSpawnSettings, +} from '../../../src/router/worker-spawn-settings.js'; + +describe('worker-spawn-settings', () => { + beforeEach(() => { + mockLoggerInfo.mockClear(); + mockLoadProjectConfig.mockReset(); + mockGetSnapshot.mockReset(); + }); + + it('has no Docker dependency', async () => { + const source = await readFile('src/router/worker-spawn-settings.ts', 'utf8'); + + expect(source).not.toContain('dockerode'); + expect(source).not.toContain('new Docker'); + }); + + it('returns global defaults without loading project config when projectId is null', async () => { + const settings = await resolveSpawnSettings(null, undefined, 'job-no-project'); + + expect(settings).toEqual({ + snapshotEnabled: false, + workerImage: 'base-worker:latest', + containerTimeoutMs: 30 * 60 * 1000, + snapshotTtlMs: 24 * 60 * 60 * 1000, + }); + expect(mockLoadProjectConfig).not.toHaveBeenCalled(); + }); + + it('adds the router kill buffer to a per-project watchdog timeout', async () => { + mockLoadProjectConfig.mockResolvedValue({ + projects: [], + fullProjects: [ + { + id: 'ucho', + watchdogTimeoutMs: 45 * 60 * 1000, + snapshotEnabled: false, + }, + ], + }); + + const settings = await resolveSpawnSettings('ucho', 'MNG-308', 'job-ucho-1'); + + expect(ROUTER_KILL_BUFFER_MS).toBe(2 * 60 * 1000); + expect(settings.containerTimeoutMs).toBe(47 * 60 * 1000); + expect(mockLoggerInfo).toHaveBeenCalledWith( + '[WorkerManager] Resolved spawn settings:', + expect.objectContaining({ + jobId: 'job-ucho-1', + projectId: 'ucho', + workItemId: 'MNG-308', + containerTimeoutMs: 47 * 60 * 1000, + containerTimeoutMinutes: 47, + projectWatchdogTimeoutMs: 45 * 60 * 1000, + globalWorkerTimeoutMs: 30 * 60 * 1000, + }), + ); + }); + + it('uses a valid snapshot image when project snapshots are enabled and metadata exists', async () => { + mockLoadProjectConfig.mockResolvedValue({ + projects: [], + fullProjects: [ + { + id: 'snapshot-project', + snapshotEnabled: true, + snapshotTtlMs: 10_000, + }, + ], + }); + mockGetSnapshot.mockReturnValue({ + imageName: 'cascade-snapshot-snapshot-project-mng-650:latest', + }); + + const settings = await resolveSpawnSettings('snapshot-project', 'MNG-650', 'job-snapshot'); + + expect(mockGetSnapshot).toHaveBeenCalledWith('snapshot-project', 'MNG-650', 10_000); + expect(settings).toMatchObject({ + snapshotEnabled: true, + workerImage: 'cascade-snapshot-snapshot-project-mng-650:latest', + snapshotTtlMs: 10_000, + }); + expect(mockLoggerInfo).toHaveBeenCalledWith( + '[WorkerManager] Snapshot hit — using snapshot image:', + expect.objectContaining({ + jobId: 'job-snapshot', + imageName: 'cascade-snapshot-snapshot-project-mng-650:latest', + }), + ); + }); + + it('falls back to the base image when snapshots are enabled but no metadata exists', async () => { + mockLoadProjectConfig.mockResolvedValue({ + projects: [], + fullProjects: [{ id: 'snapshot-miss', snapshotEnabled: true }], + }); + mockGetSnapshot.mockReturnValue(undefined); + + const settings = await resolveSpawnSettings('snapshot-miss', 'MNG-651', 'job-miss'); + + expect(settings.workerImage).toBe('base-worker:latest'); + expect(settings.snapshotEnabled).toBe(true); + expect(mockLoggerInfo).toHaveBeenCalledWith( + '[WorkerManager] Snapshot miss — using base worker image:', + expect.objectContaining({ + jobId: 'job-miss', + projectId: 'snapshot-miss', + workItemId: 'MNG-651', + }), + ); + }); + + it('builds Docker-safe worker container names from coalesced job IDs', () => { + expect(buildWorkerContainerName('coalesce:ucho:MNG-413')).toBe( + 'cascade-worker-coalesce_ucho_MNG-413', + ); + }); + + it('passes through Docker-safe worker job IDs unchanged', () => { + expect(buildWorkerContainerName('github-1234567890abcdef')).toBe( + 'cascade-worker-github-1234567890abcdef', + ); + }); +}); From 898033faf12fc4617b8d8f9205acacafb0b2fd95 Mon Sep 17 00:00:00 2001 From: Zbigniew Sobiecki Date: Mon, 11 May 2026 09:42:08 +0000 Subject: [PATCH 2/3] fix(router): route Linear webhooks by team + issue's project, not just team MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit User reported MNG-638 (Linear) wasn't triggering the implementation agent despite multiple moves to Todo and `cascade-ready` label applications. Investigation found: 1. Linear webhooks ARE arriving (`webhooklogs list --source linear`) but every one is `processed=False decisionReason="Event unparseable or not processable"`. 2. Loki shows the real reason — `LinearRouterAdapter: dropping event outside project scope` — fires ~20 times on MNG-638's UUID. The webhooklogs UI hides the specific drop reason because parseWebhook() returns null and the outer wrapper emits the generic "unparseable" message. 3. The actual bug: two cascade projects share one Linear team after `cascade` was migrated from Trello → Linear: - `cascade` → Linear team 310c41fe-..., scoped to Linear Project 83a0f22b-... - `ucho` → same team, scoped to Linear Project 7108c72e-... The router used `config.projects.find((p) => p.linear?.teamId === teamId)` which always returned the first match by team. The follow-up matchesConfiguredProjectScope() then dropped events whose issue belonged to the OTHER cascade project's Linear scope. MNG-638 is in cascade's Linear Project, but the .find() picked ucho first → scope mismatch → drop. Latent until the second cascade project landed on the same team this morning; now a deterministic loss for everything in cascade's Linear Project. Fix: replace "find first by teamId" with "best match by teamId + Linear Project scope". - Collect ALL candidates that share the teamId via `.filter()`. - Extract issue's Linear Project ID BEFORE selecting a candidate (`resolveIssueProjectId` helper — pulls from data.projectId / data.project.id, or fetches via API for Comment events without inline context). - Pick the candidate with deterministic fallback: 1. Strong match: candidate whose `linear.projectId` equals issue's project. 2. Catch-all: candidate with NO `linear.projectId` configured. 3. Otherwise: drop with the existing info-level log (now augmented with the full candidates list for diagnostics — "no candidate matches issue project" vs "issue has no project"). Delete `matchesConfiguredProjectScope` — its job is now done inline by the new selection logic. `fetchIssueProjectId` stays (called by the helper). When `candidates.length === 1` the new logic degrades to the old behavior, so single-project-per-team setups continue working. Tests: - `tests/unit/router/adapters/linear.test.ts`: updated the existing scope- filter assertions to match the new log shape (reason field + candidates list), added a `parseWebhook — multi-cascade-project-per-team` describe block with 7 scenarios: * routes to the project matching the issue's Linear scope (cascade) * routes to the other project when the issue belongs to it (ucho) * drops with candidates list when no candidate subscribes * drops with `'issue has no project'` reason when issue is unscoped and all candidates are scoped * falls back to unscoped catch-all candidate when no scoped match * prefers the scoped match over an unscoped catch-all * Comment event uses API fallback via the first candidate's creds All 45 LinearRouterAdapter tests pass. JIRA and Trello router adapters use their own unique discriminator (project key / board ID) per cascade project so they don't share the same multi-project-per-discriminator vulnerability. Verification: full suite (9280 tests) clean. After deploy, re-moving MNG-638 to Todo should route to `cascade` (issue's projectId `83a0f22b-...` matches cascade's scope) and fire the implementation agent. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/router/adapters/linear.ts | 90 ++++++---- tests/unit/router/adapters/linear.test.ts | 197 +++++++++++++++++++++- 2 files changed, 243 insertions(+), 44 deletions(-) diff --git a/src/router/adapters/linear.ts b/src/router/adapters/linear.ts index e80f68d35..a9cc6c077 100644 --- a/src/router/adapters/linear.ts +++ b/src/router/adapters/linear.ts @@ -45,14 +45,12 @@ interface LinearParsedEvent extends ParsedWebhookEvent { resourceType: string; } -interface LinearProjectScopeInput { - project: RouterProjectConfig; +interface ResolveIssueProjectIdInput { + candidates: RouterProjectConfig[]; isCommentEvent: boolean; workItemId: string | undefined; data: Record; issue: Record | undefined; - eventType: string; - teamId: string; } // ============================================================================ @@ -88,8 +86,8 @@ export class LinearRouterAdapter implements RouterPlatformAdapter { } const config = await loadProjectConfig(); - const project = config.projects.find((proj) => proj.linear?.teamId === teamId); - if (!project) { + const candidates = config.projects.filter((proj) => proj.linear?.teamId === teamId); + if (candidates.length === 0) { logger.debug('LinearRouterAdapter: no project found for teamId', { teamId }); return null; } @@ -100,20 +98,40 @@ export class LinearRouterAdapter implements RouterPlatformAdapter { : (data.id as string | undefined); const eventType = `${p.action}/${p.type}`; - // Optional project-scope filter: when the CASCADE project has been narrowed - // to a specific Linear Project, drop webhook events whose issue is not in - // that project. Linear cannot scope webhooks to a project, so the filter - // runs here, after team-match. - const matchesProjectScope = await this.matchesConfiguredProjectScope({ - project, + // 2026-05-11: multiple cascade projects can share a Linear team when each + // is narrowed to a different Linear Project (e.g. mongrel team hosts both + // `cascade` and `ucho` cascade projects, each scoped to a separate Linear + // Project). The previous `find()` returned only the first team match and + // the follow-up scope filter dropped events for issues that belonged to + // the OTHER cascade project — silently losing webhooks. Select the right + // cascade project up front based on the issue's Linear Project: + // 1. Strong match: candidate whose `linear.projectId` matches the issue's project + // 2. Catch-all: candidate with no `linear.projectId` configured + // 3. Otherwise: drop with diagnostic (no candidate subscribes to this Project) + const issueProjectId = await this.resolveIssueProjectId({ + candidates, isCommentEvent, workItemId, data, issue, - eventType, - teamId, }); - if (!matchesProjectScope) { + + const project = + candidates.find((p) => p.linear?.projectId === issueProjectId) ?? + candidates.find((p) => !p.linear?.projectId); + + if (!project) { + logger.info('LinearRouterAdapter: dropping event outside project scope', { + reason: issueProjectId ? 'no candidate matches issue project' : 'issue has no project', + teamId, + issueProjectId, + candidates: candidates.map((c) => ({ + id: c.id, + projectId: c.linear?.projectId, + })), + issueId: workItemId, + eventType, + }); return null; } @@ -128,31 +146,29 @@ export class LinearRouterAdapter implements RouterPlatformAdapter { }; } - private async matchesConfiguredProjectScope(input: LinearProjectScopeInput): Promise { - const configuredProjectId = input.project.linear?.projectId; - if (!configuredProjectId) return true; - + /** + * Resolve the issue's Linear Project ID — used to pick the right cascade + * project candidate when multiple are configured for the same team. + * + * Returns the inline `data.projectId` / `data.project.id` (or for Comment + * events the equivalent under `data.issue`) when present in the webhook + * payload. For Comment events without inline issue context, falls back to + * a Linear API lookup. The API call uses the first candidate's credentials + * — Linear creds are per-team so any candidate's credentials work for + * fetching team-scoped issue data. + */ + private async resolveIssueProjectId( + input: ResolveIssueProjectIdInput, + ): Promise { const payloadProjectId = input.isCommentEvent ? ((input.issue?.projectId as string | undefined) ?? nestedId(input.issue?.project)) : ((input.data.projectId as string | undefined) ?? nestedId(input.data.project)); - const issueProjectId = - payloadProjectId ?? - (input.isCommentEvent && input.workItemId - ? await this.fetchIssueProjectId(input.project.id, input.workItemId) - : undefined); - - if (issueProjectId === configuredProjectId) return true; - - logger.info('LinearRouterAdapter: dropping event outside project scope', { - reason: issueProjectId ? 'project scope mismatch' : 'issue has no project', - configuredProjectId, - issueProjectId, - issueId: input.workItemId, - teamId: input.teamId, - projectId: input.project.id, - eventType: input.eventType, - }); - return false; + if (payloadProjectId) return payloadProjectId; + + if (input.isCommentEvent && input.workItemId) { + return this.fetchIssueProjectId(input.candidates[0].id, input.workItemId); + } + return undefined; } private async fetchIssueProjectId( diff --git a/tests/unit/router/adapters/linear.test.ts b/tests/unit/router/adapters/linear.test.ts index 14890141e..17749075a 100644 --- a/tests/unit/router/adapters/linear.test.ts +++ b/tests/unit/router/adapters/linear.test.ts @@ -234,15 +234,19 @@ describe('LinearRouterAdapter', () => { data: { ...baseLinearPayload.data, projectId: 'P2' }, }); expect(result).toBeNull(); + // 2026-05-11: log shape changed when project selection moved before + // scope filtering. `reason` is now "no candidate matches issue project" + // (the issue HAS a project, we just didn't find a candidate that + // subscribes to it). The log also lists all candidates so operators + // can see why none matched. expect(mockLoggerInfo).toHaveBeenCalledWith( expect.stringMatching(/LinearRouterAdapter: dropping event/), expect.objectContaining({ - reason: 'project scope mismatch', - configuredProjectId: 'P1', + reason: 'no candidate matches issue project', issueProjectId: 'P2', issueId: 'issue-abc', teamId: 'team-abc-123', - projectId: 'p1', + candidates: [{ id: 'p1', projectId: 'P1' }], eventType: 'create/Issue', }), ); @@ -255,8 +259,8 @@ describe('LinearRouterAdapter', () => { expect.stringMatching(/LinearRouterAdapter: dropping event/), expect.objectContaining({ reason: 'issue has no project', - configuredProjectId: 'P1', issueProjectId: undefined, + candidates: [{ id: 'p1', projectId: 'P1' }], }), ); }); @@ -346,9 +350,9 @@ describe('LinearRouterAdapter', () => { expect(mockLoggerInfo).toHaveBeenCalledWith( expect.stringMatching(/LinearRouterAdapter: dropping event/), expect.objectContaining({ - reason: 'project scope mismatch', - configuredProjectId: 'P1', + reason: 'no candidate matches issue project', issueProjectId: 'P2', + candidates: [{ id: 'p1', projectId: 'P1' }], eventType: 'create/Comment', }), ); @@ -373,7 +377,7 @@ describe('LinearRouterAdapter', () => { expect(mockLoggerInfo).toHaveBeenCalledWith( expect.stringMatching(/LinearRouterAdapter: dropping event/), expect.objectContaining({ - reason: 'project scope mismatch', + reason: 'no candidate matches issue project', eventType: 'create/IssueLabel', }), ); @@ -394,6 +398,185 @@ describe('LinearRouterAdapter', () => { }); }); + // 2026-05-11: multi-cascade-project-per-Linear-team support. Closes the + // MNG-638 regression where cascade was migrated from Trello → Linear and + // both `cascade` and `ucho` ended up scoped to the same Linear team. The + // `.find()` returned only the first match (ucho), then the follow-up + // scope filter dropped the event because the issue's Linear Project + // (`83a0f22b-...`) didn't match ucho's scope (`7108c72e-...`). Now the + // adapter picks the candidate based on teamId AND issue's Linear Project, + // not just team. + describe('parseWebhook — multi-cascade-project-per-team', () => { + const cascadeProject: RouterProjectConfig = { + id: 'cascade', + repo: 'mongrel/cascade', + pmType: 'linear', + linear: { teamId: 'team-abc-123', projectId: 'P-cascade' }, + }; + const uchoProject: RouterProjectConfig = { + id: 'ucho', + repo: 'mongrel/ucho', + pmType: 'linear', + linear: { teamId: 'team-abc-123', projectId: 'P-ucho' }, + }; + + beforeEach(() => { + mockLoggerInfo.mockClear(); + }); + + it('routes to the cascade project whose Linear scope matches the issue project (was the .find()-first-match bug)', async () => { + // Two cascade projects on the same Linear team; ucho appears first + // in the projects array. Pre-fix the .find() would have returned + // ucho regardless of the issue's project, and the follow-up scope + // filter would have dropped the event. The new code looks at all + // candidates and matches on issue's projectId. + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [uchoProject, cascadeProject], + fullProjects: [{ id: 'ucho' } as never, { id: 'cascade' } as never], + }); + + const result = await adapter.parseWebhook({ + ...baseLinearPayload, + data: { ...baseLinearPayload.data, projectId: 'P-cascade' }, + }); + + expect(result).not.toBeNull(); + expect(result?.projectId).toBe('cascade'); + expect(mockLoggerInfo).not.toHaveBeenCalled(); + }); + + it('routes to ucho when the issue belongs to ucho (mirror of the cascade case)', async () => { + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [uchoProject, cascadeProject], + fullProjects: [{ id: 'ucho' } as never, { id: 'cascade' } as never], + }); + + const result = await adapter.parseWebhook({ + ...baseLinearPayload, + data: { ...baseLinearPayload.data, projectId: 'P-ucho' }, + }); + + expect(result).not.toBeNull(); + expect(result?.projectId).toBe('ucho'); + }); + + it('drops the event when no candidate subscribes to the issue project; log includes all candidates', async () => { + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [uchoProject, cascadeProject], + fullProjects: [{ id: 'ucho' } as never, { id: 'cascade' } as never], + }); + + const result = await adapter.parseWebhook({ + ...baseLinearPayload, + data: { ...baseLinearPayload.data, projectId: 'P-orphan' }, + }); + + expect(result).toBeNull(); + expect(mockLoggerInfo).toHaveBeenCalledWith( + expect.stringMatching(/LinearRouterAdapter: dropping event/), + expect.objectContaining({ + reason: 'no candidate matches issue project', + issueProjectId: 'P-orphan', + candidates: [ + { id: 'ucho', projectId: 'P-ucho' }, + { id: 'cascade', projectId: 'P-cascade' }, + ], + }), + ); + }); + + it('drops the event when issue has no project and all candidates are scoped', async () => { + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [uchoProject, cascadeProject], + fullProjects: [{ id: 'ucho' } as never, { id: 'cascade' } as never], + }); + + const result = await adapter.parseWebhook(baseLinearPayload); + + expect(result).toBeNull(); + expect(mockLoggerInfo).toHaveBeenCalledWith( + expect.stringMatching(/LinearRouterAdapter: dropping event/), + expect.objectContaining({ + reason: 'issue has no project', + issueProjectId: undefined, + }), + ); + }); + + it('falls back to an unscoped catch-all candidate when no scoped candidate matches', async () => { + const catchAll: RouterProjectConfig = { + id: 'catch-all', + repo: 'mongrel/catchall', + pmType: 'linear', + linear: { teamId: 'team-abc-123' }, + }; + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [cascadeProject, catchAll], + fullProjects: [{ id: 'cascade' } as never, { id: 'catch-all' } as never], + }); + + const result = await adapter.parseWebhook({ + ...baseLinearPayload, + data: { ...baseLinearPayload.data, projectId: 'P-unmatched' }, + }); + + expect(result).not.toBeNull(); + expect(result?.projectId).toBe('catch-all'); + }); + + it('prefers the scoped match over an unscoped catch-all when both are configured', async () => { + const catchAll: RouterProjectConfig = { + id: 'catch-all', + repo: 'mongrel/catchall', + pmType: 'linear', + linear: { teamId: 'team-abc-123' }, + }; + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [catchAll, cascadeProject], + fullProjects: [{ id: 'catch-all' } as never, { id: 'cascade' } as never], + }); + + const result = await adapter.parseWebhook({ + ...baseLinearPayload, + data: { ...baseLinearPayload.data, projectId: 'P-cascade' }, + }); + + expect(result).not.toBeNull(); + // Scoped match wins even though catch-all comes first in array order. + expect(result?.projectId).toBe('cascade'); + }); + + it('Comment event fetches issue project via API and routes to the right scoped candidate', async () => { + mockGetIssueProjectId.mockResolvedValueOnce('P-cascade'); + vi.mocked(loadProjectConfig).mockResolvedValue({ + projects: [uchoProject, cascadeProject], + fullProjects: [{ id: 'ucho' } as never, { id: 'cascade' } as never], + }); + + const payload = { + action: 'create', + type: 'Comment', + organizationId: 'org-123', + webhookTimestamp: Date.now(), + data: { + id: 'comment-xyz', + body: '@cascade please update', + issueId: 'issue-abc', + issue: { id: 'issue-abc', identifier: 'TEAM-1', teamId: 'team-abc-123' }, + }, + url: 'https://linear.app/issue', + }; + + const result = await adapter.parseWebhook(payload); + + expect(result).not.toBeNull(); + expect(result?.projectId).toBe('cascade'); + // The API call uses the FIRST candidate's id — Linear creds are + // per-team so any candidate's creds work for the issue lookup. + expect(mockGetIssueProjectId).toHaveBeenCalledWith('issue-abc'); + }); + }); + describe('isProcessableEvent', () => { it('returns true for Issue events', () => { expect( From d22f2b7c9830e9a236ef2451bc5b5897edeba1ca Mon Sep 17 00:00:00 2001 From: aaight Date: Mon, 11 May 2026 11:43:25 +0200 Subject: [PATCH 3/3] refactor(router): isolate worker timeout cancellation (#1331) Co-authored-by: Cascade Bot --- src/router/container-manager.ts | 77 +------- src/router/worker-timeouts.ts | 88 +++++++++ tests/unit/router/worker-timeouts.test.ts | 217 ++++++++++++++++++++++ 3 files changed, 307 insertions(+), 75 deletions(-) create mode 100644 src/router/worker-timeouts.ts create mode 100644 tests/unit/router/worker-timeouts.test.ts diff --git a/src/router/container-manager.ts b/src/router/container-manager.ts index 4d59d2773..9bdf5784a 100644 --- a/src/router/container-manager.ts +++ b/src/router/container-manager.ts @@ -13,14 +13,12 @@ import type { Job } from 'bullmq'; import Docker from 'dockerode'; -import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js'; import { captureException } from '../sentry.js'; import { logger } from '../utils/logging.js'; import { activeWorkers, cleanupWorker } from './active-workers.js'; import { clearAllAgentTypeLocks } from './agent-type-lock.js'; import { routerConfig } from './config.js'; import { ROUTER_INSTANCE_ID } from './instance-id.js'; -import { notifyTimeout } from './notifications.js'; import { stopOrphanCleanup } from './orphan-cleanup.js'; import type { CascadeJob } from './queue.js'; import { invalidateSnapshot, registerSnapshot } from './snapshot-manager.js'; @@ -32,6 +30,7 @@ import { extractWorkItemId, } from './worker-env.js'; import { buildWorkerContainerName, resolveSpawnSettings } from './worker-spawn-settings.js'; +import { killWorker } from './worker-timeouts.js'; // Re-export from sub-modules so existing callers importing from container-manager.ts // continue to work without changes. @@ -60,6 +59,7 @@ export { ROUTER_KILL_BUFFER_MS, resolveSpawnSettings, } from './worker-spawn-settings.js'; +export { killWorker } from './worker-timeouts.js'; const docker = new Docker(); @@ -511,79 +511,6 @@ export async function spawnWorker(job: Job): Promise { } } -/** - * Kill a worker container with two-phase shutdown: - * 1. SIGTERM via container.stop(t=15) — gives agent watchdog 15s to clean up - * 2. Docker auto-escalates to SIGKILL after 15s - * 3. Router posts its own timeout notification - */ -export async function killWorker(jobId: string): Promise { - const worker = activeWorkers.get(jobId); - if (!worker) return; - - try { - const container = docker.getContainer(worker.containerId); - await container.stop({ t: 15 }); - logger.info('[WorkerManager] Worker stopped:', { jobId }); - } catch (err) { - // Container might already be stopped - logger.warn('[WorkerManager] Error stopping worker (may already be stopped):', { - jobId, - error: String(err), - }); - } - - const durationMs = Date.now() - worker.startedAt.getTime(); - - // Update DB run status to timed_out (fire-and-forget, no-op if watchdog already did it). - // cleanupWorker is called below without an exitCode so it skips its own DB update, - // avoiding a race where the wrong status ('failed') could win. - if (worker.projectId) { - const dbUpdate = worker.workItemId - ? failOrphanedRun( - worker.projectId, - worker.workItemId, - 'Router timeout', - 'timed_out', - durationMs, - ) - : failOrphanedRunFallback( - worker.projectId, - worker.agentType, - worker.startedAt, - 'timed_out', - 'Router timeout', - durationMs, - ); - dbUpdate - .then((runId) => { - if (runId) - logger.info('[WorkerManager] Marked run timed_out after router kill', { - jobId, - runId, - }); - }) - .catch((err) => - logger.error('[WorkerManager] DB update failed after router kill', { - jobId, - error: String(err), - }), - ); - } - - // Send timeout notification (fire-and-forget) - notifyTimeout(worker.job, { - jobId: worker.jobId, - startedAt: worker.startedAt, - durationMs, - }).catch((err) => { - logger.error('[WorkerManager] Timeout notification error:', String(err)); - }); - - // No exitCode — DB update is handled above with the correct 'timed_out' status - cleanupWorker(jobId); -} - /** * Detach from all active workers on shutdown. * Workers continue running as independent containers. diff --git a/src/router/worker-timeouts.ts b/src/router/worker-timeouts.ts new file mode 100644 index 000000000..ea646c80c --- /dev/null +++ b/src/router/worker-timeouts.ts @@ -0,0 +1,88 @@ +/** + * Timeout cancellation workflow for CASCADE worker containers. + * + * Owns the router-side timeout path: stop the Docker container, mark the run + * `timed_out`, notify the PM/SCM surface, and clear active-worker state without + * letting the generic crash cleanup overwrite the timeout status. + */ + +import Docker from 'dockerode'; +import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js'; +import { logger } from '../utils/logging.js'; +import { activeWorkers, cleanupWorker } from './active-workers.js'; +import { notifyTimeout } from './notifications.js'; + +const docker = new Docker(); + +/** + * Kill a worker container after the router watchdog fires: + * 1. SIGTERM via container.stop(t=15) gives the agent watchdog time to clean up. + * 2. Docker auto-escalates to SIGKILL after 15s. + * 3. Router marks the run timed_out and posts its own timeout notification. + */ +export async function killWorker(jobId: string): Promise { + const worker = activeWorkers.get(jobId); + if (!worker) return; + + try { + const container = docker.getContainer(worker.containerId); + await container.stop({ t: 15 }); + logger.info('[WorkerManager] Worker stopped:', { jobId }); + } catch (err) { + // Container might already be stopped; timeout reporting still needs to run. + logger.warn('[WorkerManager] Error stopping worker (may already be stopped):', { + jobId, + error: String(err), + }); + } + + const durationMs = Date.now() - worker.startedAt.getTime(); + + // Update DB run status to timed_out (fire-and-forget, no-op if watchdog already did it). + // cleanupWorker is called below without an exitCode so it skips its own DB update, + // avoiding a race where the wrong status ('failed') could win. + if (worker.projectId) { + const dbUpdate = worker.workItemId + ? failOrphanedRun( + worker.projectId, + worker.workItemId, + 'Router timeout', + 'timed_out', + durationMs, + ) + : failOrphanedRunFallback( + worker.projectId, + worker.agentType, + worker.startedAt, + 'timed_out', + 'Router timeout', + durationMs, + ); + dbUpdate + .then((runId) => { + if (runId) + logger.info('[WorkerManager] Marked run timed_out after router kill', { + jobId, + runId, + }); + }) + .catch((err) => + logger.error('[WorkerManager] DB update failed after router kill', { + jobId, + error: String(err), + }), + ); + } + + // Send timeout notification (fire-and-forget). + notifyTimeout(worker.job, { + jobId: worker.jobId, + startedAt: worker.startedAt, + durationMs, + }).catch((err) => { + logger.error('[WorkerManager] Timeout notification error:', String(err)); + }); + + // No exitCode: DB update is handled above with the correct timed_out status. + cleanupWorker(jobId); +} diff --git a/tests/unit/router/worker-timeouts.test.ts b/tests/unit/router/worker-timeouts.test.ts new file mode 100644 index 000000000..6dd456566 --- /dev/null +++ b/tests/unit/router/worker-timeouts.test.ts @@ -0,0 +1,217 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// --------------------------------------------------------------------------- +// Hoisted mock state +// --------------------------------------------------------------------------- + +const { + mockActiveWorkers, + mockCleanupWorker, + mockDockerGetContainer, + mockFailOrphanedRun, + mockFailOrphanedRunFallback, + mockNotifyTimeout, +} = vi.hoisted(() => ({ + mockActiveWorkers: new Map(), + mockCleanupWorker: vi.fn(), + mockDockerGetContainer: vi.fn(), + mockFailOrphanedRun: vi.fn().mockResolvedValue(null), + mockFailOrphanedRunFallback: vi.fn().mockResolvedValue(null), + mockNotifyTimeout: vi.fn().mockResolvedValue(undefined), +})); + +// --------------------------------------------------------------------------- +// Module-level mocks +// --------------------------------------------------------------------------- + +vi.mock('dockerode', () => ({ + default: vi.fn().mockImplementation(() => ({ + getContainer: mockDockerGetContainer, + })), +})); + +vi.mock('../../../src/db/repositories/runsRepository.js', () => ({ + failOrphanedRun: (...args: unknown[]) => mockFailOrphanedRun(...args), + failOrphanedRunFallback: (...args: unknown[]) => mockFailOrphanedRunFallback(...args), +})); + +vi.mock('../../../src/router/active-workers.js', () => ({ + activeWorkers: mockActiveWorkers, + cleanupWorker: (...args: unknown[]) => mockCleanupWorker(...args), +})); + +vi.mock('../../../src/router/notifications.js', () => ({ + notifyTimeout: (...args: unknown[]) => mockNotifyTimeout(...args), +})); + +const { mockLogger } = vi.hoisted(() => ({ + mockLogger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); +vi.mock('../../../src/utils/logging.js', () => ({ + logger: mockLogger, +})); + +// --------------------------------------------------------------------------- +// Imports +// --------------------------------------------------------------------------- + +import type { ActiveWorker } from '../../../src/router/active-workers.js'; +import type { CascadeJob } from '../../../src/router/queue.js'; +import { killWorker } from '../../../src/router/worker-timeouts.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeWorker(overrides: Partial = {}): ActiveWorker { + return { + containerId: overrides.containerId ?? 'container-abc123def456', + jobId: overrides.jobId ?? 'job-1', + startedAt: overrides.startedAt ?? new Date(Date.now() - 1000), + timeoutHandle: overrides.timeoutHandle ?? ({} as NodeJS.Timeout), + job: overrides.job ?? ({ type: 'trello', projectId: 'proj-1' } as CascadeJob), + projectId: overrides.projectId, + workItemId: overrides.workItemId, + agentType: overrides.agentType, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('worker-timeouts killWorker', () => { + beforeEach(() => { + mockActiveWorkers.clear(); + mockCleanupWorker.mockClear(); + mockDockerGetContainer.mockReset(); + mockFailOrphanedRun.mockReset(); + mockFailOrphanedRun.mockResolvedValue(null); + mockFailOrphanedRunFallback.mockReset(); + mockFailOrphanedRunFallback.mockResolvedValue(null); + mockNotifyTimeout.mockReset(); + mockNotifyTimeout.mockResolvedValue(undefined); + mockLogger.info.mockClear(); + mockLogger.warn.mockClear(); + mockLogger.error.mockClear(); + }); + + it('is a no-op for an unknown jobId', async () => { + await expect(killWorker('missing-job')).resolves.toBeUndefined(); + + expect(mockDockerGetContainer).not.toHaveBeenCalled(); + expect(mockFailOrphanedRun).not.toHaveBeenCalled(); + expect(mockFailOrphanedRunFallback).not.toHaveBeenCalled(); + expect(mockNotifyTimeout).not.toHaveBeenCalled(); + expect(mockCleanupWorker).not.toHaveBeenCalled(); + }); + + it('stops the worker container with the timeout grace period', async () => { + const container = { stop: vi.fn().mockResolvedValue(undefined) }; + mockDockerGetContainer.mockReturnValue(container); + mockActiveWorkers.set('job-1', makeWorker({ jobId: 'job-1' })); + + await killWorker('job-1'); + + expect(mockDockerGetContainer).toHaveBeenCalledWith('container-abc123def456'); + expect(container.stop).toHaveBeenCalledWith({ t: 15 }); + }); + + it('warns but still notifies and cleans up when the container is already stopped', async () => { + const container = { stop: vi.fn().mockRejectedValue(new Error('already stopped')) }; + mockDockerGetContainer.mockReturnValue(container); + mockActiveWorkers.set('job-stopped', makeWorker({ jobId: 'job-stopped' })); + + await killWorker('job-stopped'); + + expect(mockLogger.warn).toHaveBeenCalledWith( + '[WorkerManager] Error stopping worker (may already be stopped):', + expect.objectContaining({ jobId: 'job-stopped' }), + ); + expect(mockNotifyTimeout).toHaveBeenCalled(); + expect(mockCleanupWorker).toHaveBeenCalledWith('job-stopped'); + }); + + it('marks a known work item run as timed_out with the router timeout reason', async () => { + const container = { stop: vi.fn().mockResolvedValue(undefined) }; + mockDockerGetContainer.mockReturnValue(container); + mockFailOrphanedRun.mockResolvedValue('run-123'); + mockActiveWorkers.set( + 'job-work-item', + makeWorker({ + jobId: 'job-work-item', + projectId: 'proj-1', + workItemId: 'card-1', + }), + ); + + await killWorker('job-work-item'); + + expect(mockFailOrphanedRun).toHaveBeenCalledWith( + 'proj-1', + 'card-1', + 'Router timeout', + 'timed_out', + expect.any(Number), + ); + expect(mockFailOrphanedRunFallback).not.toHaveBeenCalled(); + }); + + it('marks a run without workItemId via fallback as timed_out with the router timeout reason', async () => { + const container = { stop: vi.fn().mockResolvedValue(undefined) }; + mockDockerGetContainer.mockReturnValue(container); + mockFailOrphanedRunFallback.mockResolvedValue('run-fallback'); + const startedAt = new Date(Date.now() - 2000); + mockActiveWorkers.set( + 'job-fallback', + makeWorker({ + jobId: 'job-fallback', + projectId: 'proj-1', + agentType: 'implementation', + startedAt, + }), + ); + + await killWorker('job-fallback'); + + expect(mockFailOrphanedRunFallback).toHaveBeenCalledWith( + 'proj-1', + 'implementation', + startedAt, + 'timed_out', + 'Router timeout', + expect.any(Number), + ); + expect(mockFailOrphanedRun).not.toHaveBeenCalled(); + }); + + it('cleans up without an exit code so cleanupWorker cannot mark the run failed', async () => { + const container = { stop: vi.fn().mockResolvedValue(undefined) }; + mockDockerGetContainer.mockReturnValue(container); + mockActiveWorkers.set( + 'job-no-double-write', + makeWorker({ + jobId: 'job-no-double-write', + projectId: 'proj-1', + workItemId: 'card-1', + }), + ); + + await killWorker('job-no-double-write'); + + expect(mockFailOrphanedRun).toHaveBeenCalledTimes(1); + expect(mockFailOrphanedRun).toHaveBeenCalledWith( + 'proj-1', + 'card-1', + 'Router timeout', + 'timed_out', + expect.any(Number), + ); + expect(mockCleanupWorker).toHaveBeenCalledTimes(1); + expect(mockCleanupWorker).toHaveBeenCalledWith('job-no-double-write'); + }); +});