Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/concurrent-e2e.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
---

chore(tests): re-enable `describe.concurrent` in the e2e suite to cut CI wall-clock per workbench, and isolate diagnostic tracking state per-task so concurrent tests no longer clobber each other's `trackedRuns` (each failing test now surfaces its own run's diagnostics, and every tracked run is logged with `[trackRun] <test> → <runId>` for stdout correlation even when diagnostics fetching fails).
5 changes: 5 additions & 0 deletions .changeset/readjson-diagnostics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Fix a concurrent-read race in `writeExclusive`. `fs.writeFile(path, data, { flag: 'wx' })` is atomic for file *creation* (`O_CREAT | O_EXCL`) but the subsequent data write is not, so a concurrent reader could observe a half-written file (e.g. just `{`) before the writer's content was flushed. Callers that combined exclusive claim semantics with a follow-up read of the payload — notably the hook-token claim path and run-creation — would intermittently see `SyntaxError: Unexpected end of JSON input` (or worse, treat a partial file as an empty claim, masking conflicts). Switch to a two-step pattern: write the full content to a temp file first, then `link(2)` it into place. POSIX `link` is atomic and fails with `EEXIST` if the target already exists, so readers either see `ENOENT` or the fully-populated inode. `readJSON` also annotates any `SyntaxError` with the file path, on-disk size, and a snippet of the offending content, making future cases of this class actionable from CI logs (the error type is preserved so callers that intentionally swallow `SyntaxError` continue to work).
4 changes: 1 addition & 3 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ async function startWorkflowViaHttp(
return run;
}

