From 838186dbd2575d12bb764fc6cd7a07f05b0527ef Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 21 May 2026 12:40:39 -0700 Subject: [PATCH 1/4] Add experimental dynamic workflow source start --- .changeset/dynamic-workflow-source.md | 6 + .../v5/api-reference/workflow-api/start.mdx | 28 ++ .../docs/v5/foundations/dynamic-workflows.mdx | 131 ++++++++++ docs/content/docs/v5/foundations/index.mdx | 3 + docs/content/docs/v5/foundations/meta.json | 1 + packages/core/src/runtime.test.ts | 93 ++++++- packages/core/src/runtime.ts | 35 ++- packages/core/src/runtime/start.test.ts | 159 ++++++++++++ packages/core/src/runtime/start.ts | 240 +++++++++++++++++- packages/workflow/src/api.ts | 3 + 10 files changed, 681 insertions(+), 18 deletions(-) create mode 100644 .changeset/dynamic-workflow-source.md create mode 100644 docs/content/docs/v5/foundations/dynamic-workflows.mdx diff --git a/.changeset/dynamic-workflow-source.md b/.changeset/dynamic-workflow-source.md new file mode 100644 index 0000000000..3cd43198c5 --- /dev/null +++ b/.changeset/dynamic-workflow-source.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"workflow": patch +--- + +Add an experimental dynamic workflow source overload for `start()`. diff --git a/docs/content/docs/v5/api-reference/workflow-api/start.mdx b/docs/content/docs/v5/api-reference/workflow-api/start.mdx index a2962c5ee4..ea31443744 100644 --- a/docs/content/docs/v5/api-reference/workflow-api/start.mdx +++ b/docs/content/docs/v5/api-reference/workflow-api/start.mdx @@ -56,6 +56,7 @@ Learn more about [`WorkflowReadableStreamOptions`](/docs/api-reference/workflow- * The function returns immediately after enqueuing the workflow - it doesn't wait for the workflow to complete. * All arguments must be [serializable](/docs/foundations/serialization). * When `deploymentId` is provided, the argument types and return type become `unknown` since there is no guarantee the workflow function's types will be consistent across different deployments. +* You can also start an experimental [dynamic workflow](/docs/foundations/dynamic-workflows) from a trusted source string by passing `options.dynamic`. If `start()` throws `'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`, the passed function was not transformed as a workflow. The two most common causes are a missing `"use workflow"` directive or missing framework integration. See [start-invalid-workflow-function](/docs/errors/start-invalid-workflow-function). @@ -83,6 +84,33 @@ const run = await start(myWorkflow, ["arg1", "arg2"], { // [!code highlight] }); // [!code highlight] ``` +### With Dynamic Source + +Pass a trusted JavaScript source string plus `dynamic.steps` when the workflow orchestration is generated at runtime. + +```typescript +import { start } from "workflow/api"; +import { fetchUser, sendEmail } from "./steps"; + +const source = ` +async function workflow(input) { + "use workflow"; + + const user = await steps.fetchUser(input.userId); + await steps.sendEmail(user.email); + + return { ok: true }; +} +`; + +const run = await start(source, [{ userId: "user_123" }], { + dynamic: { + id: "ai-user-followup", + steps: { fetchUser, sendEmail }, + }, +}); +``` + ### Using `deploymentId: "latest"` Set `deploymentId` to `"latest"` to automatically resolve the most recent deployment for the current environment. This is useful when you want to ensure a workflow run targets the latest deployed version of your application rather than the deployment that initiated the call. For when to use this and how it fits with default run pinning, see [Versioning](/docs/foundations/versioning). diff --git a/docs/content/docs/v5/foundations/dynamic-workflows.mdx b/docs/content/docs/v5/foundations/dynamic-workflows.mdx new file mode 100644 index 0000000000..63dbe6f4ea --- /dev/null +++ b/docs/content/docs/v5/foundations/dynamic-workflows.mdx @@ -0,0 +1,131 @@ +--- +title: Dynamic Workflows +description: Start trusted workflow source strings that orchestrate already-registered steps. +type: conceptual +summary: Run workflow code generated at runtime without adding a new build-time workflow file. +prerequisites: + - /docs/foundations/workflows-and-steps + - /docs/foundations/starting-workflows +related: + - /docs/api-reference/workflow-api/start + - /docs/foundations/serialization + - /docs/foundations/versioning +--- + +Dynamic workflows let you start a workflow from a JavaScript source string instead of importing a workflow function that was discovered at build time. This is useful when the orchestration is generated at runtime, such as from a workflow builder UI or an LLM. + + +Dynamic workflows are experimental. Treat the source as trusted application code. The workflow VM enforces the usual deterministic workflow runtime, but it is not a security sandbox for untrusted JavaScript. + + +## How it works + +Dynamic workflows reuse the normal Workflow SDK runtime. The dynamic source is compiled into per-run workflow VM code, stored on the run, and replayed from the event log just like a static workflow. + +For the MVP, dynamic workflows can only orchestrate step functions that are already registered in the deployment. Pass those step references through `dynamic.steps`, then call them from the source through the injected `steps` object. + +```typescript lineNumbers +import { start } from "workflow/api"; +import { fetchUser, sendEmail } from "./steps"; + +const source = ` +async function workflow(input) { + "use workflow"; + + const user = await steps.fetchUser(input.userId); + await steps.sendEmail(user.email); + + return { ok: true }; +} +`; + +const run = await start( + source, + [{ userId: "user_123" }], + { + dynamic: { + id: "ai-user-followup", + steps: { fetchUser, sendEmail }, + }, + }, +); +``` + +The generated workflow ID uses the form `workflow//dynamic///`. If you omit `id`, Workflow SDK derives one from the source and step references. + +## Generated with an LLM + +Keep the step catalog explicit. The model should choose from steps you provide, and your application should review or validate the generated source before starting it. + +```typescript lineNumbers +import { generateText } from "ai"; +import { start } from "workflow/api"; +import { createTicket, lookupCustomer, sendSlackMessage } from "./steps"; + +const stepCatalog = { + lookupCustomer, + createTicket, + sendSlackMessage, +}; + +const { text: source } = await generateText({ + model: myModel, + system: ` +Generate one JavaScript async function named workflow. +The first statement inside the function must be "use workflow". +Only call tools through the injected steps object. +Do not use import, export, TypeScript syntax, or inline step functions. +`, + prompt: "When a customer reports a billing issue, look them up, create a ticket, and notify Slack.", +}); + +const run = await start( + source, + [{ customerId: "cus_123", issue: "Invoice looks wrong" }], + { + dynamic: { + id: "billing-triage", + steps: stepCatalog, + }, + }, +); +``` + +## Options + +`dynamic.steps` is required. Each value can be an imported step function or an explicit `{ stepId }` reference. + +```typescript lineNumbers +import { start } from "workflow/api"; +import { fetchUser, sendEmail } from "./steps"; + +const source = ` +async function workflow(input) { + "use workflow"; + return await steps.fetchUser(input.userId); +} +`; +const input = { userId: "user_123" }; + +await start(source, [input], { + dynamic: { + steps: { + fetchUser, + sendEmail: { stepId: "step//./steps//sendEmail" }, + }, + exportName: "workflow", + }, +}); +``` + +`dynamic.exportName` defaults to `workflow`. Use it when your generated source defines a different async function name. + +## MVP limitations + +- Dynamic source must be JavaScript and must fit within the 32 KB source limit. +- The source must define `async function workflow(...)` by default, or the function named by `dynamic.exportName`. +- The first statement in that function must be `"use workflow";`. +- `import` and `export` syntax are not supported. +- Inline `"use step"` functions are not supported. +- TypeScript syntax, runtime bundling, npm package imports, and custom class serialization registration are not supported. +- Dynamic workflow source and generated VM code are stored with run metadata. Do not put secrets in generated source. diff --git a/docs/content/docs/v5/foundations/index.mdx b/docs/content/docs/v5/foundations/index.mdx index 1eb96ddf69..7dd1c80920 100644 --- a/docs/content/docs/v5/foundations/index.mdx +++ b/docs/content/docs/v5/foundations/index.mdx @@ -17,6 +17,9 @@ Workflow programming can be a slight shift from how you traditionally write real Trigger workflows and track their execution using the `start()` function. + + Start trusted workflow source strings that orchestrate registered steps. + Types of errors and how retrying work in workflows. diff --git a/docs/content/docs/v5/foundations/meta.json b/docs/content/docs/v5/foundations/meta.json index ce49cfe47b..66e859f143 100644 --- a/docs/content/docs/v5/foundations/meta.json +++ b/docs/content/docs/v5/foundations/meta.json @@ -3,6 +3,7 @@ "pages": [ "workflows-and-steps", "starting-workflows", + "dynamic-workflows", "errors-and-retries", "hooks", "streaming", diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index 90603df446..05ccf776c0 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -10,6 +10,8 @@ import { workflowEntrypoint } from './runtime.js'; import { dehydrateStepReturnValue, dehydrateWorkflowArguments, + hydrateRunError, + hydrateWorkflowReturnValue, } from './serialization.js'; vi.mock('@vercel/functions', () => ({ @@ -58,7 +60,7 @@ async function runWorkflowHandlerWithEvents( { requestId: 'req_test', attempt: 1, - queueName: '__wkf_workflow_workflow', + queueName: `__wkf_workflow_${workflowRun.workflowName}`, messageId: 'msg_test', } ); @@ -97,6 +99,95 @@ describe('workflowEntrypoint replay guards', () => { `;globalThis.__private_workflows = new Map(); globalThis.__private_workflows.set(${JSON.stringify(workflowName)}, ${workflowName});`; + it('uses dynamic workflow code from the run executionContext when present', async () => { + const ops: Promise[] = []; + const workflowRun: WorkflowRun = { + runId: 'wrun_runtime_dynamic', + workflowName: 'workflow//dynamic/test-run//workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + ['Ada'], + 'wrun_runtime_dynamic', + undefined, + ops + ), + executionContext: { + dynamicWorkflow: { + version: 1, + sourceHash: 'hash', + exportName: 'workflow', + workflowCode: ` + async function workflow(name) { + return "hello " + name; + } + ;globalThis.__private_workflows = new Map(); + globalThis.__private_workflows.set("workflow//dynamic/test-run//workflow", workflow); + `, + }, + }, + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + + const events: Event[] = [ + { + eventId: 'event-0', + runId: workflowRun.runId, + eventType: 'run_created', + eventData: { + input: workflowRun.input, + deploymentId: workflowRun.deploymentId, + workflowName: workflowRun.workflowName, + executionContext: workflowRun.executionContext, + }, + createdAt: new Date('2024-01-01T00:00:00.000Z'), + specVersion: SPEC_VERSION_CURRENT, + }, + { + eventId: 'event-1', + runId: workflowRun.runId, + eventType: 'run_started', + createdAt: new Date('2024-01-01T00:00:00.000Z'), + specVersion: SPEC_VERSION_CURRENT, + }, + ]; + + const createdEvents = await runWorkflowHandlerWithEvents( + `function workflow() { throw new Error("static bundle should not run"); }${getWorkflowTransformCode('workflow')}`, + workflowRun, + events + ); + + const runCompleted = createdEvents.find( + (event: any) => event.eventType === 'run_completed' + ) as any; + if (!runCompleted) { + const runFailed = createdEvents.find( + (event: any) => event.eventType === 'run_failed' + ) as any; + if (runFailed) { + const error = await hydrateRunError( + runFailed.eventData.error, + workflowRun.runId, + undefined, + ops + ); + throw error; + } + } + expect(runCompleted).toBeDefined(); + expect( + await hydrateWorkflowReturnValue( + runCompleted.eventData.output, + workflowRun.runId, + undefined, + ops + ) + ).toBe('hello Ada'); + }); + it('records run_failed when a committed wait_completed targets the wrong wait', async () => { const ops: Promise[] = []; const workflowRun: WorkflowRun = { diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 60df07876a..c1409a035e 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -85,6 +85,9 @@ export { wakeUpRun, } from './runtime/runs.js'; export { + type DynamicStartOptions, + type DynamicWorkflowOptions, + type DynamicWorkflowStepReference, type StartOptions, type StartOptionsBase, type StartOptionsWithDeploymentId, @@ -781,8 +784,12 @@ export function workflowEntrypoint( eventCount: events.length, }); replayStart = Date.now(); - const result = await runWorkflow( + const workflowCodeForRun = getWorkflowCodeForRun( workflowCode, + workflowRun + ); + const result = await runWorkflow( + workflowCodeForRun, workflowRun, events, encryptionKey @@ -1060,10 +1067,14 @@ export function workflowEntrypoint( const parsedName = parseWorkflowName(workflowName); const filename = parsedName?.moduleSpecifier || workflowName; + const workflowCodeForRun = getWorkflowCodeForRun( + workflowCode, + workflowRun + ); errorStack = remapErrorStack( errorStack, filename, - workflowCode + workflowCodeForRun ); } @@ -1157,6 +1168,24 @@ export function workflowEntrypoint( if (!cachedHandler) { cachedHandler = handler(await getWorldHandlers()); } - return cachedHandler(req); + return cachedHandler!(req); }); } + +function getWorkflowCodeForRun( + staticWorkflowCode: string, + workflowRun: WorkflowRun +): string { + const dynamicWorkflow = workflowRun.executionContext?.dynamicWorkflow; + if ( + dynamicWorkflow && + typeof dynamicWorkflow === 'object' && + 'version' in dynamicWorkflow && + dynamicWorkflow.version === 1 && + 'workflowCode' in dynamicWorkflow && + typeof dynamicWorkflow.workflowCode === 'string' + ) { + return dynamicWorkflow.workflowCode; + } + return staticWorkflowCode; +} diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index c30a17da27..23c343d9ec 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -491,6 +491,153 @@ describe('start', () => { }); }); + describe('dynamic workflow source', () => { + let mockEventsCreate: ReturnType; + let mockQueue: ReturnType; + + const validSource = ` +async function workflow(input) { + "use workflow"; + const user = await steps.fetchUser(input.userId); + await steps.sendEmail(user.email); + return { ok: true }; +}`; + + const fetchUser = Object.assign(async () => undefined, { + stepId: 'step//./steps//fetchUser', + }); + const sendEmail = Object.assign(async () => undefined, { + stepId: 'step//./steps//sendEmail', + }); + + beforeEach(() => { + mockEventsCreate = vi.fn().mockImplementation((runId) => { + return Promise.resolve({ + run: { runId: runId ?? 'wrun_test123', status: 'pending' }, + }); + }); + mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + + setWorld({ + specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + }); + + afterEach(() => { + setWorld(undefined); + vi.clearAllMocks(); + }); + + it('stores dynamic workflow code in executionContext and queues the generated workflow name', async () => { + await start(validSource, [{ userId: 'user_123' }], { + dynamic: { + id: 'ai-user-followup', + steps: { fetchUser, sendEmail }, + }, + }); + + const [, runCreated] = mockEventsCreate.mock.calls[0]; + expect(runCreated.eventData.workflowName).toBe( + 'workflow//dynamic/ai-user-followup//workflow' + ); + expect(runCreated.eventData.executionContext.dynamicWorkflow).toEqual( + expect.objectContaining({ + version: 1, + exportName: 'workflow', + sourceHash: expect.any(String), + workflowCode: expect.stringContaining('steps = Object.freeze'), + }) + ); + expect( + runCreated.eventData.executionContext.dynamicWorkflow.workflowCode + ).toContain('__dynamicUseStep("step//./steps//fetchUser")'); + + const [queueName, queuePayload] = mockQueue.mock.calls[0]; + expect(queueName).toBe( + '__wkf_workflow_workflow//dynamic/ai-user-followup//workflow' + ); + expect(queuePayload.runInput.workflowName).toBe( + 'workflow//dynamic/ai-user-followup//workflow' + ); + expect(queuePayload.runInput.executionContext.dynamicWorkflow).toEqual( + runCreated.eventData.executionContext.dynamicWorkflow + ); + }); + + it('accepts explicit stepId references', async () => { + await start(validSource, [{ userId: 'user_123' }], { + dynamic: { + id: 'explicit-step-refs', + steps: { + fetchUser: { stepId: 'step//./steps//fetchUser' }, + sendEmail: { stepId: 'step//./steps//sendEmail' }, + }, + }, + }); + + const [, runCreated] = mockEventsCreate.mock.calls[0]; + expect( + runCreated.eventData.executionContext.dynamicWorkflow.workflowCode + ).toContain('__dynamicUseStep("step//./steps//sendEmail")'); + }); + + it('rejects source strings without dynamic options', async () => { + await expect( + // @ts-expect-error - intentionally missing dynamic options + start(validSource, []) + ).rejects.toThrow('Dynamic workflow source requires options.dynamic'); + }); + + it('rejects missing steps', async () => { + await expect( + start(validSource, [], { dynamic: { steps: {} } }) + ).rejects.toThrow('dynamic.steps'); + }); + + it('rejects step aliases without stepId metadata', async () => { + await expect( + start(validSource, [], { + dynamic: { + steps: { + fetchUser: (async () => undefined) as any, + sendEmail, + }, + }, + }) + ).rejects.toThrow('must be an imported step function'); + }); + + it('rejects unsupported module syntax', async () => { + await expect( + start(`import { x } from './x';\n${validSource}`, [], { + dynamic: { steps: { fetchUser, sendEmail } }, + }) + ).rejects.toThrow('cannot contain import or export syntax'); + }); + + it('rejects source without a use workflow directive', async () => { + await expect( + start('async function workflow() { return 1; }', [], { + dynamic: { steps: { fetchUser, sendEmail } }, + }) + ).rejects.toThrow('must start with a "use workflow" directive'); + }); + + it('rejects unsafe ids', async () => { + await expect( + start(validSource, [], { + dynamic: { + id: 'bad/id', + steps: { fetchUser, sendEmail }, + }, + }) + ).rejects.toThrow('Invalid dynamic workflow id'); + }); + }); + describe('overload type inference', () => { // Type-only assertions that don't execute start() at runtime. // We use expectTypeOf on the function signature's return type directly. @@ -532,5 +679,17 @@ describe('start', () => { ) => Promise>; expectTypeOf().toMatchTypeOf(); }); + + it('should return Run for dynamic workflow source', () => { + expectTypeOf< + ( + source: string, + args: unknown[], + opts: { + dynamic: { steps: Record }; + } + ) => Promise> + >().toMatchTypeOf(); + }); }); }); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 6f20e81c2a..fbf11937d8 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,3 +1,4 @@ +import { createHash } from 'node:crypto'; import { waitUntil } from '@vercel/functions'; import { EntityConflictError, @@ -20,9 +21,9 @@ import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; +import { getWorldLazy } from './get-world-lazy.js'; import { getWorkflowQueueName } from './helpers.js'; import { Run } from './run.js'; -import { getWorldLazy } from './get-world-lazy.js'; /** ULID generator for client-side runId generation */ const ulid = monotonicFactory(); @@ -68,6 +69,33 @@ export type StartOptions = | StartOptionsWithDeploymentId | StartOptionsWithoutDeploymentId; +export type DynamicWorkflowStepReference = { readonly stepId: string }; + +export interface DynamicWorkflowOptions { + /** + * Already-registered step functions exposed to the dynamic workflow source. + * + * Each value may be an imported step function transformed by Workflow SDK + * (with a `.stepId` property) or an explicit `{ stepId }` reference. + */ + steps: Record; + + /** + * Optional stable ID segment for the generated workflow ID. Defaults to a + * hash of the source and referenced steps. + */ + id?: string; + + /** + * Name of the async workflow function in the source. Defaults to "workflow". + */ + exportName?: string; +} + +export type DynamicStartOptions = StartOptions & { + dynamic: DynamicWorkflowOptions; +}; + /** * Represents an imported workflow function. */ @@ -80,6 +108,164 @@ export type WorkflowFunction = ( */ export type WorkflowMetadata = { workflowId: string }; +export interface DynamicWorkflowExecutionContext { + version: 1; + workflowCode: string; + sourceHash: string; + exportName: string; +} + +const DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES = 32 * 1024; +const SAFE_DYNAMIC_ID_SEGMENT = /^[a-zA-Z0-9_.@-]+$/; +const SAFE_DYNAMIC_IDENTIFIER = /^[a-zA-Z_][a-zA-Z0-9_]*$/; +const UNSUPPORTED_DYNAMIC_MODULE_SYNTAX = + /(^|[\s;])(?:import\s*(?:[\w*{]|\(|['"])|export\s+(?:async\s+)?(?:function|const|let|var|class|default|\{|\*))/m; + +function stableJsonStringify(value: unknown): string { + if (Array.isArray(value)) { + return `[${value.map(stableJsonStringify).join(',')}]`; + } + if (value && typeof value === 'object') { + return `{${Object.entries(value as Record) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([key, val]) => `${JSON.stringify(key)}:${stableJsonStringify(val)}`) + .join(',')}}`; + } + return JSON.stringify(value); +} + +function sha256Hex(input: string): string { + return createHash('sha256').update(input).digest('hex'); +} + +function assertDynamicWorkflowIdentifier(kind: string, value: string) { + if (!SAFE_DYNAMIC_IDENTIFIER.test(value)) { + throw new WorkflowRuntimeError( + `Invalid dynamic workflow ${kind} "${value}". Use only letters, numbers, and underscores, and start with a letter or underscore.` + ); + } +} + +function validateDynamicWorkflowSource(source: string, exportName: string) { + if (Buffer.byteLength(source, 'utf8') > DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES) { + throw new WorkflowRuntimeError( + `Dynamic workflow source is too large. The MVP limit is ${DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES} bytes.` + ); + } + + if (UNSUPPORTED_DYNAMIC_MODULE_SYNTAX.test(source)) { + throw new WorkflowRuntimeError( + 'Dynamic workflow source cannot contain import or export syntax in the MVP.' + ); + } + + const functionMatch = new RegExp( + `\\basync\\s+function\\s+${exportName}\\s*\\([^)]*\\)\\s*\\{` + ).exec(source); + if (!functionMatch) { + throw new WorkflowRuntimeError( + `Dynamic workflow source must define async function ${exportName}(...).` + ); + } + + const bodyStart = functionMatch.index + functionMatch[0].length; + const bodyPrefix = source.slice(bodyStart, bodyStart + 200); + if (!/^\s*(?:"use workflow"|'use workflow')\s*;/.test(bodyPrefix)) { + throw new WorkflowRuntimeError( + `Dynamic workflow function "${exportName}" must start with a "use workflow" directive.` + ); + } +} + +function getDynamicStepId(alias: string, value: unknown): string { + const stepId = + (value && typeof value === 'object') || typeof value === 'function' + ? (value as { stepId?: unknown }).stepId + : undefined; + + if (typeof stepId !== 'string' || stepId.length === 0) { + throw new WorkflowRuntimeError( + `Dynamic workflow step "${alias}" must be an imported step function or an object with a non-empty stepId.` + ); + } + + return stepId; +} + +function compileDynamicWorkflowSource( + source: string, + options: DynamicWorkflowOptions +): { + workflowName: string; + dynamicWorkflow: DynamicWorkflowExecutionContext; +} { + const exportName = options.exportName ?? 'workflow'; + assertDynamicWorkflowIdentifier('exportName', exportName); + validateDynamicWorkflowSource(source, exportName); + + if (!options.steps || Object.keys(options.steps).length === 0) { + throw new WorkflowRuntimeError( + 'Dynamic workflow options must include at least one registered step in dynamic.steps.' + ); + } + + const stepEntries = Object.entries(options.steps) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([alias, value]) => { + assertDynamicWorkflowIdentifier(`step alias`, alias); + return [alias, getDynamicStepId(alias, value)] as const; + }); + + const sourceHash = sha256Hex( + `${source}\n${stableJsonStringify(Object.fromEntries(stepEntries))}` + ); + const id = options.id ?? sourceHash.slice(0, 32); + if (!SAFE_DYNAMIC_ID_SEGMENT.test(id)) { + throw new WorkflowRuntimeError( + `Invalid dynamic workflow id "${id}". Use only letters, numbers, underscores, hyphens, dots, and @.` + ); + } + + const workflowName = `workflow//dynamic/${id}//${exportName}`; + const stepProxyEntries = stepEntries + .map( + ([alias, stepId]) => + `${JSON.stringify(alias)}: __dynamicUseStep(${JSON.stringify(stepId)})` + ) + .join(',\n '); + + const workflowCode = ` +globalThis.__private_workflows = new Map(); +const __dynamicUseStep = globalThis[Symbol.for("WORKFLOW_USE_STEP")]; +if (typeof __dynamicUseStep !== "function") { + throw new Error("WORKFLOW_USE_STEP is not available in the workflow VM."); +} +const steps = Object.freeze({ + ${stepProxyEntries} +}); +const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; +const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; +${source} +Object.defineProperty(${exportName}, "workflowId", { + value: ${JSON.stringify(workflowName)}, + writable: false, + enumerable: false, + configurable: false +}); +globalThis.__private_workflows.set(${JSON.stringify(workflowName)}, ${exportName}); +`; + + return { + workflowName, + dynamicWorkflow: { + version: 1, + workflowCode, + sourceHash, + exportName, + }, + }; +} + /** * Starts a workflow run. * @@ -114,15 +300,48 @@ export function start( options?: StartOptionsWithoutDeploymentId ): Promise>; +export function start( + source: string, + args: unknown[], + options: DynamicStartOptions +): Promise>; + +export function start( + source: string, + options: DynamicStartOptions +): Promise>; + export async function start( - workflow: WorkflowFunction | WorkflowMetadata, - argsOrOptions?: TArgs | StartOptions, - options?: StartOptions + workflow: WorkflowFunction | WorkflowMetadata | string, + argsOrOptions?: TArgs | StartOptions | DynamicStartOptions, + options?: StartOptions | DynamicStartOptions ) { 'use step'; return await waitedUntil(() => { - // @ts-expect-error this field is added by our client transform - const workflowName = workflow?.workflowId; + let args: Serializable[] = []; + let opts: StartOptions | DynamicStartOptions = options ?? {}; + if (Array.isArray(argsOrOptions)) { + args = argsOrOptions as Serializable[]; + } else if (typeof argsOrOptions === 'object') { + opts = argsOrOptions; + } + + let dynamicWorkflow: DynamicWorkflowExecutionContext | undefined; + let workflowName: string | undefined; + if (typeof workflow === 'string') { + const dynamicOptions = (opts as Partial).dynamic; + if (!dynamicOptions) { + throw new WorkflowRuntimeError( + 'Dynamic workflow source requires options.dynamic.' + ); + } + const compiled = compileDynamicWorkflowSource(workflow, dynamicOptions); + workflowName = compiled.workflowName; + dynamicWorkflow = compiled.dynamicWorkflow; + } else { + // @ts-expect-error this field is added by our client transform + workflowName = workflow?.workflowId; + } if (!workflowName) { throw new WorkflowRuntimeError( @@ -137,14 +356,6 @@ export async function start( ...Attribute.WorkflowOperation('start'), }); - let args: Serializable[] = []; - let opts: StartOptions = options ?? {}; - if (Array.isArray(argsOrOptions)) { - args = argsOrOptions as Serializable[]; - } else if (typeof argsOrOptions === 'object') { - opts = argsOrOptions; - } - span?.setAttributes({ ...Attribute.WorkflowArgumentsCount(args.length), }); @@ -211,6 +422,7 @@ export async function start( traceCarrier, workflowCoreVersion, features: { encryption: !!encryptionKey }, + ...(dynamicWorkflow ? { dynamicWorkflow } : {}), }; // Call events.create (run_created) and queue in parallel. diff --git a/packages/workflow/src/api.ts b/packages/workflow/src/api.ts index a0565a8e2c..5fea46c67b 100644 --- a/packages/workflow/src/api.ts +++ b/packages/workflow/src/api.ts @@ -27,6 +27,9 @@ export { type WorkflowReadableStreamOptions, } from '@workflow/core/runtime/run'; export { + type DynamicStartOptions, + type DynamicWorkflowOptions, + type DynamicWorkflowStepReference, type StartOptions, start, } from '@workflow/core/runtime/start'; From eed88d3d31fbfddc7d801ba9963e5821b69ffad4 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 21 May 2026 13:20:45 -0700 Subject: [PATCH 2/4] Document dynamic workflow runtime globals --- .../docs/v5/foundations/dynamic-workflows.mdx | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/v5/foundations/dynamic-workflows.mdx b/docs/content/docs/v5/foundations/dynamic-workflows.mdx index 63dbe6f4ea..854e4080a6 100644 --- a/docs/content/docs/v5/foundations/dynamic-workflows.mdx +++ b/docs/content/docs/v5/foundations/dynamic-workflows.mdx @@ -51,7 +51,43 @@ const run = await start( ); ``` -The generated workflow ID uses the form `workflow//dynamic///`. If you omit `id`, Workflow SDK derives one from the source and step references. +The generated workflow ID uses the form `workflow//dynamic///`. The `id` field is optional. Use it when you want a stable, human-readable workflow ID segment for queue names, run filtering, and observability. If you omit `id`, Workflow SDK derives one from the source and step references. + +## Predefined runtime globals + +Dynamic workflow source does not use imports. The runtime predefines these bindings inside the generated workflow VM code: + +| Binding | Description | +| --- | --- | +| `steps` | A frozen object containing the step aliases passed through `dynamic.steps`. | +| `sleep` | The Workflow SDK sleep primitive for durable waits and timers. | +| `createHook` | The Workflow SDK hook primitive for durable external resume signals. | + +The source also runs inside the normal deterministic workflow VM, so standard sandbox globals such as `Date`, `Math.random`, `crypto`, `URL`, `URLSearchParams`, `TextEncoder`, `TextDecoder`, `structuredClone`, `atob`, and `btoa` are available with the same determinism constraints as static workflows. + +For the MVP, `createWebhook` and `getWritable` are not predefined in dynamic source. + +```typescript lineNumbers +const source = ` +async function workflow(input) { + "use workflow"; + + await sleep("15m"); + + const approval = createHook({ token: "approval-" + input.userId }); + const result = await Promise.race([ + approval, + sleep("1d").then(() => ({ approved: false, timedOut: true })), + ]); + + if (result.approved) { + await steps.sendEmail(input.email); + } + + return result; +} +`; +``` ## Generated with an LLM @@ -73,8 +109,9 @@ const { text: source } = await generateText({ system: ` Generate one JavaScript async function named workflow. The first statement inside the function must be "use workflow". -Only call tools through the injected steps object. -Do not use import, export, TypeScript syntax, or inline step functions. +Use only these predefined runtime globals: steps, sleep, createHook. +Only call step functions through the injected steps object. +Do not use import, export, TypeScript syntax, inline step functions, createWebhook, or getWritable. `, prompt: "When a customer reports a billing issue, look them up, create a ticket, and notify Slack.", }); @@ -120,6 +157,8 @@ await start(source, [input], { `dynamic.exportName` defaults to `workflow`. Use it when your generated source defines a different async function name. +`dynamic.id` is optional. It does not affect the workflow logic. It only controls the stable ID segment in the generated workflow name. When omitted, Workflow SDK uses a hash of the source and step references. + ## MVP limitations - Dynamic source must be JavaScript and must fit within the 32 KB source limit. From 49371a511683bccba4ecfbd4367e16205eb97d8b Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 21 May 2026 13:24:27 -0700 Subject: [PATCH 3/4] Remove custom dynamic workflow ids --- .../v5/api-reference/workflow-api/start.mdx | 1 - .../docs/v5/foundations/dynamic-workflows.mdx | 6 +---- packages/core/src/runtime/start.test.ts | 24 ++++++++----------- packages/core/src/runtime/start.ts | 21 ++++++---------- 4 files changed, 18 insertions(+), 34 deletions(-) diff --git a/docs/content/docs/v5/api-reference/workflow-api/start.mdx b/docs/content/docs/v5/api-reference/workflow-api/start.mdx index ea31443744..9215aed6ff 100644 --- a/docs/content/docs/v5/api-reference/workflow-api/start.mdx +++ b/docs/content/docs/v5/api-reference/workflow-api/start.mdx @@ -105,7 +105,6 @@ async function workflow(input) { const run = await start(source, [{ userId: "user_123" }], { dynamic: { - id: "ai-user-followup", steps: { fetchUser, sendEmail }, }, }); diff --git a/docs/content/docs/v5/foundations/dynamic-workflows.mdx b/docs/content/docs/v5/foundations/dynamic-workflows.mdx index 854e4080a6..79cc58e0c0 100644 --- a/docs/content/docs/v5/foundations/dynamic-workflows.mdx +++ b/docs/content/docs/v5/foundations/dynamic-workflows.mdx @@ -44,14 +44,13 @@ const run = await start( [{ userId: "user_123" }], { dynamic: { - id: "ai-user-followup", steps: { fetchUser, sendEmail }, }, }, ); ``` -The generated workflow ID uses the form `workflow//dynamic///`. The `id` field is optional. Use it when you want a stable, human-readable workflow ID segment for queue names, run filtering, and observability. If you omit `id`, Workflow SDK derives one from the source and step references. +The generated workflow ID uses the form `workflow//dynamic///`. Workflow SDK derives the hash from the source and step references so the same dynamic workflow source gets a stable generated workflow name. ## Predefined runtime globals @@ -121,7 +120,6 @@ const run = await start( [{ customerId: "cus_123", issue: "Invoice looks wrong" }], { dynamic: { - id: "billing-triage", steps: stepCatalog, }, }, @@ -157,8 +155,6 @@ await start(source, [input], { `dynamic.exportName` defaults to `workflow`. Use it when your generated source defines a different async function name. -`dynamic.id` is optional. It does not affect the workflow logic. It only controls the stable ID segment in the generated workflow name. When omitted, Workflow SDK uses a hash of the source and step references. - ## MVP limitations - Dynamic source must be JavaScript and must fit within the 32 KB source limit. diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index 23c343d9ec..2c0a489795 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -534,15 +534,15 @@ async function workflow(input) { it('stores dynamic workflow code in executionContext and queues the generated workflow name', async () => { await start(validSource, [{ userId: 'user_123' }], { dynamic: { - id: 'ai-user-followup', steps: { fetchUser, sendEmail }, }, }); const [, runCreated] = mockEventsCreate.mock.calls[0]; - expect(runCreated.eventData.workflowName).toBe( - 'workflow//dynamic/ai-user-followup//workflow' - ); + const dynamicWorkflow = + runCreated.eventData.executionContext.dynamicWorkflow; + const expectedWorkflowName = `workflow//dynamic/${dynamicWorkflow.sourceHash.slice(0, 32)}//workflow`; + expect(runCreated.eventData.workflowName).toBe(expectedWorkflowName); expect(runCreated.eventData.executionContext.dynamicWorkflow).toEqual( expect.objectContaining({ version: 1, @@ -556,12 +556,8 @@ async function workflow(input) { ).toContain('__dynamicUseStep("step//./steps//fetchUser")'); const [queueName, queuePayload] = mockQueue.mock.calls[0]; - expect(queueName).toBe( - '__wkf_workflow_workflow//dynamic/ai-user-followup//workflow' - ); - expect(queuePayload.runInput.workflowName).toBe( - 'workflow//dynamic/ai-user-followup//workflow' - ); + expect(queueName).toBe(`__wkf_workflow_${expectedWorkflowName}`); + expect(queuePayload.runInput.workflowName).toBe(expectedWorkflowName); expect(queuePayload.runInput.executionContext.dynamicWorkflow).toEqual( runCreated.eventData.executionContext.dynamicWorkflow ); @@ -570,7 +566,6 @@ async function workflow(input) { it('accepts explicit stepId references', async () => { await start(validSource, [{ userId: 'user_123' }], { dynamic: { - id: 'explicit-step-refs', steps: { fetchUser: { stepId: 'step//./steps//fetchUser' }, sendEmail: { stepId: 'step//./steps//sendEmail' }, @@ -626,15 +621,16 @@ async function workflow(input) { ).rejects.toThrow('must start with a "use workflow" directive'); }); - it('rejects unsafe ids', async () => { + it('rejects custom dynamic workflow ids', async () => { await expect( start(validSource, [], { dynamic: { - id: 'bad/id', + // @ts-expect-error - dynamic IDs are generated from source + steps + id: 'custom-id', steps: { fetchUser, sendEmail }, }, }) - ).rejects.toThrow('Invalid dynamic workflow id'); + ).rejects.toThrow('dynamic.id is not supported'); }); }); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index fbf11937d8..83193801ee 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -80,12 +80,6 @@ export interface DynamicWorkflowOptions { */ steps: Record; - /** - * Optional stable ID segment for the generated workflow ID. Defaults to a - * hash of the source and referenced steps. - */ - id?: string; - /** * Name of the async workflow function in the source. Defaults to "workflow". */ @@ -116,7 +110,6 @@ export interface DynamicWorkflowExecutionContext { } const DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES = 32 * 1024; -const SAFE_DYNAMIC_ID_SEGMENT = /^[a-zA-Z0-9_.@-]+$/; const SAFE_DYNAMIC_IDENTIFIER = /^[a-zA-Z_][a-zA-Z0-9_]*$/; const UNSUPPORTED_DYNAMIC_MODULE_SYNTAX = /(^|[\s;])(?:import\s*(?:[\w*{]|\(|['"])|export\s+(?:async\s+)?(?:function|const|let|var|class|default|\{|\*))/m; @@ -200,6 +193,11 @@ function compileDynamicWorkflowSource( dynamicWorkflow: DynamicWorkflowExecutionContext; } { const exportName = options.exportName ?? 'workflow'; + if ('id' in options) { + throw new WorkflowRuntimeError( + 'dynamic.id is not supported. Dynamic workflow IDs are generated from the source and step references.' + ); + } assertDynamicWorkflowIdentifier('exportName', exportName); validateDynamicWorkflowSource(source, exportName); @@ -219,14 +217,9 @@ function compileDynamicWorkflowSource( const sourceHash = sha256Hex( `${source}\n${stableJsonStringify(Object.fromEntries(stepEntries))}` ); - const id = options.id ?? sourceHash.slice(0, 32); - if (!SAFE_DYNAMIC_ID_SEGMENT.test(id)) { - throw new WorkflowRuntimeError( - `Invalid dynamic workflow id "${id}". Use only letters, numbers, underscores, hyphens, dots, and @.` - ); - } + const workflowIdSegment = sourceHash.slice(0, 32); - const workflowName = `workflow//dynamic/${id}//${exportName}`; + const workflowName = `workflow//dynamic/${workflowIdSegment}//${exportName}`; const stepProxyEntries = stepEntries .map( ([alias, stepId]) => From c03bb28b4d977c8d1561aae37fb5ff23dcd3e94a Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 21 May 2026 13:27:11 -0700 Subject: [PATCH 4/4] Clarify dynamic workflow source storage direction --- docs/content/docs/v5/foundations/dynamic-workflows.mdx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/content/docs/v5/foundations/dynamic-workflows.mdx b/docs/content/docs/v5/foundations/dynamic-workflows.mdx index 79cc58e0c0..a0410621eb 100644 --- a/docs/content/docs/v5/foundations/dynamic-workflows.mdx +++ b/docs/content/docs/v5/foundations/dynamic-workflows.mdx @@ -155,6 +155,10 @@ await start(source, [input], { `dynamic.exportName` defaults to `workflow`. Use it when your generated source defines a different async function name. +## Source storage + +The current prototype stores generated workflow VM code inline with run metadata. Before broad release, dynamic workflow source/code should move to encrypted ref-backed storage so sensitive generated source is protected by the same run encryption model as workflow inputs and step data, and longer workflows do not have to fit inside run metadata storage limits. + ## MVP limitations - Dynamic source must be JavaScript and must fit within the 32 KB source limit. @@ -163,4 +167,4 @@ await start(source, [input], { - `import` and `export` syntax are not supported. - Inline `"use step"` functions are not supported. - TypeScript syntax, runtime bundling, npm package imports, and custom class serialization registration are not supported. -- Dynamic workflow source and generated VM code are stored with run metadata. Do not put secrets in generated source. +- The current prototype stores generated workflow code inline in run metadata. Do not put secrets in generated source until source/code storage moves to encrypted refs.