Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 53 additions & 37 deletions src/router/adapters/linear.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ interface LinearParsedEvent extends ParsedWebhookEvent {
resourceType: string;
}

interface LinearProjectScopeInput {
project: RouterProjectConfig;
interface ResolveIssueProjectIdInput {
candidates: RouterProjectConfig[];
isCommentEvent: boolean;
workItemId: string | undefined;
data: Record<string, unknown>;
issue: Record<string, unknown> | undefined;
eventType: string;
teamId: string;
}

// ============================================================================
Expand Down Expand Up @@ -88,8 +86,8 @@ export class LinearRouterAdapter implements RouterPlatformAdapter {
}

const config = await loadProjectConfig();
const project = config.projects.find((proj) => proj.linear?.teamId === teamId);
if (!project) {
const candidates = config.projects.filter((proj) => proj.linear?.teamId === teamId);
if (candidates.length === 0) {
logger.debug('LinearRouterAdapter: no project found for teamId', { teamId });
return null;
}
Expand All @@ -100,20 +98,40 @@ export class LinearRouterAdapter implements RouterPlatformAdapter {
: (data.id as string | undefined);
const eventType = `${p.action}/${p.type}`;

// Optional project-scope filter: when the CASCADE project has been narrowed
// to a specific Linear Project, drop webhook events whose issue is not in
// that project. Linear cannot scope webhooks to a project, so the filter
// runs here, after team-match.
const matchesProjectScope = await this.matchesConfiguredProjectScope({
project,
// 2026-05-11: multiple cascade projects can share a Linear team when each
// is narrowed to a different Linear Project (e.g. mongrel team hosts both
// `cascade` and `ucho` cascade projects, each scoped to a separate Linear
// Project). The previous `find()` returned only the first team match and
// the follow-up scope filter dropped events for issues that belonged to
// the OTHER cascade project — silently losing webhooks. Select the right
// cascade project up front based on the issue's Linear Project:
// 1. Strong match: candidate whose `linear.projectId` matches the issue's project
// 2. Catch-all: candidate with no `linear.projectId` configured
// 3. Otherwise: drop with diagnostic (no candidate subscribes to this Project)
const issueProjectId = await this.resolveIssueProjectId({
candidates,
isCommentEvent,
workItemId,
data,
issue,
eventType,
teamId,
});
if (!matchesProjectScope) {

const project =
candidates.find((p) => p.linear?.projectId === issueProjectId) ??
candidates.find((p) => !p.linear?.projectId);

if (!project) {
logger.info('LinearRouterAdapter: dropping event outside project scope', {
reason: issueProjectId ? 'no candidate matches issue project' : 'issue has no project',
teamId,
issueProjectId,
candidates: candidates.map((c) => ({
id: c.id,
projectId: c.linear?.projectId,
})),
issueId: workItemId,
eventType,
});
return null;
}

Expand All @@ -128,31 +146,29 @@ export class LinearRouterAdapter implements RouterPlatformAdapter {
};
}

private async matchesConfiguredProjectScope(input: LinearProjectScopeInput): Promise<boolean> {
const configuredProjectId = input.project.linear?.projectId;
if (!configuredProjectId) return true;

/**
* Resolve the issue's Linear Project ID — used to pick the right cascade
* project candidate when multiple are configured for the same team.
*
* Returns the inline `data.projectId` / `data.project.id` (or for Comment
* events the equivalent under `data.issue`) when present in the webhook
* payload. For Comment events without inline issue context, falls back to
* a Linear API lookup. The API call uses the first candidate's credentials
* — Linear creds are per-team so any candidate's credentials work for
* fetching team-scoped issue data.
*/
private async resolveIssueProjectId(
input: ResolveIssueProjectIdInput,
): Promise<string | undefined> {
const payloadProjectId = input.isCommentEvent
? ((input.issue?.projectId as string | undefined) ?? nestedId(input.issue?.project))
: ((input.data.projectId as string | undefined) ?? nestedId(input.data.project));
const issueProjectId =
payloadProjectId ??
(input.isCommentEvent && input.workItemId
? await this.fetchIssueProjectId(input.project.id, input.workItemId)
: undefined);

if (issueProjectId === configuredProjectId) return true;

logger.info('LinearRouterAdapter: dropping event outside project scope', {
reason: issueProjectId ? 'project scope mismatch' : 'issue has no project',
configuredProjectId,
issueProjectId,
issueId: input.workItemId,
teamId: input.teamId,
projectId: input.project.id,
eventType: input.eventType,
});
return false;
if (payloadProjectId) return payloadProjectId;

if (input.isCommentEvent && input.workItemId) {
return this.fetchIssueProjectId(input.candidates[0].id, input.workItemId);
}
return undefined;
}

private async fetchIssueProjectId(
Expand Down
176 changes: 11 additions & 165 deletions src/router/container-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@

import type { Job } from 'bullmq';
import Docker from 'dockerode';
import { failOrphanedRun, failOrphanedRunFallback } from '../db/repositories/runsRepository.js';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { activeWorkers, cleanupWorker } from './active-workers.js';
import { clearAllAgentTypeLocks } from './agent-type-lock.js';
import { loadProjectConfig, routerConfig } from './config.js';
import { routerConfig } from './config.js';
import { ROUTER_INSTANCE_ID } from './instance-id.js';
import { notifyTimeout } from './notifications.js';
import { stopOrphanCleanup } from './orphan-cleanup.js';
import type { CascadeJob } from './queue.js';
import { getSnapshot, invalidateSnapshot, registerSnapshot } from './snapshot-manager.js';
import { invalidateSnapshot, registerSnapshot } from './snapshot-manager.js';
import { clearAllWorkItemLocks } from './work-item-lock.js';
import {
buildWorkerEnvWithProjectId,
extractAgentType,
extractProjectIdFromJob,
extractWorkItemId,
} from './worker-env.js';
import { buildWorkerContainerName, resolveSpawnSettings } from './worker-spawn-settings.js';
import { killWorker } from './worker-timeouts.js';

// Re-export from sub-modules so existing callers importing from container-manager.ts
// continue to work without changes.
Expand All @@ -54,12 +54,15 @@ export {
buildWorkerEnv,
extractProjectIdFromJob,
} from './worker-env.js';
export {
buildWorkerContainerName,
ROUTER_KILL_BUFFER_MS,
resolveSpawnSettings,
} from './worker-spawn-settings.js';
export { killWorker } from './worker-timeouts.js';

const docker = new Docker();

/** Buffer added on top of the in-container watchdog so the router kill is always a backstop. */
const ROUTER_KILL_BUFFER_MS = 2 * 60 * 1000;

/**
* Build a stable Docker image name for a snapshot.
* Uses a sanitised project+workItem key so it's valid as a Docker image tag.
Expand Down Expand Up @@ -256,84 +259,6 @@ async function onWorkerExit(opts: {
cleanupWorker(jobId, result.StatusCode, { oomKilled, exitReason });
}

interface SpawnSettings {
snapshotEnabled: boolean;
workerImage: string;
containerTimeoutMs: number;
snapshotTtlMs: number;
}

/**
* Resolve per-project spawn settings (snapshot flag, image, timeout).
* Centralises all loadProjectConfig() calls so spawnWorker stays simple.
*
* @internal Exported for unit testing only — call `spawnWorker` from app code.
*/
export async function resolveSpawnSettings(
projectId: string | null,
workItemId: string | undefined,
jobId: string,
): Promise<SpawnSettings> {
let snapshotEnabled = false;
let workerImage = routerConfig.workerImage;
let containerTimeoutMs = routerConfig.workerTimeoutMs;
let snapshotTtlMs = routerConfig.snapshotDefaultTtlMs;

if (!projectId) return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs };

const { fullProjects } = await loadProjectConfig();
const projectCfg = fullProjects.find((p) => p.id === projectId);

// Project-level snapshotEnabled overrides the global default
snapshotEnabled = projectCfg?.snapshotEnabled ?? routerConfig.snapshotEnabled;

// Per-project TTL overrides the global default
snapshotTtlMs = projectCfg?.snapshotTtlMs ?? routerConfig.snapshotDefaultTtlMs;

if (snapshotEnabled && workItemId) {
const snapshot = getSnapshot(projectId, workItemId, snapshotTtlMs);
if (snapshot) {
logger.info('[WorkerManager] Snapshot hit — using snapshot image:', {
jobId,
imageName: snapshot.imageName,
projectId,
workItemId,
});
workerImage = snapshot.imageName;
} else {
logger.info('[WorkerManager] Snapshot miss — using base worker image:', {
jobId,
projectId,
workItemId,
});
}
}

// Determine container timeout: use project's watchdogTimeoutMs + buffer if available,
// falling back to the global workerTimeoutMs. This makes watchdogTimeoutMs the single source
// of truth — the in-container watchdog fires first, router kill is a backup.
if (projectCfg?.watchdogTimeoutMs) {
containerTimeoutMs = projectCfg.watchdogTimeoutMs + ROUTER_KILL_BUFFER_MS;
}

// Trace-log the actual values that will govern this worker's lifetime so a
// post-mortem can confirm whether the project's watchdogTimeoutMs override
// took effect or the global default leaked through.
logger.info('[WorkerManager] Resolved spawn settings:', {
jobId,
projectId,
workItemId,
workerImage,
snapshotEnabled,
containerTimeoutMs,
containerTimeoutMinutes: Math.round(containerTimeoutMs / 60_000),
projectWatchdogTimeoutMs: projectCfg?.watchdogTimeoutMs ?? null,
globalWorkerTimeoutMs: routerConfig.workerTimeoutMs,
});

return { snapshotEnabled, workerImage, containerTimeoutMs, snapshotTtlMs };
}

/**
* Returns true when a Docker error indicates the requested image does not exist.
* Uses the HTTP statusCode from dockerode's error objects as the primary signal,
Expand Down Expand Up @@ -480,13 +405,7 @@ async function createAndMonitorContainer(
*/
export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
const jobId = job.id ?? `unknown-${Date.now()}`;
// Docker container names accept only `[a-zA-Z0-9][a-zA-Z0-9_.-]`. PR #1226
// introduced coalesced-job IDs shaped `coalesce:${projectId}:${workItemId}`
// where the colons crashed `createContainer` with HTTP 400 — every coalesced
// job that fired post-deploy failed to spawn. Sanitize disallowed chars to
// underscores; the original `jobId` stays intact in logs and dedup keys.
const containerSafeJobId = jobId.replace(/[^a-zA-Z0-9_.-]/g, '_');
const containerName = `cascade-worker-${containerSafeJobId}`;
const containerName = buildWorkerContainerName(jobId);

// Resolve projectId once — used for both credential env and work-item lock tracking
const projectId = await extractProjectIdFromJob(job.data);
Expand Down Expand Up @@ -592,79 +511,6 @@ export async function spawnWorker(job: Job<CascadeJob>): Promise<void> {
}
}

/**
* Kill a worker container with two-phase shutdown:
* 1. SIGTERM via container.stop(t=15) — gives agent watchdog 15s to clean up
* 2. Docker auto-escalates to SIGKILL after 15s
* 3. Router posts its own timeout notification
*/
export async function killWorker(jobId: string): Promise<void> {
const worker = activeWorkers.get(jobId);
if (!worker) return;

try {
const container = docker.getContainer(worker.containerId);
await container.stop({ t: 15 });
logger.info('[WorkerManager] Worker stopped:', { jobId });
} catch (err) {
// Container might already be stopped
logger.warn('[WorkerManager] Error stopping worker (may already be stopped):', {
jobId,
error: String(err),
});
}

const durationMs = Date.now() - worker.startedAt.getTime();

// Update DB run status to timed_out (fire-and-forget, no-op if watchdog already did it).
// cleanupWorker is called below without an exitCode so it skips its own DB update,
// avoiding a race where the wrong status ('failed') could win.
if (worker.projectId) {
const dbUpdate = worker.workItemId
? failOrphanedRun(
worker.projectId,
worker.workItemId,
'Router timeout',
'timed_out',
durationMs,
)
: failOrphanedRunFallback(
worker.projectId,
worker.agentType,
worker.startedAt,
'timed_out',
'Router timeout',
durationMs,
);
dbUpdate
.then((runId) => {
if (runId)
logger.info('[WorkerManager] Marked run timed_out after router kill', {
jobId,
runId,
});
})
.catch((err) =>
logger.error('[WorkerManager] DB update failed after router kill', {
jobId,
error: String(err),
}),
);
}

// Send timeout notification (fire-and-forget)
notifyTimeout(worker.job, {
jobId: worker.jobId,
startedAt: worker.startedAt,
durationMs,
}).catch((err) => {
logger.error('[WorkerManager] Timeout notification error:', String(err));
});

// No exitCode — DB update is handled above with the correct 'timed_out' status
cleanupWorker(jobId);
}

/**
* Detach from all active workers on shutdown.
* Workers continue running as independent containers.
Expand Down
Loading
Loading