From 7eaad7f1ef24df1729f7fd7f7d738a43063e3f83 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 21 May 2026 11:18:02 -0700 Subject: [PATCH 1/4] test(e2e): cover WritableStream passed as start() argument Adds an e2e workflow + test where a parent workflow gets a WritableStream via getWritable(), forwards it through start() to a child workflow, and the child step writes raw bytes to it. Asserts the external reader on the parent's stream observes the exact bytes the child wrote. --- .changeset/tired-pigs-hug.md | 2 + packages/core/e2e/e2e.test.ts | 37 ++++++++++++++++++ workbench/example/workflows/99_e2e.ts | 54 +++++++++++++++++++++++++++ 3 files changed, 93 insertions(+) create mode 100644 .changeset/tired-pigs-hug.md diff --git a/.changeset/tired-pigs-hug.md b/.changeset/tired-pigs-hug.md new file mode 100644 index 0000000000..a845151cc8 --- /dev/null +++ b/.changeset/tired-pigs-hug.md @@ -0,0 +1,2 @@ +--- +--- diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index ad8ed949e1..70f34a1765 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -927,6 +927,43 @@ describe('e2e', () => { expect(await run.returnValue).toEqual('done'); }); + // A WritableStream passed as a workflow argument to start() should + // land raw bytes on the parent's output stream when the child step + // writes to it. + test('writableForwardedToChildWorkflow', { timeout: 120_000 }, async () => { + const payload = `hello-from-child-${Date.now()}\n`; + const run = await start(await e2e('writableForwardedToChildWorkflow'), [ + payload, + ]); + + const reader = run.getReadable().getReader(); + // `fatal: true` makes the decoder throw on any invalid UTF-8 + // sequence, so a successful decode is itself a round-trip + // assertion that the bytes survived intact. + const decoder = new TextDecoder('utf-8', { fatal: true }); + + // The child step performs exactly one write of `payload` as + // UTF-8 bytes, so we should receive a single chunk containing + // exactly those bytes before the stream closes. + const { value, done } = await reader.read(); + expect(done).toBeFalsy(); + assert(value); + assert(value instanceof Uint8Array); + + const expectedBytes = new TextEncoder().encode(payload); + expect(value.byteLength).toBe(expectedBytes.byteLength); + expect(decoder.decode(value)).toBe(payload); + + // Default stream should close cleanly after the parent's + // stepCloseOutputStream call. + expect((await reader.read()).done).toBe(true); + + const returnValue = await run.returnValue; + expect(returnValue).toMatchObject({ + childRunId: expect.stringMatching(/^wrun_/), + }); + }); + test('fetchWorkflow', { timeout: 60_000 }, async () => { const run = await start(await e2e('fetchWorkflow'), []); const returnValue = await run.returnValue; diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 1deb73bf2e..f06674beff 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -3037,3 +3037,57 @@ export class DistributedAbortController { })(); } } + +////////////////////////////////////////////////////////// +// WritableStream passed as argument to start() +// +// A parent workflow gets a WritableStream from getWritable() (its own +// output stream), and passes it through a step that calls +// `start(childWorkflow, [args, parentWritable])`. The child workflow +// receives the WritableStream as a workflow argument and forwards it +// into a step, which writes raw Uint8Array bytes to it. +// +// The external reader on `parentRun.getReadable()` should observe the +// exact bytes the child step wrote. + +async function childStepWritesToParentWritable( + parentWritable: WritableStream, + payload: string +) { + 'use step'; + const writer = parentWritable.getWriter(); + await writer.write(new TextEncoder().encode(payload)); + writer.releaseLock(); +} + +export async function writableForwardedToChildChildWorkflow( + parentWritable: WritableStream, + payload: string +) { + 'use workflow'; + await childStepWritesToParentWritable(parentWritable, payload); + return 'child-done'; +} + +async function spawnChildAndForwardWritable( + parentWritable: WritableStream, + payload: string +) { + 'use step'; + const childRun = await start(writableForwardedToChildChildWorkflow, [ + parentWritable, + payload, + ]); + // Wait for the child to finish writing before letting the parent + // close its own writable. + await childRun.returnValue; + return childRun.runId; +} + +export async function writableForwardedToChildWorkflow(payload: string) { + 'use workflow'; + const writable = getWritable(); + const childRunId = await spawnChildAndForwardWritable(writable, payload); + await stepCloseOutputStream(writable); + return { childRunId }; +} From 429d071319dcb0cc53a7faeeeebf906474f55e64 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 21 May 2026 12:37:30 -0700 Subject: [PATCH 2/4] fix(core): avoid double-framing when WritableStream is forwarded via start() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a workflow's getWritable() handle is passed across start() to a child workflow, the parent step's reviver wraps it in a serialize transform that pipes into a workflow server stream. Until now, getExternalReducers.WritableStream then installed a second serialize transform on top of that — so every chunk the child step wrote got devalue-framed twice but only deframed once on the reader side, and external consumers saw the inner frame instead of the original bytes. Fix: tag every user-visible writable that's already backed by a workflow server stream with its (runId, name). When the external reducer recognizes those tags during dehydration, it bridges bytes straight from the new child-side server stream to the original server stream instead of piping through the user's writable. That leaves the producer-side serialize transform (installed once by the child's step reviver) as the only framing layer in the chain. --- .changeset/tired-pigs-hug.md | 4 ++ packages/core/src/serialization.test.ts | 35 +++++++++++++++ packages/core/src/serialization.ts | 53 +++++++++++++++++++++++ packages/core/src/step/writable-stream.ts | 16 +++++++ packages/core/src/symbols.ts | 14 ++++++ 5 files changed, 122 insertions(+) diff --git a/.changeset/tired-pigs-hug.md b/.changeset/tired-pigs-hug.md index a845151cc8..0d4fdf6030 100644 --- a/.changeset/tired-pigs-hug.md +++ b/.changeset/tired-pigs-hug.md @@ -1,2 +1,6 @@ --- +"@workflow/core": patch +"workflow": patch --- + +Fix double-framing when a `WritableStream` is forwarded across `start()`. A workflow's `getWritable()` handle (or a step-context `getWritable()`) can now be passed as a workflow argument to a child workflow; the child's writes land on the parent's stream as raw chunks instead of devalue-encoded frames. diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 06f5c47c5b..f658de90c8 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -41,6 +41,7 @@ import { ABORT_STREAM_NAME, STABLE_ULID, STREAM_NAME_SYMBOL, + STREAM_SERVER_RUN_ID_SYMBOL, } from './symbols.js'; import { createContext } from './vm/index.js'; @@ -498,6 +499,40 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); + // Regression: when a user writable is already backed by a workflow + // server stream (because it was hydrated by a step-side reviver or + // created via step-context `getWritable()`), forwarding it across a + // `start()` boundary must NOT install a second serialize transform on + // top of it. The external WritableStream reducer recognizes the + // STREAM_NAME / STREAM_SERVER_RUN_ID tags and instead bridges bytes at + // the server-stream level, leaving the producer's serialize transform + // (installed once by the child's step reviver) as the only framing + // layer in the chain. Previously, every chunk would have been devalue- + // framed twice on write and only deframed once on read, so the + // external reader saw the inner frame as the chunk payload. + it('does not pipe through a tagged WritableStream (server-to-server bridge)', async () => { + const userWritable = new WritableStream(); + Object.defineProperty(userWritable, STREAM_NAME_SYMBOL, { + value: 'strm_parentstreamname', + writable: false, + }); + Object.defineProperty(userWritable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: 'wrun_parent', + writable: false, + }); + + expect(userWritable.locked).toBe(false); + await dehydrateWorkflowArguments( + userWritable, + 'wrun_child', + noEncryptionKey, + [] + ); + // If the reducer had piped through the user's writable, this would + // be `true` (pipeTo acquires the lock). + expect(userWritable.locked).toBe(false); + }); + it('should work with ReadableStream', async () => { const stream = new ReadableStream(); const serialized = await dehydrateWorkflowArguments( diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 502e79355d..2e31b06964 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -62,6 +62,7 @@ import { BODY_INIT_SYMBOL, STABLE_ULID, STREAM_NAME_SYMBOL, + STREAM_SERVER_RUN_ID_SYMBOL, STREAM_TYPE_SYMBOL, WEBHOOK_RESPONSE_WRITABLE, } from './symbols.js'; @@ -778,6 +779,31 @@ export function getExternalReducers( const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); const name = `strm_${streamId}`; + // Fast path: when the writable is already backed by a workflow + // server stream (e.g. it came from a step-context `getWritable()` + // or was hydrated from a workflow input by `getStepRevivers`), do + // NOT pipe through it — that would compose two serialize + // transforms on every chunk and the consumer would see one + // unrecovered devalue frame of framing. Instead, bridge bytes + // straight from the new child-side server stream to the original + // server stream, leaving the producer-side serialize transform + // (installed once by the child's step reviver) as the only + // framing layer in the chain. + const existingName = (value as any)[STREAM_NAME_SYMBOL]; + const existingRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL]; + if ( + typeof existingName === 'string' && + typeof existingRunId === 'string' + ) { + const childReadable = new WorkflowServerReadableStream(runId, name); + const originalServerWritable = new WorkflowServerWritableStream( + existingRunId, + existingName + ); + ops.push(childReadable.pipeTo(originalServerWritable)); + return { name }; + } + const readable = new WorkflowServerReadableStream(runId, name); ops.push(readable.pipeTo(value)); @@ -1435,6 +1461,18 @@ export function getExternalRevivers( // Start polling to detect when user releases lock pollWritableLock(serialize.writable, state); + // See `getStepRevivers.WritableStream` for the rationale: tagging + // lets later reducers bypass the serialize transform when the + // writable is forwarded across another serialization boundary. + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: value.name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: runId, + writable: false, + }); + return serialize.writable; }, @@ -1763,6 +1801,21 @@ function getStepRevivers( // Start polling to detect when user releases lock pollWritableLock(serialize.writable, state); + // Record the underlying `(runId, name)` so downstream reducers can + // recognize that this writable is already backed by a workflow + // server stream and avoid piping through it a second time. Without + // this, passing the writable as an argument to `start()` would + // double-frame every chunk (one serialize transform here, plus a + // second one installed by `getExternalReducers.WritableStream`). + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: value.name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: runId, + writable: false, + }); + return serialize.writable; }, diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index 5c250522fd..bc8cd69430 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -9,6 +9,7 @@ import { getSerializeStream, WorkflowServerWritableStream, } from '../serialization.js'; +import { STREAM_NAME_SYMBOL, STREAM_SERVER_RUN_ID_SYMBOL } from '../symbols.js'; import { getWorkflowRunStreamId } from '../util.js'; import { contextStorage } from './context-storage.js'; @@ -69,6 +70,21 @@ export function getWritable( pollWritableLock(serialize.writable, state); + // Tag the writable with its underlying `(runId, name)` so downstream + // reducers can recognize it. Without this, calling + // `start(child, [args, theWritable])` from the same step would install + // a second serialize transform on top of this one and double-frame + // every chunk the child writes. See `getStepRevivers.WritableStream` + // for the matching rationale on revived writables. + Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { + value: name, + writable: false, + }); + Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { + value: runId, + writable: false, + }); + // Return the writable side of the transform stream return serialize.writable; } diff --git a/packages/core/src/symbols.ts b/packages/core/src/symbols.ts index 2cbcb03f63..ecd8df9d5c 100644 --- a/packages/core/src/symbols.ts +++ b/packages/core/src/symbols.ts @@ -6,6 +6,20 @@ export const WORKFLOW_GET_STREAM_ID = Symbol.for('WORKFLOW_GET_STREAM_ID'); export const STABLE_ULID = Symbol.for('WORKFLOW_STABLE_ULID'); export const STREAM_NAME_SYMBOL = Symbol.for('WORKFLOW_STREAM_NAME'); export const STREAM_TYPE_SYMBOL = Symbol.for('WORKFLOW_STREAM_TYPE'); +/** + * Stamped on a real `WritableStream` (the user-visible `serialize.writable` + * returned from a step-side reviver or step-context `getWritable()`) to + * record the `(runId, name)` of the underlying `WorkflowServerWritableStream` + * that the writable's chunks ultimately land on. Downstream reducers (most + * importantly `getExternalReducers.WritableStream`, which `start()` uses to + * dehydrate workflow arguments) consult this symbol so they can bridge to + * the original server stream without piping through the user's + * already-installed serialize transform — that's what previously produced + * a second devalue framing layer on every chunk the child wrote. + */ +export const STREAM_SERVER_RUN_ID_SYMBOL = Symbol.for( + 'WORKFLOW_STREAM_SERVER_RUN_ID' +); export const BODY_INIT_SYMBOL = Symbol.for('BODY_INIT'); export const WEBHOOK_RESPONSE_WRITABLE = Symbol.for( 'WEBHOOK_RESPONSE_WRITABLE' From 612062bd5e00cc193c4dc11f4aaadaea649059d8 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 21 May 2026 13:32:37 -0700 Subject: [PATCH 3/4] fix(core): forward (runId, name) when a tagged WritableStream crosses start() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the previous in-process bridge with first-class writable forwarding at the descriptor level. When a parent workflow's getWritable() handle is passed as an argument to a child workflow, the dehydrated descriptor now carries the original (runId, name). The child run's step-side reviver opens the writable against the parent's server stream directly and resolves the parent run's encryption key (encrypt-only) via getEncryptionKeyForRun. This removes the architectural limitation that the bridge could only stay alive for the duration of the parent step process — on Vercel that capped forwarding at ~15 minutes regardless of the child run's lifetime, dropping any writes the child made after the parent step process exited. importKey() now accepts a usages parameter, defaulting to ['encrypt', 'decrypt']. The cross-run forwarding path imports with ['encrypt'] only so a compromised child run cannot decrypt any existing data on the parent's stream — only contribute new writes. --- .changeset/tired-pigs-hug.md | 6 +- packages/core/src/encryption.ts | 25 ++++- packages/core/src/serialization.test.ts | 34 ++++--- packages/core/src/serialization.ts | 118 +++++++++++++++------- packages/core/src/serialization/types.ts | 11 +- packages/core/src/step/writable-stream.ts | 11 +- packages/core/src/symbols.ts | 18 ++-- 7 files changed, 150 insertions(+), 73 deletions(-) diff --git a/.changeset/tired-pigs-hug.md b/.changeset/tired-pigs-hug.md index 0d4fdf6030..de5fa63f89 100644 --- a/.changeset/tired-pigs-hug.md +++ b/.changeset/tired-pigs-hug.md @@ -1,6 +1,6 @@ --- -"@workflow/core": patch -"workflow": patch +"@workflow/core": minor +"workflow": minor --- -Fix double-framing when a `WritableStream` is forwarded across `start()`. A workflow's `getWritable()` handle (or a step-context `getWritable()`) can now be passed as a workflow argument to a child workflow; the child's writes land on the parent's stream as raw chunks instead of devalue-encoded frames. +Support forwarding a `WritableStream` (from a workflow's `getWritable()`) as an argument to a child workflow via `start()`. The child run's writes land on the parent run's stream directly — encrypted with the parent run's key — for the full lifetime of the child run, with no in-process bridge tied to the parent step that invoked `start()`. diff --git a/packages/core/src/encryption.ts b/packages/core/src/encryption.ts index 368eacb269..9cd4c5613e 100644 --- a/packages/core/src/encryption.ts +++ b/packages/core/src/encryption.ts @@ -33,19 +33,34 @@ const KEY_LENGTH = 32; // bytes (AES-256) * Callers should call this once per run (after `getEncryptionKeyForRun()`) * and pass the resulting `CryptoKey` to all subsequent encrypt/decrypt calls. * + * Pass `usages: ['encrypt']` (or `['decrypt']`) for cross-run scenarios + * where the caller should not be able to perform the inverse operation + * with the key — for example a child workflow writing into a parent + * run's forwarded WritableStream only needs to encrypt, never decrypt. + * * @param raw - Raw 32-byte AES-256 key (from World.getEncryptionKeyForRun) + * @param usages - Key usages. Defaults to `['encrypt', 'decrypt']`. * @returns CryptoKey ready for AES-GCM operations */ -export async function importKey(raw: Uint8Array) { +export async function importKey( + raw: Uint8Array, + usages: ReadonlyArray<'encrypt' | 'decrypt'> = ['encrypt', 'decrypt'] +) { if (raw.byteLength !== KEY_LENGTH) { throw new WorkflowRuntimeError( `Encryption key must be exactly ${KEY_LENGTH} bytes, got ${raw.byteLength}` ); } - return globalThis.crypto.subtle.importKey('raw', raw, 'AES-GCM', false, [ - 'encrypt', - 'decrypt', - ]); + return globalThis.crypto.subtle.importKey( + 'raw', + raw, + 'AES-GCM', + false, + // `KeyUsage` is a DOM-lib type that's not in scope under `es2022`. + // The `ReadonlyArray<'encrypt' | 'decrypt'>` parameter type matches + // a strict subset of `KeyUsage[]`, so this cast is sound. + usages as ('encrypt' | 'decrypt')[] + ); } /** diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index f658de90c8..6c5447a279 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -499,18 +499,16 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - // Regression: when a user writable is already backed by a workflow - // server stream (because it was hydrated by a step-side reviver or - // created via step-context `getWritable()`), forwarding it across a - // `start()` boundary must NOT install a second serialize transform on - // top of it. The external WritableStream reducer recognizes the - // STREAM_NAME / STREAM_SERVER_RUN_ID tags and instead bridges bytes at - // the server-stream level, leaving the producer's serialize transform - // (installed once by the child's step reviver) as the only framing - // layer in the chain. Previously, every chunk would have been devalue- - // framed twice on write and only deframed once on read, so the - // external reader saw the inner frame as the chunk payload. - it('does not pipe through a tagged WritableStream (server-to-server bridge)', async () => { + // When a user writable is already backed by a workflow server + // stream (because it was hydrated by a step-side reviver or created + // via step-context `getWritable()`), forwarding it across a + // `start()` boundary must emit the original `(runId, name)` in the + // dehydrated descriptor and MUST NOT install any pipe through the + // user's writable. The child run's step-side reviver then opens a + // server writable against the original `(runId, name)` directly, + // so writes survive for the full lifetime of the child run — not + // just for the dehydrating step's process. + it('forwards original (runId, name) for a tagged WritableStream', async () => { const userWritable = new WritableStream(); Object.defineProperty(userWritable, STREAM_NAME_SYMBOL, { value: 'strm_parentstreamname', @@ -522,15 +520,21 @@ describe('workflow arguments', () => { }); expect(userWritable.locked).toBe(false); - await dehydrateWorkflowArguments( + const serialized = await dehydrateWorkflowArguments( userWritable, 'wrun_child', noEncryptionKey, [] ); - // If the reducer had piped through the user's writable, this would - // be `true` (pipeTo acquires the lock). + // If the reducer had piped through the user's writable, the lock + // would be acquired here. expect(userWritable.locked).toBe(false); + // The dehydrated descriptor should carry both the original name + // and the original runId so the child's reviver can open the + // writable against the parent's server stream directly. + const text = new TextDecoder().decode(serialized as Uint8Array); + expect(text).toContain('strm_parentstreamname'); + expect(text).toContain('wrun_parent'); }); it('should work with ReadableStream', async () => { diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 2e31b06964..f9346c3393 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -5,6 +5,7 @@ import { decrypt as aesGcmDecrypt, encrypt as aesGcmEncrypt, type CryptoKey, + importKey, } from './encryption.js'; import { createFlushableState, @@ -776,34 +777,26 @@ export function getExternalReducers( WritableStream: (value) => { if (!(value instanceof global.WritableStream)) return false; - const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); - const name = `strm_${streamId}`; - // Fast path: when the writable is already backed by a workflow // server stream (e.g. it came from a step-context `getWritable()` - // or was hydrated from a workflow input by `getStepRevivers`), do - // NOT pipe through it — that would compose two serialize - // transforms on every chunk and the consumer would see one - // unrecovered devalue frame of framing. Instead, bridge bytes - // straight from the new child-side server stream to the original - // server stream, leaving the producer-side serialize transform - // (installed once by the child's step reviver) as the only - // framing layer in the chain. + // or was hydrated from a workflow input by `getStepRevivers`), + // forward its underlying `(runId, name)` to the receiving run. + // The receiving run's step-side reviver opens a server writable + // against the original `(runId, name)` and resolves that run's + // encryption key directly, so writes land on the original stream + // for the full lifetime of the receiving run — no in-process + // bridge tied to the dehydrating step's lifetime. const existingName = (value as any)[STREAM_NAME_SYMBOL]; const existingRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL]; if ( typeof existingName === 'string' && typeof existingRunId === 'string' ) { - const childReadable = new WorkflowServerReadableStream(runId, name); - const originalServerWritable = new WorkflowServerWritableStream( - existingRunId, - existingName - ); - ops.push(childReadable.pipeTo(originalServerWritable)); - return { name }; + return { name: existingName, runId: existingRunId }; } + const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); + const name = `strm_${streamId}`; const readable = new WorkflowServerReadableStream(runId, name); ops.push(readable.pipeTo(value)); @@ -887,7 +880,13 @@ export function getWorkflowReducers( if (!name) { throw new WorkflowRuntimeError('WritableStream `name` is not set'); } - return { name }; + const s: SerializableSpecial['WritableStream'] = { name }; + // When the handle was forwarded from another run (parent → child + // via `start()`), preserve the foreign runId so the step-side + // reviver opens the writable against the original stream. + const foreignRunId = value[STREAM_SERVER_RUN_ID_SYMBOL]; + if (typeof foreignRunId === 'string') s.runId = foreignRunId; + return s; }, // AbortController/AbortSignal in workflow context — just read symbols (handles). @@ -987,6 +986,7 @@ function getStepReducers( if (!(value instanceof global.WritableStream)) return false; let name = value[STREAM_NAME_SYMBOL]; + const foreignRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL]; if (!name) { const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); name = `strm_${streamId}`; @@ -1002,7 +1002,9 @@ function getStepReducers( ); } - return { name }; + const s: SerializableSpecial['WritableStream'] = { name }; + if (typeof foreignRunId === 'string') s.runId = foreignRunId; + return s; }, AbortController: (value) => { @@ -1440,12 +1442,25 @@ export function getExternalRevivers( } }, WritableStream: (value) => { + // Same handling as `getStepRevivers.WritableStream` — see comments + // there for the cross-run case (writable carries `runId` from + // parent → child forwarding via `start()`). + const targetRunId = typeof value.runId === 'string' ? value.runId : runId; + const targetKey: EncryptionKeyParam = + targetRunId === runId + ? cryptoKey + : (async () => { + const world = await getWorldLazy(); + const rawKey = await world.getEncryptionKeyForRun?.(targetRunId); + return rawKey ? await importKey(rawKey, ['encrypt']) : undefined; + })(); + const serialize = getSerializeStream( - getExternalReducers(global, ops, runId, cryptoKey), - cryptoKey + getExternalReducers(global, ops, targetRunId, targetKey), + targetKey ); const serverWritable = new WorkflowServerWritableStream( - runId, + targetRunId, value.name ); @@ -1461,15 +1476,12 @@ export function getExternalRevivers( // Start polling to detect when user releases lock pollWritableLock(serialize.writable, state); - // See `getStepRevivers.WritableStream` for the rationale: tagging - // lets later reducers bypass the serialize transform when the - // writable is forwarded across another serialization boundary. Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { value: value.name, writable: false, }); Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { - value: runId, + value: targetRunId, writable: false, }); @@ -1557,12 +1569,22 @@ export function getWorkflowRevivers( }); }, WritableStream: (value) => { - return Object.create(global.WritableStream.prototype, { + const descriptor: PropertyDescriptorMap = { [STREAM_NAME_SYMBOL]: { value: value.name, writable: false, }, - }); + }; + // Preserve the foreign runId, if present, so that when the + // handle is later passed to a step the workflow reducer can + // forward it through to the step reviver. + if (typeof value.runId === 'string') { + descriptor[STREAM_SERVER_RUN_ID_SYMBOL] = { + value: value.runId, + writable: false, + }; + } + return Object.create(global.WritableStream.prototype, descriptor); }, // AbortController/AbortSignal revived inside the workflow VM. Use the @@ -1780,12 +1802,34 @@ function getStepRevivers( } }, WritableStream: (value) => { + // Same-run case: the writable belongs to the current run. Use the + // local cryptoKey and write to the local runId's server stream. + // + // Cross-run case (parent → child via `start()`): the descriptor + // carries the original `runId` and `name`. Open a server writable + // against the original `(runId, name)` and resolve THAT run's key + // for encryption. The resolution is async but doesn't need to + // block reviver return — `getSerializeStream` accepts the + // `Promise` directly and awaits it lazily + // on the first chunk written. The key is imported encrypt-only + // so the receiving run can never decrypt anything else on the + // owning run's stream — it can only contribute new writes. + const targetRunId = typeof value.runId === 'string' ? value.runId : runId; + const targetKey: EncryptionKeyParam = + targetRunId === runId + ? cryptoKey + : (async () => { + const world = await getWorldLazy(); + const rawKey = await world.getEncryptionKeyForRun?.(targetRunId); + return rawKey ? await importKey(rawKey, ['encrypt']) : undefined; + })(); + const serialize = getSerializeStream( - getStepReducers(global, ops, runId, cryptoKey), - cryptoKey + getStepReducers(global, ops, targetRunId, targetKey), + targetKey ); const serverWritable = new WorkflowServerWritableStream( - runId, + targetRunId, value.name ); @@ -1803,16 +1847,16 @@ function getStepRevivers( // Record the underlying `(runId, name)` so downstream reducers can // recognize that this writable is already backed by a workflow - // server stream and avoid piping through it a second time. Without - // this, passing the writable as an argument to `start()` would - // double-frame every chunk (one serialize transform here, plus a - // second one installed by `getExternalReducers.WritableStream`). + // server stream. When forwarded across `start()` again — e.g. + // the child passes this writable on to a grandchild — the + // external reducer needs both to emit the original `runId` in + // the descriptor. Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { value: value.name, writable: false, }); Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, { - value: runId, + value: targetRunId, writable: false, }); diff --git a/packages/core/src/serialization/types.ts b/packages/core/src/serialization/types.ts index 03c5b10b89..15036e29cc 100644 --- a/packages/core/src/serialization/types.ts +++ b/packages/core/src/serialization/types.ts @@ -152,7 +152,16 @@ export interface SerializableSpecial { cause?: unknown; errors: unknown[]; }; - WritableStream: { name: string }; + WritableStream: { + name: string; + /** + * The runId of the workflow run that owns the underlying server + * stream. Present only when the writable was forwarded across a + * `start()` boundary (parent → child). When omitted, the writable + * belongs to the receiving run (the normal in-run case). + */ + runId?: string; + }; AbortController: { streamName: string; hookToken: string; diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index bc8cd69430..f81349242e 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -71,11 +71,12 @@ export function getWritable( pollWritableLock(serialize.writable, state); // Tag the writable with its underlying `(runId, name)` so downstream - // reducers can recognize it. Without this, calling - // `start(child, [args, theWritable])` from the same step would install - // a second serialize transform on top of this one and double-frame - // every chunk the child writes. See `getStepRevivers.WritableStream` - // for the matching rationale on revived writables. + // reducers can recognize that it's already backed by a workflow + // server stream. Calling `start(child, [args, theWritable])` from + // the same step uses these tags to emit `{ name, runId }` in the + // dehydrated descriptor, so the child's reviver can open the + // writable against the original `(runId, name)` directly — no + // in-process bridge tied to this step's lifetime. Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, { value: name, writable: false, diff --git a/packages/core/src/symbols.ts b/packages/core/src/symbols.ts index ecd8df9d5c..6be64a42d3 100644 --- a/packages/core/src/symbols.ts +++ b/packages/core/src/symbols.ts @@ -9,13 +9,17 @@ export const STREAM_TYPE_SYMBOL = Symbol.for('WORKFLOW_STREAM_TYPE'); /** * Stamped on a real `WritableStream` (the user-visible `serialize.writable` * returned from a step-side reviver or step-context `getWritable()`) to - * record the `(runId, name)` of the underlying `WorkflowServerWritableStream` - * that the writable's chunks ultimately land on. Downstream reducers (most - * importantly `getExternalReducers.WritableStream`, which `start()` uses to - * dehydrate workflow arguments) consult this symbol so they can bridge to - * the original server stream without piping through the user's - * already-installed serialize transform — that's what previously produced - * a second devalue framing layer on every chunk the child wrote. + * record the `runId` of the workflow run that owns the underlying server + * stream. Used together with `STREAM_NAME_SYMBOL`. + * + * When `getExternalReducers.WritableStream` (the dehydration path used by + * `start()`) sees both symbols on a writable, it includes the `runId` in + * the descriptor it emits. The child run's step-side reviver then opens + * a server writable against the original `(runId, name)` and resolves + * that run's encryption key directly — so the child's writes land on + * the parent's stream as-is, with no client process in the loop. That + * keeps the forwarding alive for the full lifetime of the child run, + * not just for the parent step that initiated `start()`. */ export const STREAM_SERVER_RUN_ID_SYMBOL = Symbol.for( 'WORKFLOW_STREAM_SERVER_RUN_ID' From 4dce1473327a985d0be8e6bc4ae5fc655ef14072 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 21 May 2026 15:18:27 -0700 Subject: [PATCH 4/4] test: rename writable-forwarded workflows and cover step-context getWritable() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR review: - Rename writableForwardedToChildChildWorkflow → writableForwardedChildWorkflow (drops the duplicated 'Child' segment). - Split writableForwardedToChildWorkflow into two variants covered by a test.each: writableForwardedFromWorkflowWorkflow (workflow-context getWritable, the original test) and writableForwardedFromStepWorkflow (step-context getWritable passed directly into start() from the same step that called getWritable()). - Terser changeset description. --- .changeset/tired-pigs-hug.md | 2 +- packages/core/e2e/e2e.test.ts | 21 +++++++---- workbench/example/workflows/99_e2e.ts | 53 ++++++++++++++++++++------- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/.changeset/tired-pigs-hug.md b/.changeset/tired-pigs-hug.md index de5fa63f89..068dc24f2e 100644 --- a/.changeset/tired-pigs-hug.md +++ b/.changeset/tired-pigs-hug.md @@ -3,4 +3,4 @@ "workflow": minor --- -Support forwarding a `WritableStream` (from a workflow's `getWritable()`) as an argument to a child workflow via `start()`. The child run's writes land on the parent run's stream directly — encrypted with the parent run's key — for the full lifetime of the child run, with no in-process bridge tied to the parent step that invoked `start()`. +A `WritableStream` from a workflow's `getWritable()` can now be passed as an argument to a child workflow via `start()`; the child's writes land on the parent run's stream directly for the full lifetime of the child run. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 70f34a1765..23021b4107 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -929,12 +929,19 @@ describe('e2e', () => { // A WritableStream passed as a workflow argument to start() should // land raw bytes on the parent's output stream when the child step - // writes to it. - test('writableForwardedToChildWorkflow', { timeout: 120_000 }, async () => { + // writes to it. Covered for both: + // - `writableForwardedFromWorkflowWorkflow`: parent calls + // `getWritable()` in workflow context (fake handle revived in the + // intermediary step). + // - `writableForwardedFromStepWorkflow`: parent calls `getWritable()` + // in step context (real `serialize.writable` passed straight to + // `start()`). + test.each([ + 'writableForwardedFromWorkflowWorkflow', + 'writableForwardedFromStepWorkflow', + ] as const)('%s', { timeout: 120_000 }, async (workflowName) => { const payload = `hello-from-child-${Date.now()}\n`; - const run = await start(await e2e('writableForwardedToChildWorkflow'), [ - payload, - ]); + const run = await start(await e2e(workflowName), [payload]); const reader = run.getReadable().getReader(); // `fatal: true` makes the decoder throw on any invalid UTF-8 @@ -954,8 +961,8 @@ describe('e2e', () => { expect(value.byteLength).toBe(expectedBytes.byteLength); expect(decoder.decode(value)).toBe(payload); - // Default stream should close cleanly after the parent's - // stepCloseOutputStream call. + // Default stream should close cleanly after the parent closes its + // writable. expect((await reader.read()).done).toBe(true); const returnValue = await run.returnValue; diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index f06674beff..af4d52af0f 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -3042,39 +3042,43 @@ export class DistributedAbortController { // WritableStream passed as argument to start() // // A parent workflow gets a WritableStream from getWritable() (its own -// output stream), and passes it through a step that calls -// `start(childWorkflow, [args, parentWritable])`. The child workflow -// receives the WritableStream as a workflow argument and forwards it -// into a step, which writes raw Uint8Array bytes to it. +// output stream), and passes it through `start()` to a child +// workflow. The child workflow receives the WritableStream as a +// workflow argument and forwards it into a step, which writes raw +// Uint8Array bytes to it. // // The external reader on `parentRun.getReadable()` should observe the // exact bytes the child step wrote. -async function childStepWritesToParentWritable( - parentWritable: WritableStream, +async function writeBytesToWritable( + writable: WritableStream, payload: string ) { 'use step'; - const writer = parentWritable.getWriter(); + const writer = writable.getWriter(); await writer.write(new TextEncoder().encode(payload)); writer.releaseLock(); } -export async function writableForwardedToChildChildWorkflow( +export async function writableForwardedChildWorkflow( parentWritable: WritableStream, payload: string ) { 'use workflow'; - await childStepWritesToParentWritable(parentWritable, payload); + await writeBytesToWritable(parentWritable, payload); return 'child-done'; } -async function spawnChildAndForwardWritable( +// Variant 1: the parent calls `getWritable()` in the workflow body +// (workflow-context handle), passes the resulting fake handle through +// `start()`. The intermediary step that calls `start()` only exists +// because `start()` cannot be invoked from workflow code directly. +async function startChildWithWorkflowWritable( parentWritable: WritableStream, payload: string ) { 'use step'; - const childRun = await start(writableForwardedToChildChildWorkflow, [ + const childRun = await start(writableForwardedChildWorkflow, [ parentWritable, payload, ]); @@ -3084,10 +3088,33 @@ async function spawnChildAndForwardWritable( return childRun.runId; } -export async function writableForwardedToChildWorkflow(payload: string) { +export async function writableForwardedFromWorkflowWorkflow(payload: string) { 'use workflow'; const writable = getWritable(); - const childRunId = await spawnChildAndForwardWritable(writable, payload); + const childRunId = await startChildWithWorkflowWritable(writable, payload); await stepCloseOutputStream(writable); return { childRunId }; } + +// Variant 2: the parent's `getWritable()` is called inside the step +// that also calls `start()`, so the writable handed to the child is +// the real step-context `serialize.writable` (not a workflow-context +// fake handle that's later revived by a step). This exercises the +// step-side `getWritable()` tagging path directly. +async function startChildWithStepWritable(payload: string) { + 'use step'; + const writable = getWritable(); + const childRun = await start(writableForwardedChildWorkflow, [ + writable, + payload, + ]); + await childRun.returnValue; + await writable.close(); + return childRun.runId; +} + +export async function writableForwardedFromStepWorkflow(payload: string) { + 'use workflow'; + const childRunId = await startChildWithStepWritable(payload); + return { childRunId }; +}