diff --git a/.changeset/replay-history-harness.md b/.changeset/replay-history-harness.md new file mode 100644 index 0000000000..81c7ad285a --- /dev/null +++ b/.changeset/replay-history-harness.md @@ -0,0 +1,5 @@ +--- +'@workflow/core': minor +--- + +Add a workflow history replay helper for validating recorded event trajectories without committing new runtime events. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index f4cd051592..85cf9c3ac1 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -63,6 +63,12 @@ export { type HealthCheckResult, healthCheck, } from './runtime/helpers.js'; +export { + type ReplayPendingOperation, + type ReplayWorkflowHistoryOptions, + type ReplayWorkflowHistoryResult, + replayWorkflowHistory, +} from './runtime/replay-history.js'; export { getHookByToken, resumeHook, diff --git a/packages/core/src/runtime/replay-history.test.ts b/packages/core/src/runtime/replay-history.test.ts new file mode 100644 index 0000000000..d76405fb56 --- /dev/null +++ b/packages/core/src/runtime/replay-history.test.ts @@ -0,0 +1,251 @@ +import { CorruptedEventLogError } from '@workflow/errors'; +import { + type Event, + SPEC_VERSION_CURRENT, + type WorkflowRun, +} from '@workflow/world'; +import { describe, expect, it } from 'vitest'; +import { + dehydrateStepReturnValue, + dehydrateWorkflowArguments, + hydrateWorkflowReturnValue, +} from '../serialization.js'; +import { replayWorkflowHistory } from './replay-history.js'; + +const noEncryptionKey = undefined; + +function getWorkflowTransformCode(workflowName: string) { + return `;globalThis.__private_workflows = new Map([[${JSON.stringify(workflowName)}, ${workflowName}]]);`; +} + +async function createWorkflowRun(runId = 'wrun_replay_test') { + return { + runId, + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments([], runId, noEncryptionKey), + createdAt: new Date('2026-05-21T00:00:00.000Z'), + updatedAt: new Date('2026-05-21T00:00:00.000Z'), + startedAt: new Date('2026-05-21T00:00:00.000Z'), + deploymentId: 'test-deployment', + specVersion: SPEC_VERSION_CURRENT, + } satisfies WorkflowRun; +} + +function createWorkflowCode() { + return ` + const add = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("add"); + async function workflow() { + return await add(1, 2); + } + ${getWorkflowTransformCode('workflow')} + `; +} + +function createEvent( + workflowRun: WorkflowRun, + index: number, + event: Omit +): Event { + return { + eventId: `evnt_${index}`, + runId: workflowRun.runId, + createdAt: new Date(`2026-05-21T00:00:0${index}.000Z`), + ...event, + } as Event; +} + +async function discoverStepCorrelationId( + workflowRun: WorkflowRun, + workflowCode: string +) { + const result = await replayWorkflowHistory({ + workflowCode, + workflowRun, + events: [], + encryptionKey: noEncryptionKey, + }); + + expect(result.status).toBe('suspended'); + if (result.status !== 'suspended') { + throw new Error('expected suspended replay'); + } + + expect(result.counts).toMatchObject({ + steps: 1, + hooks: 0, + waits: 0, + }); + expect(result.pendingOperations).toHaveLength(1); + expect(result.pendingOperations[0]).toMatchObject({ + type: 'step', + stepName: 'add', + }); + + return result.pendingOperations[0].correlationId; +} + +describe('replayWorkflowHistory', () => { + it('returns pending operations for an incomplete history', async () => { + const workflowRun = await createWorkflowRun(); + const result = await replayWorkflowHistory({ + workflowCode: createWorkflowCode(), + workflowRun, + events: [], + encryptionKey: noEncryptionKey, + }); + + expect(result).toMatchObject({ + status: 'suspended', + counts: { + steps: 1, + hooks: 0, + waits: 0, + hookDisposals: 0, + aborts: 0, + }, + }); + expect(result.status).toBe('suspended'); + if (result.status === 'suspended') { + expect(result.pendingOperations[0]).toMatchObject({ + type: 'step', + stepName: 'add', + }); + } + }); + + it('replays a completed history with a terminal event', async () => { + const workflowRun = await createWorkflowRun(); + const workflowCode = createWorkflowCode(); + const correlationId = await discoverStepCorrelationId( + workflowRun, + workflowCode + ); + const stepResult = await dehydrateStepReturnValue( + 3, + workflowRun.runId, + noEncryptionKey + ); + const events: Event[] = [ + createEvent(workflowRun, 0, { + eventType: 'step_started', + correlationId, + specVersion: SPEC_VERSION_CURRENT, + eventData: { + stepName: 'add', + }, + }), + createEvent(workflowRun, 1, { + eventType: 'step_completed', + correlationId, + specVersion: SPEC_VERSION_CURRENT, + eventData: { + stepName: 'add', + result: stepResult, + }, + }), + createEvent(workflowRun, 2, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: stepResult, + }, + }), + ]; + + const result = await replayWorkflowHistory({ + workflowCode, + workflowRun, + events, + encryptionKey: noEncryptionKey, + }); + + expect(result.status).toBe('completed'); + if (result.status === 'completed') { + expect(result.terminalEvent?.eventType).toBe('run_completed'); + await expect( + hydrateWorkflowReturnValue( + result.output, + workflowRun.runId, + noEncryptionKey + ) + ).resolves.toBe(3); + } + }); + + it('throws when an event with the expected correlation id has the wrong step name', async () => { + const workflowRun = await createWorkflowRun(); + const workflowCode = createWorkflowCode(); + const correlationId = await discoverStepCorrelationId( + workflowRun, + workflowCode + ); + + await expect( + replayWorkflowHistory({ + workflowCode, + workflowRun, + events: [ + createEvent(workflowRun, 0, { + eventType: 'step_completed', + correlationId, + specVersion: SPEC_VERSION_CURRENT, + eventData: { + stepName: 'subtract', + result: await dehydrateStepReturnValue( + 3, + workflowRun.runId, + noEncryptionKey + ), + }, + }), + ], + encryptionKey: noEncryptionKey, + }) + ).rejects.toBeInstanceOf(CorruptedEventLogError); + }); + + it('throws when a history contains events for another run', async () => { + const workflowRun = await createWorkflowRun(); + const workflowCode = createWorkflowCode(); + + await expect( + replayWorkflowHistory({ + workflowCode, + workflowRun, + events: [ + { + ...createEvent(workflowRun, 0, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }), + runId: 'wrun_other', + }, + ], + }) + ).rejects.toBeInstanceOf(CorruptedEventLogError); + }); + + it('throws when a non-terminal event appears after a terminal event', async () => { + const workflowRun = await createWorkflowRun(); + const workflowCode = createWorkflowCode(); + + await expect( + replayWorkflowHistory({ + workflowCode, + workflowRun, + events: [ + createEvent(workflowRun, 0, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: {}, + }), + createEvent(workflowRun, 1, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }), + ], + }) + ).rejects.toBeInstanceOf(CorruptedEventLogError); + }); +}); diff --git a/packages/core/src/runtime/replay-history.ts b/packages/core/src/runtime/replay-history.ts new file mode 100644 index 0000000000..e668525152 --- /dev/null +++ b/packages/core/src/runtime/replay-history.ts @@ -0,0 +1,203 @@ +import { CorruptedEventLogError } from '@workflow/errors'; +import type { Event, WorkflowRun } from '@workflow/world'; +import type { CryptoKey } from '../encryption.js'; +import { type QueueItem, WorkflowSuspension } from '../global.js'; +import { runWorkflow } from '../workflow.js'; + +type TerminalRunEventType = 'run_completed' | 'run_failed' | 'run_cancelled'; + +export interface ReplayPendingOperation { + type: QueueItem['type']; + correlationId: string; + stepName?: string; + token?: string; + resumeAt?: Date; + hasCreatedEvent?: boolean; + disposed?: boolean; + isWebhook?: boolean; + isSystem?: boolean; + abortRequested?: boolean; +} + +export type ReplayWorkflowHistoryResult = + | { + status: 'completed'; + output: Uint8Array | unknown; + terminalEvent?: Extract; + } + | { + status: 'suspended'; + pendingOperations: ReplayPendingOperation[]; + counts: { + steps: number; + hooks: number; + waits: number; + hookDisposals: number; + aborts: number; + }; + } + | { + status: 'failed'; + error: unknown; + terminalEvent?: Extract; + } + | { + status: 'cancelled'; + terminalEvent: Extract; + }; + +export interface ReplayWorkflowHistoryOptions { + workflowCode: string; + workflowRun: WorkflowRun; + events: Event[]; + encryptionKey?: CryptoKey; +} + +function isTerminalRunEvent( + event: Event +): event is Extract { + return ( + event.eventType === 'run_completed' || + event.eventType === 'run_failed' || + event.eventType === 'run_cancelled' + ); +} + +function prepareReplayEvents( + workflowRun: WorkflowRun, + events: Event[] +): { + replayEvents: Event[]; + terminalEvent?: Extract; +} { + let terminalEvent: + | Extract + | undefined; + const replayEvents: Event[] = []; + + for (const event of events) { + if (event.runId !== workflowRun.runId) { + throw new CorruptedEventLogError( + `Replay history contains event "${event.eventId}" for run "${event.runId}", but expected run "${workflowRun.runId}"` + ); + } + + if (terminalEvent) { + throw new CorruptedEventLogError( + `Replay history contains event "${event.eventId}" after terminal event "${terminalEvent.eventId}"` + ); + } + + if (isTerminalRunEvent(event)) { + terminalEvent = event; + } else { + replayEvents.push(event); + } + } + + return { replayEvents, terminalEvent }; +} + +function serializePendingOperation(item: QueueItem): ReplayPendingOperation { + switch (item.type) { + case 'step': + return { + type: item.type, + correlationId: item.correlationId, + stepName: item.stepName, + hasCreatedEvent: item.hasCreatedEvent, + }; + case 'hook': + return { + type: item.type, + correlationId: item.correlationId, + token: item.token, + hasCreatedEvent: item.hasCreatedEvent, + disposed: item.disposed, + isWebhook: item.isWebhook, + isSystem: item.isSystem, + abortRequested: item.abortRequested, + }; + case 'wait': + return { + type: item.type, + correlationId: item.correlationId, + resumeAt: item.resumeAt, + hasCreatedEvent: item.hasCreatedEvent, + }; + } +} + +export async function replayWorkflowHistory({ + workflowCode, + workflowRun, + events, + encryptionKey, +}: ReplayWorkflowHistoryOptions): Promise { + const { replayEvents, terminalEvent } = prepareReplayEvents( + workflowRun, + events + ); + + if (terminalEvent?.eventType === 'run_cancelled') { + return { + status: 'cancelled', + terminalEvent, + }; + } + + try { + const output = await runWorkflow( + workflowCode, + workflowRun, + replayEvents, + encryptionKey, + { drainPendingQueueItems: false } + ); + + if (terminalEvent?.eventType === 'run_failed') { + throw new CorruptedEventLogError( + `Replay history ended with "${terminalEvent.eventType}", but workflow replay completed` + ); + } + + return { + status: 'completed', + output, + terminalEvent: + terminalEvent?.eventType === 'run_completed' + ? terminalEvent + : undefined, + }; + } catch (error) { + if (WorkflowSuspension.is(error)) { + if (terminalEvent) { + throw new CorruptedEventLogError( + `Replay history ended with "${terminalEvent.eventType}", but workflow replay suspended` + ); + } + + return { + status: 'suspended', + pendingOperations: error.steps.map(serializePendingOperation), + counts: { + steps: error.stepCount, + hooks: error.hookCount, + waits: error.waitCount, + hookDisposals: error.hookDisposedCount, + aborts: error.abortCount, + }, + }; + } + + if (terminalEvent?.eventType === 'run_failed') { + return { + status: 'failed', + error, + terminalEvent, + }; + } + + throw error; + } +} diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 43fc54566e..6f04b21d95 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -6,7 +6,6 @@ import { WorkflowRuntimeError, } from '@workflow/errors'; import { withResolvers } from '@workflow/utils'; -import { getPortLazy } from './runtime/get-port-lazy.js'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import type { Event, WorkflowRun } from '@workflow/world'; import * as nanoid from 'nanoid'; @@ -16,9 +15,10 @@ import { EventConsumerResult, EventsConsumer } from './events-consumer.js'; import type { QueueItem } from './global.js'; import { ENOTSUP, WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; +import type { WorkflowOrchestratorContext } from './private.js'; +import { getPortLazy } from './runtime/get-port-lazy.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld } from './runtime/world.js'; -import type { WorkflowOrchestratorContext } from './private.js'; import { dehydrateWorkflowReturnValue, hydrateWorkflowArguments, @@ -36,12 +36,12 @@ import * as Attribute from './telemetry/semantic-conventions.js'; import { trace } from './telemetry.js'; import { getWorkflowRunStreamId } from './util.js'; import { createContext } from './vm/index.js'; -import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js'; -import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js'; import { createAbortSignalStatics, createCreateAbortController, } from './workflow/abort-controller.js'; +import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js'; +import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js'; import { createCreateHook } from './workflow/hook.js'; import { createSleep } from './workflow/sleep.js'; @@ -107,13 +107,19 @@ async function drainPendingQueueItems( } } +interface RunWorkflowOptions { + drainPendingQueueItems?: boolean; +} + export async function runWorkflow( workflowCode: string, workflowRun: WorkflowRun, events: Event[], - encryptionKey: CryptoKey | undefined + encryptionKey: CryptoKey | undefined, + options: RunWorkflowOptions = {} ): Promise { return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => { + const shouldDrainPendingQueueItems = options.drainPendingQueueItems ?? true; span?.setAttributes({ ...Attribute.WorkflowName(workflowRun.workflowName), ...Attribute.WorkflowRunId(workflowRun.runId), @@ -797,13 +803,15 @@ export async function runWorkflow( ...Attribute.WorkflowResultType(typeof result), }); - await drainPendingQueueItems( - workflowRun.runId, - workflowContext.invocationsQueue, - vmGlobalThis, - workflowRun, - 'completed' - ); + if (shouldDrainPendingQueueItems) { + await drainPendingQueueItems( + workflowRun.runId, + workflowContext.invocationsQueue, + vmGlobalThis, + workflowRun, + 'completed' + ); + } return dehydrated; } catch (err) { @@ -812,13 +820,15 @@ export async function runWorkflow( throw err; } - await drainPendingQueueItems( - workflowRun.runId, - workflowContext.invocationsQueue, - vmGlobalThis, - workflowRun, - 'failed' - ); + if (shouldDrainPendingQueueItems) { + await drainPendingQueueItems( + workflowRun.runId, + workflowContext.invocationsQueue, + vmGlobalThis, + workflowRun, + 'failed' + ); + } throw err; } diff --git a/workbench/nextjs-turbopack/package.json b/workbench/nextjs-turbopack/package.json index 6f298210ea..4ff1b171f9 100644 --- a/workbench/nextjs-turbopack/package.json +++ b/workbench/nextjs-turbopack/package.json @@ -4,7 +4,7 @@ "private": true, "license": "Apache-2.0", "scripts": { - "generate:workflows": "node ../scripts/generate-workflows-registry.js", + "generate:workflows": "node ../scripts/generate-workflows-registry.js --include app/.well-known/agent/v1/steps.ts", "predev": "pnpm generate:workflows", "prebuild": "pnpm generate:workflows", "dev": "next dev --turbopack", diff --git a/workbench/scripts/generate-workflows-registry.js b/workbench/scripts/generate-workflows-registry.js index 8b42645c53..fa03bff455 100644 --- a/workbench/scripts/generate-workflows-registry.js +++ b/workbench/scripts/generate-workflows-registry.js @@ -3,7 +3,7 @@ /** * Auto-generates _workflows.ts registry file for workbenches * - * Usage: node generate-workflows-registry.js [workflowsDir] [outputPath] [--esm] + * Usage: node generate-workflows-registry.js [workflowsDir] [outputPath] [--esm] [--include path] * * Defaults: * workflowsDir: ./workflows @@ -19,17 +19,37 @@ const path = require('node:path'); // Parse arguments const args = process.argv.slice(2); const esmMode = args.includes('--esm'); -const nonFlagArgs = args.filter((arg) => !arg.startsWith('--')); +const includeFiles = []; +const nonFlagArgs = []; + +for (let index = 0; index < args.length; index++) { + const arg = args[index]; + if (arg === '--esm') { + continue; + } + if (arg === '--include') { + const includePath = args[index + 1]; + if (!includePath || includePath.startsWith('--')) { + console.error('Error: --include requires a file path'); + process.exit(1); + } + includeFiles.push(includePath); + index++; + continue; + } + if (arg.startsWith('--')) { + console.error(`Error: Unknown option: ${arg}`); + process.exit(1); + } + nonFlagArgs.push(arg); +} // Get arguments or use defaults const workflowsDir = nonFlagArgs[0] || './workflows'; const outputPath = nonFlagArgs[1] || './_workflows.ts'; -// Calculate relative path from output to workflows directory +// Calculate relative path from output to each registered file. const outputDir = path.dirname(outputPath); -const relativeWorkflowsPath = path - .relative(outputDir, workflowsDir) - .replace(/\\/g, '/'); // Files to skip const SKIP_FILES = ['helpers.ts']; @@ -38,9 +58,9 @@ const SKIP_PREFIX = '_'; function generateSafeIdentifier(filename) { // Convert filename to safe JS identifier // e.g., "1_simple.ts" -> "workflow_1_simple" - return ( - 'workflow_' + filename.replace(/\.tsx?$/, '').replace(/[^a-zA-Z0-9_]/g, '_') - ); + return `workflow_${filename + .replace(/\.tsx?$/, '') + .replace(/[^a-zA-Z0-9_]/g, '_')}`; } function generateRegistry() { @@ -51,7 +71,7 @@ function generateRegistry() { } // Read all files from workflows directory - const files = fs + const workflowFiles = fs .readdirSync(workflowsDir) .filter((file) => { // Only .ts files @@ -61,7 +81,26 @@ function generateRegistry() { if (file.startsWith(SKIP_PREFIX)) return false; return true; }) - .sort(); // Sort for consistent output + .map((file) => ({ + filePath: path.join(workflowsDir, file).replace(/\\/g, '/'), + registryKey: `workflows/${file}`, + })); + + const extraFiles = includeFiles.map((file) => { + const filePath = file.replace(/\\/g, '/'); + if (!fs.existsSync(filePath)) { + console.error(`Error: Included workflow file not found: ${filePath}`); + process.exit(1); + } + return { + filePath, + registryKey: filePath, + }; + }); + + const files = [...workflowFiles, ...extraFiles].sort((a, b) => + a.registryKey.localeCompare(b.registryKey) + ); if (files.length === 0) { console.warn('Warning: No workflow files found to register'); @@ -72,25 +111,22 @@ function generateRegistry() { // Generate imports const imports = files - .map((file) => { - const identifier = generateSafeIdentifier(file); - // Use relative path from output directory to workflows directory - let importPath; - const baseName = file.replace(/\.tsx?$/, ''); - if (relativeWorkflowsPath && relativeWorkflowsPath !== 'workflows') { - importPath = `${relativeWorkflowsPath}/${baseName}${importExtension}`; - } else { - importPath = `./workflows/${baseName}${importExtension}`; + .map(({ filePath, registryKey }) => { + const identifier = generateSafeIdentifier(registryKey); + let importPath = path.relative(outputDir, filePath).replace(/\\/g, '/'); + if (!importPath.startsWith('.')) { + importPath = `./${importPath}`; } + importPath = importPath.replace(/\.tsx?$/, importExtension); return `import * as ${identifier} from '${importPath}';`; }) .join('\n'); // Generate registry object entries const registryEntries = files - .map((file) => { - const identifier = generateSafeIdentifier(file); - return ` 'workflows/${file}': ${identifier},`; + .map(({ registryKey }) => { + const identifier = generateSafeIdentifier(registryKey); + return ` '${registryKey}': ${identifier},`; }) .join('\n'); @@ -109,7 +145,9 @@ ${registryEntries} fs.writeFileSync(outputPath, content, 'utf-8'); console.log(`✓ Generated ${outputPath} with ${files.length} workflow(s)`); - files.forEach((file) => console.log(` - workflows/${file}`)); + files.forEach(({ registryKey }) => { + console.log(` - ${registryKey}`); + }); } // Run the generator