diff --git a/.changeset/tired-pigs-hug.md b/.changeset/tired-pigs-hug.md new file mode 100644 index 0000000000..068dc24f2e --- /dev/null +++ b/.changeset/tired-pigs-hug.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": minor +"workflow": minor +--- + +A `WritableStream` from a workflow's `getWritable()` can now be passed as an argument to a child workflow via `start()`; the child's writes land on the parent run's stream directly for the full lifetime of the child run. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 435a24140f..5ae160d3d7 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -405,7 +405,9 @@ describe('e2e', () => { test( 'hookWorkflow is not resumable via public webhook endpoint', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const token = Math.random().toString(36).slice(2); const customData = Math.random().toString(36).slice(2); @@ -835,7 +837,9 @@ describe('e2e', () => { test( 'outputStreamInsideStepWorkflow - getWritable() called inside step functions', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('outputStreamInsideStepWorkflow'), []); const reader = run.getReadable().getReader(); @@ -877,6 +881,50 @@ describe('e2e', () => { } ); + // A WritableStream passed as a workflow argument to start() should + // land raw bytes on the parent's output stream when the child step + // writes to it. Covered for both: + // - `writableForwardedFromWorkflowWorkflow`: parent calls + // `getWritable()` in workflow context (fake handle revived in the + // intermediary step). + // - `writableForwardedFromStepWorkflow`: parent calls `getWritable()` + // in step context (real `serialize.writable` passed straight to + // `start()`). + test.each([ + 'writableForwardedFromWorkflowWorkflow', + 'writableForwardedFromStepWorkflow', + ] as const)('%s', { timeout: 120_000 }, async (workflowName) => { + const payload = `hello-from-child-${Date.now()}\n`; + const run = await start(await e2e(workflowName), [payload]); + + const reader = run.getReadable().getReader(); + // `fatal: true` makes the decoder throw on any invalid UTF-8 + // sequence, so a successful decode is itself a round-trip + // assertion that the bytes survived intact. + const decoder = new TextDecoder('utf-8', { fatal: true }); + + // The child step performs exactly one write of `payload` as + // UTF-8 bytes, so we should receive a single chunk containing + // exactly those bytes before the stream closes. + const { value, done } = await reader.read(); + expect(done).toBeFalsy(); + assert(value); + assert(value instanceof Uint8Array); + + const expectedBytes = new TextEncoder().encode(payload); + expect(value.byteLength).toBe(expectedBytes.byteLength); + expect(decoder.decode(value)).toBe(payload); + + // Default stream should close cleanly after the parent closes its + // writable. + expect((await reader.read()).done).toBe(true); + + const returnValue = await run.returnValue; + expect(returnValue).toMatchObject({ + childRunId: expect.stringMatching(/^wrun_/), + }); + }); + test('fetchWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('fetchWorkflow'), []); const returnValue = await run.returnValue; @@ -901,7 +949,9 @@ describe('e2e', () => { describe('workflow errors', () => { test( 'nested function calls preserve message and stack trace', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorWorkflowNested'), []); const error = await run.returnValue.catch((e: unknown) => e); @@ -933,7 +983,9 @@ describe('e2e', () => { test( 'cross-file imports preserve message and stack trace', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorWorkflowCrossFile'), []); const error = await run.returnValue.catch((e: unknown) => e); @@ -963,7 +1015,9 @@ describe('e2e', () => { describe('step errors', () => { test( 'basic step error preserves message and stack trace', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorStepBasic'), []); const result = await run.returnValue; @@ -1015,7 +1069,9 @@ describe('e2e', () => { test( 'cross-file step error preserves message and function names in stack', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorStepCrossFile'), []); const result = await run.returnValue; @@ -1072,7 +1128,9 @@ describe('e2e', () => { describe('retry behavior', () => { test( 'regular Error retries until success', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorRetrySuccess'), []); const result = await run.returnValue; @@ -1092,7 +1150,9 @@ describe('e2e', () => { test( 'FatalError fails immediately without retries', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorRetryFatal'), []); const error = await run.returnValue.catch((e: unknown) => e); @@ -1115,7 +1175,9 @@ describe('e2e', () => { test( 'RetryableError respects custom retryAfter delay', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorRetryCustomDelay'), []); const result = await run.returnValue; @@ -1137,7 +1199,9 @@ describe('e2e', () => { describe('catchability', () => { test( 'FatalError can be caught and detected with FatalError.is()', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('errorFatalCatchable'), []); const result = await run.returnValue; @@ -1155,7 +1219,9 @@ describe('e2e', () => { describe('not registered', () => { test( 'WorkflowNotRegisteredError fails the run when workflow does not exist', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Start a run with a workflowId that doesn't exist in the deployment bundle. // This simulates starting a run against a deployment that doesn't have the workflow. @@ -1179,7 +1245,9 @@ describe('e2e', () => { test( 'StepNotRegisteredError fails the step but workflow can catch it', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('stepNotRegisteredCatchable'), []); const result = await run.returnValue; @@ -1205,7 +1273,9 @@ describe('e2e', () => { test( 'StepNotRegisteredError fails the run when not caught in workflow', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('stepNotRegisteredUncaught'), []); const error = await run.returnValue.catch((e: unknown) => e); @@ -1221,7 +1291,9 @@ describe('e2e', () => { test( 'stepDirectCallWorkflow - calling step functions directly outside workflow context', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Call the API route that directly calls a step function (no workflow context) const url = new URL('/api/test-direct-step-call', deploymentUrl); @@ -1251,7 +1323,9 @@ describe('e2e', () => { test( 'hookCleanupTestWorkflow - hook token reuse after workflow completion', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const token = Math.random().toString(36).slice(2); const customData = Math.random().toString(36).slice(2); @@ -1319,7 +1393,9 @@ describe('e2e', () => { test( 'concurrent hook token conflict - two workflows cannot use the same hook token simultaneously', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const token = Math.random().toString(36).slice(2); const customData = Math.random().toString(36).slice(2); @@ -1380,7 +1456,9 @@ describe('e2e', () => { test( 'hookDisposeTestWorkflow - hook token reuse after explicit disposal while workflow still running', - { timeout: 90_000 }, + { + timeout: 90_000, + }, async () => { const token = Math.random().toString(36).slice(2); const customData = Math.random().toString(36).slice(2); @@ -1461,7 +1539,9 @@ describe('e2e', () => { test( 'stepFunctionPassingWorkflow - step function references can be passed as arguments (without closure vars)', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This workflow passes a step function reference to another step // The receiving step calls the passed function and returns the result @@ -1493,7 +1573,9 @@ describe('e2e', () => { test( 'stepFunctionWithClosureWorkflow - step function with closure variables passed as argument', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This workflow creates a nested step function with closure variables, // then passes it to another step which invokes it. @@ -1518,7 +1600,9 @@ describe('e2e', () => { test( 'closureVariableWorkflow - nested step functions with closure variables', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This workflow uses a nested step function that references closure variables // from the parent workflow scope (multiplier, prefix, baseValue) @@ -1532,7 +1616,9 @@ describe('e2e', () => { test( 'spawnWorkflowFromStepWorkflow - spawning a child workflow using start() inside a step', - { timeout: 120_000 }, + { + timeout: 120_000, + }, async () => { // This workflow spawns another workflow using start() inside a step function // This is the recommended pattern for spawning workflows from within workflows @@ -1645,7 +1731,9 @@ describe('e2e', () => { test( 'health check (queue-based) - workflow and step endpoints respond to health check messages', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Tests the queue-based health check using healthCheck() directly. // This bypasses Vercel Deployment Protection by sending messages @@ -1666,7 +1754,9 @@ describe('e2e', () => { test( 'health check (CLI) - workflow health command reports healthy endpoints', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // NOTE: This tests the `workflow health` CLI command which uses the // queue-based health check under the hood. The CLI provides a convenient @@ -1697,7 +1787,9 @@ describe('e2e', () => { test( 'pathsAliasWorkflow - TypeScript path aliases resolve correctly', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This workflow uses a step that calls a helper function imported via @repo/* path alias // which resolves to a file outside the workbench directory (../../lib/steps/paths-alias-test.ts) @@ -1721,7 +1813,9 @@ describe('e2e', () => { test( 'Calculator.calculate - static workflow method using static step methods from another class', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Calculator.calculate(5, 3) should: // 1. MathService.add(5, 3) = 8 @@ -1743,7 +1837,9 @@ describe('e2e', () => { test( 'AllInOneService.processNumber - static workflow method using sibling static step methods', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // AllInOneService.processNumber(10) should: // 1. AllInOneService.double(10) = 20 @@ -1766,7 +1862,9 @@ describe('e2e', () => { test( 'ChainableService.processWithThis - static step methods using `this` to reference the class', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // ChainableService.processWithThis(5) should: // - ChainableService.multiplyByClassValue(5) uses `this.multiplier` (10) -> 5 * 10 = 50 @@ -1800,7 +1898,9 @@ describe('e2e', () => { test( 'thisSerializationWorkflow - step function invoked with .call() and .apply()', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // thisSerializationWorkflow(10) should: // 1. multiplyByFactor.call({ factor: 2 }, 10) = 20 @@ -1823,7 +1923,9 @@ describe('e2e', () => { test( 'customSerializationWorkflow - custom class serialization with WORKFLOW_SERIALIZE/WORKFLOW_DESERIALIZE', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This workflow tests custom serialization of user-defined class instances. // The Point class uses WORKFLOW_SERIALIZE and WORKFLOW_DESERIALIZE symbols @@ -1860,7 +1962,9 @@ describe('e2e', () => { test( 'instanceMethodStepWorkflow - instance methods with "use step" directive', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This workflow tests instance methods marked with "use step". // The Counter class has custom serialization so the `this` context @@ -1955,7 +2059,9 @@ describe('e2e', () => { test( 'crossContextSerdeWorkflow - classes defined in step code are deserializable in workflow context', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // This is a critical test for the cross-context class registration feature. // @@ -2008,7 +2114,9 @@ describe('e2e', () => { test( 'stepFunctionAsStartArgWorkflow - step function reference passed as start() argument', - { timeout: 120_000 }, + { + timeout: 120_000, + }, async () => { // This test verifies that step function references can be: // 1. Serialized in the client bundle (the SWC plugin sets stepId property on the function) @@ -2073,7 +2181,9 @@ describe('e2e', () => { // ==================== CANCEL TESTS ==================== test( 'cancelRun - cancelling a running workflow', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Start a long-running workflow with a 30s sleep to provide a wide // window for the cancel to arrive while the workflow is still running. @@ -2100,7 +2210,9 @@ describe('e2e', () => { test( 'cancelRun via CLI - cancelling a running workflow', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Start a long-running workflow with a 30s sleep to provide a wide // window for the cancel to arrive while the workflow is still running. @@ -2145,7 +2257,9 @@ describe('e2e', () => { test( 'promiseAllWorkflow via pages router', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await startWorkflowViaHttp( 'promiseAllWorkflow', @@ -2177,7 +2291,9 @@ describe('e2e', () => { test( 'hookWithSleepWorkflow - hook payloads delivered correctly with concurrent sleep', - { timeout: 90_000 }, + { + timeout: 90_000, + }, async () => { // Regression test: when a hook and sleep run concurrently, multiple // hook_received events should all be processed even though the sleep @@ -2237,7 +2353,9 @@ describe('e2e', () => { test( 'sleepInLoopWorkflow - sleep inside loop with steps actually delays each iteration', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('sleepInLoopWorkflow'), []); const returnValue = await run.returnValue; @@ -2255,7 +2373,9 @@ describe('e2e', () => { test( 'sleepWithSequentialStepsWorkflow - sequential steps work with concurrent sleep (control)', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Control test: proves that void sleep('1d').then() does NOT break // sequential step execution. Steps have per-event consumption so the @@ -2276,7 +2396,9 @@ describe('e2e', () => { test( 'importMetaUrlWorkflow - import.meta.url is available in step bundles', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('importMetaUrlWorkflow'), []); const returnValue = await run.returnValue; @@ -2290,7 +2412,9 @@ describe('e2e', () => { test( 'metadataFromHelperWorkflow - getWorkflowMetadata/getStepMetadata work from module-level helper (#1577)', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { const run = await start(await e2e('metadataFromHelperWorkflow'), [ 'smoke-test', @@ -2309,7 +2433,9 @@ describe('e2e', () => { // ============================================================ test( 'resilient start: addTenWorkflow completes when run_created returns 500', - { timeout: 60_000 }, + { + timeout: 60_000, + }, async () => { // Get the real world and wrap it so the first events.create call // (run_created) throws a 500 server error. The queue should still diff --git a/packages/core/src/encryption.ts b/packages/core/src/encryption.ts index 730e4ee115..6bf0bbbc32 100644 --- a/packages/core/src/encryption.ts +++ b/packages/core/src/encryption.ts @@ -31,19 +31,34 @@ const KEY_LENGTH = 32; // bytes (AES-256) * Callers should call this once per run (after `getEncryptionKeyForRun()`) * and pass the resulting `CryptoKey` to all subsequent encrypt/decrypt calls. * + * Pass `usages: ['encrypt']` (or `['decrypt']`) for cross-run scenarios + * where the caller should not be able to perform the inverse operation + * with the key — for example a child workflow writing into a parent + * run's forwarded WritableStream only needs to encrypt, never decrypt. + * * @param raw - Raw 32-byte AES-256 key (from World.getEncryptionKeyForRun) + * @param usages - Key usages. Defaults to `['encrypt', 'decrypt']`. * @returns CryptoKey ready for AES-GCM operations */ -export async function importKey(raw: Uint8Array) { +export async function importKey( + raw: Uint8Array, + usages: ReadonlyArray<'encrypt' | 'decrypt'> = ['encrypt', 'decrypt'] +) { if (raw.byteLength !== KEY_LENGTH) { throw new Error( `Encryption key must be exactly ${KEY_LENGTH} bytes, got ${raw.byteLength}` ); } - return globalThis.crypto.subtle.importKey('raw', raw, 'AES-GCM', false, [ - 'encrypt', - 'decrypt', - ]); + return globalThis.crypto.subtle.importKey( + 'raw', + raw, + 'AES-GCM', + false, + // `KeyUsage` is a DOM-lib type that's not in scope under `es2022`. + // The `ReadonlyArray<'encrypt' | 'decrypt'>` parameter type matches + // a strict subset of `KeyUsage[]`, so this cast is sound. + usages as ('encrypt' | 'decrypt')[] + ); } /** diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 51cf0018a8..6380ea5704 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -25,7 +25,11 @@ import { maybeEncrypt, SerializationFormat, } from './serialization.js'; -import { STABLE_ULID, STREAM_NAME_SYMBOL } from './symbols.js'; +import { + STABLE_ULID, + STREAM_NAME_SYMBOL, + STREAM_SERVER_RUN_ID_SYMBOL, +} from './symbols.js'; import { createContext } from './vm/index.js'; const mockRunId = 'wrun_mockidnumber0001'; @@ -454,6 +458,44 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); + // When a user writable is already backed by a workflow server + // stream (because it was hydrated by a step-side reviver or created + // via step-context `getWritable()`), forwarding it across a + // `start()` boundary must emit the original `(runId, name)` in the + // dehydrated descriptor and MUST NOT install any pipe through the + // user's writable. The child run's step-side reviver then opens a + // server writable against the original `(runId, name)` directly, + // so writes survive for the full lifetime of the child run — not + // just for the dehydrating step's process. + it('forwards original (runId, name) for a tagged WritableStream', async () => { + const userWritable = new WritableStream(); + Object.defineProperty(userWritable, STREAM_NAME_SYMBOL, { + value: 'strm_parentstreamname', + writable: false, + }); + Object.defineProperty(userWritable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: 'wrun_parent', + writable: false, + }); + + expect(userWritable.locked).toBe(false); + const serialized = await dehydrateWorkflowArguments( + userWritable, + 'wrun_child', + noEncryptionKey, + [] + ); + // If the reducer had piped through the user's writable, the lock + // would be acquired here. + expect(userWritable.locked).toBe(false); + // The dehydrated descriptor should carry both the original name + // and the original runId so the child's reviver can open the + // writable against the parent's server stream directly. + const text = new TextDecoder().decode(serialized as Uint8Array); + expect(text).toContain('strm_parentstreamname'); + expect(text).toContain('wrun_parent'); + }); + it('should work with ReadableStream', async () => { const stream = new ReadableStream(); const serialized = await dehydrateWorkflowArguments( diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 4ee143c9a8..7d51d55398 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -8,6 +8,7 @@ import { decrypt as aesGcmDecrypt, encrypt as aesGcmEncrypt, type CryptoKey, + importKey, } from './encryption.js'; /** @@ -35,6 +36,7 @@ import { BODY_INIT_SYMBOL, STABLE_ULID, STREAM_NAME_SYMBOL, + STREAM_SERVER_RUN_ID_SYMBOL, STREAM_TYPE_SYMBOL, WEBHOOK_RESPONSE_WRITABLE, } from './symbols.js'; @@ -652,7 +654,16 @@ export interface SerializableSpecial { Uint8ClampedArray: string; // base64 string Uint16Array: string; // base64 string Uint32Array: string; // base64 string - WritableStream: { name: string }; + WritableStream: { + name: string; + /** + * The runId of the workflow run that owns the underlying server + * stream. Present only when the writable was forwarded across a + * `start()` boundary (parent → child). When omitted, the writable + * belongs to the receiving run (the normal in-run case). + */ + runId?: string; + }; } type Reducers = { @@ -921,6 +932,24 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; + // Fast path: when the writable is already backed by a workflow + // server stream (e.g. it came from a step-context `getWritable()` + // or was hydrated from a workflow input by `getStepRevivers`), + // forward its underlying `(runId, name)` to the receiving run. + // The receiving run's step-side reviver opens a server writable + // against the original `(runId, name)` and resolves that run's + // encryption key directly, so writes land on the original stream + // for the full lifetime of the receiving run — no in-process + // bridge tied to the dehydrating step's lifetime. + const existingName = (value as any)[STREAM_NAME_SYMBOL]; + const existingRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL]; + if ( + typeof existingName === 'string' && + typeof existingRunId === 'string' + ) { + return { name: existingName, runId: existingRunId }; + } + const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); const name = `strm_${streamId}`; @@ -973,7 +1002,13 @@ export function getWorkflowReducers( if (!name) { throw new Error('WritableStream `name` is not set'); } - return { name }; + const s: SerializableSpecial['WritableStream'] = { name }; + // When the handle was forwarded from another run (parent → child + // via `start()`), preserve the foreign runId so the step-side + // reviver opens the writable against the original stream. + const foreignRunId = value[STREAM_SERVER_RUN_ID_SYMBOL]; + if (typeof foreignRunId === 'string') s.runId = foreignRunId; + return s; }, }; } @@ -1041,6 +1076,7 @@ function getStepReducers( if (!(value instanceof global.WritableStream)) return false; let name = value[STREAM_NAME_SYMBOL]; + const foreignRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL]; if (!name) { const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; @@ -1056,7 +1092,9 @@ function getStepReducers( ); } - return { name }; + const s: SerializableSpecial['WritableStream'] = { name }; + if (typeof foreignRunId === 'string') s.runId = foreignRunId; + return s; }, }; } @@ -1274,13 +1312,26 @@ export function getExternalRevivers( } }, WritableStream: (value) => { + // Same handling as `getStepRevivers.WritableStream` — see comments + // there for the cross-run case (writable carries `runId` from + // parent → child forwarding via `start()`). + const targetRunId = typeof value.runId === 'string' ? value.runId : runId; + const targetKey: EncryptionKeyParam = + targetRunId === runId + ? cryptoKey + : (async () => { + const world = getWorld(); + const rawKey = await world.getEncryptionKeyForRun?.(targetRunId); + return rawKey ? await importKey(rawKey, ['encrypt']) : undefined; + })(); + const serialize = getSerializeStream( - getExternalReducers(global, ops, runId, cryptoKey), - cryptoKey + getExternalReducers(global, ops, targetRunId, targetKey), + targetKey ); const serverWritable = new WorkflowServerWritableStream( value.name, - runId + targetRunId ); // Create flushable state for this stream @@ -1295,6 +1346,15 @@ export function getExternalRevivers( // Start polling to detect when user releases lock pollWritableLock(serialize.writable, state); + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: value.name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: targetRunId, + writable: false, + }); + return serialize.writable; }, }; @@ -1394,12 +1454,22 @@ export function getWorkflowRevivers( }); }, WritableStream: (value) => { - return Object.create(global.WritableStream.prototype, { + const descriptor: PropertyDescriptorMap = { [STREAM_NAME_SYMBOL]: { value: value.name, writable: false, }, - }); + }; + // Preserve the foreign runId, if present, so that when the + // handle is later passed to a step the workflow reducer can + // forward it through to the step reviver. + if (typeof value.runId === 'string') { + descriptor[STREAM_SERVER_RUN_ID_SYMBOL] = { + value: value.runId, + writable: false, + }; + } + return Object.create(global.WritableStream.prototype, descriptor); }, }; } @@ -1572,13 +1642,35 @@ function getStepRevivers( } }, WritableStream: (value) => { + // Same-run case: the writable belongs to the current run. Use the + // local cryptoKey and write to the local runId's server stream. + // + // Cross-run case (parent → child via `start()`): the descriptor + // carries the original `runId` and `name`. Open a server writable + // against the original `(runId, name)` and resolve THAT run's key + // for encryption. The resolution is async but doesn't need to + // block reviver return — `getSerializeStream` accepts the + // `Promise` directly and awaits it lazily + // on the first chunk written. The key is imported encrypt-only + // so the receiving run can never decrypt anything else on the + // owning run's stream — it can only contribute new writes. + const targetRunId = typeof value.runId === 'string' ? value.runId : runId; + const targetKey: EncryptionKeyParam = + targetRunId === runId + ? cryptoKey + : (async () => { + const world = getWorld(); + const rawKey = await world.getEncryptionKeyForRun?.(targetRunId); + return rawKey ? await importKey(rawKey, ['encrypt']) : undefined; + })(); + const serialize = getSerializeStream( - getStepReducers(global, ops, runId, cryptoKey), - cryptoKey + getStepReducers(global, ops, targetRunId, targetKey), + targetKey ); const serverWritable = new WorkflowServerWritableStream( value.name, - runId + targetRunId ); // Create flushable state for this stream @@ -1593,6 +1685,21 @@ function getStepRevivers( // Start polling to detect when user releases lock pollWritableLock(serialize.writable, state); + // Record the underlying `(runId, name)` so downstream reducers can + // recognize that this writable is already backed by a workflow + // server stream. When forwarded across `start()` again — e.g. + // the child passes this writable on to a grandchild — the + // external reducer needs both to emit the original `runId` in + // the descriptor. + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: value.name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: targetRunId, + writable: false, + }); + return serialize.writable; }, }; diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index 550ac46a4d..54136f0e9c 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -8,6 +8,7 @@ import { getSerializeStream, WorkflowServerWritableStream, } from '../serialization.js'; +import { STREAM_NAME_SYMBOL, STREAM_SERVER_RUN_ID_SYMBOL } from '../symbols.js'; import { getWorkflowRunStreamId } from '../util.js'; import { contextStorage } from './context-storage.js'; @@ -66,6 +67,22 @@ export function getWritable( pollWritableLock(serialize.writable, state); + // Tag the writable with its underlying `(runId, name)` so downstream + // reducers can recognize that it's already backed by a workflow + // server stream. Calling `start(child, [args, theWritable])` from + // the same step uses these tags to emit `{ name, runId }` in the + // dehydrated descriptor, so the child's reviver can open the + // writable against the original `(runId, name)` directly — no + // in-process bridge tied to this step's lifetime. + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: runId, + writable: false, + }); + // Return the writable side of the transform stream return serialize.writable; } diff --git a/packages/core/src/symbols.ts b/packages/core/src/symbols.ts index 92df4058db..0bef37ef6f 100644 --- a/packages/core/src/symbols.ts +++ b/packages/core/src/symbols.ts @@ -6,6 +6,24 @@ export const WORKFLOW_GET_STREAM_ID = Symbol.for('WORKFLOW_GET_STREAM_ID'); export const STABLE_ULID = Symbol.for('WORKFLOW_STABLE_ULID'); export const STREAM_NAME_SYMBOL = Symbol.for('WORKFLOW_STREAM_NAME'); export const STREAM_TYPE_SYMBOL = Symbol.for('WORKFLOW_STREAM_TYPE'); +/** + * Stamped on a real `WritableStream` (the user-visible `serialize.writable` + * returned from a step-side reviver or step-context `getWritable()`) to + * record the `runId` of the workflow run that owns the underlying server + * stream. Used together with `STREAM_NAME_SYMBOL`. + * + * When `getExternalReducers.WritableStream` (the dehydration path used by + * `start()`) sees both symbols on a writable, it includes the `runId` in + * the descriptor it emits. The child run's step-side reviver then opens + * a server writable against the original `(runId, name)` and resolves + * that run's encryption key directly — so the child's writes land on + * the parent's stream as-is, with no client process in the loop. That + * keeps the forwarding alive for the full lifetime of the child run, + * not just for the parent step that initiated `start()`. + */ +export const STREAM_SERVER_RUN_ID_SYMBOL = Symbol.for( + 'WORKFLOW_STREAM_SERVER_RUN_ID' +); export const BODY_INIT_SYMBOL = Symbol.for('BODY_INIT'); export const WEBHOOK_RESPONSE_WRITABLE = Symbol.for( 'WEBHOOK_RESPONSE_WRITABLE' diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 92df6f70c4..17cd948f6b 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -1609,3 +1609,84 @@ export async function metadataFromHelperWorkflow(label: string): Promise<{ return await metadataHelperStep(label); } + +////////////////////////////////////////////////////////// +// WritableStream passed as argument to start() +// +// A parent workflow gets a WritableStream from getWritable() (its own +// output stream), and passes it through `start()` to a child +// workflow. The child workflow receives the WritableStream as a +// workflow argument and forwards it into a step, which writes raw +// Uint8Array bytes to it. +// +// The external reader on `parentRun.getReadable()` should observe the +// exact bytes the child step wrote. + +async function writeBytesToWritable( + writable: WritableStream, + payload: string +) { + 'use step'; + const writer = writable.getWriter(); + await writer.write(new TextEncoder().encode(payload)); + writer.releaseLock(); +} + +export async function writableForwardedChildWorkflow( + parentWritable: WritableStream, + payload: string +) { + 'use workflow'; + await writeBytesToWritable(parentWritable, payload); + return 'child-done'; +} + +// Variant 1: the parent calls `getWritable()` in the workflow body +// (workflow-context handle), passes the resulting fake handle through +// `start()`. The intermediary step that calls `start()` only exists +// because `start()` cannot be invoked from workflow code directly. +async function startChildWithWorkflowWritable( + parentWritable: WritableStream, + payload: string +) { + 'use step'; + const childRun = await start(writableForwardedChildWorkflow, [ + parentWritable, + payload, + ]); + // Wait for the child to finish writing before letting the parent + // close its own writable. + await childRun.returnValue; + return childRun.runId; +} + +export async function writableForwardedFromWorkflowWorkflow(payload: string) { + 'use workflow'; + const writable = getWritable(); + const childRunId = await startChildWithWorkflowWritable(writable, payload); + await stepCloseOutputStream(writable); + return { childRunId }; +} + +// Variant 2: the parent's `getWritable()` is called inside the step +// that also calls `start()`, so the writable handed to the child is +// the real step-context `serialize.writable` (not a workflow-context +// fake handle that's later revived by a step). This exercises the +// step-side `getWritable()` tagging path directly. +async function startChildWithStepWritable(payload: string) { + 'use step'; + const writable = getWritable(); + const childRun = await start(writableForwardedChildWorkflow, [ + writable, + payload, + ]); + await childRun.returnValue; + await writable.close(); + return childRun.runId; +} + +export async function writableForwardedFromStepWorkflow(payload: string) { + 'use workflow'; + const childRunId = await startChildWithStepWritable(payload); + return { childRunId }; +}