diff --git a/.changeset/dynamic-workflow-source.md b/.changeset/dynamic-workflow-source.md new file mode 100644 index 0000000000..3cd43198c5 --- /dev/null +++ b/.changeset/dynamic-workflow-source.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"workflow": patch +--- + +Add an experimental dynamic workflow source overload for `start()`. diff --git a/docs/content/docs/v5/api-reference/workflow-api/start.mdx b/docs/content/docs/v5/api-reference/workflow-api/start.mdx index a2962c5ee4..9215aed6ff 100644 --- a/docs/content/docs/v5/api-reference/workflow-api/start.mdx +++ b/docs/content/docs/v5/api-reference/workflow-api/start.mdx @@ -56,6 +56,7 @@ Learn more about [`WorkflowReadableStreamOptions`](/docs/api-reference/workflow- * The function returns immediately after enqueuing the workflow - it doesn't wait for the workflow to complete. * All arguments must be [serializable](/docs/foundations/serialization). * When `deploymentId` is provided, the argument types and return type become `unknown` since there is no guarantee the workflow function's types will be consistent across different deployments. +* You can also start an experimental [dynamic workflow](/docs/foundations/dynamic-workflows) from a trusted source string by passing `options.dynamic`. If `start()` throws `'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`, the passed function was not transformed as a workflow. The two most common causes are a missing `"use workflow"` directive or missing framework integration. See [start-invalid-workflow-function](/docs/errors/start-invalid-workflow-function). @@ -83,6 +84,32 @@ const run = await start(myWorkflow, ["arg1", "arg2"], { // [!code highlight] }); // [!code highlight] ``` +### With Dynamic Source + +Pass a trusted JavaScript source string plus `dynamic.steps` when the workflow orchestration is generated at runtime. + +```typescript +import { start } from "workflow/api"; +import { fetchUser, sendEmail } from "./steps"; + +const source = ` +async function workflow(input) { + "use workflow"; + + const user = await steps.fetchUser(input.userId); + await steps.sendEmail(user.email); + + return { ok: true }; +} +`; + +const run = await start(source, [{ userId: "user_123" }], { + dynamic: { + steps: { fetchUser, sendEmail }, + }, +}); +``` + ### Using `deploymentId: "latest"` Set `deploymentId` to `"latest"` to automatically resolve the most recent deployment for the current environment. This is useful when you want to ensure a workflow run targets the latest deployed version of your application rather than the deployment that initiated the call. For when to use this and how it fits with default run pinning, see [Versioning](/docs/foundations/versioning). diff --git a/docs/content/docs/v5/foundations/dynamic-workflows.mdx b/docs/content/docs/v5/foundations/dynamic-workflows.mdx new file mode 100644 index 0000000000..a0410621eb --- /dev/null +++ b/docs/content/docs/v5/foundations/dynamic-workflows.mdx @@ -0,0 +1,170 @@ +--- +title: Dynamic Workflows +description: Start trusted workflow source strings that orchestrate already-registered steps. +type: conceptual +summary: Run workflow code generated at runtime without adding a new build-time workflow file. +prerequisites: + - /docs/foundations/workflows-and-steps + - /docs/foundations/starting-workflows +related: + - /docs/api-reference/workflow-api/start + - /docs/foundations/serialization + - /docs/foundations/versioning +--- + +Dynamic workflows let you start a workflow from a JavaScript source string instead of importing a workflow function that was discovered at build time. This is useful when the orchestration is generated at runtime, such as from a workflow builder UI or an LLM. + + +Dynamic workflows are experimental. Treat the source as trusted application code. The workflow VM enforces the usual deterministic workflow runtime, but it is not a security sandbox for untrusted JavaScript. + + +## How it works + +Dynamic workflows reuse the normal Workflow SDK runtime. The dynamic source is compiled into per-run workflow VM code, stored on the run, and replayed from the event log just like a static workflow. + +For the MVP, dynamic workflows can only orchestrate step functions that are already registered in the deployment. Pass those step references through `dynamic.steps`, then call them from the source through the injected `steps` object. + +```typescript lineNumbers +import { start } from "workflow/api"; +import { fetchUser, sendEmail } from "./steps"; + +const source = ` +async function workflow(input) { + "use workflow"; + + const user = await steps.fetchUser(input.userId); + await steps.sendEmail(user.email); + + return { ok: true }; +} +`; + +const run = await start( + source, + [{ userId: "user_123" }], + { + dynamic: { + steps: { fetchUser, sendEmail }, + }, + }, +); +``` + +The generated workflow ID uses the form `workflow//dynamic///`. Workflow SDK derives the hash from the source and step references so the same dynamic workflow source gets a stable generated workflow name. + +## Predefined runtime globals + +Dynamic workflow source does not use imports. The runtime predefines these bindings inside the generated workflow VM code: + +| Binding | Description | +| --- | --- | +| `steps` | A frozen object containing the step aliases passed through `dynamic.steps`. | +| `sleep` | The Workflow SDK sleep primitive for durable waits and timers. | +| `createHook` | The Workflow SDK hook primitive for durable external resume signals. | + +The source also runs inside the normal deterministic workflow VM, so standard sandbox globals such as `Date`, `Math.random`, `crypto`, `URL`, `URLSearchParams`, `TextEncoder`, `TextDecoder`, `structuredClone`, `atob`, and `btoa` are available with the same determinism constraints as static workflows. + +For the MVP, `createWebhook` and `getWritable` are not predefined in dynamic source. + +```typescript lineNumbers +const source = ` +async function workflow(input) { + "use workflow"; + + await sleep("15m"); + + const approval = createHook({ token: "approval-" + input.userId }); + const result = await Promise.race([ + approval, + sleep("1d").then(() => ({ approved: false, timedOut: true })), + ]); + + if (result.approved) { + await steps.sendEmail(input.email); + } + + return result; +} +`; +``` + +## Generated with an LLM + +Keep the step catalog explicit. The model should choose from steps you provide, and your application should review or validate the generated source before starting it. + +```typescript lineNumbers +import { generateText } from "ai"; +import { start } from "workflow/api"; +import { createTicket, lookupCustomer, sendSlackMessage } from "./steps"; + +const stepCatalog = { + lookupCustomer, + createTicket, + sendSlackMessage, +}; + +const { text: source } = await generateText({ + model: myModel, + system: ` +Generate one JavaScript async function named workflow. +The first statement inside the function must be "use workflow". +Use only these predefined runtime globals: steps, sleep, createHook. +Only call step functions through the injected steps object. +Do not use import, export, TypeScript syntax, inline step functions, createWebhook, or getWritable. +`, + prompt: "When a customer reports a billing issue, look them up, create a ticket, and notify Slack.", +}); + +const run = await start( + source, + [{ customerId: "cus_123", issue: "Invoice looks wrong" }], + { + dynamic: { + steps: stepCatalog, + }, + }, +); +``` + +## Options + +`dynamic.steps` is required. Each value can be an imported step function or an explicit `{ stepId }` reference. + +```typescript lineNumbers +import { start } from "workflow/api"; +import { fetchUser, sendEmail } from "./steps"; + +const source = ` +async function workflow(input) { + "use workflow"; + return await steps.fetchUser(input.userId); +} +`; +const input = { userId: "user_123" }; + +await start(source, [input], { + dynamic: { + steps: { + fetchUser, + sendEmail: { stepId: "step//./steps//sendEmail" }, + }, + exportName: "workflow", + }, +}); +``` + +`dynamic.exportName` defaults to `workflow`. Use it when your generated source defines a different async function name. + +## Source storage + +The current prototype stores generated workflow VM code inline with run metadata. Before broad release, dynamic workflow source/code should move to encrypted ref-backed storage so sensitive generated source is protected by the same run encryption model as workflow inputs and step data, and longer workflows do not have to fit inside run metadata storage limits. + +## MVP limitations + +- Dynamic source must be JavaScript and must fit within the 32 KB source limit. +- The source must define `async function workflow(...)` by default, or the function named by `dynamic.exportName`. +- The first statement in that function must be `"use workflow";`. +- `import` and `export` syntax are not supported. +- Inline `"use step"` functions are not supported. +- TypeScript syntax, runtime bundling, npm package imports, and custom class serialization registration are not supported. +- The current prototype stores generated workflow code inline in run metadata. Do not put secrets in generated source until source/code storage moves to encrypted refs. diff --git a/docs/content/docs/v5/foundations/index.mdx b/docs/content/docs/v5/foundations/index.mdx index 1eb96ddf69..7dd1c80920 100644 --- a/docs/content/docs/v5/foundations/index.mdx +++ b/docs/content/docs/v5/foundations/index.mdx @@ -17,6 +17,9 @@ Workflow programming can be a slight shift from how you traditionally write real Trigger workflows and track their execution using the `start()` function. + + Start trusted workflow source strings that orchestrate registered steps. + Types of errors and how retrying work in workflows. diff --git a/docs/content/docs/v5/foundations/meta.json b/docs/content/docs/v5/foundations/meta.json index ce49cfe47b..66e859f143 100644 --- a/docs/content/docs/v5/foundations/meta.json +++ b/docs/content/docs/v5/foundations/meta.json @@ -3,6 +3,7 @@ "pages": [ "workflows-and-steps", "starting-workflows", + "dynamic-workflows", "errors-and-retries", "hooks", "streaming", diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index 90603df446..05ccf776c0 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -10,6 +10,8 @@ import { workflowEntrypoint } from './runtime.js'; import { dehydrateStepReturnValue, dehydrateWorkflowArguments, + hydrateRunError, + hydrateWorkflowReturnValue, } from './serialization.js'; vi.mock('@vercel/functions', () => ({ @@ -58,7 +60,7 @@ async function runWorkflowHandlerWithEvents( { requestId: 'req_test', attempt: 1, - queueName: '__wkf_workflow_workflow', + queueName: `__wkf_workflow_${workflowRun.workflowName}`, messageId: 'msg_test', } ); @@ -97,6 +99,95 @@ describe('workflowEntrypoint replay guards', () => { `;globalThis.__private_workflows = new Map(); globalThis.__private_workflows.set(${JSON.stringify(workflowName)}, ${workflowName});`; + it('uses dynamic workflow code from the run executionContext when present', async () => { + const ops: Promise[] = []; + const workflowRun: WorkflowRun = { + runId: 'wrun_runtime_dynamic', + workflowName: 'workflow//dynamic/test-run//workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + ['Ada'], + 'wrun_runtime_dynamic', + undefined, + ops + ), + executionContext: { + dynamicWorkflow: { + version: 1, + sourceHash: 'hash', + exportName: 'workflow', + workflowCode: ` + async function workflow(name) { + return "hello " + name; + } + ;globalThis.__private_workflows = new Map(); + globalThis.__private_workflows.set("workflow//dynamic/test-run//workflow", workflow); + `, + }, + }, + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + + const events: Event[] = [ + { + eventId: 'event-0', + runId: workflowRun.runId, + eventType: 'run_created', + eventData: { + input: workflowRun.input, + deploymentId: workflowRun.deploymentId, + workflowName: workflowRun.workflowName, + executionContext: workflowRun.executionContext, + }, + createdAt: new Date('2024-01-01T00:00:00.000Z'), + specVersion: SPEC_VERSION_CURRENT, + }, + { + eventId: 'event-1', + runId: workflowRun.runId, + eventType: 'run_started', + createdAt: new Date('2024-01-01T00:00:00.000Z'), + specVersion: SPEC_VERSION_CURRENT, + }, + ]; + + const createdEvents = await runWorkflowHandlerWithEvents( + `function workflow() { throw new Error("static bundle should not run"); }${getWorkflowTransformCode('workflow')}`, + workflowRun, + events + ); + + const runCompleted = createdEvents.find( + (event: any) => event.eventType === 'run_completed' + ) as any; + if (!runCompleted) { + const runFailed = createdEvents.find( + (event: any) => event.eventType === 'run_failed' + ) as any; + if (runFailed) { + const error = await hydrateRunError( + runFailed.eventData.error, + workflowRun.runId, + undefined, + ops + ); + throw error; + } + } + expect(runCompleted).toBeDefined(); + expect( + await hydrateWorkflowReturnValue( + runCompleted.eventData.output, + workflowRun.runId, + undefined, + ops + ) + ).toBe('hello Ada'); + }); + it('records run_failed when a committed wait_completed targets the wrong wait', async () => { const ops: Promise[] = []; const workflowRun: WorkflowRun = { diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 60df07876a..c1409a035e 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -85,6 +85,9 @@ export { wakeUpRun, } from './runtime/runs.js'; export { + type DynamicStartOptions, + type DynamicWorkflowOptions, + type DynamicWorkflowStepReference, type StartOptions, type StartOptionsBase, type StartOptionsWithDeploymentId, @@ -781,8 +784,12 @@ export function workflowEntrypoint( eventCount: events.length, }); replayStart = Date.now(); - const result = await runWorkflow( + const workflowCodeForRun = getWorkflowCodeForRun( workflowCode, + workflowRun + ); + const result = await runWorkflow( + workflowCodeForRun, workflowRun, events, encryptionKey @@ -1060,10 +1067,14 @@ export function workflowEntrypoint( const parsedName = parseWorkflowName(workflowName); const filename = parsedName?.moduleSpecifier || workflowName; + const workflowCodeForRun = getWorkflowCodeForRun( + workflowCode, + workflowRun + ); errorStack = remapErrorStack( errorStack, filename, - workflowCode + workflowCodeForRun ); } @@ -1157,6 +1168,24 @@ export function workflowEntrypoint( if (!cachedHandler) { cachedHandler = handler(await getWorldHandlers()); } - return cachedHandler(req); + return cachedHandler!(req); }); } + +function getWorkflowCodeForRun( + staticWorkflowCode: string, + workflowRun: WorkflowRun +): string { + const dynamicWorkflow = workflowRun.executionContext?.dynamicWorkflow; + if ( + dynamicWorkflow && + typeof dynamicWorkflow === 'object' && + 'version' in dynamicWorkflow && + dynamicWorkflow.version === 1 && + 'workflowCode' in dynamicWorkflow && + typeof dynamicWorkflow.workflowCode === 'string' + ) { + return dynamicWorkflow.workflowCode; + } + return staticWorkflowCode; +} diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index c30a17da27..2c0a489795 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -491,6 +491,149 @@ describe('start', () => { }); }); + describe('dynamic workflow source', () => { + let mockEventsCreate: ReturnType; + let mockQueue: ReturnType; + + const validSource = ` +async function workflow(input) { + "use workflow"; + const user = await steps.fetchUser(input.userId); + await steps.sendEmail(user.email); + return { ok: true }; +}`; + + const fetchUser = Object.assign(async () => undefined, { + stepId: 'step//./steps//fetchUser', + }); + const sendEmail = Object.assign(async () => undefined, { + stepId: 'step//./steps//sendEmail', + }); + + beforeEach(() => { + mockEventsCreate = vi.fn().mockImplementation((runId) => { + return Promise.resolve({ + run: { runId: runId ?? 'wrun_test123', status: 'pending' }, + }); + }); + mockQueue = vi.fn().mockResolvedValue({ messageId: null }); + + setWorld({ + specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + }); + + afterEach(() => { + setWorld(undefined); + vi.clearAllMocks(); + }); + + it('stores dynamic workflow code in executionContext and queues the generated workflow name', async () => { + await start(validSource, [{ userId: 'user_123' }], { + dynamic: { + steps: { fetchUser, sendEmail }, + }, + }); + + const [, runCreated] = mockEventsCreate.mock.calls[0]; + const dynamicWorkflow = + runCreated.eventData.executionContext.dynamicWorkflow; + const expectedWorkflowName = `workflow//dynamic/${dynamicWorkflow.sourceHash.slice(0, 32)}//workflow`; + expect(runCreated.eventData.workflowName).toBe(expectedWorkflowName); + expect(runCreated.eventData.executionContext.dynamicWorkflow).toEqual( + expect.objectContaining({ + version: 1, + exportName: 'workflow', + sourceHash: expect.any(String), + workflowCode: expect.stringContaining('steps = Object.freeze'), + }) + ); + expect( + runCreated.eventData.executionContext.dynamicWorkflow.workflowCode + ).toContain('__dynamicUseStep("step//./steps//fetchUser")'); + + const [queueName, queuePayload] = mockQueue.mock.calls[0]; + expect(queueName).toBe(`__wkf_workflow_${expectedWorkflowName}`); + expect(queuePayload.runInput.workflowName).toBe(expectedWorkflowName); + expect(queuePayload.runInput.executionContext.dynamicWorkflow).toEqual( + runCreated.eventData.executionContext.dynamicWorkflow + ); + }); + + it('accepts explicit stepId references', async () => { + await start(validSource, [{ userId: 'user_123' }], { + dynamic: { + steps: { + fetchUser: { stepId: 'step//./steps//fetchUser' }, + sendEmail: { stepId: 'step//./steps//sendEmail' }, + }, + }, + }); + + const [, runCreated] = mockEventsCreate.mock.calls[0]; + expect( + runCreated.eventData.executionContext.dynamicWorkflow.workflowCode + ).toContain('__dynamicUseStep("step//./steps//sendEmail")'); + }); + + it('rejects source strings without dynamic options', async () => { + await expect( + // @ts-expect-error - intentionally missing dynamic options + start(validSource, []) + ).rejects.toThrow('Dynamic workflow source requires options.dynamic'); + }); + + it('rejects missing steps', async () => { + await expect( + start(validSource, [], { dynamic: { steps: {} } }) + ).rejects.toThrow('dynamic.steps'); + }); + + it('rejects step aliases without stepId metadata', async () => { + await expect( + start(validSource, [], { + dynamic: { + steps: { + fetchUser: (async () => undefined) as any, + sendEmail, + }, + }, + }) + ).rejects.toThrow('must be an imported step function'); + }); + + it('rejects unsupported module syntax', async () => { + await expect( + start(`import { x } from './x';\n${validSource}`, [], { + dynamic: { steps: { fetchUser, sendEmail } }, + }) + ).rejects.toThrow('cannot contain import or export syntax'); + }); + + it('rejects source without a use workflow directive', async () => { + await expect( + start('async function workflow() { return 1; }', [], { + dynamic: { steps: { fetchUser, sendEmail } }, + }) + ).rejects.toThrow('must start with a "use workflow" directive'); + }); + + it('rejects custom dynamic workflow ids', async () => { + await expect( + start(validSource, [], { + dynamic: { + // @ts-expect-error - dynamic IDs are generated from source + steps + id: 'custom-id', + steps: { fetchUser, sendEmail }, + }, + }) + ).rejects.toThrow('dynamic.id is not supported'); + }); + }); + describe('overload type inference', () => { // Type-only assertions that don't execute start() at runtime. // We use expectTypeOf on the function signature's return type directly. @@ -532,5 +675,17 @@ describe('start', () => { ) => Promise>; expectTypeOf().toMatchTypeOf(); }); + + it('should return Run for dynamic workflow source', () => { + expectTypeOf< + ( + source: string, + args: unknown[], + opts: { + dynamic: { steps: Record }; + } + ) => Promise> + >().toMatchTypeOf(); + }); }); }); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 6f20e81c2a..83193801ee 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,3 +1,4 @@ +import { createHash } from 'node:crypto'; import { waitUntil } from '@vercel/functions'; import { EntityConflictError, @@ -20,9 +21,9 @@ import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; +import { getWorldLazy } from './get-world-lazy.js'; import { getWorkflowQueueName } from './helpers.js'; import { Run } from './run.js'; -import { getWorldLazy } from './get-world-lazy.js'; /** ULID generator for client-side runId generation */ const ulid = monotonicFactory(); @@ -68,6 +69,27 @@ export type StartOptions = | StartOptionsWithDeploymentId | StartOptionsWithoutDeploymentId; +export type DynamicWorkflowStepReference = { readonly stepId: string }; + +export interface DynamicWorkflowOptions { + /** + * Already-registered step functions exposed to the dynamic workflow source. + * + * Each value may be an imported step function transformed by Workflow SDK + * (with a `.stepId` property) or an explicit `{ stepId }` reference. + */ + steps: Record; + + /** + * Name of the async workflow function in the source. Defaults to "workflow". + */ + exportName?: string; +} + +export type DynamicStartOptions = StartOptions & { + dynamic: DynamicWorkflowOptions; +}; + /** * Represents an imported workflow function. */ @@ -80,6 +102,163 @@ export type WorkflowFunction = ( */ export type WorkflowMetadata = { workflowId: string }; +export interface DynamicWorkflowExecutionContext { + version: 1; + workflowCode: string; + sourceHash: string; + exportName: string; +} + +const DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES = 32 * 1024; +const SAFE_DYNAMIC_IDENTIFIER = /^[a-zA-Z_][a-zA-Z0-9_]*$/; +const UNSUPPORTED_DYNAMIC_MODULE_SYNTAX = + /(^|[\s;])(?:import\s*(?:[\w*{]|\(|['"])|export\s+(?:async\s+)?(?:function|const|let|var|class|default|\{|\*))/m; + +function stableJsonStringify(value: unknown): string { + if (Array.isArray(value)) { + return `[${value.map(stableJsonStringify).join(',')}]`; + } + if (value && typeof value === 'object') { + return `{${Object.entries(value as Record) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([key, val]) => `${JSON.stringify(key)}:${stableJsonStringify(val)}`) + .join(',')}}`; + } + return JSON.stringify(value); +} + +function sha256Hex(input: string): string { + return createHash('sha256').update(input).digest('hex'); +} + +function assertDynamicWorkflowIdentifier(kind: string, value: string) { + if (!SAFE_DYNAMIC_IDENTIFIER.test(value)) { + throw new WorkflowRuntimeError( + `Invalid dynamic workflow ${kind} "${value}". Use only letters, numbers, and underscores, and start with a letter or underscore.` + ); + } +} + +function validateDynamicWorkflowSource(source: string, exportName: string) { + if (Buffer.byteLength(source, 'utf8') > DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES) { + throw new WorkflowRuntimeError( + `Dynamic workflow source is too large. The MVP limit is ${DYNAMIC_WORKFLOW_SOURCE_MAX_BYTES} bytes.` + ); + } + + if (UNSUPPORTED_DYNAMIC_MODULE_SYNTAX.test(source)) { + throw new WorkflowRuntimeError( + 'Dynamic workflow source cannot contain import or export syntax in the MVP.' + ); + } + + const functionMatch = new RegExp( + `\\basync\\s+function\\s+${exportName}\\s*\\([^)]*\\)\\s*\\{` + ).exec(source); + if (!functionMatch) { + throw new WorkflowRuntimeError( + `Dynamic workflow source must define async function ${exportName}(...).` + ); + } + + const bodyStart = functionMatch.index + functionMatch[0].length; + const bodyPrefix = source.slice(bodyStart, bodyStart + 200); + if (!/^\s*(?:"use workflow"|'use workflow')\s*;/.test(bodyPrefix)) { + throw new WorkflowRuntimeError( + `Dynamic workflow function "${exportName}" must start with a "use workflow" directive.` + ); + } +} + +function getDynamicStepId(alias: string, value: unknown): string { + const stepId = + (value && typeof value === 'object') || typeof value === 'function' + ? (value as { stepId?: unknown }).stepId + : undefined; + + if (typeof stepId !== 'string' || stepId.length === 0) { + throw new WorkflowRuntimeError( + `Dynamic workflow step "${alias}" must be an imported step function or an object with a non-empty stepId.` + ); + } + + return stepId; +} + +function compileDynamicWorkflowSource( + source: string, + options: DynamicWorkflowOptions +): { + workflowName: string; + dynamicWorkflow: DynamicWorkflowExecutionContext; +} { + const exportName = options.exportName ?? 'workflow'; + if ('id' in options) { + throw new WorkflowRuntimeError( + 'dynamic.id is not supported. Dynamic workflow IDs are generated from the source and step references.' + ); + } + assertDynamicWorkflowIdentifier('exportName', exportName); + validateDynamicWorkflowSource(source, exportName); + + if (!options.steps || Object.keys(options.steps).length === 0) { + throw new WorkflowRuntimeError( + 'Dynamic workflow options must include at least one registered step in dynamic.steps.' + ); + } + + const stepEntries = Object.entries(options.steps) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([alias, value]) => { + assertDynamicWorkflowIdentifier(`step alias`, alias); + return [alias, getDynamicStepId(alias, value)] as const; + }); + + const sourceHash = sha256Hex( + `${source}\n${stableJsonStringify(Object.fromEntries(stepEntries))}` + ); + const workflowIdSegment = sourceHash.slice(0, 32); + + const workflowName = `workflow//dynamic/${workflowIdSegment}//${exportName}`; + const stepProxyEntries = stepEntries + .map( + ([alias, stepId]) => + `${JSON.stringify(alias)}: __dynamicUseStep(${JSON.stringify(stepId)})` + ) + .join(',\n '); + + const workflowCode = ` +globalThis.__private_workflows = new Map(); +const __dynamicUseStep = globalThis[Symbol.for("WORKFLOW_USE_STEP")]; +if (typeof __dynamicUseStep !== "function") { + throw new Error("WORKFLOW_USE_STEP is not available in the workflow VM."); +} +const steps = Object.freeze({ + ${stepProxyEntries} +}); +const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; +const createHook = globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")]; +${source} +Object.defineProperty(${exportName}, "workflowId", { + value: ${JSON.stringify(workflowName)}, + writable: false, + enumerable: false, + configurable: false +}); +globalThis.__private_workflows.set(${JSON.stringify(workflowName)}, ${exportName}); +`; + + return { + workflowName, + dynamicWorkflow: { + version: 1, + workflowCode, + sourceHash, + exportName, + }, + }; +} + /** * Starts a workflow run. * @@ -114,15 +293,48 @@ export function start( options?: StartOptionsWithoutDeploymentId ): Promise>; +export function start( + source: string, + args: unknown[], + options: DynamicStartOptions +): Promise>; + +export function start( + source: string, + options: DynamicStartOptions +): Promise>; + export async function start( - workflow: WorkflowFunction | WorkflowMetadata, - argsOrOptions?: TArgs | StartOptions, - options?: StartOptions + workflow: WorkflowFunction | WorkflowMetadata | string, + argsOrOptions?: TArgs | StartOptions | DynamicStartOptions, + options?: StartOptions | DynamicStartOptions ) { 'use step'; return await waitedUntil(() => { - // @ts-expect-error this field is added by our client transform - const workflowName = workflow?.workflowId; + let args: Serializable[] = []; + let opts: StartOptions | DynamicStartOptions = options ?? {}; + if (Array.isArray(argsOrOptions)) { + args = argsOrOptions as Serializable[]; + } else if (typeof argsOrOptions === 'object') { + opts = argsOrOptions; + } + + let dynamicWorkflow: DynamicWorkflowExecutionContext | undefined; + let workflowName: string | undefined; + if (typeof workflow === 'string') { + const dynamicOptions = (opts as Partial).dynamic; + if (!dynamicOptions) { + throw new WorkflowRuntimeError( + 'Dynamic workflow source requires options.dynamic.' + ); + } + const compiled = compileDynamicWorkflowSource(workflow, dynamicOptions); + workflowName = compiled.workflowName; + dynamicWorkflow = compiled.dynamicWorkflow; + } else { + // @ts-expect-error this field is added by our client transform + workflowName = workflow?.workflowId; + } if (!workflowName) { throw new WorkflowRuntimeError( @@ -137,14 +349,6 @@ export async function start( ...Attribute.WorkflowOperation('start'), }); - let args: Serializable[] = []; - let opts: StartOptions = options ?? {}; - if (Array.isArray(argsOrOptions)) { - args = argsOrOptions as Serializable[]; - } else if (typeof argsOrOptions === 'object') { - opts = argsOrOptions; - } - span?.setAttributes({ ...Attribute.WorkflowArgumentsCount(args.length), }); @@ -211,6 +415,7 @@ export async function start( traceCarrier, workflowCoreVersion, features: { encryption: !!encryptionKey }, + ...(dynamicWorkflow ? { dynamicWorkflow } : {}), }; // Call events.create (run_created) and queue in parallel. diff --git a/packages/workflow/src/api.ts b/packages/workflow/src/api.ts index a0565a8e2c..5fea46c67b 100644 --- a/packages/workflow/src/api.ts +++ b/packages/workflow/src/api.ts @@ -27,6 +27,9 @@ export { type WorkflowReadableStreamOptions, } from '@workflow/core/runtime/run'; export { + type DynamicStartOptions, + type DynamicWorkflowOptions, + type DynamicWorkflowStepReference, type StartOptions, start, } from '@workflow/core/runtime/start';