Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/backends/shared/envFilter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const SHARED_BLOCKED_ENV_EXACT = new Set([
'JOB_ID',
'JOB_TYPE',
'JOB_DATA',
'JOB_DATA_REDIS_KEY',
'CASCADE_POSTGRES_HOST',
'CASCADE_POSTGRES_PORT',
'NODE_OPTIONS',
Expand Down
117 changes: 117 additions & 0 deletions src/router/job-data-offload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Large-job-payload offload to Redis.
*
* The router passes the full job payload to a worker container via the
* `JOB_DATA` environment variable (`worker-env.ts`). Linux caps a single
* argv/env string at `MAX_ARG_STRLEN` (128 KiB); when the serialized payload
* exceeds it, the kernel rejects the `execve` of the container entrypoint with
* `exec ...: argument list too long` and the worker dies in ~260ms before any
* JS runs — silently breaking every agent run for that work item.
*
* This happened in prod on ucho/MNG-1660 (a Linear issue whose ~10KB+ markdown
* description was serialized twice — once as the raw webhook `payload`, once
* inside the pre-resolved `triggerResult.agentInput`). PM adapters MUST keep
* embedding the pre-resolved `triggerResult` (MNG-1053 freshness-gate invariant),
* so the fix is to move the payload OFF the env channel, not to shrink it.
*
* When `JSON.stringify(job.data)` exceeds `JOB_DATA_INLINE_MAX_BYTES`, the
* router stores it in Redis under `cascade:jobdata:<jobId>` and passes
* `JOB_DATA_REDIS_KEY` instead of `JOB_DATA`. The worker reads + deletes the
* key on startup (see `worker-entry.ts`). Small payloads keep using the inline
* env var (backward compatible).
*
* Redis is chosen over a bind-mounted file because the router runs as a
* container and spawns sibling workers via the Docker socket: a Dockerode bind
* path resolves on the docker host, not inside the router container, so a file
* written by the router is not the file mounted into the worker without a
* shared host volume. The worker already receives `REDIS_URL` and both
* processes can reach the same Redis with zero infra change. Mirrors the
* lazy-singleton pattern in `triggers/github/review-dispatch-dedup.ts`.
*/

import { Redis } from 'ioredis';
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { routerConfig } from './config.js';

/**
* Inline-vs-offload threshold, in bytes. 96 KiB leaves headroom under the
* 128 KiB `MAX_ARG_STRLEN` per-string kernel limit (for the `JOB_DATA=` prefix
* and any multibyte expansion). Measure with `Buffer.byteLength(..., 'utf8')`,
* never `String.length`.
*/
export const JOB_DATA_INLINE_MAX_BYTES = 96 * 1024;

/**
* TTL for an offloaded payload. The worker DELetes the key immediately after a
* successful read, so this is only a safety net for the case where the worker
* never starts (spawn failure / crash before read) — it bounds the leak.
*/
export const JOB_DATA_OFFLOAD_TTL_SEC = 60 * 60;

const KEY_NS = 'cascade:jobdata:';

let redisInstance: Redis | null = null;

/**
* Lazy singleton. Reads the URL via `routerConfig.redisUrl` (captured at
* config-module load) rather than `process.env.REDIS_URL`, because the worker
* scrubs `REDIS_URL` from `process.env` early in startup (`scrubSensitiveEnv`).
* The routerConfig snapshot survives the scrub, same as the DB pool's cached
* connection string and `review-dispatch-dedup.ts`.
*/
function getRedis(): Redis {
if (!redisInstance) {
if (!routerConfig.redisUrl) {
throw new Error('REDIS_URL is required for JOB_DATA offload');
}
redisInstance = new Redis(routerConfig.redisUrl);
}
return redisInstance;
}

export function buildJobDataRedisKey(jobId: string): string {
return `${KEY_NS}${jobId}`;
}

/**
* Store a serialized job payload in Redis. Throws loudly on failure so the
* caller (`buildWorkerEnvWithProjectId` → `spawnWorker`) burns a BullMQ retry
* instead of launching a container that is guaranteed to crash at exec.
*/
export async function offloadJobData(jobId: string, serialized: string): Promise<void> {
const key = buildJobDataRedisKey(jobId);
try {
await getRedis().set(key, serialized, 'EX', JOB_DATA_OFFLOAD_TTL_SEC);
} catch (err) {
captureException(err, { tags: { source: 'job_data_offload_write' }, extra: { jobId } });
throw new Error(`Failed to offload JOB_DATA to Redis for job ${jobId}: ${String(err)}`);
}
}

/**
* Read (and best-effort delete) an offloaded job payload. Throws a distinct,
* grep-able error on a missing key or a Redis failure so the worker exits with
* a clear reason — never the cryptic `argument list too long` exec crash.
*/
export async function readOffloadedJobData(key: string): Promise<string> {
let value: string | null;
try {
value = await getRedis().get(key);
} catch (err) {
throw new Error(`Failed to read offloaded JOB_DATA from Redis (key ${key}): ${String(err)}`);
}
if (value === null) {
throw new Error(`Offloaded JOB_DATA key ${key} not found in Redis (expired or never written)`);
}
// Best-effort cleanup — the TTL reaps the key if this fails.
try {
await getRedis().del(key);
} catch (err) {
logger.warn('[job-data-offload] Failed to delete offloaded JOB_DATA key (TTL will reap it)', {
key,
error: String(err),
});
}
return value;
}
24 changes: 23 additions & 1 deletion src/router/worker-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import { extractProjectIdFromJobViaRegistry } from '../integrations/pm/_shared/p
import { captureException } from '../sentry.js';
import { logger } from '../utils/logging.js';
import { routerConfig } from './config.js';
import {
buildJobDataRedisKey,
JOB_DATA_INLINE_MAX_BYTES,
offloadJobData,
} from './job-data-offload.js';
import type { CascadeJob } from './queue.js';

/**
Expand Down Expand Up @@ -107,7 +112,6 @@ export async function buildWorkerEnvWithProjectId(
const env: string[] = [
`JOB_ID=${job.id}`,
`JOB_TYPE=${job.data.type}`,
`JOB_DATA=${JSON.stringify(job.data)}`,
// Redis for job completion reporting
`REDIS_URL=${routerConfig.redisUrl}`,
// Database connection
Expand All @@ -119,6 +123,24 @@ export async function buildWorkerEnvWithProjectId(
`LOG_LEVEL=${process.env.LOG_LEVEL || 'info'}`,
];

// Pass the job payload inline when it fits under the OS arg-size limit; an
// env/argv string over MAX_ARG_STRLEN (128 KiB) makes the kernel reject the
// container entrypoint exec with "argument list too long" before any JS runs
// (prod incident ucho/MNG-1660). Oversized payloads are offloaded to Redis
// and the worker reads them back via JOB_DATA_REDIS_KEY (see worker-entry.ts).
const serialized = JSON.stringify(job.data);
const jobDataBytes = Buffer.byteLength(serialized, 'utf8');
if (jobDataBytes <= JOB_DATA_INLINE_MAX_BYTES) {
env.push(`JOB_DATA=${serialized}`);
} else {
await offloadJobData(String(job.id), serialized);
env.push(`JOB_DATA_REDIS_KEY=${buildJobDataRedisKey(String(job.id))}`);
logger.info('[WorkerManager] Offloaded large JOB_DATA to Redis', {
jobId: job.id,
bytes: jobDataBytes,
});
}

// Signal snapshot reuse so the worker skips redundant setup (clone, install).
if (snapshotReuse) {
env.push('CASCADE_SNAPSHOT_REUSE=true');
Expand Down
1 change: 1 addition & 0 deletions src/utils/cascadeEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const PROTECTED_ENV_KEYS = new Set([
'JOB_ID',
'JOB_TYPE',
'JOB_DATA',
'JOB_DATA_REDIS_KEY',
]);

export interface EnvSnapshot {
Expand Down
45 changes: 42 additions & 3 deletions src/worker-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
extractTrelloContext,
generateAckMessage,
} from './router/ackMessageGenerator.js';
import { readOffloadedJobData } from './router/job-data-offload.js';
import { dispatchPMAck } from './router/pm-ack-dispatch.js';
import { captureException, flush, setTag } from './sentry.js';
import {
Expand Down Expand Up @@ -432,26 +433,64 @@ export async function dispatchJob(
}
}

/**
* Resolve the raw JOB_DATA JSON string from either the inline env var or the
* Redis offload key (prod incident ucho/MNG-1660). Large payloads are stored in
* Redis by the router instead of inline, because an env string over the OS
* MAX_ARG_STRLEN (128 KiB) makes the kernel reject the container entrypoint exec
* with "argument list too long". Must run before scrubSensitiveEnv() strips
* REDIS_URL. Exits the process with a clear, grep-able reason on any failure —
* never the cryptic exec crash, never a payload-less worker.
*/
async function resolveRawJobData(): Promise<string> {
const inline = process.env.JOB_DATA;
if (inline) return inline;

const key = process.env.JOB_DATA_REDIS_KEY;
if (!key) {
// Defensive: main() validates that JOB_DATA or JOB_DATA_REDIS_KEY is present.
const err = new Error('JOB_DATA could not be resolved from env or Redis');
console.error(`[Worker] ${err.message}`);
captureException(err, { tags: { source: 'worker_env' } });
await flush();
process.exit(1);
}

try {
return await readOffloadedJobData(key);
} catch (err) {
console.error('[Worker] Failed to read offloaded JOB_DATA from Redis:', err);
captureException(err, { tags: { source: 'worker_job_data_redis_read' } });
await flush();
process.exit(1);
}
}

export async function main(): Promise<void> {
const jobId = process.env.JOB_ID;
const jobType = process.env.JOB_TYPE;
const jobDataRaw = process.env.JOB_DATA;
const jobDataRedisKey = process.env.JOB_DATA_REDIS_KEY;

setTag('role', 'worker');
if (jobId) setTag('jobId', jobId);
if (jobType) setTag('jobType', jobType);

if (!jobId || !jobType || !jobDataRaw) {
const err = new Error('Missing required environment variables: JOB_ID, JOB_TYPE, JOB_DATA');
if (!jobId || !jobType || (!jobDataRaw && !jobDataRedisKey)) {
const err = new Error(
'Missing required environment variables: JOB_ID, JOB_TYPE, JOB_DATA (or JOB_DATA_REDIS_KEY)',
);
console.error(`[Worker] ${err.message}`);
captureException(err, { tags: { source: 'worker_env' } });
await flush();
process.exit(1);
}

const resolvedJobData = await resolveRawJobData();

let jobData: JobData;
try {
jobData = JSON.parse(jobDataRaw);
jobData = JSON.parse(resolvedJobData);
} catch (err) {
console.error('[Worker] Failed to parse JOB_DATA:', err);
captureException(err, { tags: { source: 'worker_job_parse' } });
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/backends/shared-envFilter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,9 @@ describe('SHARED_BLOCKED_ENV_EXACT', () => {
expect(SHARED_BLOCKED_ENV_EXACT.has('CREDENTIAL_MASTER_KEY')).toBe(true);
expect(SHARED_BLOCKED_ENV_EXACT.has('NODE_OPTIONS')).toBe(true);
});

it('blocks the worker job-payload channels (JOB_DATA and its Redis-offload key)', () => {
expect(SHARED_BLOCKED_ENV_EXACT.has('JOB_DATA')).toBe(true);
expect(SHARED_BLOCKED_ENV_EXACT.has('JOB_DATA_REDIS_KEY')).toBe(true);
});
});
107 changes: 107 additions & 0 deletions tests/unit/router/job-data-offload.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';

// ---------------------------------------------------------------------------
// Module-level mocks
// ---------------------------------------------------------------------------

const mockSet = vi.fn();
const mockGet = vi.fn();
const mockDel = vi.fn();

vi.mock('ioredis', () => ({
Redis: vi.fn().mockImplementation(() => ({
set: mockSet,
get: mockGet,
del: mockDel,
})),
}));

vi.mock('../../../src/router/config.js', () => ({
routerConfig: { redisUrl: 'redis://localhost:6379' },
}));

vi.mock('../../../src/sentry.js', () => ({
captureException: vi.fn(),
}));

vi.mock('../../../src/utils/logging.js', () => ({
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() },
}));

import {
buildJobDataRedisKey,
JOB_DATA_OFFLOAD_TTL_SEC,
offloadJobData,
readOffloadedJobData,
} from '../../../src/router/job-data-offload.js';
import { captureException } from '../../../src/sentry.js';

beforeEach(() => {
mockSet.mockReset().mockResolvedValue('OK');
mockGet.mockReset();
mockDel.mockReset().mockResolvedValue(1);
vi.mocked(captureException).mockClear();
});

describe('buildJobDataRedisKey', () => {
it('namespaces the jobId under cascade:jobdata:', () => {
expect(buildJobDataRedisKey('coalesce_ucho_MNG-1660_123_abc')).toBe(
'cascade:jobdata:coalesce_ucho_MNG-1660_123_abc',
);
});
});

describe('offloadJobData', () => {
it('SETs the serialized payload under the namespaced key with the TTL', async () => {
await offloadJobData('job-1', '{"big":"payload"}');
expect(mockSet).toHaveBeenCalledWith(
'cascade:jobdata:job-1',
'{"big":"payload"}',
'EX',
JOB_DATA_OFFLOAD_TTL_SEC,
);
});

it('throws a descriptive error and captures on Redis failure', async () => {
mockSet.mockRejectedValueOnce(new Error('ECONNREFUSED'));
await expect(offloadJobData('job-x', '{}')).rejects.toThrow(
/Failed to offload JOB_DATA to Redis for job job-x/,
);
expect(captureException).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ tags: { source: 'job_data_offload_write' } }),
);
});
});

