-
Notifications
You must be signed in to change notification settings - Fork 265
Attributes MVP (experimental and write-only) #2088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5183e92
f500572
dd8b01d
260febb
64621ba
8a87a8d
7f45c2c
0281f43
e5fd716
c58fd38
928cfc5
fbb0c59
a8d07a5
f88260e
1a213f9
93afb1b
45aa4c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| --- | ||
| '@workflow/core': patch | ||
| '@workflow/world': patch | ||
| '@workflow/world-local': patch | ||
| '@workflow/world-postgres': patch | ||
| '@workflow/world-vercel': patch | ||
| 'workflow': patch | ||
| --- | ||
|
|
||
| Add experimental `setAttributes()` for attaching plaintext string key/value metadata to a workflow run. Callable from a workflow body; the call is dispatched as a step so the mutation is recorded on the event log. |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| import { FatalError } from '@workflow/errors'; | ||
| import { describe, expect, it } from 'vitest'; | ||
| import { setAttributes } from './set-attributes.js'; | ||
|
|
||
| describe('setAttributes (host-side stub)', () => { | ||
| // The host-side `setAttributes` is the fallback resolved when callers | ||
| // are NOT in the workflow VM. The real implementation lives in | ||
| // `workflow/set-attributes.ts` and is selected via the `workflow` | ||
| // package-exports condition. Reaching this file from a step body or | ||
| // plain host code is unsupported and must surface a clear error. | ||
| it('throws FatalError telling the user setAttributes is workflow-body only', async () => { | ||
| await expect(setAttributes({ phase: 'init' })).rejects.toBeInstanceOf( | ||
| FatalError | ||
| ); | ||
| await expect(setAttributes({ phase: 'init' })).rejects.toThrow( | ||
| /workflow.*function/i | ||
| ); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| import { FatalError } from '@workflow/errors'; | ||
|
|
||
| /** | ||
| * Host-side stub for `setAttributes`. The real implementation lives in | ||
| * `./workflow/set-attributes.ts` and is selected by the `workflow` | ||
| * package-exports condition when the workflow VM bundle is resolved. | ||
| * | ||
| * Reaching this stub means `setAttributes` was called outside a workflow | ||
| * body — most likely from a `'use step'` function or plain host code. | ||
| * That isn't supported: attribute mutations must be event-sourced | ||
| * through the workflow runtime so they survive replay. | ||
| */ | ||
| export async function setAttributes( | ||
| _attrs: Record<string, string | undefined> | ||
| ): Promise<void> { | ||
| throw new FatalError( | ||
| "setAttributes() must be called from a 'use workflow' function. " + | ||
| 'Calling it from a step body or plain host code is not supported.' | ||
| ); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| import { FatalError } from '@workflow/errors'; | ||
| import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; | ||
| import { WORKFLOW_USE_STEP } from '../symbols.js'; | ||
| import { setAttributes } from './set-attributes.js'; | ||
|
|
||
| describe('workflow.setAttributes', () => { | ||
| const dispatchCalls: Array<{ | ||
| stepName: string; | ||
| changes: Array<{ key: string; value: string | null }>; | ||
| }> = []; | ||
|
|
||
| beforeEach(() => { | ||
| dispatchCalls.length = 0; | ||
| (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP] = vi.fn( | ||
| (stepName: string) => | ||
| async (changes: Array<{ key: string; value: string | null }>) => { | ||
| dispatchCalls.push({ stepName, changes }); | ||
| } | ||
| ); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| delete (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP]; | ||
| }); | ||
|
|
||
| it('dispatches normalized changes through __builtin_set_attributes', async () => { | ||
| await setAttributes({ phase: 'init', orderId: 'ord_1' }); | ||
| expect(dispatchCalls).toEqual([ | ||
| { | ||
| stepName: '__builtin_set_attributes', | ||
| changes: [ | ||
| { key: 'phase', value: 'init' }, | ||
| { key: 'orderId', value: 'ord_1' }, | ||
| ], | ||
| }, | ||
| ]); | ||
| }); | ||
|
|
||
| it('translates undefined values into null (unset semantics)', async () => { | ||
| await setAttributes({ phase: 'done', stale: undefined }); | ||
| expect(dispatchCalls).toEqual([ | ||
| { | ||
| stepName: '__builtin_set_attributes', | ||
| changes: [ | ||
| { key: 'phase', value: 'done' }, | ||
| { key: 'stale', value: null }, | ||
| ], | ||
| }, | ||
| ]); | ||
| }); | ||
|
|
||
| it('is a no-op for an empty record (no dispatch)', async () => { | ||
| await setAttributes({}); | ||
| expect(dispatchCalls).toHaveLength(0); | ||
| }); | ||
|
|
||
| it('throws FatalError when the workflow runtime has not initialized useStep', async () => { | ||
| delete (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP]; | ||
| await expect(setAttributes({ phase: 'init' })).rejects.toBeInstanceOf( | ||
| FatalError | ||
| ); | ||
| }); | ||
|
|
||
| it('throws FatalError for reserved-prefix keys before any dispatch', async () => { | ||
| await expect(setAttributes({ $sys: 'x' })).rejects.toBeInstanceOf( | ||
| FatalError | ||
| ); | ||
| expect(dispatchCalls).toHaveLength(0); | ||
| }); | ||
|
|
||
| it('throws FatalError when called with a non-object', async () => { | ||
| await expect( | ||
| setAttributes(null as unknown as Record<string, string>) | ||
| ).rejects.toBeInstanceOf(FatalError); | ||
| await expect( | ||
| setAttributes([] as unknown as Record<string, string>) | ||
| ).rejects.toBeInstanceOf(FatalError); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| import { FatalError } from '@workflow/errors'; | ||
| import { | ||
| type AttributeChange, | ||
| AttributeValidationError, | ||
| validateAttributeChanges, | ||
| } from '@workflow/world'; | ||
| import { WORKFLOW_USE_STEP } from '../symbols.js'; | ||
|
|
||
| /** | ||
| * Attach plaintext string key/value metadata to the current workflow run. | ||
| * | ||
| * **EXPERIMENTAL.** Callable only from a workflow body (`'use workflow'`). | ||
| * The call is dispatched through the workflow runtime as a step, so the | ||
| * mutation is recorded in the event log and survives replay. | ||
| * | ||
| * Validation runs in the VM (cheap, deterministic) before the step | ||
| * dispatch — violations throw `FatalError` without queuing a step. An | ||
| * empty record is a no-op. `value: undefined` removes the key from the | ||
| * run's attribute map. | ||
| * | ||
| * **WARNING**: While this feature is experimental, calling e.g. | ||
| * `Promise.all([setAttributes({ a: '1' }), setAttributes({ a: '2' })])` | ||
| * is not guaranteed to be ordered consistently, but | ||
| * `await setAttributes({ a: '1' }).then(() => setAttributes({ a: '2' }))` | ||
| * is. | ||
| * | ||
| * @example | ||
| * ```ts | ||
| * export async function myWorkflow() { | ||
| * 'use workflow'; | ||
| * await setAttributes({ phase: 'init' }); | ||
| * // ... work ... | ||
| * await setAttributes({ phase: 'done', orderId: 'ord_123' }); | ||
| * await setAttributes({ orderId: undefined }); // remove | ||
| * } | ||
| * ``` | ||
| */ | ||
| export async function setAttributes( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming nit: prefer The repo doesn't yet have an established Suggest applying the same convention to the world-interface method too: |
||
| attrs: Record<string, string | undefined> | ||
| ): Promise<void> { | ||
| if (attrs === null || typeof attrs !== 'object' || Array.isArray(attrs)) { | ||
| throw new FatalError( | ||
| `setAttributes requires a plain object, got ${ | ||
| attrs === null ? 'null' : Array.isArray(attrs) ? 'array' : typeof attrs | ||
| }` | ||
| ); | ||
| } | ||
| const changes: AttributeChange[] = Object.entries(attrs).map( | ||
| ([key, value]) => ({ | ||
| key, | ||
| value: value === undefined ? null : value, | ||
| }) | ||
| ); | ||
| if (changes.length === 0) return; | ||
| try { | ||
| validateAttributeChanges(changes); | ||
| } catch (err) { | ||
| if (err instanceof AttributeValidationError) { | ||
| throw new FatalError(err.message); | ||
| } | ||
| throw err; | ||
| } | ||
| const useStep = (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP] as | ||
| | ((stepName: string) => (changes: AttributeChange[]) => Promise<void>) | ||
| | undefined; | ||
| if (!useStep) { | ||
| throw new FatalError( | ||
| 'setAttributes() called outside a workflow runtime context. ' + | ||
| 'It must be called from within a workflow body (`use workflow`).' | ||
| ); | ||
| } | ||
| await useStep('__builtin_set_attributes')(changes); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please verify the bare-name step lookup for The workflow VM dispatches with the bare name
That The new e2e ( |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,3 +20,54 @@ export async function __builtin_response_text(this: Request | Response) { | |
| 'use step'; | ||
| return this.text(); | ||
| } | ||
|
|
||
| /** | ||
| * Step bridge for workflow-body `setAttributes` calls. The VM-side | ||
| * helper validates input and dispatches here via `useStep`. This step | ||
| * runs in normal Node context with full world access. | ||
| * | ||
| * The dispatch reads the world and current run id directly from | ||
| * `globalThis` symbols populated by the workflow/step runtime — this | ||
| * intentionally avoids importing `@workflow/core` so the Next.js | ||
| * deferred-entries discoverer can't walk a chain into world adapters | ||
| * and `@vercel/queue` from this step file. | ||
| */ | ||
| export async function __builtin_set_attributes( | ||
| changes: Array<{ key: string; value: string | null }> | ||
| ) { | ||
| 'use step'; | ||
| if (changes.length === 0) return; | ||
| const g = globalThis as Record<symbol, unknown>; | ||
|
|
||
| const contextStorage = g[Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE')] as | ||
| | { | ||
| getStore: () => | ||
| | { workflowMetadata?: { workflowRunId?: string } } | ||
| | undefined; | ||
| } | ||
| | undefined; | ||
| const runId = contextStorage?.getStore?.()?.workflowMetadata?.workflowRunId; | ||
| if (!runId) { | ||
| throw new Error( | ||
| '__builtin_set_attributes: no workflow run id available in step context' | ||
| ); | ||
| } | ||
|
|
||
| const world = g[Symbol.for('@workflow/world//cache')] as | ||
| | { | ||
| runs?: { | ||
| experimentalSetAttributes?: ( | ||
| runId: string, | ||
| changes: Array<{ key: string; value: string | null }> | ||
| ) => Promise<unknown>; | ||
| }; | ||
| } | ||
| | undefined; | ||
| if (typeof world?.runs?.experimentalSetAttributes !== 'function') { | ||
| // World adapter doesn't implement attributes yet — silently no-op. | ||
| // The VM-side validation already ran, so input was well-formed. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Silent no-op contradicts the documented behavior. The MVP changelog ( This code returns silently with no log/warning. A user calling At minimum, emit |
||
| return; | ||
| } | ||
|
|
||
| await world.runs.experimentalSetAttributes(runId, changes); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the workflow-body-only restriction load-bearing, or scope?
Step-body support looks like ~10 lines: the host stub would do exactly what
__builtin_set_attributesalready does — read the runId fromSymbol.for('WORKFLOW_STEP_CONTEXT_STORAGE'), read the world fromSymbol.for('@workflow/world//cache'), validate, dispatch directly. Both symbols are already populated when a step body runs (the bridge step proves it).Plain host code outside any run genuinely can't be supported with this signature (no runId to infer) — that part isn't arbitrary, you'd need a separate
experimental_setRunAttributes(runId, {...}). But step body is just a scope decision.Trade-offs of allowing step body:
What you'd gain:
setAttributesworks "anywhere inside a run."What you'd lose:
step_created/step_completedpair via the__builtin_set_attributesbridge; step-body calls would write directly with no extra events. Replay determinism is unaffected (step bodies aren't re-executed during replay anyway).Forward-compat with #1933 is fine either way — the planned
attr_setevent'swriter: { type: 'step', stepId, attempt }discriminator already accounts for step writers.Not asking you to add it in this PR — but if the restriction is just scope-cut rather than something the architecture is leaning on, please mention that in the changelog so it doesn't read as a hard architectural constraint that has to stay until V1.