diff --git a/.changeset/attributes-mvp-plan.md b/.changeset/attributes-mvp-plan.md new file mode 100644 index 0000000000..fbe8426d1c --- /dev/null +++ b/.changeset/attributes-mvp-plan.md @@ -0,0 +1,10 @@ +--- +'@workflow/core': patch +'@workflow/world': patch +'@workflow/world-local': patch +'@workflow/world-postgres': patch +'@workflow/world-vercel': patch +'workflow': patch +--- + +Add experimental `setAttributes()` for attaching plaintext string key/value metadata to a workflow run. Callable from a workflow body; the call is dispatched as a step so the mutation is recorded on the event log. diff --git a/docs/content/docs/v4/cookbook/common-patterns/timeouts.mdx b/docs/content/docs/v4/cookbook/common-patterns/timeouts.mdx index f5f05c7f16..7253d4d5b4 100644 --- a/docs/content/docs/v4/cookbook/common-patterns/timeouts.mdx +++ b/docs/content/docs/v4/cookbook/common-patterns/timeouts.mdx @@ -68,6 +68,9 @@ export async function waitForApproval(requestId: string) { throw new Error("Approval request expired after 7 days"); } + // You may see warnings like `Workflow run completed with 1 uncommitted operations` in your + // logs when the workflow completes. This is expected behavior. + return result.approved; } ``` diff --git a/docs/content/docs/v5/changelog/attributes-mvp.mdx b/docs/content/docs/v5/changelog/attributes-mvp.mdx new file mode 100644 index 0000000000..225b344dc6 --- /dev/null +++ b/docs/content/docs/v5/changelog/attributes-mvp.mdx @@ -0,0 +1,282 @@ +--- +title: Workflow Attributes (MVP, experimental) +description: A minimal, write-only subset of the planned Workflow Attributes feature, forward-compatible with the full 5.0.0 release. Lets workflow and step code attach plaintext string key/value metadata to a run. +--- + +# Workflow Attributes (MVP) + +This is a minimal, **experimental** subset of the [planned Workflow Attributes feature for 5.0.0](https://github.com/vercel/workflow/pull/1933). See [discussion #132](https://github.com/vercel/workflow/discussions/132) for broader background on the use cases and the full design space. + +The MVP lets workflow and step code attach plaintext `string → string` metadata to a run, viewable in any observability surface that reads the `WorkflowRun` entity. It is deliberately narrow: write-only, no reads from inside a run, no list/filter endpoints, no event-log representation. The wire format and SDK surface are chosen so the full 5.0.0 implementation replaces this without source-level breaking changes for end users. + +## What MVP supports + +- `setAttributes(record)` callable from a **workflow body** (`"use workflow"` function), dispatched via an internal `__builtin_set_attributes` step bridge so the mutation gets a `step_created → step_completed` event pair +- Attributes are materialized onto the `WorkflowRun` entity, plaintext, and visible via `world.runs.get()` / `world.runs.list()` and any observability UI built on top of those +- World implementations emit a side-channel observability record per write (in `world-vercel`, this hooks into the same observability/analytics pipeline already used for other run lifecycle events) + +Calling `setAttributes` from a step body or plain host code is intentionally not supported in the MVP — the host-side export throws `FatalError` directing callers back to a workflow body. This keeps the implementation a single dispatch path; step-body support can be added later without breaking the workflow-body contract. + +## What MVP does **not** support (deferred to 5.0.0) + +- Reading attributes from inside a workflow or step (`getAttribute` / `getAttributes`) +- `start(workflow, input, { attributes })` (initial attributes at run creation) +- Filtering runs by attribute value (`runs.list({ attributes: { ... } })`) +- Enumerating attribute keys or values (`listAttributeKeys`, `listAttributeValues`) +- Writer attribution / event-log history of attribute changes +- Any non-string value type +- Reserved-key (`$`-prefixed) namespace + +See [PR #1933](https://github.com/vercel/workflow/pull/1933) for the design of those features. + +## Why the MVP defers an `attr_set` event type + +The full design represents attribute changes as a new `attr_set` event type in the event log, replayed by the workflow runtime VM to reconstruct the attribute snapshot. That requires bumping `SPEC_VERSION_CURRENT`, because every world implementation (including community worlds) needs to handle the new event during replay, and the runtime's reconstruction logic gains a new case. + +A spec version bump is expensive: it gates every world adapter and ties the rollout to coordinated upgrades. We do not want to pay that cost twice — once for the MVP, again for the full feature. + +The MVP instead writes attributes via a direct entity-mutation path (outside the event log) which does not require a spec version bump. The downside is that MVP-era attributes have **no representation in the event log** and will not be visible to event-based reconstruction (e.g. a materialization rebuild). When the full feature ships, new writes use `attr_set` events; old runs created during the MVP window retain whatever attributes were materialized at the time, but their history is not recoverable. + +## Implementation plan + +### 1. `@workflow/world` — Storage interface addition + +Add an `experimentalSetAttributes` method to `Storage.runs`: + +{/*@skip-typecheck - snippet, not runnable code*/} + +```ts +runs: { + get: /* unchanged */; + list: /* unchanged */; + + /** + * Apply a set of attribute changes to a run. Merge semantics: + * keys with a string value are upserted, keys with `value: null` + * are removed. Other keys on the run are untouched. + * + * OPTIONAL. World implementations may omit this method; the SDK + * detects absence and no-ops `setAttributes` with a warning so that + * third-party / community worlds continue to function without + * adopting the experimental API. + * + * EXPERIMENTAL: this method exists as a stopgap until the + * `attr_set` event type lands. See the 5.0.0 attributes design. + */ + experimentalSetAttributes?( + runId: string, + changes: Array<{ key: string; value: string | null }> + ): Promise<{ attributes: Record }>; +} +``` + +The method is **optional** to avoid forcing every World implementation (especially community-maintained adapters such as Redis, MongoDB, Turso, and similar) to ship support before the API stabilises. World implementations that do support it return the post-merge attribute snapshot so callers — notably the SDK helper and world adapters emitting observability records — have it without a follow-up read. + +Add `attributes?: Record` to `WorkflowRunBaseSchema` in `packages/world/src/runs.ts`. Optional for backward compatibility: runs created before this field landed have no `attributes` and read as `undefined`. + +### 2. Wire format (used by `world-vercel`) + +`world-vercel` calls into a remote endpoint to persist attributes: + +``` +POST /v2/runs/:runId/attributes + +{ + "changes": [ + { "key": "phase", "value": "done" }, + { "key": "stale", "value": null } + ] +} +``` + +Response: `{ "attributes": { "phase": "done", "tenant": "t1" } }`. + +The body shape **deliberately mirrors** the eventual `attr_set` event's `eventData.changes`. When the full feature ships, this endpoint goes away — the same body shape is posted to `POST /v2/runs/:id/events` with `eventType: 'attr_set'` (plus a `writer` discriminator). No SDK signature change, no client-side migration. + +### 3. `@workflow/core` — SDK surface + +A new export from `@workflow/core` (re-exported by `workflow`): + +{/*@skip-typecheck - snippet, not runnable code*/} + +```ts +function setAttributes( + attrs: Record +): Promise +``` + +`undefined` is normalized to `null` (unset). An empty object is a no-op (no RPC, no events). + +Validation (shared helper, applied both client-side and server-side): + +- Key: 1–256 chars, must not start with `$` (reserved) +- Value: ≤ 256 bytes UTF-8 +- Maximum 64 attributes per run (validated against the post-merge snapshot when the server applies) +- Violations throw `FatalError` from `@workflow/errors` + +`setAttributes` is callable only from a workflow body: + +```ts +import { setAttributes } from 'workflow'; + +export async function myWorkflow(orderId: string) { + 'use workflow'; + await setAttributes({ phase: 'init', orderId }); + // ... + await setAttributes({ phase: 'done' }); +} +``` + +The workflow-body path validates input inside the VM and then dispatches the canonical `AttributeChange[]` through an internal `__builtin_set_attributes` step bridge — see "How workflow-body dispatch works" below. The mutation is materialized on the run entity by the step body. + +The host-side export (the one resolved when the `workflow` package-exports condition is **not** `workflow`, i.e. step bodies and plain host code) throws `FatalError` with a message pointing callers back to the workflow body. There is no step-side dispatch path in the MVP. + +#### Optional world support + +Because `runs.experimentalSetAttributes` is **optional** on the World interface (see §1), the `__builtin_set_attributes` step body checks for its presence before dispatching and silently no-ops when absent. User code does not need to feature-detect; calling `setAttributes` against an unsupporting world is safe but ineffective. + +### 4. World implementations + +#### `world-local` + +Implement `experimentalSetAttributes(runId, changes)` by reading the run's JSON file, merging the changes into `run.attributes` (set on string value, delete on null), and writing back atomically using the same per-run write gate that protects entity writes today. Apply validation server-side before merging. + +#### `world-postgres` + +Add an `attributes JSONB` column to the runs table (default `'{}'::jsonb`, NOT NULL). Apply the merge in SQL using `jsonb_set` / `jsonb_strip_nulls` so the database does the merge atomically without a read-modify-write cycle, returning the post-merge map via `RETURNING attributes`. + +#### `world-vercel` + +Pure HTTP wrapper. Calls the wire endpoint described in §2 and returns the response's `attributes`. The backing service materializes the attribute map onto its run-row storage; where the underlying data store supports atomic per-key map updates, the merge is a single atomic operation rather than a read-modify-write cycle — the same shape the future `attr_set` event handler will use, so the storage layout is forward-compatible. + +After the persistence ack, the service emits a side-channel observability record carrying the post-merge attribute snapshot, decoupled from the request path so the runtime never waits on analytics emission. + +### 5. Observability surfaces + +Because attributes are stored plaintext on the `WorkflowRun` entity, any UI that already calls `world.runs.get()` / `world.runs.list()` can read them with no additional plumbing. The render details (where attributes appear in the run-detail view, formatting, etc.) are out of scope for this MVP and tracked separately from the SDK work. + +## Trade-offs and known limitations + +### Concurrent writes to the same run + +The MVP applies **last-write-wins by arrival order**. Two concurrent `setAttributes` calls writing to the same key produce a final state matching whichever request the world processes second. There is no conditional / `expectedValue` semantic and no `unique: true` mode. + +Workflow body writes are serialized by the workflow VM (one step at a time within a single workflow body), so the concurrent case in practice arises when: + +- Multiple steps within the same workflow run write the same key in parallel (`Promise.all`). +- A step writes a key while another concurrent step is also writing. + +Both are well-defined under LWW-by-arrival, but applications that need conditional semantics should wait for the 5.0.0 release. We will not retrofit conditional writes onto the MVP path. + +### No event-log history + +Attribute changes do not appear in `world.events.list(runId)`. There is no record of *when* a key changed or *which step attempt* set it. The current snapshot on the run entity is authoritative; the history is lost. + +When the full feature ships, new writes carry writer attribution (`writer: { type: 'workflow' }` or `writer: { type: 'step', stepId, attempt }`) in their `attr_set` events. MVP-era writes will not have this — history starts at the `attr_set` cutover. + +### MVP attributes do not survive materialization rebuild + +Any tooling that reconstructs the run entity from the event log (disaster recovery, debugging, audit) will see no attributes on MVP-era runs, because the writes are not in the event log. This is the chief reason `experimentalSetAttributes` is named "experimental" — it is a known break from the otherwise-strict event-sourced model. + +The 5.0.0 path closes this gap. + +### SDK surface stability + +`setAttributes(record)` is intended to be stable across MVP and 5.0.0. User code calling it under the MVP will continue to work after the full feature lands; only the runtime dispatch path changes (`"use step"` indirection → workflow-VM-native intercept, parallel to `sleep`). + +If you need behavior the MVP does not provide (read, list, filter, initial attributes at `start`, writer attribution), wait for 5.0.0 rather than building around the MVP surface. + +## Test plan + +- Unit tests in `@workflow/core` for: + - Validation rules (length, `$` prefix, post-merge count cap) + - `setAttributes({})` is a no-op (no dispatch, no events) + - `undefined` value normalizes to a `null`-valued change on the wire + - Workflow VM with no `useStep` bound (= called outside a workflow) throws `FatalError` + - Host-side stub (resolved from step or plain code) throws `FatalError` +- `world-local` integration tests: + - `setAttributes` from a workflow body materializes onto the run row + - Merge semantics: setting + unsetting in a single call leaves the run with the expected snapshot + - Idempotency: repeated identical calls converge to the same final state +- `world-postgres` integration tests: same shape as `world-local` +- `world-vercel` contract test: verifies the request body shape matches the documented wire format +- E2E in `workbench/nextjs-turbopack`: + 1. Start a workflow + 2. Workflow body calls `setAttributes({ tenant: 't1', phase: 'init' })` + 3. Awaits a step that calls `setAttributes({ phase: 'processing', orderId: 'ord_123' })` + 4. Then `setAttributes({ phase: 'done', orderId: undefined })` + 5. Assert via `world.runs.get(runId)`: `attributes === { tenant: 't1', phase: 'done' }` + +## Migration to 5.0.0 + +When the full attributes feature ships: + +- `setAttributes` (SDK) — unchanged signature, new dispatch path +- `runs.experimentalSetAttributes` (world interface) — deprecated, then removed; replaced by `events.create(runId, { eventType: 'attr_set', eventData: { changes, writer } })` +- Wire endpoint — `POST /v2/runs/:runId/attributes` removed; the same body shape posts to `POST /v2/runs/:id/events` +- Pre-existing attribute values on MVP-era runs remain on the run entity but are not represented in the event log + +Skew protection means workflows started under the MVP will continue to run with the MVP dispatch path on their original deployment. New deployments use the new path. No in-place data migration is needed. + +## Implementation notes (decisions made during the MVP build-out) + +This section records concrete decisions taken while landing the MVP that weren't in the original plan, so the next iteration has them in one place. + +### How workflow-body dispatch works + +`setAttributes` from a workflow body is wired through an internal built-in step, `__builtin_set_attributes`, defined in `packages/workflow/src/internal/builtins.ts` alongside the other workflow-side builtins (`__builtin_response_json`, etc.). The mechanism: + +1. The workflow VM bundle resolves `setAttributes` to `packages/core/src/workflow/set-attributes.ts` (via the `workflow` package-exports condition). +2. That helper validates the input record inline (no shared helper, no cross-file dependency from a 'use step' file) and produces canonical `AttributeChange[]`. +3. It dispatches through the standard workflow-VM step mechanism: `globalThis[WORKFLOW_USE_STEP]('__builtin_set_attributes')(changes)`. The `useStep` dispatcher is the same one used by every other step call from a workflow body, populated by `packages/core/src/workflow.ts` at VM bootstrap. +4. The dispatch queues a step (`step_created`), the host runs `__builtin_set_attributes(changes)` from `packages/workflow/src/internal/builtins.ts`. The step body reads the active world and current run id directly from `globalThis` symbols (`Symbol.for('@workflow/world//cache')` and `Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE')`) — populated by the host runtime — and calls `world.runs.experimentalSetAttributes(runId, changes)`. +5. The step completes (`step_completed`), the workflow resumes. + +This puts the mutation on the event log as a normal `step_created → step_completed` pair without inventing a new event type — that stays for the full 5.0.0 cutover. + +The step body intentionally does **not** import anything from `@workflow/core`. That keeps the Next.js deferred-entries discoverer from walking a `__builtin_set_attributes` → `@workflow/core/...` → world adapter → `@vercel/queue` chain, which earlier drafts triggered (blowing the call stack of webpack's regex-based extractor with `RangeError: Maximum call stack size exceeded at RegExpStringIterator.next` on tarball-installed `nextjs-webpack` builds). + +The host-side `setAttributes` export (`packages/core/src/set-attributes.ts`, resolved by everything that isn't the workflow VM) throws `FatalError` with a message pointing the caller back to a workflow body. Step-body support can be added in a follow-up without changing this contract. + +When the full 5.0.0 attributes feature lands, `__builtin_set_attributes` is replaced by an `events.create(runId, { eventType: 'attr_set', ... })` dispatch path; SDK signatures don't change. + +### Endpoint lives under `v2`, not a fresh namespace + +The initial draft placed the new endpoint at `POST /v3/runs/:runId/attributes`, on the assumption that introducing a new wire feature warranted a major namespace bump. In practice `world-vercel` mixes `/v1/...` and `/v2/...` endpoints already, and creating a `v3Api` subrouter just for a single endpoint would have required duplicating the auth / flags / rate-limit middleware stack. The MVP endpoint is therefore mounted under the existing `v2Api`. The wire body shape is unchanged, so the migration path described above (rerouting from `/v2/runs/:runId/attributes` to `/v2/runs/:id/events`) still holds — just within the same namespace. + +### Concurrent writes: read-modify-write, not per-key atomic + +The plan called for per-key atomic `UpdateExpression` updates (`SET #attrs.#k = :v` / `REMOVE #attrs.#k`) in the `world-vercel` backing store, on the basis that it eliminates the read-modify-write race. The MVP ships with the simpler read-modify-write path instead: + +- **In `world-postgres`** the SQL-side `jsonb_set` / `-` chain *is* used and is genuinely atomic on the run row, so the only race is the cap check (a separate `SELECT`). Documented as LWW-by-arrival for the cap; the merge itself is atomic. +- **In the `world-vercel` backing service** the attributes column is laid out as a native key-addressable map so the atomic `UpdateItem` variant can be enabled later without a data migration. The MVP commits the merged map via the existing entity update path. Two concurrent writers therefore race; whichever lands second wins on shared keys, and any write to a non-overlapping key is preserved. +- **In `world-local`** an in-process per-run mutex serializes the read-merge-write sequence so parallel `setAttributes` calls from concurrent steps do not lose writes within a single process. There is a corresponding test that exercises 20 parallel writes to the same run. + +This is consistent with the original "concurrent writes are LWW by arrival" caveat. Promoting to per-key atomic writes is a no-API-break change once the event-sourced path lands. + +### `WORKFLOW_ATTRIBUTES` usage-fact schema: not introduced + +The plan called for a dedicated `WORKFLOW_ATTRIBUTES` usage fact carrying the post-merge snapshot. Landing that schema would have required a coordinated change to the shared usage-facts package (used by `world-vercel`'s backing service) plus an ingest-side update before the endpoint could ship. + +For the MVP the endpoint reuses the existing `WORKFLOW_EVENT` fact with `eventType: 'attr_set'`. That mirrors how other run-lifecycle events are reported, and it's enough for "did an attribute mutation happen" debugging without adding an analytics dependency to the critical path. A dedicated fact carrying the full snapshot can land alongside the event-sourced path without touching the SDK wire contract. + +### Validation rules are shared between SDK and world + +Validation lives in a single helper exported from `@workflow/world` (`validateAttributeChanges`, `validateAttributeKey`, `validateAttributeValue`). Both the SDK `setAttributes` helper and the `world-local` / `world-postgres` implementations call it; the `world-vercel` backing service applies the same rules independently. The shared module is the authoritative spec for the limits (256-char keys, 256-byte values, max 64 attributes per run, `$`-prefixed keys reserved) — any future change goes through one file. + +### Run row reconstruction had to thread `attributes` through + +`world-local`'s events storage rebuilds the run row on every lifecycle event (`run_started`, `run_completed`, `run_failed`, `run_cancelled`) by explicitly listing fields rather than spreading. Without forwarding `attributes: currentRun.attributes` through each branch, any attribute set before the run completed would be silently dropped by the next lifecycle event. The fix was local to the four reconstruction sites — the field is otherwise untouched by event-sourced code. A subtler race remains: a `setAttributes` write landing in the same async window as a `run_completed` event read can be clobbered, but that's the same LWW-by-arrival semantic documented above. + +### Optional world method: feature-detect, warn once + +`runs.experimentalSetAttributes` is optional on the `World` interface so community worlds (Redis, MongoDB, Turso, etc.) continue to build and run without adopting the experimental API. The SDK helper feature-detects the method's presence on first dispatch; if absent, it logs a single `console.warn` for the lifetime of the process and resolves silently for that call and all subsequent calls. Users do not need to feature-detect in their own code — calling `setAttributes` against an unsupporting world is safe but ineffective. + +### Tests + +- **`@workflow/world`** — 18 unit tests covering validation rules (key length, reserved prefix, value byte cap, batch cap, duplicate keys) and the `applyAttributeChanges` merge helper. +- **`@workflow/core`** — 10 unit tests covering context detection (workflow body / step body / neither), undefined-to-null normalization, empty-record no-op, validation surface (throws `FatalError`), and feature-detect-with-single-warning when the world lacks support. +- **`@workflow/world-local`** — 10 integration tests covering upsert, merge across calls, unset via null, set-and-unset in a single batch, not-found errors, all four validation flavors, and concurrent writes (per-run mutex prevents lost writes). +- **`@workflow/world-postgres`** — 3 integration tests added under the existing Postgres container suite covering upsert, merge across calls, and unset via null. The SQL-side merge (`jsonb_set` / `-`) is exercised through these. + +End-to-end coverage in the workbench app is intentionally deferred — running through the full SWC plugin + workflow VM path can land once the workflow-server preview deployment carrying the endpoint is live. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 5f00f28ae1..6d3f60b63b 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -3430,4 +3430,44 @@ describe('e2e', () => { expect(returnValue.reason).toBe('Test complete'); } ); + + // ========================================================================== + // setAttributes (experimental MVP) + // ========================================================================== + + describe('setAttributes', () => { + test( + 'setAttributesWorkflow: workflow-body calls dispatch through the step bridge and merge correctly', + { timeout: 30_000 }, + async () => { + const run = await start(await e2e('setAttributesWorkflow'), [7]); + const output = await run.returnValue; + expect(output).toBe(21); + + const world = await getWorld(); + const persisted = await world.runs.get(run.runId); + + // First call sets {phase: 'init', source: 'workflow-body'}; second + // overwrites phase; third unsets source via undefined → null. + expect(persisted?.attributes).toEqual({ phase: 'done' }); + expect(persisted?.attributes ?? {}).not.toHaveProperty('source'); + + // Dispatch is via a real step — verify at least one + // `step_created`/`step_completed` pair for the `__builtin_set_attributes` + // step exists on the run's event log. + const { data: events } = await world.events.list({ runId: run.runId }); + const attrStepEvents = events.filter( + (e) => + (e.eventType === 'step_created' || + e.eventType === 'step_completed') && + typeof (e.eventData as { stepName?: string } | undefined) + ?.stepName === 'string' && + (e.eventData as { stepName: string }).stepName.includes( + '__builtin_set_attributes' + ) + ); + expect(attrStepEvents.length).toBeGreaterThanOrEqual(2); + } + ); + }); }); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 1d969aeaa6..e5bbb0bb3b 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -25,6 +25,7 @@ export { type WebhookOptions, } from './create-hook.js'; export { defineHook, type TypedHook } from './define-hook.js'; +export { setAttributes } from './set-attributes.js'; export { sleep } from './sleep.js'; export { getStepMetadata, diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 582975af57..761df62bfb 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -354,7 +354,7 @@ export async function loadWorkflowRunEvents( // Preserve the last non-null cursor across pages. A World may // legitimately return `{ data: [], cursor: null, hasMore: false }` // on a trailing empty page — for example when the previous page's - // underlying DynamoDB query hit `Limit` exactly and returned a + // underlying DB query hit the limit exactly and returned a // `LastEvaluatedKey` "just in case". Overwriting with that null // would lose the position past the last real event we loaded and // force the runtime into the "no cursor after initial load" full- diff --git a/packages/core/src/set-attributes.test.ts b/packages/core/src/set-attributes.test.ts new file mode 100644 index 0000000000..f365c503e3 --- /dev/null +++ b/packages/core/src/set-attributes.test.ts @@ -0,0 +1,19 @@ +import { FatalError } from '@workflow/errors'; +import { describe, expect, it } from 'vitest'; +import { setAttributes } from './set-attributes.js'; + +describe('setAttributes (host-side stub)', () => { + // The host-side `setAttributes` is the fallback resolved when callers + // are NOT in the workflow VM. The real implementation lives in + // `workflow/set-attributes.ts` and is selected via the `workflow` + // package-exports condition. Reaching this file from a step body or + // plain host code is unsupported and must surface a clear error. + it('throws FatalError telling the user setAttributes is workflow-body only', async () => { + await expect(setAttributes({ phase: 'init' })).rejects.toBeInstanceOf( + FatalError + ); + await expect(setAttributes({ phase: 'init' })).rejects.toThrow( + /workflow.*function/i + ); + }); +}); diff --git a/packages/core/src/set-attributes.ts b/packages/core/src/set-attributes.ts new file mode 100644 index 0000000000..8888fd6cad --- /dev/null +++ b/packages/core/src/set-attributes.ts @@ -0,0 +1,20 @@ +import { FatalError } from '@workflow/errors'; + +/** + * Host-side stub for `setAttributes`. The real implementation lives in + * `./workflow/set-attributes.ts` and is selected by the `workflow` + * package-exports condition when the workflow VM bundle is resolved. + * + * Reaching this stub means `setAttributes` was called outside a workflow + * body — most likely from a `'use step'` function or plain host code. + * That isn't supported: attribute mutations must be event-sourced + * through the workflow runtime so they survive replay. + */ +export async function setAttributes( + _attrs: Record +): Promise { + throw new FatalError( + "setAttributes() must be called from a 'use workflow' function. " + + 'Calling it from a step body or plain host code is not supported.' + ); +} diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 43fc54566e..0990f8d573 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -6,7 +6,6 @@ import { WorkflowRuntimeError, } from '@workflow/errors'; import { withResolvers } from '@workflow/utils'; -import { getPortLazy } from './runtime/get-port-lazy.js'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import type { Event, WorkflowRun } from '@workflow/world'; import * as nanoid from 'nanoid'; @@ -16,9 +15,10 @@ import { EventConsumerResult, EventsConsumer } from './events-consumer.js'; import type { QueueItem } from './global.js'; import { ENOTSUP, WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; +import type { WorkflowOrchestratorContext } from './private.js'; +import { getPortLazy } from './runtime/get-port-lazy.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld } from './runtime/world.js'; -import type { WorkflowOrchestratorContext } from './private.js'; import { dehydrateWorkflowReturnValue, hydrateWorkflowArguments, @@ -36,12 +36,12 @@ import * as Attribute from './telemetry/semantic-conventions.js'; import { trace } from './telemetry.js'; import { getWorkflowRunStreamId } from './util.js'; import { createContext } from './vm/index.js'; -import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js'; -import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js'; import { createAbortSignalStatics, createCreateAbortController, } from './workflow/abort-controller.js'; +import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js'; +import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js'; import { createCreateHook } from './workflow/hook.js'; import { createSleep } from './workflow/sleep.js'; diff --git a/packages/core/src/workflow/index.ts b/packages/core/src/workflow/index.ts index 73b92f2fcb..e8fe14323b 100644 --- a/packages/core/src/workflow/index.ts +++ b/packages/core/src/workflow/index.ts @@ -10,6 +10,7 @@ export { type RetryableErrorOptions, } from '@workflow/errors'; export type { Hook, HookOptions } from '../create-hook.js'; +export { setAttributes } from './set-attributes.js'; export { sleep } from '../sleep.js'; export { createHook, createWebhook } from './create-hook.js'; export { defineHook } from './define-hook.js'; diff --git a/packages/core/src/workflow/set-attributes.test.ts b/packages/core/src/workflow/set-attributes.test.ts new file mode 100644 index 0000000000..e3484d1023 --- /dev/null +++ b/packages/core/src/workflow/set-attributes.test.ts @@ -0,0 +1,79 @@ +import { FatalError } from '@workflow/errors'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { WORKFLOW_USE_STEP } from '../symbols.js'; +import { setAttributes } from './set-attributes.js'; + +describe('workflow.setAttributes', () => { + const dispatchCalls: Array<{ + stepName: string; + changes: Array<{ key: string; value: string | null }>; + }> = []; + + beforeEach(() => { + dispatchCalls.length = 0; + (globalThis as Record)[WORKFLOW_USE_STEP] = vi.fn( + (stepName: string) => + async (changes: Array<{ key: string; value: string | null }>) => { + dispatchCalls.push({ stepName, changes }); + } + ); + }); + + afterEach(() => { + delete (globalThis as Record)[WORKFLOW_USE_STEP]; + }); + + it('dispatches normalized changes through __builtin_set_attributes', async () => { + await setAttributes({ phase: 'init', orderId: 'ord_1' }); + expect(dispatchCalls).toEqual([ + { + stepName: '__builtin_set_attributes', + changes: [ + { key: 'phase', value: 'init' }, + { key: 'orderId', value: 'ord_1' }, + ], + }, + ]); + }); + + it('translates undefined values into null (unset semantics)', async () => { + await setAttributes({ phase: 'done', stale: undefined }); + expect(dispatchCalls).toEqual([ + { + stepName: '__builtin_set_attributes', + changes: [ + { key: 'phase', value: 'done' }, + { key: 'stale', value: null }, + ], + }, + ]); + }); + + it('is a no-op for an empty record (no dispatch)', async () => { + await setAttributes({}); + expect(dispatchCalls).toHaveLength(0); + }); + + it('throws FatalError when the workflow runtime has not initialized useStep', async () => { + delete (globalThis as Record)[WORKFLOW_USE_STEP]; + await expect(setAttributes({ phase: 'init' })).rejects.toBeInstanceOf( + FatalError + ); + }); + + it('throws FatalError for reserved-prefix keys before any dispatch', async () => { + await expect(setAttributes({ $sys: 'x' })).rejects.toBeInstanceOf( + FatalError + ); + expect(dispatchCalls).toHaveLength(0); + }); + + it('throws FatalError when called with a non-object', async () => { + await expect( + setAttributes(null as unknown as Record) + ).rejects.toBeInstanceOf(FatalError); + await expect( + setAttributes([] as unknown as Record) + ).rejects.toBeInstanceOf(FatalError); + }); +}); diff --git a/packages/core/src/workflow/set-attributes.ts b/packages/core/src/workflow/set-attributes.ts new file mode 100644 index 0000000000..e2d54d8207 --- /dev/null +++ b/packages/core/src/workflow/set-attributes.ts @@ -0,0 +1,73 @@ +import { FatalError } from '@workflow/errors'; +import { + type AttributeChange, + AttributeValidationError, + validateAttributeChanges, +} from '@workflow/world'; +import { WORKFLOW_USE_STEP } from '../symbols.js'; + +/** + * Attach plaintext string key/value metadata to the current workflow run. + * + * **EXPERIMENTAL.** Callable only from a workflow body (`'use workflow'`). + * The call is dispatched through the workflow runtime as a step, so the + * mutation is recorded in the event log and survives replay. + * + * Validation runs in the VM (cheap, deterministic) before the step + * dispatch — violations throw `FatalError` without queuing a step. An + * empty record is a no-op. `value: undefined` removes the key from the + * run's attribute map. + * + * **WARNING**: While this feature is experimental, calling e.g. + * `Promise.all([setAttributes({ a: '1' }), setAttributes({ a: '2' })])` + * is not guaranteed to be ordered consistently, but + * `await setAttributes({ a: '1' }).then(() => setAttributes({ a: '2' }))` + * is. + * + * @example + * ```ts + * export async function myWorkflow() { + * 'use workflow'; + * await setAttributes({ phase: 'init' }); + * // ... work ... + * await setAttributes({ phase: 'done', orderId: 'ord_123' }); + * await setAttributes({ orderId: undefined }); // remove + * } + * ``` + */ +export async function setAttributes( + attrs: Record +): Promise { + if (attrs === null || typeof attrs !== 'object' || Array.isArray(attrs)) { + throw new FatalError( + `setAttributes requires a plain object, got ${ + attrs === null ? 'null' : Array.isArray(attrs) ? 'array' : typeof attrs + }` + ); + } + const changes: AttributeChange[] = Object.entries(attrs).map( + ([key, value]) => ({ + key, + value: value === undefined ? null : value, + }) + ); + if (changes.length === 0) return; + try { + validateAttributeChanges(changes); + } catch (err) { + if (err instanceof AttributeValidationError) { + throw new FatalError(err.message); + } + throw err; + } + const useStep = (globalThis as Record)[WORKFLOW_USE_STEP] as + | ((stepName: string) => (changes: AttributeChange[]) => Promise) + | undefined; + if (!useStep) { + throw new FatalError( + 'setAttributes() called outside a workflow runtime context. ' + + 'It must be called from within a workflow body (`use workflow`).' + ); + } + await useStep('__builtin_set_attributes')(changes); +} diff --git a/packages/web-shared/src/components/sidebar/attribute-panel.tsx b/packages/web-shared/src/components/sidebar/attribute-panel.tsx index c708dc32ba..cf15f2f742 100644 --- a/packages/web-shared/src/components/sidebar/attribute-panel.tsx +++ b/packages/web-shared/src/components/sidebar/attribute-panel.tsx @@ -417,6 +417,13 @@ const attributeToDisplayFn: Record< projectId: (_value: unknown) => null, environment: (_value: unknown) => null, executionContext: (_value: unknown) => null, + // Attributes MVP — string-string metadata attached to the run. + // Rendered as a JSON block; if empty/missing, hidden by the + // hasDisplayContent gate above. + attributes: (value: unknown) => { + if (!hasDisplayContent(value)) return null; + return JsonBlock(value); + }, // Dates — wrapped with TimestampTooltip showing UTC/local + relative time createdAt: timestampWithTooltipOrNull, startedAt: timestampWithTooltipOrNull, diff --git a/packages/workflow/src/internal/builtins.ts b/packages/workflow/src/internal/builtins.ts index 886686e50e..f7ea9cd465 100644 --- a/packages/workflow/src/internal/builtins.ts +++ b/packages/workflow/src/internal/builtins.ts @@ -20,3 +20,54 @@ export async function __builtin_response_text(this: Request | Response) { 'use step'; return this.text(); } + +/** + * Step bridge for workflow-body `setAttributes` calls. The VM-side + * helper validates input and dispatches here via `useStep`. This step + * runs in normal Node context with full world access. + * + * The dispatch reads the world and current run id directly from + * `globalThis` symbols populated by the workflow/step runtime — this + * intentionally avoids importing `@workflow/core` so the Next.js + * deferred-entries discoverer can't walk a chain into world adapters + * and `@vercel/queue` from this step file. + */ +export async function __builtin_set_attributes( + changes: Array<{ key: string; value: string | null }> +) { + 'use step'; + if (changes.length === 0) return; + const g = globalThis as Record; + + const contextStorage = g[Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE')] as + | { + getStore: () => + | { workflowMetadata?: { workflowRunId?: string } } + | undefined; + } + | undefined; + const runId = contextStorage?.getStore?.()?.workflowMetadata?.workflowRunId; + if (!runId) { + throw new Error( + '__builtin_set_attributes: no workflow run id available in step context' + ); + } + + const world = g[Symbol.for('@workflow/world//cache')] as + | { + runs?: { + experimentalSetAttributes?: ( + runId: string, + changes: Array<{ key: string; value: string | null }> + ) => Promise; + }; + } + | undefined; + if (typeof world?.runs?.experimentalSetAttributes !== 'function') { + // World adapter doesn't implement attributes yet — silently no-op. + // The VM-side validation already ran, so input was well-formed. + return; + } + + await world.runs.experimentalSetAttributes(runId, changes); +} diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 91ebc82942..cfc9635edc 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -554,6 +554,7 @@ export function createEventsStorage( completedAt: undefined, startedAt: currentRun.startedAt ?? now, updatedAt: now, + attributes: currentRun.attributes, }; await writeJSON( taggedPath(basedir, 'runs', effectiveRunId, tag), @@ -580,6 +581,7 @@ export function createEventsStorage( error: undefined, completedAt: now, updatedAt: now, + attributes: currentRun.attributes, }; await writeJSON( taggedPath(basedir, 'runs', effectiveRunId, tag), @@ -617,6 +619,7 @@ export function createEventsStorage( errorCode: failedData.errorCode, completedAt: now, updatedAt: now, + attributes: currentRun.attributes, }; await writeJSON( taggedPath(basedir, 'runs', effectiveRunId, tag), @@ -646,6 +649,7 @@ export function createEventsStorage( error: undefined, completedAt: now, updatedAt: now, + attributes: currentRun.attributes, }; await writeJSON( taggedPath(basedir, 'runs', effectiveRunId, tag), diff --git a/packages/world-local/src/storage/runs-storage.test.ts b/packages/world-local/src/storage/runs-storage.test.ts new file mode 100644 index 0000000000..8eddf9d121 --- /dev/null +++ b/packages/world-local/src/storage/runs-storage.test.ts @@ -0,0 +1,163 @@ +import { promises as fs } from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { WorkflowRunNotFoundError } from '@workflow/errors'; +import { + ATTRIBUTE_KEY_MAX_LENGTH, + AttributeValidationError, + RESERVED_ATTRIBUTE_KEY_PREFIX, + type Storage, +} from '@workflow/world'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { createStorage } from '../storage.js'; +import { createRun } from '../test-helpers.js'; + +describe('runs.experimentalSetAttributes (world-local)', () => { + let testDir: string; + let storage: Storage; + + beforeEach(async () => { + testDir = await fs.mkdtemp(path.join(os.tmpdir(), 'attrs-test-')); + storage = createStorage(testDir); + }); + + afterEach(async () => { + await fs.rm(testDir, { recursive: true, force: true }); + }); + + async function newRun() { + return createRun(storage, { + deploymentId: 'dpl_test', + workflowName: 'test-workflow', + input: new Uint8Array([1]), + }); + } + + it('upserts new keys', async () => { + const run = await newRun(); + + const result = await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'phase', value: 'init' }, + { key: 'tenant', value: 't1' }, + ]); + + expect(result.attributes).toEqual({ phase: 'init', tenant: 't1' }); + const refreshed = await storage.runs.get(run.runId); + expect(refreshed.attributes).toEqual({ phase: 'init', tenant: 't1' }); + }); + + it('updates existing keys (merge semantics)', async () => { + const run = await newRun(); + await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'phase', value: 'init' }, + { key: 'tenant', value: 't1' }, + ]); + + const result = await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'phase', value: 'done' }, + ]); + + expect(result.attributes).toEqual({ phase: 'done', tenant: 't1' }); + }); + + it('removes keys when value is null', async () => { + const run = await newRun(); + await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'phase', value: 'init' }, + { key: 'orderId', value: 'ord_123' }, + ]); + + const result = await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'orderId', value: null }, + ]); + + expect(result.attributes).toEqual({ phase: 'init' }); + expect(result.attributes).not.toHaveProperty('orderId'); + }); + + it('applies set and unset in a single call', async () => { + const run = await newRun(); + await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'stale', value: 'yes' }, + ]); + + const result = await storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'stale', value: null }, + { key: 'fresh', value: 'yes' }, + ]); + + expect(result.attributes).toEqual({ fresh: 'yes' }); + }); + + it('throws WorkflowRunNotFoundError for unknown run', async () => { + await expect( + storage.runs.experimentalSetAttributes!('wrun_doesnotexist', [ + { key: 'phase', value: 'init' }, + ]) + ).rejects.toBeInstanceOf(WorkflowRunNotFoundError); + }); + + it('rejects keys starting with reserved prefix', async () => { + const run = await newRun(); + await expect( + storage.runs.experimentalSetAttributes!(run.runId, [ + { key: `${RESERVED_ATTRIBUTE_KEY_PREFIX}sys`, value: 'x' }, + ]) + ).rejects.toBeInstanceOf(AttributeValidationError); + }); + + it('rejects keys over the max length', async () => { + const run = await newRun(); + await expect( + storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'k'.repeat(ATTRIBUTE_KEY_MAX_LENGTH + 1), value: 'x' }, + ]) + ).rejects.toBeInstanceOf(AttributeValidationError); + }); + + it('rejects values exceeding the byte cap', async () => { + const run = await newRun(); + await expect( + storage.runs.experimentalSetAttributes!(run.runId, [ + { key: 'big', value: 'a'.repeat(257) }, + ]) + ).rejects.toBeInstanceOf(AttributeValidationError); + }); + + it('rejects when post-merge count exceeds limit', async () => { + const run = await newRun(); + // Pre-fill to within the cap. The MVP cap is 64. + const initial = Array.from({ length: 60 }, (_, i) => ({ + key: `k${i}`, + value: 'v', + })); + await storage.runs.experimentalSetAttributes!(run.runId, initial); + + // Adding 5 new keys would push us over. + const overflow = Array.from({ length: 5 }, (_, i) => ({ + key: `extra${i}`, + value: 'v', + })); + await expect( + storage.runs.experimentalSetAttributes!(run.runId, overflow) + ).rejects.toBeInstanceOf(AttributeValidationError); + }); + + it('serializes concurrent writes to the same run (no lost writes)', async () => { + const run = await newRun(); + + // 20 concurrent writes; each adds a unique key. Per-run mutex must + // serialize them so all keys land — without it, the read-merge-write + // race loses some. + await Promise.all( + Array.from({ length: 20 }, (_, i) => + storage.runs.experimentalSetAttributes!(run.runId, [ + { key: `k${i}`, value: `v${i}` }, + ]) + ) + ); + + const refreshed = await storage.runs.get(run.runId); + expect(Object.keys(refreshed.attributes ?? {})).toHaveLength(20); + }); +}); diff --git a/packages/world-local/src/storage/runs-storage.ts b/packages/world-local/src/storage/runs-storage.ts index 76e7891404..54c8eeff92 100644 --- a/packages/world-local/src/storage/runs-storage.ts +++ b/packages/world-local/src/storage/runs-storage.ts @@ -1,18 +1,27 @@ import path from 'node:path'; import { WorkflowRunNotFoundError } from '@workflow/errors'; import type { + AttributeChange, + ExperimentalSetAttributesResult, ListWorkflowRunsParams, PaginatedResponse, Storage, WorkflowRun, WorkflowRunWithoutData, } from '@workflow/world'; -import { WorkflowRunSchema } from '@workflow/world'; +import { + applyAttributeChanges, + AttributeValidationError, + validateAttributeChanges, + WorkflowRunSchema, +} from '@workflow/world'; import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; import { assertSafeEntityId, paginatedFileSystemQuery, readJSONWithFallback, + taggedPath, + writeJSON, } from '../fs.js'; import { filterRunData } from './filters.js'; import { getObjectCreatedAt } from './helpers.js'; @@ -40,6 +49,39 @@ export interface LocalRunsStorage { params?: LocalListWorkflowRunsParams ): Promise>; }; + experimentalSetAttributes( + runId: string, + changes: AttributeChange[] + ): Promise; +} + +/** + * Per-run in-process async mutex. Serializes concurrent attribute writes + * to the same run so the read-merge-write sequence is atomic. Without this + * two parallel `setAttributes` calls (e.g. from `Promise.all` steps) can + * both read the same prior snapshot and one of the updates is lost. + */ +const runAttributeLocks = new Map>(); + +function withRunAttributeLock( + key: string, + fn: () => Promise +): Promise { + const prev = runAttributeLocks.get(key); + const taskBox: { task?: Promise } = {}; + const task = (async () => { + if (prev) await prev.catch(() => undefined); + try { + return await fn(); + } finally { + if (runAttributeLocks.get(key) === taskBox.task) { + runAttributeLocks.delete(key); + } + } + })(); + taskBox.task = task; + runAttributeLocks.set(key, task); + return task; } /** @@ -107,5 +149,51 @@ export function createRunsStorage( return result; }) as LocalRunsStorage['list'], + + experimentalSetAttributes: async (runId, changes) => { + assertSafeEntityId('runId', runId); + + return withRunAttributeLock(runId, async () => { + const run = await readJSONWithFallback( + basedir, + 'runs', + runId, + WorkflowRunSchema, + tag + ); + if (!run) { + throw new WorkflowRunNotFoundError(runId); + } + + // Server-side validation. The SDK validates before sending, but + // the world is the final authority — re-check so direct callers + // (tests, other consumers) cannot bypass the limits. + try { + validateAttributeChanges(changes, { + existingCount: Object.keys(run.attributes ?? {}).length, + }); + } catch (err) { + if (err instanceof AttributeValidationError) { + // Re-throw as a plain error; callers (the SDK) wrap as + // FatalError on their side. + throw err; + } + throw err; + } + + const nextAttributes = applyAttributeChanges(run.attributes, changes); + const updatedRun = { + ...run, + attributes: nextAttributes, + updatedAt: new Date(), + }; + + await writeJSON(taggedPath(basedir, 'runs', runId, tag), updatedRun, { + overwrite: true, + }); + + return { attributes: nextAttributes }; + }); + }, }; } diff --git a/packages/world-postgres/src/drizzle/migrations/0013_add_attributes.sql b/packages/world-postgres/src/drizzle/migrations/0013_add_attributes.sql new file mode 100644 index 0000000000..de472fe108 --- /dev/null +++ b/packages/world-postgres/src/drizzle/migrations/0013_add_attributes.sql @@ -0,0 +1 @@ +ALTER TABLE "workflow"."workflow_runs" ADD COLUMN "attributes" jsonb DEFAULT '{}'::jsonb NOT NULL; diff --git a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json index f55371e3c8..e2ad71c47e 100644 --- a/packages/world-postgres/src/drizzle/migrations/meta/_journal.json +++ b/packages/world-postgres/src/drizzle/migrations/meta/_journal.json @@ -92,6 +92,13 @@ "when": 1775600000000, "tag": "0012_add_is_system", "breakpoints": true + }, + { + "idx": 13, + "version": "7", + "when": 1779609600000, + "tag": "0013_add_attributes", + "breakpoints": true } ] } diff --git a/packages/world-postgres/src/drizzle/schema.ts b/packages/world-postgres/src/drizzle/schema.ts index 44a2785f90..3449576ba4 100644 --- a/packages/world-postgres/src/drizzle/schema.ts +++ b/packages/world-postgres/src/drizzle/schema.ts @@ -95,6 +95,17 @@ export const runs = schema.table( * decryption or hydration. */ errorCode: varchar('error_code'), + /** + * Plaintext string-string metadata attached to the run via + * `setAttributes()`. EXPERIMENTAL MVP: stored as JSONB to allow + * SQL-side merge (`jsonb_set` / `jsonb_strip_nulls`) without a + * read-modify-write cycle. Defaults to `{}` so existing rows + * (pre-migration) read as the empty map. + */ + attributes: jsonb('attributes') + .$type>() + .default({}) + .notNull(), createdAt: timestamp('created_at').defaultNow().notNull(), updatedAt: timestamp('updated_at') .defaultNow() diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 6383cf39de..72d81beb34 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -8,8 +8,10 @@ import { WorkflowWorldError, } from '@workflow/errors'; import type { + AttributeChange, Event, EventResult, + ExperimentalSetAttributesResult, GetEventParams, Hook, ListEventsParams, @@ -25,6 +27,7 @@ import type { WorkflowRunWithoutData, } from '@workflow/world'; import { + AttributeValidationError, EventSchema, HookSchema, isLegacySpecVersion, @@ -32,6 +35,7 @@ import { SPEC_VERSION_CURRENT, StepSchema, stripEventDataRefs, + validateAttributeChanges, validateUlidTimestamp, WorkflowRunSchema, } from '@workflow/world'; @@ -140,6 +144,62 @@ export function createRunsStorage(drizzle: Drizzle): Storage['runs'] { cursor: values.at(-1)?.runId ?? null, }; }) as Storage['runs']['list'], + + experimentalSetAttributes: async ( + runId: string, + changes: AttributeChange[] + ): Promise => { + // Load existing attributes for the per-run cap check. Postgres + // applies the merge atomically via the UPDATE below, but the + // count cap requires knowing the existing size — fetch it first. + // The narrow window between this read and the UPDATE is + // documented as last-write-wins by arrival (concurrent writes + // limitation in the MVP changelog). + const [existing] = await drizzle + .select({ attributes: runs.attributes }) + .from(runs) + .where(eq(runs.runId, runId)) + .limit(1); + if (!existing) { + throw new WorkflowRunNotFoundError(runId); + } + + try { + validateAttributeChanges(changes, { + existingCount: Object.keys(existing.attributes ?? {}).length, + }); + } catch (err) { + if (err instanceof AttributeValidationError) throw err; + throw err; + } + + // Build a single SQL expression that applies all changes atomically. + // Sets fold into nested `jsonb_set` calls; removes fold into + // chained `-` (delete) operators. Returns the post-merge map. + let expr = sql`COALESCE(${runs.attributes}, '{}'::jsonb)`; + for (const { key, value } of changes) { + if (value === null) { + expr = sql`${expr} - ${key}`; + } else { + expr = sql`jsonb_set(${expr}, ARRAY[${key}]::text[], to_jsonb(${value}::text), true)`; + } + } + + const [updated] = await drizzle + .update(runs) + .set({ + attributes: expr as any, + updatedAt: new Date(), + }) + .where(eq(runs.runId, runId)) + .returning({ attributes: runs.attributes }); + + if (!updated) { + throw new WorkflowRunNotFoundError(runId); + } + + return { attributes: updated.attributes ?? {} }; + }, }; } diff --git a/packages/world-postgres/test/storage.test.ts b/packages/world-postgres/test/storage.test.ts index 6879f5deb7..432785a9f5 100644 --- a/packages/world-postgres/test/storage.test.ts +++ b/packages/world-postgres/test/storage.test.ts @@ -352,6 +352,57 @@ describe('Storage (Postgres integration)', () => { expect(page2.data[0].runId).not.toBe(page1.data[0].runId); }); }); + + describe('experimentalSetAttributes', () => { + it('upserts new keys', async () => { + const run = await createRun(events, { + deploymentId: 'd', + workflowName: 'w', + input: new Uint8Array(), + }); + + const result = await runs.experimentalSetAttributes!(run.runId, [ + { key: 'phase', value: 'init' }, + { key: 'tenant', value: 't1' }, + ]); + expect(result.attributes).toEqual({ phase: 'init', tenant: 't1' }); + + const fresh = await runs.get(run.runId); + expect(fresh.attributes).toEqual({ phase: 'init', tenant: 't1' }); + }); + + it('merges across calls without clobbering prior keys', async () => { + const run = await createRun(events, { + deploymentId: 'd', + workflowName: 'w', + input: new Uint8Array(), + }); + + await runs.experimentalSetAttributes!(run.runId, [ + { key: 'a', value: '1' }, + ]); + const result = await runs.experimentalSetAttributes!(run.runId, [ + { key: 'b', value: '2' }, + ]); + expect(result.attributes).toEqual({ a: '1', b: '2' }); + }); + + it('removes keys when value is null', async () => { + const run = await createRun(events, { + deploymentId: 'd', + workflowName: 'w', + input: new Uint8Array(), + }); + await runs.experimentalSetAttributes!(run.runId, [ + { key: 'a', value: '1' }, + { key: 'b', value: '2' }, + ]); + const result = await runs.experimentalSetAttributes!(run.runId, [ + { key: 'a', value: null }, + ]); + expect(result.attributes).toEqual({ b: '2' }); + }); + }); }); describe('steps', () => { diff --git a/packages/world-vercel/src/runs.ts b/packages/world-vercel/src/runs.ts index 466d3ae661..bc1e723c3d 100644 --- a/packages/world-vercel/src/runs.ts +++ b/packages/world-vercel/src/runs.ts @@ -1,7 +1,9 @@ import { WorkflowRunNotFoundError, WorkflowWorldError } from '@workflow/errors'; import { + type AttributeChange, type CancelWorkflowRunParams, type CreateWorkflowRunRequest, + type ExperimentalSetAttributesResult, type GetWorkflowRunParams, type ListWorkflowRunsParams, type PaginatedResponse, @@ -254,3 +256,43 @@ export async function cancelWorkflowRunV1( throw error; } } + +/** + * Wire response schema for `experimentalSetAttributes`. The backend + * returns the post-merge attribute snapshot so callers don't need to + * issue a follow-up read. + */ +const ExperimentalSetAttributesResponseSchema = z.object({ + attributes: z.record(z.string(), z.string()), +}); + +/** + * Apply attribute changes to a workflow run. The body shape mirrors the + * future `attr_set` event's `eventData.changes`, so the wire contract is + * forward-compatible with the full 5.0.0 attributes feature — only the + * endpoint path changes. + * + * EXPERIMENTAL: tied to the MVP write-only attributes API. See + * `docs/content/docs/v5/changelog/attributes-mvp.mdx`. + */ +export async function experimentalSetAttributes( + runId: string, + changes: AttributeChange[], + config?: APIConfig +): Promise { + try { + const response = await makeRequest({ + endpoint: `/v2/runs/${encodeURIComponent(runId)}/attributes`, + options: { method: 'POST' }, + data: { changes }, + config, + schema: ExperimentalSetAttributesResponseSchema, + }); + return { attributes: response.attributes }; + } catch (error) { + if (error instanceof WorkflowWorldError && error.status === 404) { + throw new WorkflowRunNotFoundError(runId); + } + throw error; + } +} diff --git a/packages/world-vercel/src/storage.ts b/packages/world-vercel/src/storage.ts index 38c62b78cb..966a5691af 100644 --- a/packages/world-vercel/src/storage.ts +++ b/packages/world-vercel/src/storage.ts @@ -6,7 +6,11 @@ import { } from './events.js'; import { getHook, getHookByToken, listHooks } from './hooks.js'; import { instrumentObject } from './instrumentObject.js'; -import { getWorkflowRun, listWorkflowRuns } from './runs.js'; +import { + experimentalSetAttributes, + getWorkflowRun, + listWorkflowRuns, +} from './runs.js'; import { getStep, listWorkflowRunSteps } from './steps.js'; import type { APIConfig } from './utils.js'; @@ -18,6 +22,8 @@ export function createStorage(config?: APIConfig): Storage { getWorkflowRun(id, params, config)) as Storage['runs']['get'], list: ((params?: any) => listWorkflowRuns(params, config)) as Storage['runs']['list'], + experimentalSetAttributes: (runId, changes) => + experimentalSetAttributes(runId, changes, config), }, steps: { get: ((runId: string, stepId: string, params?: any) => diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 8acefbaa69..de96d16ecb 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -59,7 +59,8 @@ function httpLog( * `main` — rewritten by external CI for branch-deployment testing. * Prefer `VERCEL_WORKFLOW_SERVER_URL` for deployment-time configuration. */ -const WORKFLOW_SERVER_URL_OVERRIDE = ''; +const WORKFLOW_SERVER_URL_OVERRIDE = + 'https://workflow-server-git-peter-attributes-mvp.vercel.sh'; /** * Per-request timeout for HTTP calls to workflow-server (in ms). diff --git a/packages/world/src/attributes.test.ts b/packages/world/src/attributes.test.ts new file mode 100644 index 0000000000..2d78366d3e --- /dev/null +++ b/packages/world/src/attributes.test.ts @@ -0,0 +1,136 @@ +import { describe, expect, it } from 'vitest'; +import { + applyAttributeChanges, + ATTRIBUTE_KEY_MAX_LENGTH, + ATTRIBUTE_MAX_PER_RUN, + AttributeValidationError, + validateAttributeChanges, + validateAttributeKey, + validateAttributeValue, +} from './attributes.js'; + +describe('validateAttributeKey', () => { + it('accepts a normal key', () => { + expect(validateAttributeKey('phase')).toBeNull(); + }); + + it('rejects empty keys', () => { + expect(validateAttributeKey('')).toBeInstanceOf(AttributeValidationError); + }); + + it('rejects keys over the length cap', () => { + expect( + validateAttributeKey('k'.repeat(ATTRIBUTE_KEY_MAX_LENGTH + 1)) + ).toBeInstanceOf(AttributeValidationError); + }); + + it('accepts keys exactly at the length cap', () => { + expect( + validateAttributeKey('k'.repeat(ATTRIBUTE_KEY_MAX_LENGTH)) + ).toBeNull(); + }); + + it('rejects keys starting with the reserved prefix', () => { + expect(validateAttributeKey('$internal')).toBeInstanceOf( + AttributeValidationError + ); + }); +}); + +describe('validateAttributeValue', () => { + it('accepts null (unset)', () => { + expect(validateAttributeValue(null)).toBeNull(); + }); + + it('accepts a normal string', () => { + expect(validateAttributeValue('hello')).toBeNull(); + }); + + it('rejects values over the byte cap', () => { + expect(validateAttributeValue('a'.repeat(257))).toBeInstanceOf( + AttributeValidationError + ); + }); + + it('counts UTF-8 bytes, not characters', () => { + // 4-byte UTF-8 emoji; 64 of them = 256 bytes exactly (at the cap) + const at = '💥'.repeat(64); + expect(validateAttributeValue(at)).toBeNull(); + const over = '💥'.repeat(65); // 260 bytes, over + expect(validateAttributeValue(over)).toBeInstanceOf( + AttributeValidationError + ); + }); +}); + +describe('validateAttributeChanges', () => { + it('accepts a small batch of valid changes', () => { + expect(() => + validateAttributeChanges([ + { key: 'phase', value: 'init' }, + { key: 'stale', value: null }, + ]) + ).not.toThrow(); + }); + + it('rejects duplicate keys within a single batch', () => { + expect(() => + validateAttributeChanges([ + { key: 'phase', value: 'init' }, + { key: 'phase', value: 'done' }, + ]) + ).toThrow(AttributeValidationError); + }); + + it('rejects when post-merge count exceeds the per-run cap', () => { + const changes = Array.from({ length: ATTRIBUTE_MAX_PER_RUN }, (_, i) => ({ + key: `k${i}`, + value: 'v', + })); + expect(() => + validateAttributeChanges(changes, { existingCount: 1 }) + ).toThrow(AttributeValidationError); + }); +}); + +describe('applyAttributeChanges', () => { + it('upserts new keys', () => { + expect( + applyAttributeChanges({ a: '1' }, [{ key: 'b', value: '2' }]) + ).toEqual({ a: '1', b: '2' }); + }); + + it('overwrites existing keys', () => { + expect( + applyAttributeChanges({ a: '1' }, [{ key: 'a', value: '2' }]) + ).toEqual({ a: '2' }); + }); + + it('removes keys when value is null', () => { + expect( + applyAttributeChanges({ a: '1', b: '2' }, [{ key: 'a', value: null }]) + ).toEqual({ b: '2' }); + }); + + it('applies set and unset in a single batch', () => { + expect( + applyAttributeChanges({ a: '1', stale: 'x' }, [ + { key: 'stale', value: null }, + { key: 'fresh', value: 'yes' }, + ]) + ).toEqual({ a: '1', fresh: 'yes' }); + }); + + it('returns a new object (does not mutate input)', () => { + const before = { a: '1' }; + const after = applyAttributeChanges(before, [{ key: 'b', value: '2' }]); + expect(before).toEqual({ a: '1' }); + expect(after).not.toBe(before); + }); + + it('treats undefined existing as the empty record', () => { + expect( + applyAttributeChanges(undefined, [{ key: 'a', value: '1' }]) + ).toEqual({ a: '1' }); + }); +}); diff --git a/packages/world/src/attributes.ts b/packages/world/src/attributes.ts new file mode 100644 index 0000000000..18b86527a0 --- /dev/null +++ b/packages/world/src/attributes.ts @@ -0,0 +1,177 @@ +import { z } from 'zod'; + +/** + * Reserved key prefix for system-managed attributes. User code may not set + * keys starting with `$` — those are blocked at validation time so the + * namespace remains available for future system use. + */ +export const RESERVED_ATTRIBUTE_KEY_PREFIX = '$'; + +/** Max length of an attribute key, in characters. */ +export const ATTRIBUTE_KEY_MAX_LENGTH = 256; + +/** Max length of an attribute value, in bytes (UTF-8). */ +export const ATTRIBUTE_VALUE_MAX_BYTES = 256; + +/** Max number of attributes on a single run (post-merge). */ +export const ATTRIBUTE_MAX_PER_RUN = 64; + +/** + * A single change in an `experimentalSetAttributes` call. `value: null` + * means "remove this key from the run's attributes". + * + * The shape is deliberately the same as the future `attr_set` event's + * `eventData.changes` entries so the SDK and wire format do not change + * when the full attributes feature lands. + */ +export const AttributeChangeSchema = z.object({ + key: z.string(), + value: z.union([z.string(), z.null()]), +}); + +export type AttributeChange = z.infer; + +export const AttributeChangesSchema = z.array(AttributeChangeSchema); + +/** + * Result returned by `runs.experimentalSetAttributes` — the post-merge + * snapshot of all attributes on the run. Provided so callers (notably + * `setAttributes` and observability emitters) do not need a follow-up read. + */ +export interface ExperimentalSetAttributesResult { + attributes: Record; +} + +export interface AttributeValidationContext { + /** + * Existing attribute count on the run, used to enforce the per-run cap + * after merging in the incoming changes. Defaults to 0 so client-side + * validation (which does not know the existing snapshot) can still + * catch single-batch violations. + */ + existingCount?: number; +} + +/** + * Thrown when an attribute key or value violates one of the validation + * rules. Use a plain `Error` here so the world layer can decide whether + * to wrap as `FatalError` (SDK) or return a 400 (server endpoint). + */ +export class AttributeValidationError extends Error { + constructor(message: string) { + super(message); + this.name = 'AttributeValidationError'; + } +} + +const valueByteLength = (value: string): number => + new TextEncoder().encode(value).length; + +/** + * Validate a single attribute key. Returns an `AttributeValidationError` + * on violation, or `null` if the key is valid. Returning instead of + * throwing lets callers aggregate or wrap the failure as needed. + */ +export function validateAttributeKey( + key: string +): AttributeValidationError | null { + if (typeof key !== 'string') { + return new AttributeValidationError( + `Attribute key must be a string, got ${typeof key}` + ); + } + if (key.length === 0) { + return new AttributeValidationError('Attribute key must not be empty'); + } + if (key.length > ATTRIBUTE_KEY_MAX_LENGTH) { + return new AttributeValidationError( + `Attribute key length ${key.length} exceeds limit ${ATTRIBUTE_KEY_MAX_LENGTH}: ${JSON.stringify(key.slice(0, 32))}…` + ); + } + if (key.startsWith(RESERVED_ATTRIBUTE_KEY_PREFIX)) { + return new AttributeValidationError( + `Attribute key ${JSON.stringify(key)} starts with reserved prefix "${RESERVED_ATTRIBUTE_KEY_PREFIX}"` + ); + } + return null; +} + +/** + * Validate a single attribute value. `null` represents an unset and is + * always valid. Returns an `AttributeValidationError` on violation or + * `null` if the value is valid. + */ +export function validateAttributeValue( + value: string | null +): AttributeValidationError | null { + if (value === null) return null; + if (typeof value !== 'string') { + return new AttributeValidationError( + `Attribute value must be a string or null, got ${typeof value}` + ); + } + const bytes = valueByteLength(value); + if (bytes > ATTRIBUTE_VALUE_MAX_BYTES) { + return new AttributeValidationError( + `Attribute value byte length ${bytes} exceeds limit ${ATTRIBUTE_VALUE_MAX_BYTES}` + ); + } + return null; +} + +/** + * Validate a batch of attribute changes. Throws `AttributeValidationError` + * on the first violation found. Use `existingCount` (in `context`) to + * enforce the per-run cap against the post-merge total. + */ +export function validateAttributeChanges( + changes: AttributeChange[], + context: AttributeValidationContext = {} +): void { + const seenKeys = new Set(); + let netAdds = 0; + for (const change of changes) { + const keyError = validateAttributeKey(change.key); + if (keyError) throw keyError; + const valueError = validateAttributeValue(change.value); + if (valueError) throw valueError; + if (seenKeys.has(change.key)) { + throw new AttributeValidationError( + `Attribute key ${JSON.stringify(change.key)} appears more than once in the same batch` + ); + } + seenKeys.add(change.key); + // Net adds counted optimistically — an existing key being set is also + // counted as +1 here, which makes the cap check slightly conservative. + // For the MVP cap of 64 this is acceptable; the server's authoritative + // check uses the real post-merge size. + if (change.value !== null) netAdds += 1; + } + const existing = context.existingCount ?? 0; + if (existing + netAdds > ATTRIBUTE_MAX_PER_RUN) { + throw new AttributeValidationError( + `Run attribute count would exceed limit ${ATTRIBUTE_MAX_PER_RUN} (existing ${existing} + incoming ${netAdds})` + ); + } +} + +/** + * Apply a batch of validated changes to an existing attribute map. Returns + * a new map; does not mutate the input. The world layer uses this to + * compute the post-merge snapshot when the underlying store cannot do the + * merge in a single atomic operation. + */ +export function applyAttributeChanges( + existing: Record | undefined, + changes: AttributeChange[] +): Record { + const next: Record = { ...(existing ?? {}) }; + for (const { key, value } of changes) { + if (value === null) { + delete next[key]; + } else { + next[key] = value; + } + } + return next; +} diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index 06ce262df3..412d1df02b 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -1,3 +1,17 @@ +export type * from './attributes.js'; +export { + applyAttributeChanges, + ATTRIBUTE_KEY_MAX_LENGTH, + ATTRIBUTE_MAX_PER_RUN, + ATTRIBUTE_VALUE_MAX_BYTES, + AttributeChangeSchema, + AttributeChangesSchema, + AttributeValidationError, + RESERVED_ATTRIBUTE_KEY_PREFIX, + validateAttributeChanges, + validateAttributeKey, + validateAttributeValue, +} from './attributes.js'; export type * from './events.js'; export { BaseEventSchema, diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index 45d8143c4b..7dc2560ab9 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -1,3 +1,7 @@ +import type { + AttributeChange, + ExperimentalSetAttributesResult, +} from './attributes.js'; import type { CreateEventParams, CreateEventRequest, @@ -154,6 +158,31 @@ export interface Storage { list( params?: ListWorkflowRunsParams ): Promise>; + + /** + * Apply a batch of attribute changes to a run. Merge semantics: + * - `value: string` upserts the key + * - `value: null` removes the key + * - keys not listed in `changes` are untouched + * + * Returns the post-merge attribute snapshot on the run. + * + * OPTIONAL. World implementations may omit this method; the SDK + * helper (`setAttributes` in `@workflow/core`) feature-detects its + * absence and no-ops with a one-time warning so third-party / + * community worlds keep working without adopting the experimental + * API. + * + * EXPERIMENTAL: this method exists as a stopgap until the + * `attr_set` event type lands in a future spec version. When that + * happens, `setAttributes` will dispatch through `events.create` + * instead, and this method is expected to be removed. See the + * `attributes-mvp` changelog entry for the migration shape. + */ + experimentalSetAttributes?( + runId: string, + changes: AttributeChange[] + ): Promise; }; steps: { diff --git a/packages/world/src/runs.ts b/packages/world/src/runs.ts index cd4fbffb0a..07fd150c95 100644 --- a/packages/world/src/runs.ts +++ b/packages/world/src/runs.ts @@ -63,6 +63,19 @@ export const WorkflowRunBaseSchema = z.object({ * without needing to decrypt the full error payload. */ errorCode: z.string().optional(), + /** + * Plaintext string-string metadata attached to the run via + * `setAttributes()` (or, in the future, materialized from `attr_set` + * events). Stored unencrypted alongside other plaintext fields so + * observability surfaces can read it without going through the + * decryption pipeline. + * + * EXPERIMENTAL (MVP): runs created before this field landed read as + * `undefined`. The full Workflow Attributes feature replaces the + * direct-mutation MVP path with an event-sourced model — see the + * attributes-mvp changelog entry. + */ + attributes: z.record(z.string(), z.string()).optional(), expiredAt: z.coerce.date().optional(), startedAt: z.coerce.date().optional(), completedAt: z.coerce.date().optional(), diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 41ed11de25..07cbffbaa5 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -11,6 +11,7 @@ import { getWritable, type RequestWithResponse, RetryableError, + setAttributes, sleep, } from 'workflow'; import { getHookByToken, getRun, Run, resumeHook, start } from 'workflow/api'; @@ -3169,3 +3170,22 @@ export async function writableForwardedFromStepWorkflow(payload: string) { const childRunId = await startChildWithStepWritable(payload); return { childRunId }; } + +////////////////////////////////////////////////////////// +// Workflow Attributes MVP — workflow-body-only API. + +/** + * Calls `setAttributes` directly from the workflow body. The call is + * dispatched through the `__builtin_set_attributes` step bridge, so the + * mutation gets a `step_created`/`step_completed` event pair. The third + * call sets a key to `undefined` and the test verifies the key is + * absent from the final attribute map. + */ +export async function setAttributesWorkflow(input: number) { + 'use workflow'; + await setAttributes({ phase: 'init', source: 'workflow-body' }); + const tripled = input * 3; + await setAttributes({ phase: 'done' }); + await setAttributes({ source: undefined }); + return tripled; +}