describe('readOffloadedJobData', () => {
it('GETs the payload, deletes the key, and returns the value', async () => {
mockGet.mockResolvedValueOnce('{"restored":true}');
const value = await readOffloadedJobData('cascade:jobdata:job-1');
expect(mockGet).toHaveBeenCalledWith('cascade:jobdata:job-1');
expect(value).toBe('{"restored":true}');
expect(mockDel).toHaveBeenCalledWith('cascade:jobdata:job-1');
});

it('throws a "not found" error when the key is missing/expired (GET returns null)', async () => {
mockGet.mockResolvedValueOnce(null);
await expect(readOffloadedJobData('cascade:jobdata:gone')).rejects.toThrow(
/not found in Redis \(expired or never written\)/,
);
expect(mockDel).not.toHaveBeenCalled();
});

it('throws when Redis GET fails', async () => {
mockGet.mockRejectedValueOnce(new Error('ECONNRESET'));
await expect(readOffloadedJobData('cascade:jobdata:job-1')).rejects.toThrow(
/Failed to read offloaded JOB_DATA from Redis/,
);
});

it('still returns the value when the best-effort DEL fails (TTL reaps it)', async () => {
mockGet.mockResolvedValueOnce('{"restored":true}');
mockDel.mockRejectedValueOnce(new Error('DEL failed'));
const value = await readOffloadedJobData('cascade:jobdata:job-1');
expect(value).toBe('{"restored":true}');
});
});
Loading
Loading