diff --git a/.changeset/concurrent-e2e.md b/.changeset/concurrent-e2e.md new file mode 100644 index 0000000000..f050cf9eed --- /dev/null +++ b/.changeset/concurrent-e2e.md @@ -0,0 +1,4 @@ +--- +--- + +chore(tests): re-enable `describe.concurrent` in the e2e suite to cut CI wall-clock per workbench, and isolate diagnostic tracking state per-task so concurrent tests no longer clobber each other's `trackedRuns` (each failing test now surfaces its own run's diagnostics, and every tracked run is logged with `[trackRun] ` for stdout correlation even when diagnostics fetching fails). diff --git a/.changeset/readjson-diagnostics.md b/.changeset/readjson-diagnostics.md new file mode 100644 index 0000000000..4deaccf760 --- /dev/null +++ b/.changeset/readjson-diagnostics.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-local': patch +--- + +Fix a concurrent-read race in `writeExclusive`. `fs.writeFile(path, data, { flag: 'wx' })` is atomic for file *creation* (`O_CREAT | O_EXCL`) but the subsequent data write is not, so a concurrent reader could observe a half-written file (e.g. just `{`) before the writer's content was flushed. Callers that combined exclusive claim semantics with a follow-up read of the payload — notably the hook-token claim path and run-creation — would intermittently see `SyntaxError: Unexpected end of JSON input` (or worse, treat a partial file as an empty claim, masking conflicts). Switch to a two-step pattern: write the full content to a temp file first, then `link(2)` it into place. POSIX `link` is atomic and fails with `EEXIST` if the target already exists, so readers either see `ENOENT` or the fully-populated inode. `readJSON` also annotates any `SyntaxError` with the file path, on-disk size, and a snippet of the offending content, making future cases of this class actionable from CI logs (the error type is preserved so callers that intentionally swallow `SyntaxError` continue to work). diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 5f00f28ae1..cc478b637b 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -182,9 +182,7 @@ async function startWorkflowViaHttp( return run; } -// NOTE: Temporarily disabling concurrent tests to avoid flakiness. -// TODO: Re-enable concurrent tests after conf when we have more time to investigate. -describe('e2e', () => { +describe.concurrent('e2e', () => { // Configure the World for the test runner process so that start() and // run.returnValue can communicate with the same backend as the workbench app. beforeAll(async () => { diff --git a/packages/core/e2e/utils.ts b/packages/core/e2e/utils.ts index 509775fbd3..d84e96eff4 100644 --- a/packages/core/e2e/utils.ts +++ b/packages/core/e2e/utils.ts @@ -4,7 +4,8 @@ import path, { dirname } from 'node:path'; import { setTimeout as sleep } from 'node:timers/promises'; import { fileURLToPath } from 'node:url'; import { createVercelWorld } from '@workflow/world-vercel'; -import { onTestFailed } from 'vitest'; +import { onTestFailed, onTestFinished } from 'vitest'; +import { getCurrentTest } from 'vitest/suite'; import { getTrustedSourcesHeaders } from '../../../scripts/trusted-sources-headers.mjs'; import { parseEnvironmentFlag } from '../../next/src/environment-flag.js'; import type { Run } from '../src/runtime'; @@ -460,8 +461,28 @@ interface TrackedRun { workflowFn?: string; } -// Per-test tracked runs — reset between tests via setupRunTracking() -let trackedRuns: TrackedRun[] = []; +interface PerTaskState { + testName: string; + trackedRuns: TrackedRun[]; +} + +// Keyed by vitest task id so concurrent tests get isolated state. +// Stored on a module-level Map (rather than a `let` slot) because +// `describe.concurrent` interleaves beforeEach/test/onTestFailed across +// tasks — a shared `let` gets clobbered by whichever task last entered +// beforeEach, surfacing the wrong run's diagnostics on the failing test. +const perTaskState = new Map(); + +function currentTaskState(): PerTaskState | undefined { + const task = getCurrentTest(); + if (!task) return undefined; + let state = perTaskState.get(task.id); + if (!state) { + state = { testName: task.name, trackedRuns: [] }; + perTaskState.set(task.id, state); + } + return state; +} // Global list of run IDs collected for metadata (observability links) const globalCollectedRunIds: { @@ -492,17 +513,24 @@ export function trackRun( workflowFn?: string; } ): Run { - const testName = options?.testName ?? currentTestName; - trackedRuns.push({ - run, - workflowFile: options?.workflowFile, - workflowFn: options?.workflowFn, - }); + const state = currentTaskState(); + const testName = options?.testName ?? state?.testName ?? 'unknown'; + if (state) { + state.trackedRuns.push({ + run, + workflowFile: options?.workflowFile, + workflowFn: options?.workflowFn, + }); + } globalCollectedRunIds.push({ testName, runId: run.runId, timestamp: new Date().toISOString(), }); + // Unconditionally log run-id <-> test correlation so that if diagnostics + // fail to fetch (or the test hangs past the timeout), we can still trace + // back from CI stdout which workflow run belongs to which test. + console.log(`[trackRun] ${testName} → ${run.runId}`); return run; } @@ -661,13 +689,27 @@ function emitGitHubAnnotation( * beforeEach((ctx) => { setupRunTracking(ctx.task.name); }); */ export function setupRunTracking(testName: string) { - currentTestName = testName; - trackedRuns = []; + const task = getCurrentTest(); + if (!task) { + // Outside of a test context — nothing to wire up. + return; + } + const state: PerTaskState = { testName, trackedRuns: [] }; + perTaskState.set(task.id, state); + onTestFailed( async (result) => { const errorMessage = result.errors?.[0]?.message || 'Test failed'; - for (const tracked of trackedRuns) { + if (state.trackedRuns.length === 0) { + // Make the absence of tracked runs explicit: helps distinguish "test + // failed before start()" from "diagnostics infrastructure is broken". + console.error( + `[diagnostics] ${testName}: no tracked runs for this test` + ); + } + + for (const tracked of state.trackedRuns) { try { const diagnostics = await getRunDiagnostics(tracked); console.error(diagnostics); @@ -681,10 +723,12 @@ export function setupRunTracking(testName: string) { }, 30_000 // Allow 30s for diagnostics fetching (default hookTimeout is 10s) ); -} -// Current test name for auto-tracking -let currentTestName = 'unknown'; + // Drop the per-task slot after the test settles to keep the Map bounded. + onTestFinished(() => { + perTaskState.delete(task.id); + }); +} /** * Write diagnostics sidecar file with per-test run info for the aggregation script. diff --git a/packages/world-local/src/fs.ts b/packages/world-local/src/fs.ts index 5cccf744a0..a86ce4718f 100644 --- a/packages/world-local/src/fs.ts +++ b/packages/world-local/src/fs.ts @@ -357,13 +357,29 @@ export async function readJSON( filePath: string, decoder: z.ZodType ): Promise { + let content: string; try { - const content = await fs.readFile(filePath, 'utf-8'); - return decoder.parse(JSON.parse(content, jsonReviver)); + content = await fs.readFile(filePath, 'utf-8'); } catch (error) { if ((error as any).code === 'ENOENT') return null; throw error; } + try { + return decoder.parse(JSON.parse(content, jsonReviver)); + } catch (error) { + if (error instanceof SyntaxError) { + // Annotate the SyntaxError with the file path, size, and a snippet of + // the (possibly empty / partial) content. Some writers — notably + // `writeExclusive` — are atomic for file *creation* but not for the + // data write that follows, so a concurrent reader can briefly observe + // a half-written file. Preserve the SyntaxError type so callers like + // `readHookTokenClaim` that intentionally swallow it via + // `error instanceof SyntaxError` continue to work. + const stat = await fs.stat(filePath).catch(() => null); + error.message = `${error.message} (parsing ${filePath}, size=${stat?.size ?? '?'}, content=${JSON.stringify(content.slice(0, 200))})`; + } + throw error; + } } export async function readBuffer(filePath: string): Promise { @@ -380,23 +396,47 @@ export async function deleteJSON(filePath: string): Promise { } /** - * Atomically create a file using O_CREAT | O_EXCL flags. - * Returns true if the file was created, false if it already exists. - * This is atomic at the OS level, safe for concurrent access. + * Atomically create a file containing `data`, failing if the target already + * exists. Returns true on successful claim, false if the file is already + * present. + * + * The naive `fs.writeFile(path, data, { flag: 'wx' })` form is atomic for the + * file *creation* (`O_CREAT | O_EXCL`) but the subsequent content write is + * not — a concurrent reader can observe the file with partial bytes (e.g. + * just `{`) before the writer's content has been flushed. Callers that + * combine `writeExclusive` for claim-style exclusion with a follow-up read + * of the claim payload would intermittently see a `SyntaxError` instead of + * the claim data. + * + * Use a two-step atomic pattern instead: write the full content to a temp + * file first, then `link(2)` it into place. POSIX guarantees `link` is + * atomic and fails with `EEXIST` if the target already exists. The reader + * either sees nothing (ENOENT) or the fully-populated linked inode — never + * a half-written file. */ export async function writeExclusive( filePath: string, data: string ): Promise { await ensureDir(path.dirname(filePath)); + const tempPath = `${filePath}.tmp.${ulid()}`; + let tempCreated = false; try { - await fs.writeFile(filePath, data, { flag: 'wx' }); + await fs.writeFile(tempPath, data); + tempCreated = true; + await withWindowsRetry(() => fs.link(tempPath, filePath)); return true; } catch (error: any) { if (error.code === 'EEXIST') { return false; } throw error; + } finally { + if (tempCreated) { + // Best-effort cleanup of the temp inode regardless of whether the + // link succeeded — leaving it behind would leak files in the data dir. + await withWindowsRetry(() => fs.unlink(tempPath), 3).catch(() => {}); + } } }