Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/attributes-mvp-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@workflow/core': patch
'@workflow/world': patch
'@workflow/world-local': patch
'@workflow/world-postgres': patch
'@workflow/world-vercel': patch
'workflow': patch
---

Add experimental `setAttributes()` for attaching plaintext string key/value metadata to a workflow run. Callable from a workflow body; the call is dispatched as a step so the mutation is recorded on the event log.
3 changes: 3 additions & 0 deletions docs/content/docs/v4/cookbook/common-patterns/timeouts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ export async function waitForApproval(requestId: string) {
throw new Error("Approval request expired after 7 days");
}

// You may see warnings like `Workflow run completed with 1 uncommitted operations` in your
// logs when the workflow completes. This is expected behavior.

return result.approved;
}
```
Expand Down
282 changes: 282 additions & 0 deletions docs/content/docs/v5/changelog/attributes-mvp.mdx

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3430,4 +3430,44 @@ describe('e2e', () => {
expect(returnValue.reason).toBe('Test complete');
}
);

// ==========================================================================
// setAttributes (experimental MVP)
// ==========================================================================

describe('setAttributes', () => {
test(
'setAttributesWorkflow: workflow-body calls dispatch through the step bridge and merge correctly',
{ timeout: 30_000 },
async () => {
const run = await start(await e2e('setAttributesWorkflow'), [7]);
const output = await run.returnValue;
expect(output).toBe(21);

const world = await getWorld();
const persisted = await world.runs.get(run.runId);

// First call sets {phase: 'init', source: 'workflow-body'}; second
// overwrites phase; third unsets source via undefined → null.
expect(persisted?.attributes).toEqual({ phase: 'done' });
expect(persisted?.attributes ?? {}).not.toHaveProperty('source');

// Dispatch is via a real step — verify at least one
// `step_created`/`step_completed` pair for the `__builtin_set_attributes`
// step exists on the run's event log.
const { data: events } = await world.events.list({ runId: run.runId });
const attrStepEvents = events.filter(
(e) =>
(e.eventType === 'step_created' ||
e.eventType === 'step_completed') &&
typeof (e.eventData as { stepName?: string } | undefined)
?.stepName === 'string' &&
(e.eventData as { stepName: string }).stepName.includes(
'__builtin_set_attributes'
)
);
expect(attrStepEvents.length).toBeGreaterThanOrEqual(2);
}
);
});
});
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export {
type WebhookOptions,
} from './create-hook.js';
export { defineHook, type TypedHook } from './define-hook.js';
export { setAttributes } from './set-attributes.js';
export { sleep } from './sleep.js';
export {
getStepMetadata,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export async function loadWorkflowRunEvents(
// Preserve the last non-null cursor across pages. A World may
// legitimately return `{ data: [], cursor: null, hasMore: false }`
// on a trailing empty page — for example when the previous page's
// underlying DynamoDB query hit `Limit` exactly and returned a
// underlying DB query hit the limit exactly and returned a
// `LastEvaluatedKey` "just in case". Overwriting with that null
// would lose the position past the last real event we loaded and
// force the runtime into the "no cursor after initial load" full-
Expand Down
19 changes: 19 additions & 0 deletions packages/core/src/set-attributes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { FatalError } from '@workflow/errors';
import { describe, expect, it } from 'vitest';
import { setAttributes } from './set-attributes.js';

describe('setAttributes (host-side stub)', () => {
// The host-side `setAttributes` is the fallback resolved when callers
// are NOT in the workflow VM. The real implementation lives in
// `workflow/set-attributes.ts` and is selected via the `workflow`
// package-exports condition. Reaching this file from a step body or
// plain host code is unsupported and must surface a clear error.
it('throws FatalError telling the user setAttributes is workflow-body only', async () => {
await expect(setAttributes({ phase: 'init' })).rejects.toBeInstanceOf(
FatalError
);
await expect(setAttributes({ phase: 'init' })).rejects.toThrow(
/workflow.*function/i
);
});
});
20 changes: 20 additions & 0 deletions packages/core/src/set-attributes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { FatalError } from '@workflow/errors';

/**
* Host-side stub for `setAttributes`. The real implementation lives in
* `./workflow/set-attributes.ts` and is selected by the `workflow`
* package-exports condition when the workflow VM bundle is resolved.
*
* Reaching this stub means `setAttributes` was called outside a workflow
* body — most likely from a `'use step'` function or plain host code.
* That isn't supported: attribute mutations must be event-sourced
* through the workflow runtime so they survive replay.
*/
export async function setAttributes(
_attrs: Record<string, string | undefined>
): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the workflow-body-only restriction load-bearing, or scope?

Step-body support looks like ~10 lines: the host stub would do exactly what __builtin_set_attributes already does — read the runId from Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE'), read the world from Symbol.for('@workflow/world//cache'), validate, dispatch directly. Both symbols are already populated when a step body runs (the bridge step proves it).

const ctx = globalThis[Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE')]?.getStore?.();
const runId = ctx?.workflowMetadata?.workflowRunId;
if (!runId) throw new FatalError('must be called from workflow or step body');
const world = globalThis[Symbol.for('@workflow/world//cache')];
await world?.runs?.experimentalSetAttributes?.(runId, normalizedChanges);

Plain host code outside any run genuinely can't be supported with this signature (no runId to infer) — that part isn't arbitrary, you'd need a separate experimental_setRunAttributes(runId, {...}). But step body is just a scope decision.

Trade-offs of allowing step body:

What you'd gain:

  • Users writing a step that already has the data don't have to bubble values back to the workflow body to set an attribute.
  • The natural ergonomic model — setAttributes works "anywhere inside a run."

What you'd lose:

  • The "single dispatch path" simplicity the PR description cites.
  • Event-log visibility of the dispatch: workflow-body calls produce a step_created/step_completed pair via the __builtin_set_attributes bridge; step-body calls would write directly with no extra events. Replay determinism is unaffected (step bodies aren't re-executed during replay anyway).

Forward-compat with #1933 is fine either way — the planned attr_set event's writer: { type: 'step', stepId, attempt } discriminator already accounts for step writers.

Not asking you to add it in this PR — but if the restriction is just scope-cut rather than something the architecture is leaning on, please mention that in the changelog so it doesn't read as a hard architectural constraint that has to stay until V1.

throw new FatalError(
"setAttributes() must be called from a 'use workflow' function. " +
'Calling it from a step body or plain host code is not supported.'
);
}
8 changes: 4 additions & 4 deletions packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
WorkflowRuntimeError,
} from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { getPortLazy } from './runtime/get-port-lazy.js';
import { parseWorkflowName } from '@workflow/utils/parse-name';
import type { Event, WorkflowRun } from '@workflow/world';
import * as nanoid from 'nanoid';
Expand All @@ -16,9 +15,10 @@ import { EventConsumerResult, EventsConsumer } from './events-consumer.js';
import type { QueueItem } from './global.js';
import { ENOTSUP, WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
import type { WorkflowOrchestratorContext } from './private.js';
import { getPortLazy } from './runtime/get-port-lazy.js';
import { handleSuspension } from './runtime/suspension-handler.js';
import { getWorld } from './runtime/world.js';
import type { WorkflowOrchestratorContext } from './private.js';
import {
dehydrateWorkflowReturnValue,
hydrateWorkflowArguments,
Expand All @@ -36,12 +36,12 @@ import * as Attribute from './telemetry/semantic-conventions.js';
import { trace } from './telemetry.js';
import { getWorkflowRunStreamId } from './util.js';
import { createContext } from './vm/index.js';
import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js';
import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js';
import {
createAbortSignalStatics,
createCreateAbortController,
} from './workflow/abort-controller.js';
import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js';
import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js';
import { createCreateHook } from './workflow/hook.js';
import { createSleep } from './workflow/sleep.js';

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/workflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
type RetryableErrorOptions,
} from '@workflow/errors';
export type { Hook, HookOptions } from '../create-hook.js';
export { setAttributes } from './set-attributes.js';
export { sleep } from '../sleep.js';
export { createHook, createWebhook } from './create-hook.js';
export { defineHook } from './define-hook.js';
Expand Down
79 changes: 79 additions & 0 deletions packages/core/src/workflow/set-attributes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { FatalError } from '@workflow/errors';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { WORKFLOW_USE_STEP } from '../symbols.js';
import { setAttributes } from './set-attributes.js';

describe('workflow.setAttributes', () => {
const dispatchCalls: Array<{
stepName: string;
changes: Array<{ key: string; value: string | null }>;
}> = [];

beforeEach(() => {
dispatchCalls.length = 0;
(globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP] = vi.fn(
(stepName: string) =>
async (changes: Array<{ key: string; value: string | null }>) => {
dispatchCalls.push({ stepName, changes });
}
);
});

afterEach(() => {
delete (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP];
});

it('dispatches normalized changes through __builtin_set_attributes', async () => {
await setAttributes({ phase: 'init', orderId: 'ord_1' });
expect(dispatchCalls).toEqual([
{
stepName: '__builtin_set_attributes',
changes: [
{ key: 'phase', value: 'init' },
{ key: 'orderId', value: 'ord_1' },
],
},
]);
});

it('translates undefined values into null (unset semantics)', async () => {
await setAttributes({ phase: 'done', stale: undefined });
expect(dispatchCalls).toEqual([
{
stepName: '__builtin_set_attributes',
changes: [
{ key: 'phase', value: 'done' },
{ key: 'stale', value: null },
],
},
]);
});

it('is a no-op for an empty record (no dispatch)', async () => {
await setAttributes({});
expect(dispatchCalls).toHaveLength(0);
});

it('throws FatalError when the workflow runtime has not initialized useStep', async () => {
delete (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP];
await expect(setAttributes({ phase: 'init' })).rejects.toBeInstanceOf(
FatalError
);
});

it('throws FatalError for reserved-prefix keys before any dispatch', async () => {
await expect(setAttributes({ $sys: 'x' })).rejects.toBeInstanceOf(
FatalError
);
expect(dispatchCalls).toHaveLength(0);
});

it('throws FatalError when called with a non-object', async () => {
await expect(
setAttributes(null as unknown as Record<string, string>)
).rejects.toBeInstanceOf(FatalError);
await expect(
setAttributes([] as unknown as Record<string, string>)
).rejects.toBeInstanceOf(FatalError);
});
});
73 changes: 73 additions & 0 deletions packages/core/src/workflow/set-attributes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { FatalError } from '@workflow/errors';
import {
type AttributeChange,
AttributeValidationError,
validateAttributeChanges,
} from '@workflow/world';
import { WORKFLOW_USE_STEP } from '../symbols.js';

/**
* Attach plaintext string key/value metadata to the current workflow run.
*
* **EXPERIMENTAL.** Callable only from a workflow body (`'use workflow'`).
* The call is dispatched through the workflow runtime as a step, so the
* mutation is recorded in the event log and survives replay.
*
* Validation runs in the VM (cheap, deterministic) before the step
* dispatch — violations throw `FatalError` without queuing a step. An
* empty record is a no-op. `value: undefined` removes the key from the
* run's attribute map.
*
* **WARNING**: While this feature is experimental, calling e.g.
* `Promise.all([setAttributes({ a: '1' }), setAttributes({ a: '2' })])`
* is not guaranteed to be ordered consistently, but
* `await setAttributes({ a: '1' }).then(() => setAttributes({ a: '2' }))`
* is.
*
* @example
* ```ts
* export async function myWorkflow() {
* 'use workflow';
* await setAttributes({ phase: 'init' });
* // ... work ...
* await setAttributes({ phase: 'done', orderId: 'ord_123' });
* await setAttributes({ orderId: undefined }); // remove
* }
* ```
*/
export async function setAttributes(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit: prefer experimental_setAttributes for the public export.

The repo doesn't yet have an established experimental_ / unstable_ convention, but adopting one here would make the "this WILL change" signal much louder at the call site — await setAttributes({...}) reads as production-ready, await experimental_setAttributes({...}) does not. This is the Next.js / React pattern (unstable_*, experimental_*) and it's a single codemod to remove when V1 stabilizes the API — which doubles as a forcing function for every consumer to re-review the new contract on the rename.

Suggest applying the same convention to the world-interface method too: experimental_setAttributes? rather than experimentalSetAttributes? in packages/world/src/interfaces.ts. Consistency across the surface, and the underscore separator makes the experimental marker scan visually rather than blending into a camelCase identifier.

attrs: Record<string, string | undefined>
): Promise<void> {
if (attrs === null || typeof attrs !== 'object' || Array.isArray(attrs)) {
throw new FatalError(
`setAttributes requires a plain object, got ${
attrs === null ? 'null' : Array.isArray(attrs) ? 'array' : typeof attrs
}`
);
}
const changes: AttributeChange[] = Object.entries(attrs).map(
([key, value]) => ({
key,
value: value === undefined ? null : value,
})
);
if (changes.length === 0) return;
try {
validateAttributeChanges(changes);
} catch (err) {
if (err instanceof AttributeValidationError) {
throw new FatalError(err.message);
}
throw err;
}
const useStep = (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP] as
| ((stepName: string) => (changes: AttributeChange[]) => Promise<void>)
| undefined;
if (!useStep) {
throw new FatalError(
'setAttributes() called outside a workflow runtime context. ' +
'It must be called from within a workflow body (`use workflow`).'
);
}
await useStep('__builtin_set_attributes')(changes);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please verify the bare-name step lookup for __builtin_set_attributes actually resolves under the production SWC + step-handler pipeline.

The workflow VM dispatches with the bare name '__builtin_set_attributes', which becomes the stepName on the step_created event. On the step worker side, packages/core/src/runtime/step-handler.ts:198 calls getStepFunction(stepName) to resolve it.

getStepFunction (packages/core/src/private.ts:103-122) tries three things in order:

  1. registeredSteps.get(stepName) — but the SWC plugin spec (spec.md:115) registers steps with full IDs like step//<modulePath>//<fnName>, so a bare-name lookup misses.
  2. getStepIdAliasCandidates(stepName) — returns [] when the input isn't already a step//x//y triple.
  3. getBuiltinResponseStepAlias(stepName) — this is the only fallback that handles bare names, and it explicitly hard-codes BUILTIN_RESPONSE_STEP_NAMES = { __builtin_response_array_buffer, __builtin_response_json, __builtin_response_text } at private.ts:30-34. __builtin_set_attributes is not in that set, so the fallback returns undefined.

That private.ts file is not touched by this PR. The way __builtin_response_* work today is precisely that they're in the allowlist; __builtin_set_attributes needs the same treatment (or a generalization of the alias to all step//*//<bareName> matches).

The new e2e (packages/core/e2e/e2e.test.ts) does seem to pass in CI, which is what makes me less than fully confident — but I can't see a code path that makes the bare-name lookup succeed. Worth running the new e2e explicitly with getStepFunction instrumentation, or just adding __builtin_set_attributes to BUILTIN_RESPONSE_STEP_NAMES (and probably renaming that set) to be safe.

}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ const attributeToDisplayFn: Record<
projectId: (_value: unknown) => null,
environment: (_value: unknown) => null,
executionContext: (_value: unknown) => null,
// Attributes MVP — string-string metadata attached to the run.
// Rendered as a JSON block; if empty/missing, hidden by the
// hasDisplayContent gate above.
attributes: (value: unknown) => {
if (!hasDisplayContent(value)) return null;
return JsonBlock(value);
},
// Dates — wrapped with TimestampTooltip showing UTC/local + relative time
createdAt: timestampWithTooltipOrNull,
startedAt: timestampWithTooltipOrNull,
Expand Down
51 changes: 51 additions & 0 deletions packages/workflow/src/internal/builtins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,54 @@ export async function __builtin_response_text(this: Request | Response) {
'use step';
return this.text();
}

/**
* Step bridge for workflow-body `setAttributes` calls. The VM-side
* helper validates input and dispatches here via `useStep`. This step
* runs in normal Node context with full world access.
*
* The dispatch reads the world and current run id directly from
* `globalThis` symbols populated by the workflow/step runtime — this
* intentionally avoids importing `@workflow/core` so the Next.js
* deferred-entries discoverer can't walk a chain into world adapters
* and `@vercel/queue` from this step file.
*/
export async function __builtin_set_attributes(
changes: Array<{ key: string; value: string | null }>
) {
'use step';
if (changes.length === 0) return;
const g = globalThis as Record<symbol, unknown>;

const contextStorage = g[Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE')] as
| {
getStore: () =>
| { workflowMetadata?: { workflowRunId?: string } }
| undefined;
}
| undefined;
const runId = contextStorage?.getStore?.()?.workflowMetadata?.workflowRunId;
if (!runId) {
throw new Error(
'__builtin_set_attributes: no workflow run id available in step context'
);
}

const world = g[Symbol.for('@workflow/world//cache')] as
| {
runs?: {
experimentalSetAttributes?: (
runId: string,
changes: Array<{ key: string; value: string | null }>
) => Promise<unknown>;
};
}
| undefined;
if (typeof world?.runs?.experimentalSetAttributes !== 'function') {
// World adapter doesn't implement attributes yet — silently no-op.
// The VM-side validation already ran, so input was well-formed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent no-op contradicts the documented behavior. The MVP changelog (docs/content/docs/v5/changelog/attributes-mvp.mdx) and the JSDoc on Storage.runs.experimentalSetAttributes (packages/world/src/interfaces.ts:177-180) both state: "the SDK detects absence and no-ops setAttributes with a warning so that third-party / community worlds continue to function without adopting the experimental API."

This code returns silently with no log/warning. A user calling setAttributes() against a community world that hasn't adopted the API will see a successful await and have no signal whatsoever that their attributes weren't persisted. That's the exact failure mode the documented warning is supposed to prevent.

At minimum, emit runtimeLogger.warn(...) (or a process-lifetime once-flag if you're worried about noise) before return. Right now the docs/code drift is also itself a maintainability concern — either implement the warning or strike the claim from the docs.

return;
}

await world.runs.experimentalSetAttributes(runId, changes);
}
Loading
Loading