// NOTE: Temporarily disabling concurrent tests to avoid flakiness.
// TODO: Re-enable concurrent tests after conf when we have more time to investigate.
describe('e2e', () => {
describe.concurrent('e2e', () => {
// Configure the World for the test runner process so that start() and
// run.returnValue can communicate with the same backend as the workbench app.
beforeAll(async () => {
Expand Down
74 changes: 59 additions & 15 deletions packages/core/e2e/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import path, { dirname } from 'node:path';
import { setTimeout as sleep } from 'node:timers/promises';
import { fileURLToPath } from 'node:url';
import { createVercelWorld } from '@workflow/world-vercel';
import { onTestFailed } from 'vitest';
import { onTestFailed, onTestFinished } from 'vitest';
import { getCurrentTest } from 'vitest/suite';
import { getTrustedSourcesHeaders } from '../../../scripts/trusted-sources-headers.mjs';
import { parseEnvironmentFlag } from '../../next/src/environment-flag.js';
import type { Run } from '../src/runtime';
Expand Down Expand Up @@ -460,8 +461,28 @@ interface TrackedRun {
workflowFn?: string;
}

// Per-test tracked runs — reset between tests via setupRunTracking()
let trackedRuns: TrackedRun[] = [];
interface PerTaskState {
testName: string;
trackedRuns: TrackedRun[];
}

// Keyed by vitest task id so concurrent tests get isolated state.
// Stored on a module-level Map (rather than a `let` slot) because
// `describe.concurrent` interleaves beforeEach/test/onTestFailed across
// tasks — a shared `let` gets clobbered by whichever task last entered
// beforeEach, surfacing the wrong run's diagnostics on the failing test.
const perTaskState = new Map<string, PerTaskState>();

function currentTaskState(): PerTaskState | undefined {
const task = getCurrentTest();
if (!task) return undefined;
let state = perTaskState.get(task.id);
if (!state) {
state = { testName: task.name, trackedRuns: [] };
perTaskState.set(task.id, state);
}
return state;
}

// Global list of run IDs collected for metadata (observability links)
const globalCollectedRunIds: {
Expand Down Expand Up @@ -492,17 +513,24 @@ export function trackRun<T>(
workflowFn?: string;
}
): Run<T> {
const testName = options?.testName ?? currentTestName;
trackedRuns.push({
run,
workflowFile: options?.workflowFile,
workflowFn: options?.workflowFn,
});
const state = currentTaskState();
const testName = options?.testName ?? state?.testName ?? 'unknown';
if (state) {
state.trackedRuns.push({
run,
workflowFile: options?.workflowFile,
workflowFn: options?.workflowFn,
});
}
globalCollectedRunIds.push({
testName,
runId: run.runId,
timestamp: new Date().toISOString(),
});
// Unconditionally log run-id <-> test correlation so that if diagnostics
// fail to fetch (or the test hangs past the timeout), we can still trace
// back from CI stdout which workflow run belongs to which test.
console.log(`[trackRun] ${testName} → ${run.runId}`);
return run;
}

Expand Down Expand Up @@ -661,13 +689,27 @@ function emitGitHubAnnotation(
* beforeEach((ctx) => { setupRunTracking(ctx.task.name); });
*/
export function setupRunTracking(testName: string) {
currentTestName = testName;
trackedRuns = [];
const task = getCurrentTest();
if (!task) {
// Outside of a test context — nothing to wire up.
return;
}
const state: PerTaskState = { testName, trackedRuns: [] };
perTaskState.set(task.id, state);

onTestFailed(
async (result) => {
const errorMessage = result.errors?.[0]?.message || 'Test failed';

for (const tracked of trackedRuns) {
if (state.trackedRuns.length === 0) {
// Make the absence of tracked runs explicit: helps distinguish "test
// failed before start()" from "diagnostics infrastructure is broken".
console.error(
`[diagnostics] ${testName}: no tracked runs for this test`
);
}

for (const tracked of state.trackedRuns) {
try {
const diagnostics = await getRunDiagnostics(tracked);
console.error(diagnostics);
Expand All @@ -681,10 +723,12 @@ export function setupRunTracking(testName: string) {
},
30_000 // Allow 30s for diagnostics fetching (default hookTimeout is 10s)
);
}

// Current test name for auto-tracking
let currentTestName = 'unknown';
// Drop the per-task slot after the test settles to keep the Map bounded.
onTestFinished(() => {
perTaskState.delete(task.id);
});
}

/**
* Write diagnostics sidecar file with per-test run info for the aggregation script.
Expand Down
52 changes: 46 additions & 6 deletions packages/world-local/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,29 @@ export async function readJSON<T>(
filePath: string,
decoder: z.ZodType<T>
): Promise<T | null> {
let content: string;
try {
const content = await fs.readFile(filePath, 'utf-8');
return decoder.parse(JSON.parse(content, jsonReviver));
content = await fs.readFile(filePath, 'utf-8');
} catch (error) {
if ((error as any).code === 'ENOENT') return null;
throw error;
}
try {
return decoder.parse(JSON.parse(content, jsonReviver));
} catch (error) {
if (error instanceof SyntaxError) {
// Annotate the SyntaxError with the file path, size, and a snippet of
// the (possibly empty / partial) content. Some writers — notably
// `writeExclusive` — are atomic for file *creation* but not for the
// data write that follows, so a concurrent reader can briefly observe
// a half-written file. Preserve the SyntaxError type so callers like
// `readHookTokenClaim` that intentionally swallow it via
// `error instanceof SyntaxError` continue to work.
const stat = await fs.stat(filePath).catch(() => null);
error.message = `${error.message} (parsing ${filePath}, size=${stat?.size ?? '?'}, content=${JSON.stringify(content.slice(0, 200))})`;
}
throw error;
}
}

export async function readBuffer(filePath: string): Promise<Buffer> {
Expand All @@ -380,23 +396,47 @@ export async function deleteJSON(filePath: string): Promise<void> {
}

/**
* Atomically create a file using O_CREAT | O_EXCL flags.
* Returns true if the file was created, false if it already exists.
* This is atomic at the OS level, safe for concurrent access.
* Atomically create a file containing `data`, failing if the target already
* exists. Returns true on successful claim, false if the file is already
* present.
*
* The naive `fs.writeFile(path, data, { flag: 'wx' })` form is atomic for the
* file *creation* (`O_CREAT | O_EXCL`) but the subsequent content write is
* not — a concurrent reader can observe the file with partial bytes (e.g.
* just `{`) before the writer's content has been flushed. Callers that
* combine `writeExclusive` for claim-style exclusion with a follow-up read
* of the claim payload would intermittently see a `SyntaxError` instead of
* the claim data.
*
* Use a two-step atomic pattern instead: write the full content to a temp
* file first, then `link(2)` it into place. POSIX guarantees `link` is
* atomic and fails with `EEXIST` if the target already exists. The reader
* either sees nothing (ENOENT) or the fully-populated linked inode — never
* a half-written file.
*/
export async function writeExclusive(
filePath: string,
data: string
): Promise<boolean> {
await ensureDir(path.dirname(filePath));
const tempPath = `${filePath}.tmp.${ulid()}`;
let tempCreated = false;
try {
await fs.writeFile(filePath, data, { flag: 'wx' });
await fs.writeFile(tempPath, data);
tempCreated = true;
await withWindowsRetry(() => fs.link(tempPath, filePath));
return true;
} catch (error: any) {
if (error.code === 'EEXIST') {
return false;
}
throw error;
} finally {
if (tempCreated) {
// Best-effort cleanup of the temp inode regardless of whether the
// link succeeded — leaving it behind would leak files in the data dir.
await withWindowsRetry(() => fs.unlink(tempPath), 3).catch(() => {});
}
}
}

Expand Down
Loading