diff --git a/src/backends/shared/envFilter.ts b/src/backends/shared/envFilter.ts index 676c582e..f4e558d2 100644 --- a/src/backends/shared/envFilter.ts +++ b/src/backends/shared/envFilter.ts @@ -31,6 +31,7 @@ export const SHARED_BLOCKED_ENV_EXACT = new Set([ 'JOB_ID', 'JOB_TYPE', 'JOB_DATA', + 'JOB_DATA_REDIS_KEY', 'CASCADE_POSTGRES_HOST', 'CASCADE_POSTGRES_PORT', 'NODE_OPTIONS', diff --git a/src/router/job-data-offload.ts b/src/router/job-data-offload.ts new file mode 100644 index 00000000..f7b6d61a --- /dev/null +++ b/src/router/job-data-offload.ts @@ -0,0 +1,117 @@ +/** + * Large-job-payload offload to Redis. + * + * The router passes the full job payload to a worker container via the + * `JOB_DATA` environment variable (`worker-env.ts`). Linux caps a single + * argv/env string at `MAX_ARG_STRLEN` (128 KiB); when the serialized payload + * exceeds it, the kernel rejects the `execve` of the container entrypoint with + * `exec ...: argument list too long` and the worker dies in ~260ms before any + * JS runs — silently breaking every agent run for that work item. + * + * This happened in prod on ucho/MNG-1660 (a Linear issue whose ~10KB+ markdown + * description was serialized twice — once as the raw webhook `payload`, once + * inside the pre-resolved `triggerResult.agentInput`). PM adapters MUST keep + * embedding the pre-resolved `triggerResult` (MNG-1053 freshness-gate invariant), + * so the fix is to move the payload OFF the env channel, not to shrink it. + * + * When `JSON.stringify(job.data)` exceeds `JOB_DATA_INLINE_MAX_BYTES`, the + * router stores it in Redis under `cascade:jobdata:` and passes + * `JOB_DATA_REDIS_KEY` instead of `JOB_DATA`. The worker reads + deletes the + * key on startup (see `worker-entry.ts`). Small payloads keep using the inline + * env var (backward compatible). + * + * Redis is chosen over a bind-mounted file because the router runs as a + * container and spawns sibling workers via the Docker socket: a Dockerode bind + * path resolves on the docker host, not inside the router container, so a file + * written by the router is not the file mounted into the worker without a + * shared host volume. The worker already receives `REDIS_URL` and both + * processes can reach the same Redis with zero infra change. Mirrors the + * lazy-singleton pattern in `triggers/github/review-dispatch-dedup.ts`. + */ + +import { Redis } from 'ioredis'; +import { captureException } from '../sentry.js'; +import { logger } from '../utils/logging.js'; +import { routerConfig } from './config.js'; + +/** + * Inline-vs-offload threshold, in bytes. 96 KiB leaves headroom under the + * 128 KiB `MAX_ARG_STRLEN` per-string kernel limit (for the `JOB_DATA=` prefix + * and any multibyte expansion). Measure with `Buffer.byteLength(..., 'utf8')`, + * never `String.length`. + */ +export const JOB_DATA_INLINE_MAX_BYTES = 96 * 1024; + +/** + * TTL for an offloaded payload. The worker DELetes the key immediately after a + * successful read, so this is only a safety net for the case where the worker + * never starts (spawn failure / crash before read) — it bounds the leak. + */ +export const JOB_DATA_OFFLOAD_TTL_SEC = 60 * 60; + +const KEY_NS = 'cascade:jobdata:'; + +let redisInstance: Redis | null = null; + +/** + * Lazy singleton. Reads the URL via `routerConfig.redisUrl` (captured at + * config-module load) rather than `process.env.REDIS_URL`, because the worker + * scrubs `REDIS_URL` from `process.env` early in startup (`scrubSensitiveEnv`). + * The routerConfig snapshot survives the scrub, same as the DB pool's cached + * connection string and `review-dispatch-dedup.ts`. + */ +function getRedis(): Redis { + if (!redisInstance) { + if (!routerConfig.redisUrl) { + throw new Error('REDIS_URL is required for JOB_DATA offload'); + } + redisInstance = new Redis(routerConfig.redisUrl); + } + return redisInstance; +} + +export function buildJobDataRedisKey(jobId: string): string { + return `${KEY_NS}${jobId}`; +} + +/** + * Store a serialized job payload in Redis. Throws loudly on failure so the + * caller (`buildWorkerEnvWithProjectId` → `spawnWorker`) burns a BullMQ retry + * instead of launching a container that is guaranteed to crash at exec. + */ +export async function offloadJobData(jobId: string, serialized: string): Promise { + const key = buildJobDataRedisKey(jobId); + try { + await getRedis().set(key, serialized, 'EX', JOB_DATA_OFFLOAD_TTL_SEC); + } catch (err) { + captureException(err, { tags: { source: 'job_data_offload_write' }, extra: { jobId } }); + throw new Error(`Failed to offload JOB_DATA to Redis for job ${jobId}: ${String(err)}`); + } +} + +/** + * Read (and best-effort delete) an offloaded job payload. Throws a distinct, + * grep-able error on a missing key or a Redis failure so the worker exits with + * a clear reason — never the cryptic `argument list too long` exec crash. + */ +export async function readOffloadedJobData(key: string): Promise { + let value: string | null; + try { + value = await getRedis().get(key); + } catch (err) { + throw new Error(`Failed to read offloaded JOB_DATA from Redis (key ${key}): ${String(err)}`); + } + if (value === null) { + throw new Error(`Offloaded JOB_DATA key ${key} not found in Redis (expired or never written)`); + } + // Best-effort cleanup — the TTL reaps the key if this fails. + try { + await getRedis().del(key); + } catch (err) { + logger.warn('[job-data-offload] Failed to delete offloaded JOB_DATA key (TTL will reap it)', { + key, + error: String(err), + }); + } + return value; +} diff --git a/src/router/worker-env.ts b/src/router/worker-env.ts index a379c8a6..8a847b15 100644 --- a/src/router/worker-env.ts +++ b/src/router/worker-env.ts @@ -11,6 +11,11 @@ import { extractProjectIdFromJobViaRegistry } from '../integrations/pm/_shared/p import { captureException } from '../sentry.js'; import { logger } from '../utils/logging.js'; import { routerConfig } from './config.js'; +import { + buildJobDataRedisKey, + JOB_DATA_INLINE_MAX_BYTES, + offloadJobData, +} from './job-data-offload.js'; import type { CascadeJob } from './queue.js'; /** @@ -107,7 +112,6 @@ export async function buildWorkerEnvWithProjectId( const env: string[] = [ `JOB_ID=${job.id}`, `JOB_TYPE=${job.data.type}`, - `JOB_DATA=${JSON.stringify(job.data)}`, // Redis for job completion reporting `REDIS_URL=${routerConfig.redisUrl}`, // Database connection @@ -119,6 +123,24 @@ export async function buildWorkerEnvWithProjectId( `LOG_LEVEL=${process.env.LOG_LEVEL || 'info'}`, ]; + // Pass the job payload inline when it fits under the OS arg-size limit; an + // env/argv string over MAX_ARG_STRLEN (128 KiB) makes the kernel reject the + // container entrypoint exec with "argument list too long" before any JS runs + // (prod incident ucho/MNG-1660). Oversized payloads are offloaded to Redis + // and the worker reads them back via JOB_DATA_REDIS_KEY (see worker-entry.ts). + const serialized = JSON.stringify(job.data); + const jobDataBytes = Buffer.byteLength(serialized, 'utf8'); + if (jobDataBytes <= JOB_DATA_INLINE_MAX_BYTES) { + env.push(`JOB_DATA=${serialized}`); + } else { + await offloadJobData(String(job.id), serialized); + env.push(`JOB_DATA_REDIS_KEY=${buildJobDataRedisKey(String(job.id))}`); + logger.info('[WorkerManager] Offloaded large JOB_DATA to Redis', { + jobId: job.id, + bytes: jobDataBytes, + }); + } + // Signal snapshot reuse so the worker skips redundant setup (clone, install). if (snapshotReuse) { env.push('CASCADE_SNAPSHOT_REUSE=true'); diff --git a/src/utils/cascadeEnv.ts b/src/utils/cascadeEnv.ts index a7e8cb5d..c8077c9e 100644 --- a/src/utils/cascadeEnv.ts +++ b/src/utils/cascadeEnv.ts @@ -21,6 +21,7 @@ const PROTECTED_ENV_KEYS = new Set([ 'JOB_ID', 'JOB_TYPE', 'JOB_DATA', + 'JOB_DATA_REDIS_KEY', ]); export interface EnvSnapshot { diff --git a/src/worker-entry.ts b/src/worker-entry.ts index 630ac7f0..0ddba454 100644 --- a/src/worker-entry.ts +++ b/src/worker-entry.ts @@ -29,6 +29,7 @@ import { extractTrelloContext, generateAckMessage, } from './router/ackMessageGenerator.js'; +import { readOffloadedJobData } from './router/job-data-offload.js'; import { dispatchPMAck } from './router/pm-ack-dispatch.js'; import { captureException, flush, setTag } from './sentry.js'; import { @@ -432,26 +433,64 @@ export async function dispatchJob( } } +/** + * Resolve the raw JOB_DATA JSON string from either the inline env var or the + * Redis offload key (prod incident ucho/MNG-1660). Large payloads are stored in + * Redis by the router instead of inline, because an env string over the OS + * MAX_ARG_STRLEN (128 KiB) makes the kernel reject the container entrypoint exec + * with "argument list too long". Must run before scrubSensitiveEnv() strips + * REDIS_URL. Exits the process with a clear, grep-able reason on any failure — + * never the cryptic exec crash, never a payload-less worker. + */ +async function resolveRawJobData(): Promise { + const inline = process.env.JOB_DATA; + if (inline) return inline; + + const key = process.env.JOB_DATA_REDIS_KEY; + if (!key) { + // Defensive: main() validates that JOB_DATA or JOB_DATA_REDIS_KEY is present. + const err = new Error('JOB_DATA could not be resolved from env or Redis'); + console.error(`[Worker] ${err.message}`); + captureException(err, { tags: { source: 'worker_env' } }); + await flush(); + process.exit(1); + } + + try { + return await readOffloadedJobData(key); + } catch (err) { + console.error('[Worker] Failed to read offloaded JOB_DATA from Redis:', err); + captureException(err, { tags: { source: 'worker_job_data_redis_read' } }); + await flush(); + process.exit(1); + } +} + export async function main(): Promise { const jobId = process.env.JOB_ID; const jobType = process.env.JOB_TYPE; const jobDataRaw = process.env.JOB_DATA; + const jobDataRedisKey = process.env.JOB_DATA_REDIS_KEY; setTag('role', 'worker'); if (jobId) setTag('jobId', jobId); if (jobType) setTag('jobType', jobType); - if (!jobId || !jobType || !jobDataRaw) { - const err = new Error('Missing required environment variables: JOB_ID, JOB_TYPE, JOB_DATA'); + if (!jobId || !jobType || (!jobDataRaw && !jobDataRedisKey)) { + const err = new Error( + 'Missing required environment variables: JOB_ID, JOB_TYPE, JOB_DATA (or JOB_DATA_REDIS_KEY)', + ); console.error(`[Worker] ${err.message}`); captureException(err, { tags: { source: 'worker_env' } }); await flush(); process.exit(1); } + const resolvedJobData = await resolveRawJobData(); + let jobData: JobData; try { - jobData = JSON.parse(jobDataRaw); + jobData = JSON.parse(resolvedJobData); } catch (err) { console.error('[Worker] Failed to parse JOB_DATA:', err); captureException(err, { tags: { source: 'worker_job_parse' } }); diff --git a/tests/unit/backends/shared-envFilter.test.ts b/tests/unit/backends/shared-envFilter.test.ts index af7e705d..a3a355a0 100644 --- a/tests/unit/backends/shared-envFilter.test.ts +++ b/tests/unit/backends/shared-envFilter.test.ts @@ -262,4 +262,9 @@ describe('SHARED_BLOCKED_ENV_EXACT', () => { expect(SHARED_BLOCKED_ENV_EXACT.has('CREDENTIAL_MASTER_KEY')).toBe(true); expect(SHARED_BLOCKED_ENV_EXACT.has('NODE_OPTIONS')).toBe(true); }); + + it('blocks the worker job-payload channels (JOB_DATA and its Redis-offload key)', () => { + expect(SHARED_BLOCKED_ENV_EXACT.has('JOB_DATA')).toBe(true); + expect(SHARED_BLOCKED_ENV_EXACT.has('JOB_DATA_REDIS_KEY')).toBe(true); + }); }); diff --git a/tests/unit/router/job-data-offload.test.ts b/tests/unit/router/job-data-offload.test.ts new file mode 100644 index 00000000..10475498 --- /dev/null +++ b/tests/unit/router/job-data-offload.test.ts @@ -0,0 +1,107 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// --------------------------------------------------------------------------- +// Module-level mocks +// --------------------------------------------------------------------------- + +const mockSet = vi.fn(); +const mockGet = vi.fn(); +const mockDel = vi.fn(); + +vi.mock('ioredis', () => ({ + Redis: vi.fn().mockImplementation(() => ({ + set: mockSet, + get: mockGet, + del: mockDel, + })), +})); + +vi.mock('../../../src/router/config.js', () => ({ + routerConfig: { redisUrl: 'redis://localhost:6379' }, +})); + +vi.mock('../../../src/sentry.js', () => ({ + captureException: vi.fn(), +})); + +vi.mock('../../../src/utils/logging.js', () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, +})); + +import { + buildJobDataRedisKey, + JOB_DATA_OFFLOAD_TTL_SEC, + offloadJobData, + readOffloadedJobData, +} from '../../../src/router/job-data-offload.js'; +import { captureException } from '../../../src/sentry.js'; + +beforeEach(() => { + mockSet.mockReset().mockResolvedValue('OK'); + mockGet.mockReset(); + mockDel.mockReset().mockResolvedValue(1); + vi.mocked(captureException).mockClear(); +}); + +describe('buildJobDataRedisKey', () => { + it('namespaces the jobId under cascade:jobdata:', () => { + expect(buildJobDataRedisKey('coalesce_ucho_MNG-1660_123_abc')).toBe( + 'cascade:jobdata:coalesce_ucho_MNG-1660_123_abc', + ); + }); +}); + +describe('offloadJobData', () => { + it('SETs the serialized payload under the namespaced key with the TTL', async () => { + await offloadJobData('job-1', '{"big":"payload"}'); + expect(mockSet).toHaveBeenCalledWith( + 'cascade:jobdata:job-1', + '{"big":"payload"}', + 'EX', + JOB_DATA_OFFLOAD_TTL_SEC, + ); + }); + + it('throws a descriptive error and captures on Redis failure', async () => { + mockSet.mockRejectedValueOnce(new Error('ECONNREFUSED')); + await expect(offloadJobData('job-x', '{}')).rejects.toThrow( + /Failed to offload JOB_DATA to Redis for job job-x/, + ); + expect(captureException).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ tags: { source: 'job_data_offload_write' } }), + ); + }); +}); + +describe('readOffloadedJobData', () => { + it('GETs the payload, deletes the key, and returns the value', async () => { + mockGet.mockResolvedValueOnce('{"restored":true}'); + const value = await readOffloadedJobData('cascade:jobdata:job-1'); + expect(mockGet).toHaveBeenCalledWith('cascade:jobdata:job-1'); + expect(value).toBe('{"restored":true}'); + expect(mockDel).toHaveBeenCalledWith('cascade:jobdata:job-1'); + }); + + it('throws a "not found" error when the key is missing/expired (GET returns null)', async () => { + mockGet.mockResolvedValueOnce(null); + await expect(readOffloadedJobData('cascade:jobdata:gone')).rejects.toThrow( + /not found in Redis \(expired or never written\)/, + ); + expect(mockDel).not.toHaveBeenCalled(); + }); + + it('throws when Redis GET fails', async () => { + mockGet.mockRejectedValueOnce(new Error('ECONNRESET')); + await expect(readOffloadedJobData('cascade:jobdata:job-1')).rejects.toThrow( + /Failed to read offloaded JOB_DATA from Redis/, + ); + }); + + it('still returns the value when the best-effort DEL fails (TTL reaps it)', async () => { + mockGet.mockResolvedValueOnce('{"restored":true}'); + mockDel.mockRejectedValueOnce(new Error('DEL failed')); + const value = await readOffloadedJobData('cascade:jobdata:job-1'); + expect(value).toBe('{"restored":true}'); + }); +}); diff --git a/tests/unit/router/job-data-roundtrip.test.ts b/tests/unit/router/job-data-roundtrip.test.ts new file mode 100644 index 00000000..717679a7 --- /dev/null +++ b/tests/unit/router/job-data-roundtrip.test.ts @@ -0,0 +1,90 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +/** + * Direct regression for the MNG-1660 class: a job payload larger than the OS + * MAX_ARG_STRLEN (128 KiB) must NOT appear inline in any worker env string, and + * must round-trip losslessly through the Redis offload (router write → worker + * read). Uses the REAL job-data-offload module against an in-memory Redis — no + * Docker, no live Redis. + */ + +// In-memory Redis shared by the router-write and worker-read paths. +const { store } = vi.hoisted(() => ({ store: new Map() })); + +vi.mock('ioredis', () => ({ + Redis: vi.fn().mockImplementation(() => ({ + set: async (k: string, v: string) => { + store.set(k, v); + return 'OK'; + }, + get: async (k: string) => store.get(k) ?? null, + del: async (k: string) => { + store.delete(k); + return 1; + }, + })), +})); + +vi.mock('../../../src/router/config.js', () => ({ + routerConfig: { redisUrl: 'redis://localhost:6379', workerImage: 'test-worker:latest' }, +})); + +vi.mock('../../../src/sentry.js', () => ({ captureException: vi.fn() })); + +vi.mock('../../../src/utils/logging.js', () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, +})); + +vi.mock('../../../src/config/provider.js', () => ({ + findProjectByRepo: vi.fn(), + getAllProjectCredentials: vi.fn().mockResolvedValue({}), +})); + +import { + JOB_DATA_INLINE_MAX_BYTES, + readOffloadedJobData, +} from '../../../src/router/job-data-offload.js'; +import type { CascadeJob } from '../../../src/router/queue.js'; +import { buildWorkerEnvWithProjectId } from '../../../src/router/worker-env.js'; + +beforeEach(() => { + store.clear(); +}); + +describe('large JOB_DATA round-trip (router write → worker read)', () => { + it('keeps the oversized payload off the env channel and restores it losslessly', async () => { + // ~200 KB description — comfortably over the 128 KiB kernel arg limit. + const jobData = { + type: 'linear', + source: 'linear', + payload: { type: 'Issue', data: { id: 'lin-1', description: 'a'.repeat(200 * 1024) } }, + projectId: 'proj-1', + workItemId: 'lin-1', + eventType: 'update/Issue', + receivedAt: '2024-01-01T00:00:00Z', + triggerResult: { agentType: 'implementation', workItemId: 'MNG-1660' }, + } as unknown as CascadeJob; + + const job = { id: 'coalesce_ucho_MNG-1660_123_abc', data: jobData }; + + const env = await buildWorkerEnvWithProjectId(job as never, 'proj-1'); + + // 1. The payload was offloaded, not inlined. + expect(env.some((e) => e.startsWith('JOB_DATA='))).toBe(false); + const keyEntry = env.find((e) => e.startsWith('JOB_DATA_REDIS_KEY=')); + expect(keyEntry).toBeDefined(); + + // 2. No single env string exceeds the inline limit — the exec crash is impossible. + for (const e of env) { + expect(Buffer.byteLength(e, 'utf8')).toBeLessThanOrEqual(JOB_DATA_INLINE_MAX_BYTES); + } + + // 3. The worker read path restores the exact original job.data. + const key = keyEntry?.slice('JOB_DATA_REDIS_KEY='.length) as string; + const restored = JSON.parse(await readOffloadedJobData(key)); + expect(restored).toEqual(jobData); + + // 4. Worker DELetes the key after reading (no leak beyond the read). + expect(store.has(key)).toBe(false); + }); +}); diff --git a/tests/unit/router/worker-env.test.ts b/tests/unit/router/worker-env.test.ts index 0d4e5499..15f98dd6 100644 --- a/tests/unit/router/worker-env.test.ts +++ b/tests/unit/router/worker-env.test.ts @@ -36,11 +36,18 @@ vi.mock('../../../src/router/config.js', () => ({ }, })); +vi.mock('../../../src/router/job-data-offload.js', () => ({ + JOB_DATA_INLINE_MAX_BYTES: 96 * 1024, + offloadJobData: vi.fn().mockResolvedValue(undefined), + buildJobDataRedisKey: (jobId: string) => `cascade:jobdata:${jobId}`, +})); + // --------------------------------------------------------------------------- // Imports (after mocks) // --------------------------------------------------------------------------- import { findProjectByRepo, getAllProjectCredentials } from '../../../src/config/provider.js'; +import { offloadJobData } from '../../../src/router/job-data-offload.js'; // All PM providers (Trello 006/2, JIRA 006/3, Linear 006/4) resolve through // the PM provider manifest registry. Side-effect imports register them. import '../../../src/integrations/pm/trello/index.js'; @@ -367,3 +374,69 @@ describe('buildWorkerEnvWithProjectId — snapshotEnabled flag', () => { expect(env.some((e) => e.startsWith('CASCADE_SNAPSHOT_ENABLED='))).toBe(false); }); }); + +// --------------------------------------------------------------------------- +// buildWorkerEnvWithProjectId — large JOB_DATA offload (MNG-1660) +// --------------------------------------------------------------------------- + +describe('buildWorkerEnvWithProjectId — JOB_DATA offload', () => { + const mockOffload = vi.mocked(offloadJobData); + + beforeEach(() => { + mockGetAllProjectCredentials.mockResolvedValue({}); + mockOffload.mockReset().mockResolvedValue(undefined); + }); + + function jobWithPayload(payload: unknown, id = 'job-big') { + return { id, data: { type: 'linear', payload } as unknown as CascadeJob }; + } + + it('passes JOB_DATA inline and does NOT offload for a small payload', async () => { + const env = await buildWorkerEnvWithProjectId(jobWithPayload({ small: 'x' }) as never, 'p'); + expect(env.some((e) => e.startsWith('JOB_DATA='))).toBe(true); + expect(env.some((e) => e.startsWith('JOB_DATA_REDIS_KEY='))).toBe(false); + expect(mockOffload).not.toHaveBeenCalled(); + }); + + it('keeps a payload just under the threshold inline', async () => { + // Build a payload whose JSON serialization is a few KB under 96 KiB. + const description = 'a'.repeat(96 * 1024 - 2048); + const env = await buildWorkerEnvWithProjectId(jobWithPayload({ description }) as never, 'p'); + expect(env.some((e) => e.startsWith('JOB_DATA='))).toBe(true); + expect(mockOffload).not.toHaveBeenCalled(); + }); + + it('offloads to Redis and emits JOB_DATA_REDIS_KEY (not JOB_DATA) for an oversized payload', async () => { + const description = 'a'.repeat(200 * 1024); // ~200 KB → well over 96 KiB + const env = await buildWorkerEnvWithProjectId( + jobWithPayload({ description }, 'job-huge') as never, + 'p', + ); + expect(mockOffload).toHaveBeenCalledTimes(1); + expect(mockOffload).toHaveBeenCalledWith('job-huge', expect.any(String)); + expect(env).toContain('JOB_DATA_REDIS_KEY=cascade:jobdata:job-huge'); + expect(env.some((e) => e.startsWith('JOB_DATA='))).toBe(false); + }); + + it('measures the threshold in BYTES, not characters (multibyte payload)', async () => { + // Each 😀 is 2 UTF-16 code units but 4 UTF-8 bytes. 30720 of them → + // length 61440 (< 96 KiB) but byteLength 122880 (> 96 KiB) — proves + // byteLength, not String.length, drives the decision. + const description = '😀'.repeat(30 * 1024); + expect(description.length).toBeLessThan(96 * 1024); // char count is under + const env = await buildWorkerEnvWithProjectId( + jobWithPayload({ description }, 'job-emoji') as never, + 'p', + ); + expect(mockOffload).toHaveBeenCalledTimes(1); + expect(env).toContain('JOB_DATA_REDIS_KEY=cascade:jobdata:job-emoji'); + }); + + it('propagates a fail-loud error when the offload write rejects', async () => { + mockOffload.mockRejectedValueOnce(new Error('Redis down')); + const description = 'a'.repeat(200 * 1024); + await expect( + buildWorkerEnvWithProjectId(jobWithPayload({ description }) as never, 'p'), + ).rejects.toThrow('Redis down'); + }); +}); diff --git a/tests/unit/worker-entry.test.ts b/tests/unit/worker-entry.test.ts index b73c1af1..f5b4fd4d 100644 --- a/tests/unit/worker-entry.test.ts +++ b/tests/unit/worker-entry.test.ts @@ -48,6 +48,10 @@ vi.mock('../../src/router/pm-ack-dispatch.js', () => ({ dispatchPMAck: vi.fn(), })); +vi.mock('../../src/router/job-data-offload.js', () => ({ + readOffloadedJobData: vi.fn(), +})); + vi.mock('../../src/router/ackMessageGenerator.js', () => ({ extractTrelloContext: vi.fn().mockReturnValue(''), extractJiraContext: vi.fn().mockReturnValue(''), @@ -105,6 +109,7 @@ import { extractTrelloContext, generateAckMessage, } from '../../src/router/ackMessageGenerator.js'; +import { readOffloadedJobData } from '../../src/router/job-data-offload.js'; import { dispatchPMAck } from '../../src/router/pm-ack-dispatch.js'; import { captureException, flush } from '../../src/sentry.js'; import { processGitHubWebhook, processJiraWebhook } from '../../src/triggers/index.js'; @@ -887,6 +892,8 @@ describe('main() - environment variable validation', () => { delete process.env.JOB_ID; delete process.env.JOB_TYPE; delete process.env.JOB_DATA; + delete process.env.JOB_DATA_REDIS_KEY; + vi.mocked(readOffloadedJobData).mockReset(); exitSpy = vi.spyOn(process, 'exit').mockImplementation((code?) => { throw new Error(`process.exit(${code ?? 0})`); }); @@ -897,6 +904,7 @@ describe('main() - environment variable validation', () => { delete process.env.JOB_ID; delete process.env.JOB_TYPE; delete process.env.JOB_DATA; + delete process.env.JOB_DATA_REDIS_KEY; }); it('calls captureException with worker_env tag and exits 1 when all env vars are absent', async () => { @@ -904,7 +912,8 @@ describe('main() - environment variable validation', () => { expect(captureException).toHaveBeenCalledWith( expect.objectContaining({ - message: 'Missing required environment variables: JOB_ID, JOB_TYPE, JOB_DATA', + message: + 'Missing required environment variables: JOB_ID, JOB_TYPE, JOB_DATA (or JOB_DATA_REDIS_KEY)', }), expect.objectContaining({ tags: { source: 'worker_env' } }), ); @@ -1010,4 +1019,59 @@ describe('main() - environment variable validation', () => { await expect(main()).rejects.toThrow('process.exit(2)'); expect(flush).toHaveBeenCalled(); }); + + it('reads the offloaded payload from Redis when JOB_DATA_REDIS_KEY is set, then dispatches', async () => { + process.env.JOB_ID = 'job-offload-1'; + process.env.JOB_TYPE = 'linear'; + process.env.JOB_DATA_REDIS_KEY = 'cascade:jobdata:job-offload-1'; + // JOB_DATA intentionally absent — payload lives in Redis. + vi.mocked(readOffloadedJobData).mockResolvedValueOnce( + JSON.stringify({ + type: 'linear', + source: 'linear', + payload: { type: 'Issue', data: { id: 'lin-1' } }, + projectId: 'proj-1', + workItemId: 'lin-1', + eventType: 'update/Issue', + receivedAt: '2024-01-01T00:00:00Z', + }), + ); + + await expect(main()).rejects.toThrow('process.exit('); + + expect(readOffloadedJobData).toHaveBeenCalledWith('cascade:jobdata:job-offload-1'); + expect(processLinearWebhook).toHaveBeenCalled(); + expect(flush).toHaveBeenCalled(); + }); + + it('exits 1 with worker_job_data_redis_read tag when the offloaded key is missing/expired', async () => { + process.env.JOB_ID = 'job-offload-missing'; + process.env.JOB_TYPE = 'linear'; + process.env.JOB_DATA_REDIS_KEY = 'cascade:jobdata:gone'; + vi.mocked(readOffloadedJobData).mockRejectedValueOnce( + new Error('Offloaded JOB_DATA key cascade:jobdata:gone not found in Redis'), + ); + + await expect(main()).rejects.toThrow('process.exit(1)'); + + expect(captureException).toHaveBeenCalledWith( + expect.objectContaining({ message: expect.stringContaining('not found in Redis') }), + expect.objectContaining({ tags: { source: 'worker_job_data_redis_read' } }), + ); + expect(flush).toHaveBeenCalled(); + }); + + it('exits 1 with worker_job_data_redis_read tag when Redis read throws', async () => { + process.env.JOB_ID = 'job-offload-down'; + process.env.JOB_TYPE = 'linear'; + process.env.JOB_DATA_REDIS_KEY = 'cascade:jobdata:job-offload-down'; + vi.mocked(readOffloadedJobData).mockRejectedValueOnce(new Error('ECONNREFUSED')); + + await expect(main()).rejects.toThrow('process.exit(1)'); + + expect(captureException).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ tags: { source: 'worker_job_data_redis_read' } }), + ); + }); });