diff --git a/examples/signals/index.ts b/examples/signals/index.ts new file mode 100644 index 00000000..ae353f51 --- /dev/null +++ b/examples/signals/index.ts @@ -0,0 +1,157 @@ +import { BackendPostgres } from "@openworkflow/backend-postgres"; +import { randomUUID } from "node:crypto"; +import { OpenWorkflow, SignalTimeoutError } from "openworkflow"; + +const databaseUrl = "postgresql://postgres:postgres@localhost:5432/postgres"; +const backend = await BackendPostgres.connect(databaseUrl, { + namespaceId: randomUUID(), +}); +const ow = new OpenWorkflow({ backend }); + +interface ApprovalRequest { + documentId: string; + requestedBy: string; +} + +interface ApprovalSignal { + approved: boolean; + reviewedBy: string; + comment?: string; +} + +interface ApprovalResult { + documentId: string; + status: "approved" | "rejected" | "timed-out"; + reviewedBy?: string | undefined; + comment?: string | undefined; +} + +/** + * An approval workflow that pauses and waits for an external signal before + * continuing. Demonstrates step.waitForSignal() with an optional timeout. + */ +const approvalWorkflow = ow.defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + // Simulate sending a notification to a reviewer + await step.run({ name: "send-notification" }, () => { + console.log( + `Notification sent to reviewer for document "${input.documentId}" (requested by ${input.requestedBy})`, + ); + }); + + // Pause and wait for an external approval signal (timeout after 10 seconds + // for demo purposes; in production this would be hours or days) + let approval: ApprovalSignal; + try { + approval = await step.waitForSignal("approval-decision", { + timeout: "10s", + }); + } catch (error) { + if (error instanceof SignalTimeoutError) { + console.log("No approval received within timeout — auto-rejecting."); + return { + documentId: input.documentId, + status: "timed-out", + }; + } + throw error; + } + + // Continue processing based on the signal payload + const result = await step.run({ name: "process-decision" }, () => { + const status = approval.approved ? "approved" : "rejected"; + console.log( + `Document "${input.documentId}" ${status} by ${approval.reviewedBy}` + + (approval.comment ? `: "${approval.comment}"` : ""), + ); + return { + documentId: input.documentId, + status: status as ApprovalResult["status"], + reviewedBy: approval.reviewedBy, + comment: approval.comment, + }; + }); + + return result; + }, +); + +async function main() { + const worker = ow.newWorker({ concurrency: 2 }); + await worker.start(); + console.log("Worker started.\n"); + + // --- Demo 1: signal arrives in time --- + console.log("=== Demo 1: Signal arrives in time ==="); + const handle1 = await approvalWorkflow.run({ + documentId: "doc-001", + requestedBy: "alice", + }); + console.log(`Workflow started: ${handle1.workflowRun.id}`); + + // Wait a moment for the workflow to reach the waitForSignal step + await sleep(500); + + // Send the approval signal from outside the workflow + const signalResult = await handle1.sendSignal("approval-decision", { + approved: true, + reviewedBy: "bob", + comment: "Looks good!", + } satisfies ApprovalSignal); + console.log(`Signal delivered: ${JSON.stringify(signalResult)}`); + + const result1 = await handle1.result(); + console.log(`Result: ${JSON.stringify(result1)}\n`); + + // --- Demo 2: timeout elapses before signal arrives --- + console.log("=== Demo 2: Timeout (no signal sent) ==="); + const handle2 = await approvalWorkflow.run({ + documentId: "doc-002", + requestedBy: "charlie", + }); + console.log(`Workflow started: ${handle2.workflowRun.id}`); + console.log("Waiting for timeout (10s)..."); + + const result2 = await handle2.result({ timeoutMs: 30_000 }); + console.log(`Result: ${JSON.stringify(result2)}\n`); + + // --- Demo 3: sendSignal via ow.sendSignal() with a run ID --- + console.log("=== Demo 3: sendSignal via ow.sendSignal() ==="); + const handle3 = await approvalWorkflow.run({ + documentId: "doc-003", + requestedBy: "diana", + }); + console.log(`Workflow started: ${handle3.workflowRun.id}`); + + await sleep(500); + + // Use the top-level ow.sendSignal() instead of handle.sendSignal() + const signalResult3 = await ow.sendSignal( + handle3.workflowRun.id, + "approval-decision", + { + approved: false, + reviewedBy: "eve", + comment: "Needs revision.", + } satisfies ApprovalSignal, + ); + console.log(`Signal delivered: ${JSON.stringify(signalResult3)}`); + + const result3 = await handle3.result(); + console.log(`Result: ${JSON.stringify(result3)}\n`); + + console.log("Stopping worker..."); + await worker.stop(); + await backend.stop(); + console.log("Done."); +} + +await main().catch((error: unknown) => { + console.error(error); + process.exitCode = 1; +}); + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/examples/signals/package.json b/examples/signals/package.json new file mode 100644 index 00000000..d296678e --- /dev/null +++ b/examples/signals/package.json @@ -0,0 +1,15 @@ +{ + "name": "example-signals", + "private": true, + "type": "module", + "scripts": { + "start": "tsx index.ts" + }, + "dependencies": { + "@openworkflow/backend-postgres": "*", + "openworkflow": "*" + }, + "devDependencies": { + "tsx": "^4.21.0" + } +} diff --git a/examples/signals/tsconfig.json b/examples/signals/tsconfig.json new file mode 100644 index 00000000..1958d50c --- /dev/null +++ b/examples/signals/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": ["../../tsconfig.base.json"], + "compilerOptions": { + "outDir": "dist" + }, + "include": ["**/*.ts"], + "exclude": ["dist"] +} diff --git a/package-lock.json b/package-lock.json index 66db4bd2..b78abe42 100644 --- a/package-lock.json +++ b/package-lock.json @@ -81,6 +81,16 @@ "tsx": "^4.21.0" } }, + "examples/signals": { + "name": "example-signals", + "dependencies": { + "@openworkflow/backend-postgres": "*", + "openworkflow": "*" + }, + "devDependencies": { + "tsx": "^4.21.0" + } + }, "examples/with-schema-validation": { "name": "example-with-zod-schema", "dependencies": { @@ -9853,6 +9863,10 @@ "resolved": "examples/declare-workflow", "link": true }, + "node_modules/example-signals": { + "resolved": "examples/signals", + "link": true + }, "node_modules/example-with-zod-schema": { "resolved": "examples/with-schema-validation", "link": true diff --git a/packages/docs/docs.json b/packages/docs/docs.json index 13eb3f90..843a9733 100644 --- a/packages/docs/docs.json +++ b/packages/docs/docs.json @@ -39,6 +39,7 @@ "group": "Guides", "pages": [ "docs/sleeping", + "docs/signals", "docs/parallel-steps", "docs/dynamic-steps", "docs/child-workflows", diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx new file mode 100644 index 00000000..2178e103 --- /dev/null +++ b/packages/docs/docs/signals.mdx @@ -0,0 +1,308 @@ +--- +title: Signals +description: Pause workflows and resume them with external data +--- + +Sometimes a workflow needs to stop and wait for something that happens outside +the process — a human approving a request, a webhook arriving, or another +service completing work. `step.waitForSignal()` pauses a workflow durably until +an external caller delivers a named signal. + +While the workflow is waiting, no worker slot is held. The run sits in `running` +state with `workerId = null` until a signal arrives, at which point it's woken +up and resumed with the signal's payload as the return value. + +## Basic Usage + +```ts +import { OpenWorkflow, SignalTimeoutError } from "openworkflow"; + +const ow = new OpenWorkflow({ backend }); + +interface ApprovalSignal { + approved: boolean; + comment?: string; +} + +const approvalWorkflow = ow.defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + await step.run({ name: "notify-reviewer" }, async () => { + await email.send({ to: input.reviewerEmail, subject: "Approval needed" }); + }); + + // Pause until someone sends the "approval-decision" signal + const decision = + await step.waitForSignal("approval-decision"); + + return decision.approved ? "approved" : "rejected"; + }, +); +``` + +## Sending a Signal + +Send a signal using the workflow run handle or the top-level client: + +```ts +// From the run handle +const handle = await approvalWorkflow.run({ reviewerEmail: "bob@example.com" }); +await handle.sendSignal("approval-decision", { approved: true }); + +// From the top-level client, using the run ID +await ow.sendSignal(handle.workflowRun.id, "approval-decision", { + approved: true, +}); +``` + +`sendSignal` returns a result indicating whether the signal was delivered: + +```ts +const result = await handle.sendSignal("approval-decision", { approved: true }); + +if (result.delivered) { + console.log("Signal delivered"); +} else { + console.log("Not delivered:", result.reason); + // reason: "workflow_not_found" | "signal_not_waiting" +} +``` + +## How Signals Work + +When a workflow encounters `step.waitForSignal()`: + +1. A signal step attempt is created with `status = running` +2. The workflow is durably parked with `workerId = null` +3. The worker slot is freed for other work + +When `sendSignal()` is called: + +1. The signal payload is written to the step attempt +2. The workflow run is woken up (`availableAt` set to now) +3. A worker claims the run and replays the workflow +4. `waitForSignal()` returns the payload and execution continues + +## Timeout + +By default, a workflow waits indefinitely for a signal (bounded by the +workflow's own deadline). Pass `timeout` to fail the step if no signal +arrives in time: + +```ts +const decision = await step.waitForSignal("approval-decision", { + timeout: "7d", +}); +``` + +When the timeout elapses, `waitForSignal()` throws a `SignalTimeoutError` that +you can catch inside the workflow: + +```ts +import { SignalTimeoutError } from "openworkflow"; + +const approvalWorkflow = ow.defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + try { + const decision = await step.waitForSignal( + "approval-decision", + { timeout: "7d" }, + ); + return decision.approved ? "approved" : "rejected"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + return "timed-out"; + } + throw error; + } + }, +); +``` + +`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a +number of milliseconds, or a `Date`: + +```ts +// Duration string +await step.waitForSignal("sig", { timeout: "24h" }); + +// Milliseconds +await step.waitForSignal("sig", { timeout: 60_000 }); + +// Absolute deadline +await step.waitForSignal("sig", { timeout: new Date("2026-12-31") }); +``` + +## Signal Names + +Each signal name must match exactly between `waitForSignal()` and +`sendSignal()`. Names follow the same rules as step names — they're deduplicated +automatically if you use the same name more than once in a workflow: + +```ts +// First call uses "approval" +const first = await step.waitForSignal("approval"); + +// Second call becomes "approval:1" +const second = await step.waitForSignal("approval"); +``` + + + Signal names are case-sensitive. `"Approval"` and `"approval"` are different + signals. + + +## Payload + +Signals carry a typed JSON payload. Use a type parameter on `waitForSignal` to +type the resolved value: + +```ts +interface PaymentConfirmed { + transactionId: string; + amount: number; +} + +const payment = await step.waitForSignal("payment-confirmed"); +console.log(payment.transactionId); +``` + +Signals with no payload send `null` and are still treated as delivered: + +```ts +// Sender +await handle.sendSignal("proceed"); + +// Workflow +const payload = await step.waitForSignal("proceed"); // null +``` + +## Common Patterns + +### Human-in-the-Loop Approval + +```ts +const reviewWorkflow = ow.defineWorkflow( + { name: "content-review" }, + async ({ input, step }) => { + await step.run({ name: "submit-for-review" }, async () => { + await db.reviews.create({ + contentId: input.contentId, + status: "pending", + }); + await email.send({ to: "editor@example.com", subject: "Review needed" }); + }); + + try { + const decision = await step.waitForSignal<{ approved: boolean }>( + "review-decision", + { timeout: "3d" }, + ); + + await step.run({ name: "apply-decision" }, async () => { + await db.content.update(input.contentId, { + status: decision.approved ? "published" : "rejected", + }); + }); + + return decision.approved ? "published" : "rejected"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + await step.run({ name: "escalate" }, async () => { + await email.send({ + to: "manager@example.com", + subject: "Review overdue", + }); + }); + return "escalated"; + } + throw error; + } + }, +); +``` + +### Webhook Confirmation + +```ts +const orderWorkflow = ow.defineWorkflow( + { name: "order-workflow" }, + async ({ input, step }) => { + const charge = await step.run({ name: "create-charge" }, async () => { + return await payments.createCharge({ amount: input.amount }); + }); + + // Wait for the payment provider's webhook to confirm the charge + const confirmation = await step.waitForSignal<{ status: string }>( + "payment-webhook", + { timeout: "10m" }, + ); + + if (confirmation.status !== "succeeded") { + throw new Error(`Payment failed: ${confirmation.status}`); + } + + await step.run({ name: "fulfill-order" }, async () => { + await orders.fulfill(input.orderId); + }); + }, +); + +// In your webhook handler: +app.post("/webhooks/payment", async (req, res) => { + const { workflowRunId, status } = req.body; + await ow.sendSignal(workflowRunId, "payment-webhook", { status }); + res.sendStatus(200); +}); +``` + +### Gate on External Completion + +```ts +const deployWorkflow = ow.defineWorkflow( + { name: "deploy-workflow" }, + async ({ input, step }) => { + await step.run({ name: "trigger-ci" }, async () => { + await ci.triggerPipeline({ + repo: input.repo, + sha: input.sha, + callbackRunId: input.workflowRunId, + }); + }); + + // Wait for CI to report back (up to 1 hour) + const result = await step.waitForSignal<{ passed: boolean }>("ci-result", { + timeout: "1h", + }); + + if (!result.passed) { + throw new Error("CI pipeline failed"); + } + + await step.run({ name: "deploy" }, async () => { + await deploy.release(input.repo, input.sha); + }); + }, +); +``` + +## Memoization + +Once a signal is received, the step is memoized. If the workflow replays after +the signal has been delivered, `waitForSignal()` returns the stored payload +immediately without parking again: + +```ts +// On first execution: parks and waits for "approval-decision" +// On replay after delivery: returns immediately with the stored payload +const decision = await step.waitForSignal("approval-decision"); +``` + + + Sending a signal to a workflow that is not currently waiting for it (e.g. + it's still running steps before `waitForSignal`) returns + `{ delivered: false, reason: "signal_not_waiting" }`. You can either retry + delivery or store the signal externally and deliver it once the workflow + parks. + diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index bb936dce..bbf622bb 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -1,5 +1,7 @@ -import type { Backend } from "../core/backend.js"; +import type { Backend, DeliverSignalResult } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; +import type { JsonValue } from "../core/json.js"; +import { resolveSignalName, type SignalSpec } from "../core/signal-spec.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; import { calculateDateFromDuration } from "../core/step-attempt.js"; import { @@ -173,6 +175,49 @@ export class OpenWorkflow { async cancelWorkflowRun(workflowRunId: string): Promise { await this.backend.cancelWorkflowRun({ workflowRunId }); } + + /** + * Send a typed signal to a waiting workflow run using a {@link SignalSpec} + * descriptor. The payload type is enforced by the spec. + * @param workflowRunId - The ID of the workflow run to signal + * @param spec - A `SignalSpec` created with {@link defineSignalSpec} + * @param payload - Signal payload (typed to the spec's `Payload` generic) + * @returns Result indicating whether the signal was delivered + */ + async sendSignal( + workflowRunId: string, + spec: SignalSpec, + payload?: Payload, + ): Promise; + + /** + * Send a signal to a waiting workflow run by name. + * @param workflowRunId - The ID of the workflow run to signal + * @param signalName - The signal name (must match the name used in `step.waitForSignal`) + * @param payload - Optional data to pass to the waiting workflow step + * @returns Result indicating whether the signal was delivered + * @example + * ```ts + * await ow.sendSignal("run-id", "approval-received", { approved: true }); + * ``` + */ + async sendSignal( + workflowRunId: string, + signalName: string, + payload?: JsonValue, + ): Promise; + + async sendSignal( + workflowRunId: string, + nameOrSpec: string | SignalSpec, + payload?: JsonValue, + ): Promise { + return this.backend.deliverSignal({ + workflowRunId, + signalName: resolveSignalName(nameOrSpec), + payload: payload ?? null, + }); + } } /** @@ -346,4 +391,38 @@ class WorkflowRunHandle { workflowRunId: this.workflowRun.id, }); } + + /** + * Send a typed signal to this workflow run using a {@link SignalSpec} + * descriptor. The payload type is enforced by the spec. + * @param spec - A `SignalSpec` created with {@link defineSignalSpec} + * @param payload - Signal payload (typed to the spec's `Payload` generic) + * @returns Result indicating whether the signal was delivered + */ + async sendSignal( + spec: SignalSpec, + payload?: Payload, + ): Promise; + + /** + * Send a signal to this workflow run by name. + * @param signalName - The signal name (must match the name used in `step.waitForSignal`) + * @param payload - Optional data to pass to the waiting workflow step + * @returns Result indicating whether the signal was delivered + */ + async sendSignal( + signalName: string, + payload?: JsonValue, + ): Promise; + + async sendSignal( + nameOrSpec: string | SignalSpec, + payload?: JsonValue, + ): Promise { + return this.backend.deliverSignal({ + workflowRunId: this.workflowRun.id, + signalName: resolveSignalName(nameOrSpec), + payload: payload ?? null, + }); + } } diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 5276154d..3e3bf429 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -68,6 +68,11 @@ export interface Backend { params: Readonly, ): Promise; + // Signals + deliverSignal( + params: Readonly, + ): Promise; + // Lifecycle stop(): Promise; } @@ -165,6 +170,19 @@ export interface FailStepAttemptParams { error: SerializedError; } +export interface DeliverSignalParams { + workflowRunId: string; + signalName: string; + payload: JsonValue | null; +} + +export type DeliverSignalResult = + | { delivered: true } + | { + delivered: false; + reason: "workflow_not_found" | "signal_not_waiting"; + }; + export interface SetStepAttemptChildWorkflowRunParams { workflowRunId: string; stepAttemptId: string; diff --git a/packages/openworkflow/core/signal-spec.ts b/packages/openworkflow/core/signal-spec.ts new file mode 100644 index 00000000..190b97b0 --- /dev/null +++ b/packages/openworkflow/core/signal-spec.ts @@ -0,0 +1,56 @@ +/** + * A typed descriptor for a named signal. The `Payload` generic is compile-time + * only — only `name` exists at runtime. + * + * Create one with {@link defineSignalSpec} and use it on both sides of the + * signal API to get end-to-end type safety: + * + * ```ts + * const approvalSignal = defineSignalSpec<{ approved: boolean }>("approval"); + * + * // Workflow: + * const decision = await step.waitForSignal(approvalSignal); + * + * // Sender: + * await handle.sendSignal(approvalSignal, { approved: true }); + * ``` + */ +export interface SignalSpec { + /** The signal name matched between `step.waitForSignal` and `sendSignal`. */ + readonly name: string; + /** + * Phantom type carrier — does NOT exist at runtime. + * Prevents structural collapse between different `SignalSpec` instantiations. + * @internal + */ + readonly __types?: { payload: Payload }; +} + +/** + * Create a typed signal descriptor. + * @param name - Signal name. Must match the name passed to both + * `step.waitForSignal` and `sendSignal`. + * @returns A `SignalSpec` descriptor. + * @example + * ```ts + * const approvalSignal = defineSignalSpec<{ approved: boolean; comment?: string }>( + * "approval-decision", + * ); + * ``` + */ +export function defineSignalSpec( + name: string, +): SignalSpec { + return { name }; +} + +/** + * Extract the signal name from a `string` or `SignalSpec`. + * @param nameOrSpec - Signal name string or `SignalSpec` descriptor. + * @returns The signal name string. + */ +export function resolveSignalName( + nameOrSpec: string | SignalSpec, +): string { + return typeof nameOrSpec === "string" ? nameOrSpec : nameOrSpec.name; +} diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 888120ac..868b5774 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,7 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep" | "workflow"; +export type StepKind = "function" | "sleep" | "workflow" | "signal"; /** * Status of a step attempt through its lifecycle. @@ -34,12 +34,23 @@ export interface WorkflowStepAttemptContext { timeoutAt: string | null; } +/** + * Context for a signal step attempt. + */ +export interface SignalStepAttemptContext { + kind: "signal"; + timeoutAt: string | null; + /** Set to true by deliverSignal once the signal payload has been written. */ + delivered?: boolean; +} + /** * Context for a step attempt. */ export type StepAttemptContext = | SleepStepAttemptContext - | WorkflowStepAttemptContext; + | WorkflowStepAttemptContext + | SignalStepAttemptContext; /** * StepAttempt represents a single attempt of a step within a workflow. @@ -171,3 +182,17 @@ export function createWorkflowContext( timeoutAt: timeoutAt?.toISOString() ?? null, }; } + +/** + * Create the context object for a signal step attempt. + * @param timeoutAt - Signal timeout deadline, or null for no timeout + * @returns The context object for a signal step + */ +export function createSignalContext( + timeoutAt: Readonly | null, +): SignalStepAttemptContext { + return { + kind: "signal" as const, + timeoutAt: timeoutAt?.toISOString() ?? null, + }; +} diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 01745679..9a08d459 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,4 +1,7 @@ +import type { DeliverSignalResult } from "./backend.js"; import type { DurationString } from "./duration.js"; +import type { JsonValue } from "./json.js"; +import type { SignalSpec } from "./signal-spec.js"; import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; @@ -40,6 +43,29 @@ export interface StepRunWorkflowOptions { timeout?: number | string | Date; } +/** + * Options for an individual step defined with `step.waitForSignal()`. + */ +export interface StepWaitForSignalOptions { + /** + * Maximum time to wait for the signal to arrive. If the signal is not + * received before the timeout, the step throws a SignalTimeoutError. + */ + timeout?: number | string | Date; +} + +/** + * Options for an individual step defined with `step.sendSignal()`. + */ +export interface StepSendSignalOptions { + /** + * Optional durable step name. Defaults to `send:${signalName}`. + * Provide an explicit name when you need to send the same signal multiple + * times — the auto-generated name would collide on the second call. + */ + name?: string; +} + /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, @@ -56,6 +82,16 @@ export interface StepApi { options?: Readonly, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; + waitForSignal: ( + nameOrSpec: string | SignalSpec, + options?: Readonly, + ) => Promise; + sendSignal: ( + targetRunId: string, + nameOrSpec: string | SignalSpec, + payload?: Payload, + options?: Readonly, + ) => Promise; } /** diff --git a/packages/openworkflow/index.ts b/packages/openworkflow/index.ts index d59653d9..67e64e3f 100644 --- a/packages/openworkflow/index.ts +++ b/packages/openworkflow/index.ts @@ -4,7 +4,18 @@ export { OpenWorkflow } from "./client/client.js"; // core export type { RetryPolicy, Workflow } from "./core/workflow-definition.js"; -export type { WorkflowRunMetadata } from "./core/workflow-function.js"; +export type { + WorkflowRunMetadata, + StepWaitForSignalOptions, + StepSendSignalOptions, +} from "./core/workflow-function.js"; +export type { + DeliverSignalParams, + DeliverSignalResult, +} from "./core/backend.js"; +export type { SignalSpec } from "./core/signal-spec.js"; +export { defineSignalSpec } from "./core/signal-spec.js"; +export { SignalTimeoutError } from "./worker/execution.js"; export { defineWorkflowSpec, defineWorkflow, diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index c2ac3187..f76f6a97 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -21,6 +21,8 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + DeliverSignalParams, + DeliverSignalResult, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; import { JsonValue } from "../core/json.js"; @@ -44,6 +46,11 @@ interface BackendPostgresOptions { schema?: string; } +/** Shape of the row returned by the deliverSignal CTE query. */ +interface DeliverSignalResultRow { + result: "delivered" | "workflow_not_found" | "signal_not_waiting"; +} + /** * Manages a connection to a Postgres database for workflow operations. */ @@ -423,11 +430,11 @@ export class BackendPostgres implements Backend { } /** - * Reconcile a just-parked parent run that is waiting on workflow replay. If the - * child already reached a terminal state before the parent cleared workerId, - * the normal child-completion wake-up can be missed. This forces an immediate - * wake-up for that case. - * @param workflowRunId - Parent workflow run id + * Reconcile a just-parked run that may have missed a wake-up while the + * worker still held its lease. Covers two cases: + * 1. A child workflow finished before the parent cleared workerId. + * 2. A signal was delivered before the parent cleared workerId. + * @param workflowRunId - Workflow run id to reconcile * @returns Updated run when reconciliation changed availability, otherwise null */ private async reconcileWorkflowSleepWakeUp( @@ -449,17 +456,28 @@ export class BackendPostgres implements Backend { AND wr."id" = ${workflowRunId} AND wr."status" = 'running' AND wr."worker_id" IS NULL - AND EXISTS ( - SELECT 1 - FROM ${stepAttemptsTable} sa - JOIN ${workflowRunsTable} child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = wr."namespace_id" - AND sa."workflow_run_id" = wr."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + AND ( + EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${workflowRunsTable} child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'signal' + AND sa."status" = 'running' + AND sa."context" @> '{"delivered":true}' + ) ) RETURNING wr.* `; @@ -689,6 +707,101 @@ export class BackendPostgres implements Backend { `; } + async deliverSignal( + params: DeliverSignalParams, + ): Promise { + const workflowRunsTable = this.workflowRunsTable(); + const stepAttemptsTable = this.stepAttemptsTable(); + + // Write the signal payload into the step attempt and mark it delivered. + // Use context.delivered as the delivery flag so null payloads work correctly. + // A single CTE handles three cases atomically: + // - delivery: update an existing running signal step (normal path) + // - pre_delivery: insert a buffered signal step for parked/pending workflows + // (signal sent before waitForSignal creates its step) + // - wake: advance available_at so the worker picks it up promptly + const [row] = await this.pg` + WITH delivery AS ( + UPDATE ${stepAttemptsTable} + SET + "output" = ${this.pg.json(params.payload)}, + "context" = "context" || '{"delivered":true}'::jsonb, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "step_name" = ${params.signalName} + AND "kind" = 'signal' + AND "status" = 'running' + AND NOT ("context" @> '{"delivered":true}') + RETURNING "id" + ), + pre_delivery AS ( + INSERT INTO ${stepAttemptsTable} ( + "namespace_id", "id", "workflow_run_id", "step_name", + "kind", "status", "config", "context", "output", + "started_at", "created_at", "updated_at" + ) + SELECT + ${this.namespaceId}, gen_random_uuid(), ${params.workflowRunId}, ${params.signalName}, + 'signal', 'running', '{}', + ${this.pg.json({ kind: "signal", delivered: true, timeoutAt: null })}, + ${this.pg.json(params.payload)}, + NOW(), NOW(), NOW() + WHERE NOT EXISTS (SELECT 1 FROM delivery) + AND NOT EXISTS ( + SELECT 1 FROM ${stepAttemptsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "step_name" = ${params.signalName} + AND "kind" = 'signal' + ) + AND EXISTS ( + SELECT 1 FROM ${workflowRunsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ${params.workflowRunId} + AND "status" IN ('pending', 'running') + AND "worker_id" IS NULL + ) + RETURNING "id" + ), + wake AS ( + UPDATE ${workflowRunsTable} + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > NOW() + THEN NOW() + ELSE "available_at" + END, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ${params.workflowRunId} + AND "status" IN ('pending', 'running') + AND (EXISTS (SELECT 1 FROM delivery) OR EXISTS (SELECT 1 FROM pre_delivery)) + RETURNING "id" + ) + SELECT + CASE + WHEN (SELECT count(*) FROM delivery) > 0 + OR (SELECT count(*) FROM pre_delivery) > 0 THEN 'delivered' + WHEN NOT EXISTS ( + SELECT 1 FROM ${workflowRunsTable} + WHERE "namespace_id" = ${this.namespaceId} AND "id" = ${params.workflowRunId} + ) THEN 'workflow_not_found' + ELSE 'signal_not_waiting' + END AS result + `; + + if (!row) { + return { delivered: false, reason: "workflow_not_found" }; + } + + if (row.result === "delivered") { + return { delivered: true }; + } + + return { delivered: false, reason: row.result }; + } + async createStepAttempt( params: CreateStepAttemptParams, ): Promise { diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index e12c4aed..49342232 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -20,6 +20,8 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + DeliverSignalParams, + DeliverSignalResult, toWorkflowRunCounts, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; @@ -367,17 +369,28 @@ export class BackendSqlite implements Backend { SET "status" = 'running', "available_at" = CASE - WHEN EXISTS ( - SELECT 1 - FROM "step_attempts" sa - JOIN "workflow_runs" child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = "workflow_runs"."namespace_id" - AND sa."workflow_run_id" = "workflow_runs"."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + WHEN ( + EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_runs" child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM "step_attempts" sa + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'signal' + AND sa."status" = 'running' + AND json_extract(sa."context", '$.delivered') IS TRUE + ) ) AND ? > ? THEN ? ELSE ? END, @@ -601,6 +614,164 @@ export class BackendSqlite implements Backend { return updated; } + // eslint-disable-next-line @typescript-eslint/require-await + async deliverSignal( + params: DeliverSignalParams, + ): Promise { + const currentTime = now(); + + // --- Path 1: Normal delivery — update an existing running signal step --- + // Use context.delivered as the delivery flag so null payloads work correctly. + const updateResult = this.db + .prepare( + ` + UPDATE "step_attempts" + SET + "output" = ?, + "context" = json_patch("context", '{"delivered":true}'), + "updated_at" = ? + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "step_name" = ? + AND "kind" = 'signal' + AND "status" = 'running' + AND json_extract("context", '$.delivered') IS NOT TRUE + `, + ) + .run( + toJSON(params.payload), + currentTime, + this.namespaceId, + params.workflowRunId, + params.signalName, + ); + + if (updateResult.changes > 0) { + // Wake the workflow run so it picks up the signal on next execution. + // No worker_id guard: signal can arrive while the worker still holds the + // lease, and sleepWorkflowRun's reconcile step will correct available_at + // after the worker parks. + this.db + .prepare( + ` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running') + `, + ) + .run( + currentTime, + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + ); + return { delivered: true }; + } + + // --- Path 2: Pre-delivery to a parked / pending workflow --- + // If the workflow is active but the worker is not currently executing it + // (worker_id IS NULL), and no signal step for this name exists yet, buffer + // the signal by inserting a running step with delivered=true. The pre-pass + // in executeWorkflow will complete it on the next execution and the payload + // will be available in the cache when waitForSignal is replayed. + const insertResult = this.db + .prepare( + ` + INSERT INTO "step_attempts" ( + "namespace_id", "id", "workflow_run_id", "step_name", + "kind", "status", "config", "context", "output", + "started_at", "created_at", "updated_at" + ) + SELECT ?, ?, ?, ?, 'signal', 'running', '{}', + ?, ?, ?, ?, ? + WHERE NOT EXISTS ( + SELECT 1 FROM "step_attempts" + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "step_name" = ? + AND "kind" = 'signal' + ) + AND EXISTS ( + SELECT 1 FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running') + AND "worker_id" IS NULL + ) + `, + ) + .run( + this.namespaceId, + generateUUID(), + params.workflowRunId, + params.signalName, + toJSON({ kind: "signal", delivered: true, timeoutAt: null }), + toJSON(params.payload), + currentTime, + currentTime, + currentTime, + // NOT EXISTS params + this.namespaceId, + params.workflowRunId, + params.signalName, + // EXISTS params + this.namespaceId, + params.workflowRunId, + ); + + if (insertResult.changes > 0) { + this.db + .prepare( + ` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running') + `, + ) + .run( + currentTime, + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + ); + return { delivered: true }; + } + + // --- Path 3: Delivery not possible — determine why --- + const workflowRow = this.db + .prepare( + ` + SELECT "id" FROM "workflow_runs" + WHERE "namespace_id" = ? AND "id" = ? + LIMIT 1 + `, + ) + .get(this.namespaceId, params.workflowRunId) as + | { id: string } + | undefined; + + if (!workflowRow) { + return { delivered: false, reason: "workflow_not_found" }; + } + return { delivered: false, reason: "signal_not_waiting" }; + } + private wakeParentWorkflowRun(childWorkflowRun: Readonly): void { if ( !childWorkflowRun.parentStepAttemptNamespaceId || diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 7ecd8346..939ee007 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -2230,6 +2230,262 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); }); + + describe("deliverSignal()", () => { + test("returns workflow_not_found when workflow run does not exist", async () => { + const backend = await setup(); + try { + const result = await backend.deliverSignal({ + workflowRunId: randomUUID(), + signalName: "test-signal", + payload: { value: 42 }, + }); + expect(result.delivered).toBe(false); + if (!result.delivered) { + expect(result.reason).toBe("workflow_not_found"); + } + } finally { + await teardown(backend); + } + }); + + test("buffers signal to pending workflow before any signal step is created", async () => { + const backend = await setup(); + try { + const run = await createPendingWorkflowRun(backend); + const result = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "test-signal", + payload: { value: 42 }, + }); + expect(result.delivered).toBe(true); + + // Verify a pre-delivered running step was created + const steps = await backend.listStepAttempts({ + workflowRunId: run.id, + limit: 10, + }); + const signalStep = steps.data.find( + (s) => s.stepName === "test-signal" && s.kind === "signal", + ); + expect(signalStep).toBeDefined(); + expect(signalStep?.status).toBe("running"); + expect(signalStep?.output).toEqual({ value: 42 }); + expect( + (signalStep?.context as { delivered?: boolean } | null)?.delivered, + ).toBe(true); + } finally { + await teardown(backend); + } + }); + + test("returns signal_not_waiting when workflow is currently executing (worker_id set)", async () => { + const backend = await setup(); + try { + const run = await createPendingWorkflowRun(backend); + const workerId = randomUUID(); + // Claim the run (simulates an active worker executing the workflow) + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + expect(claimed).not.toBeNull(); + + // Worker is actively executing — no signal step created yet + const result = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "test-signal", + payload: { value: 42 }, + }); + expect(result.delivered).toBe(false); + if (!result.delivered) { + expect(result.reason).toBe("signal_not_waiting"); + } + } finally { + await teardown(backend); + } + }); + + test("returns signal_not_waiting when signal was already buffered (idempotency guard)", async () => { + const backend = await setup(); + try { + const run = await createPendingWorkflowRun(backend); + + // First delivery — buffers the signal + const first = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "my-signal", + payload: { value: 1 }, + }); + expect(first.delivered).toBe(true); + + // Second delivery — slot is already taken + const second = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "my-signal", + payload: { value: 2 }, + }); + expect(second.delivered).toBe(false); + if (!second.delivered) { + expect(second.reason).toBe("signal_not_waiting"); + } + + // Payload should still be from the first delivery + const steps = await backend.listStepAttempts({ + workflowRunId: run.id, + limit: 10, + }); + const signalStep = steps.data.find( + (s) => s.stepName === "my-signal" && s.kind === "signal", + ); + expect(signalStep?.output).toEqual({ value: 1 }); + } finally { + await teardown(backend); + } + }); + + test("delivers signal and wakes the workflow run", async () => { + const backend = await setup(); + try { + const run = await createPendingWorkflowRun(backend); + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + if (!claimed) throw new Error("Expected to claim a workflow run"); + + // Create a signal step attempt (simulates what executeWorkflow does) + const attempt = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId, + stepName: "my-signal", + kind: "signal", + config: {}, + context: { kind: "signal", timeoutAt: null }, + }); + expect(attempt.status).toBe("running"); + + // Park the workflow (simulates sleepWorkflowRun after waitForSignal) + const futureDate = new Date(Date.now() + 60 * 60 * 1000); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId, + availableAt: futureDate, + }); + + // Deliver the signal + const result = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "my-signal", + payload: { answer: 42 }, + }); + expect(result.delivered).toBe(true); + + // The signal step should now have the payload and be marked delivered + const updatedAttempt = await backend.getStepAttempt({ + stepAttemptId: attempt.id, + }); + expect(updatedAttempt?.output).toEqual({ answer: 42 }); + + // The workflow run should be woken up (available_at moved to NOW) + const updatedRun = await backend.getWorkflowRun({ + workflowRunId: run.id, + }); + expect(updatedRun?.availableAt).not.toBeNull(); + if (updatedRun?.availableAt) { + // Should be woken up (available_at significantly earlier than futureDate) + expect(updatedRun.availableAt.getTime()).toBeLessThan( + futureDate.getTime() - 60 * 60 * 500, + ); + } + } finally { + await teardown(backend); + } + }); + + test("does not deliver signal to a completed step attempt", async () => { + const backend = await setup(); + try { + const run = await createPendingWorkflowRun(backend); + const workerId = randomUUID(); + await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + + // Create and complete a signal attempt + const attempt = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId, + stepName: "done-signal", + kind: "signal", + config: {}, + context: { kind: "signal", timeoutAt: null }, + }); + await backend.completeStepAttempt({ + workflowRunId: run.id, + stepAttemptId: attempt.id, + workerId, + output: { original: true }, + }); + + // Try to deliver a second time — should fail + const result = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "done-signal", + payload: { overwrite: true }, + }); + expect(result.delivered).toBe(false); + if (!result.delivered) { + expect(result.reason).toBe("signal_not_waiting"); + } + } finally { + await teardown(backend); + } + }); + + test("null payload is treated as a valid delivery", async () => { + const backend = await setup(); + try { + const run = await createPendingWorkflowRun(backend); + const workerId = randomUUID(); + await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + + const attempt = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId, + stepName: "ping", + kind: "signal", + config: {}, + context: { kind: "signal", timeoutAt: null }, + }); + + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId, + availableAt: new Date(Date.now() + 3_600_000), + }); + + const result = await backend.deliverSignal({ + workflowRunId: run.id, + signalName: "ping", + payload: null, + }); + expect(result.delivered).toBe(true); + + const updatedAttempt = await backend.getStepAttempt({ + stepAttemptId: attempt.id, + }); + expect(updatedAttempt?.output).toBeNull(); + } finally { + await teardown(backend); + } + }); + }); }); } diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 2dba8674..99dcc2af 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -1,7 +1,11 @@ import { OpenWorkflow } from "../client/client.js"; import type { Backend } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; -import type { StepAttempt } from "../core/step-attempt.js"; +import { defineSignalSpec } from "../core/signal-spec.js"; +import type { + SignalStepAttemptContext, + StepAttempt, +} from "../core/step-attempt.js"; import { DEFAULT_WORKFLOW_RETRY_POLICY } from "../core/workflow-definition.js"; import type { WorkflowFunctionParams } from "../core/workflow-function.js"; import type { WorkflowRun } from "../core/workflow-run.js"; @@ -10,6 +14,7 @@ import { DEFAULT_POSTGRES_URL } from "../postgres/postgres.js"; import { WORKFLOW_STEP_LIMIT, STEP_LIMIT_EXCEEDED_ERROR_CODE, + SignalTimeoutError, createStepExecutionStateFromAttempts, executeWorkflow, } from "./execution.js"; @@ -3258,8 +3263,1155 @@ describe("executeWorkflow", () => { expect(snapshots[0]).toEqual(snapshots[1]); }); }); + + // ---- step.waitForSignal tests ----------------------------------------- + + describe("step.waitForSignal", () => { + test("parks workflow waiting for signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-parks-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ value: number }>( + "my-signal", + ); + return payload.value; + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + const parkedRun = await tickUntilParked( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + + expect(parkedRun.status).toBe("running"); + expect(parkedRun.workerId).toBeNull(); + expect(parkedRun.availableAt).not.toBeNull(); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempt = attempts.data.find((a) => a.kind === "signal"); + expect(signalAttempt).toBeDefined(); + expect(signalAttempt?.stepName).toBe("my-signal"); + expect(signalAttempt?.status).toBe("running"); + }); + + test("resumes workflow with signal payload after sendSignal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-resume-${randomUUID()}` }, + async ({ step }) => { + const before = await step.run({ name: "before" }, () => 10); + const payload = await step.waitForSignal<{ multiplier: number }>( + "multiply-signal", + ); + return before * payload.multiplier; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Tick until the workflow parks waiting for the signal + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Send the signal + await handle.sendSignal("multiply-signal", { multiplier: 3 }); + + // Tick until completed + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(30); + }); + + test("throws SignalTimeoutError when timeout elapses", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-${randomUUID()}` }, + async ({ step }) => { + try { + await step.waitForSignal("timed-signal", { timeout: "20ms" }); + return "signal-received"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + return "timed-out"; + } + throw error; + } + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + // Tick until parked (waiting for signal) + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Wait for timeout to elapse + await sleep(50); + + // Tick again — timeout should be detected, workflow completes + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("timed-out"); + }); + + test("steps after waitForSignal are skipped on replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let sideEffectCount = 0; + + const workflow = client.defineWorkflow( + { name: `signal-replay-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ value: string }>( + "my-signal", + ); + // This step should only run once (not on the replay pass that parks) + await step.run({ name: "after-signal" }, () => { + sideEffectCount++; + }); + return payload.value; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + expect(sideEffectCount).toBe(0); + + await handle.sendSignal("my-signal", { value: "hello" }); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + expect(sideEffectCount).toBe(1); + await expect(handle.result()).resolves.toBe("hello"); + }); + + test("ow.sendSignal() with run ID delivers signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-ow-send-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ approved: boolean }>( + "approval", + ); + return payload.approved ? "approved" : "rejected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Use top-level ow.sendSignal() instead of handle.sendSignal() + const result = await client.sendSignal( + handle.workflowRun.id, + "approval", + { approved: true }, + ); + expect(result.delivered).toBe(true); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("approved"); + }); + + test("sendSignal returns signal_not_waiting when no signal step exists", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-not-waiting-${randomUUID()}` }, + async ({ step }) => { + await step.run({ name: "only-step" }, () => "done"); + return "complete"; + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + // Workflow is already complete — signal delivery should fail + const result = await client.sendSignal( + handle.workflowRun.id, + "some-signal", + {}, + ); + expect(result.delivered).toBe(false); + }); + + test("signal sent before workflow reaches waitForSignal is buffered and received", async () => { + // Pre-delivery: sendSignal before the workflow has parked on waitForSignal. + // The signal is buffered as a pre-delivered running step; the pre-pass in + // executeWorkflow completes it so waitForSignal finds the payload in cache. + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-pre-deliver-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ answer: number }>( + "pre-signal", + ); + return payload.answer; + }, + ); + + const handle = await workflow.run(); + + // Deliver BEFORE any worker tick — workflow is pending, no signal step yet + const result = await handle.sendSignal("pre-signal", { answer: 99 }); + expect(result.delivered).toBe(true); + + // Now let the worker process; it should receive the buffered signal + const worker = client.newWorker({ concurrency: 2 }); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 300, 15); + await expect(handle.result()).resolves.toBe(99); + }); + + test("signal sent after workflow parks is received normally (pre-delivery not involved)", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-normal-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ answer: number }>( + "normal-signal", + ); + return payload.answer; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Wait until workflow parks + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Deliver normally (workflow already parked on the signal step) + const result = await handle.sendSignal("normal-signal", { answer: 77 }); + expect(result.delivered).toBe(true); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe(77); + }); + + test("signal step stores timeoutAt in context", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-context-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("my-signal", { timeout: "1h" }); + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempt = attempts.data.find((a) => a.kind === "signal"); + const signalContext = signalAttempt?.context as SignalStepAttemptContext; + expect(signalContext.kind).toBe("signal"); + // timeoutAt should be approximately 1 hour from now + expect(signalContext.timeoutAt).toBeTypeOf("string"); + const timeoutAt = new Date(signalContext.timeoutAt ?? ""); + const diffMs = timeoutAt.getTime() - Date.now(); + expect(diffMs).toBeGreaterThan(59 * 60 * 1000); + expect(diffMs).toBeLessThan(61 * 60 * 1000); + }); + + test("signal step without timeout stores null timeoutAt", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-no-timeout-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("my-signal"); + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempt = attempts.data.find((a) => a.kind === "signal"); + expect(signalAttempt?.context).toMatchObject({ + kind: "signal", + timeoutAt: null, + }); + }); + + test("duplicate waitForSignal names are auto-indexed", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-duplicate-${randomUUID()}` }, + async ({ step }) => { + const a = await step.waitForSignal<{ v: number }>("sig"); + const b = await step.waitForSignal<{ v: number }>("sig"); + return a.v + b.v; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Parks on first signal + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("sig", { v: 10 }); + + // Parks on second signal + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("sig:1", { v: 20 }); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe(30); + }); + + test("timeout: 0 times out immediately", async () => { + // Regression: timeout: 0 is falsy, so the truthiness check + // `options?.timeout ? ...` skipped resolveWorkflowTimeoutAt and + // treated it as no timeout, causing the workflow to park until + // deadline/default instead of timing out immediately. + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-zero-${randomUUID()}` }, + async ({ step }) => { + try { + await step.waitForSignal("never", { timeout: 0 }); + return "received"; + } catch (error) { + if (error instanceof SignalTimeoutError) return "timed-out"; + throw error; + } + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("timed-out"); + }); + + test("signal received before timeout does not trigger timeout on replay", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-before-timeout-${randomUUID()}` }, + async ({ step }) => { + try { + const payload = await step.waitForSignal<{ value: string }>( + "quick-signal", + { timeout: "500ms" }, + ); + return `received:${payload.value}`; + } catch (error) { + if (error instanceof SignalTimeoutError) return "timed-out"; + throw error; + } + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + // Send signal before timeout elapses + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("quick-signal", { value: "fast" }); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("received:fast"); + }); + + test("signal timeout carries correct signalName on the error", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + let caughtError: SignalTimeoutError | undefined; + + const workflow = client.defineWorkflow( + { name: `signal-error-name-${randomUUID()}` }, + async ({ step }) => { + try { + await step.waitForSignal("named-signal", { timeout: "20ms" }); + return "received"; + } catch (error) { + if (error instanceof SignalTimeoutError) { + caughtError = error; + return "caught"; + } + throw error; + } + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await sleep(50); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + await expect(handle.result()).resolves.toBe("caught"); + expect(caughtError).toBeInstanceOf(SignalTimeoutError); + expect(caughtError?.signalName).toBe("named-signal"); + expect(caughtError?.code).toBe("SIGNAL_TIMEOUT"); + }); + + test("null payload signal is treated as delivered", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-null-payload-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal("ping"); + return payload === null ? "got-null" : "unexpected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Send signal with no payload — defaults to null + const result = await handle.sendSignal("ping"); + expect(result.delivered).toBe(true); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("got-null"); + }); + + test("explicit null payload signal is treated as delivered", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-explicit-null-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal("ping"); + return payload === null ? "got-null" : "unexpected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + // Send signal with explicit null payload + const result = await client.sendSignal( + handle.workflowRun.id, + "ping", + null, + ); + expect(result.delivered).toBe(true); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe("got-null"); + }); + + test("waitForSignal step is not created again on replay after signal received", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-no-duplicate-attempt-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal<{ ok: boolean }>("once"); + return "done"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + await handle.sendSignal("once", { ok: true }); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + + // There should be exactly one signal step attempt + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalAttempts = attempts.data.filter((a) => a.kind === "signal"); + expect(signalAttempts).toHaveLength(1); + const signalAttempt = signalAttempts.at(0); + expect(signalAttempt?.status).toBe("completed"); + expect(signalAttempt?.output).toEqual({ ok: true }); + }); + + test("signal delivered while worker holds lease still wakes the run", async () => { + // Regression: deliverSignal previously guarded the wake-up query with + // "worker_id IS NULL". If the signal arrived while the worker still held + // its lease, no wake-up was recorded and the run slept until + // timeout/deadline. sleepWorkflowRun's reconcile step must correct this. + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-race-lease-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ v: number }>("race"); + return payload.v * 2; + }, + ); + + // Long deadline so the bug would cause a hang, not a spurious pass. + const handle = await workflow.run( + {}, + { deadlineAt: new Date(Date.now() + 60_000) }, + ); + const worker = client.newWorker({ concurrency: 1 }); + + // Start the tick (worker claims run, creates signal step, then parks). + // Deliver the signal concurrently so it may arrive while worker_id is set. + const tickPromise = worker.tick(); + await sleep(20); // give the worker time to create the signal step + const deliverResult = await handle.sendSignal("race", { v: 7 }); + expect(deliverResult.delivered).toBe(true); + await tickPromise; + + // reconcileWorkflowSleepWakeUp should have reset available_at to NOW(), + // so the next tick picks it up immediately. + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toBe(14); + }); + + test("workflow deadline is used as park time when no signal timeout is set", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + // Deadline 2 seconds from now — well in the future but measurable + const deadlineAt = new Date(Date.now() + 2000); + + const workflow = client.defineWorkflow( + { name: `signal-deadline-park-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("any-signal"); + return "received"; + }, + ); + + const handle = await workflow.run({}, { deadlineAt }); + const worker = client.newWorker(); + const parkedRun = await tickUntilParked( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + + // availableAt should be at (or very close to) deadlineAt since no + // signal timeout was set — the workflow parks until its own deadline + expect(parkedRun.availableAt).not.toBeNull(); + const diff = Math.abs( + parkedRun.availableAt.getTime() - deadlineAt.getTime(), + ); + expect(diff).toBeLessThan(2000); + }); + + test("workflow with expired deadline fails while waiting for signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-deadline-fail-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("any-signal"); + return "received"; + }, + ); + + // deadline already in the past + const handle = await workflow.run( + {}, + { deadlineAt: new Date(Date.now() - 1) }, + ); + const worker = client.newWorker(); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + expect(status).toBe("failed"); + }); + + test("waitForSignal accepts a SignalSpec descriptor", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const approvalSignal = defineSignalSpec<{ approved: boolean }>( + "approval", + ); + + const workflow = client.defineWorkflow( + { name: `signal-spec-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal(approvalSignal); + return payload.approved ? "approved" : "rejected"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + const result = await handle.sendSignal(approvalSignal, { + approved: true, + }); + expect(result.delivered).toBe(true); + + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("approved"); + }); + + test("parallel Promise.all signals: second signal delivered first is processed correctly", async () => { + // Regression: the pre-pass previously threw SleepSignal at the FIRST + // undelivered signal, skipping processing of later delivered signals. + // This test verifies the fix using a controlled mock backend so both + // signal step attempts are present from the start. + const sigAId = "sig-a-attempt"; + const sigBId = "sig-b-attempt"; + + const sigAAttempt = createMockStepAttempt({ + id: sigAId, + stepName: "sig-a", + kind: "signal", + status: "running", + context: { kind: "signal", timeoutAt: null }, + output: null, + finishedAt: null, + }); + // sig-b is already delivered + const sigBAttempt = createMockStepAttempt({ + id: sigBId, + stepName: "sig-b", + kind: "signal", + status: "running", + context: { kind: "signal", timeoutAt: null, delivered: true }, + output: { v: 20 }, + finishedAt: null, + }); + + const listStepAttempts = vi.fn(() => + Promise.resolve({ + data: [sigAAttempt, sigBAttempt], + pagination: { next: null, prev: null }, + }), + ); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + stepName: params.stepAttemptId === sigAId ? "sig-a" : "sig-b", + kind: "signal", + status: "completed", + output: params.output, + }), + ), + ); + const sleepWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + workerId: null, + availableAt: params.availableAt, + }), + ), + ); + + const workflowRun = createMockWorkflowRun({ + id: "parallel-signal-run", + workerId: "worker-1", + deadlineAt: new Date(Date.now() + 3_600_000), + }); + + await executeWorkflow({ + backend: { + listStepAttempts, + completeStepAttempt, + sleepWorkflowRun, + } as unknown as Backend, + workflowRun, + workflowFn: vi.fn(), + workflowVersion: null, + workerId: "worker-1", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + // sig-b must be completed (its delivery was processed) + expect(completeStepAttempt).toHaveBeenCalledWith( + expect.objectContaining({ stepAttemptId: sigBId, output: { v: 20 } }), + ); + + // sig-a must NOT be completed (still waiting) + expect(completeStepAttempt).not.toHaveBeenCalledWith( + expect.objectContaining({ stepAttemptId: sigAId }), + ); + + // Workflow must be parked (waiting for sig-a) + expect(sleepWorkflowRun).toHaveBeenCalledTimes(1); + }); + }); + + // ---- step.sendSignal tests ---------------------------------------------- + + describe("step.sendSignal", () => { + test("sends a signal to another workflow run and returns delivered result", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + // Receiver workflow: parks on waitForSignal + const receiver = client.defineWorkflow( + { name: `send-signal-recv-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ value: number }>("ping"); + return payload.value * 2; + }, + ); + + // Sender workflow: sends a signal to the receiver using its run ID + const sender = client.defineWorkflow< + { targetRunId: string }, + boolean + >( + { name: `send-signal-send-${randomUUID()}` }, + async ({ step, input }) => { + const result = await step.sendSignal(input.targetRunId, "ping", { + value: 21, + }); + return result.delivered; + }, + ); + + const worker = client.newWorker({ concurrency: 4 }); + + // Start receiver and wait until it is parked + const recvHandle = await receiver.run(); + await tickUntilParked(backend, worker, recvHandle.workflowRun.id, 200, 10); + + // Start sender pointing at receiver + const sendHandle = await sender.run({ + targetRunId: recvHandle.workflowRun.id, + }); + await tickUntilTerminal( + backend, + worker, + sendHandle.workflowRun.id, + 200, + 10, + ); + await expect(sendHandle.result()).resolves.toBe(true); + + // Receiver should wake and complete with doubled value + await tickUntilTerminal( + backend, + worker, + recvHandle.workflowRun.id, + 200, + 10, + ); + await expect(recvHandle.result()).resolves.toBe(42); + }); + + test("step.sendSignal is memoized on replay — signal not re-delivered", async () => { + // Durability: the send result is memoized in the step-attempt cache. + // On a second execution pass (replay), the cached result is returned and + // deliverSignal is NOT called again (signal slot is already consumed). + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const receiver = client.defineWorkflow( + { name: `send-signal-replay-recv-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal<{ v: number }>("the-signal"); + return payload.v; + }, + ); + + // This sender has a step AFTER sendSignal so a replay pass is forced + // once the send step is cached, verifying the memoized result is used. + const sender = client.defineWorkflow<{ targetRunId: string }, boolean>( + { name: `send-signal-replay-send-${randomUUID()}` }, + async ({ step, input }) => { + const deliverResult = await step.sendSignal( + input.targetRunId, + "the-signal", + { v: 10 }, + ); + // Second step forces a replay pass after sendSignal completes + const confirmed = await step.run({ name: "confirm" }, () => + deliverResult.delivered, + ); + return confirmed; + }, + ); + + const worker = client.newWorker({ concurrency: 4 }); + + const recvHandle = await receiver.run(); + await tickUntilParked(backend, worker, recvHandle.workflowRun.id, 200, 10); + + const sendHandle = await sender.run({ + targetRunId: recvHandle.workflowRun.id, + }); + await tickUntilTerminal( + backend, + worker, + sendHandle.workflowRun.id, + 200, + 10, + ); + await expect(sendHandle.result()).resolves.toBe(true); + + // Receiver should also complete + await tickUntilTerminal( + backend, + worker, + recvHandle.workflowRun.id, + 200, + 10, + ); + await expect(recvHandle.result()).resolves.toBe(10); + + // Verify only one sendSignal step attempt was created + const attempts = await backend.listStepAttempts({ + workflowRunId: sendHandle.workflowRun.id, + limit: 100, + }); + const sendAttempts = attempts.data.filter((a) => + a.stepName.startsWith("send:"), + ); + expect(sendAttempts).toHaveLength(1); + expect(sendAttempts[0]?.status).toBe("completed"); + }); + + test("step.sendSignal to unknown run returns signal_not_waiting and is memoized", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `send-signal-unknown-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal( + "non-existent-run-id", + "ghost-signal", + {}, + ); + return result.delivered ? "ok" : "not-delivered"; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await tickUntilTerminal(backend, worker, handle.workflowRun.id, 200, 10); + await expect(handle.result()).resolves.toBe("not-delivered"); + }); + + test("step.sendSignal works with a SignalSpec descriptor", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const greeting = defineSignalSpec<{ message: string }>("greeting"); + + const receiver = client.defineWorkflow( + { name: `send-signal-spec-recv-${randomUUID()}` }, + async ({ step }) => { + const payload = await step.waitForSignal(greeting); + return payload.message; + }, + ); + + const sender = client.defineWorkflow<{ targetRunId: string }, boolean>( + { name: `send-signal-spec-send-${randomUUID()}` }, + async ({ step, input }) => { + const r = await step.sendSignal(input.targetRunId, greeting, { + message: "hello", + }); + return r.delivered; + }, + ); + + const worker = client.newWorker({ concurrency: 4 }); + const recvHandle = await receiver.run(); + await tickUntilParked(backend, worker, recvHandle.workflowRun.id, 200, 10); + + const sendHandle = await sender.run({ + targetRunId: recvHandle.workflowRun.id, + }); + await tickUntilTerminal( + backend, + worker, + sendHandle.workflowRun.id, + 200, + 10, + ); + await expect(sendHandle.result()).resolves.toBe(true); + + await tickUntilTerminal( + backend, + worker, + recvHandle.workflowRun.id, + 200, + 10, + ); + await expect(recvHandle.result()).resolves.toBe("hello"); + }); + + test("step.sendSignal default step name includes signal name", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const receiver = client.defineWorkflow( + { name: `send-signal-name-recv-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("my-event"); + }, + ); + + const sender = client.defineWorkflow<{ targetRunId: string }, void>( + { name: `send-signal-name-send-${randomUUID()}` }, + async ({ step, input }) => { + await step.sendSignal(input.targetRunId, "my-event", {}); + }, + ); + + const worker = client.newWorker({ concurrency: 4 }); + const recvHandle = await receiver.run(); + await tickUntilParked(backend, worker, recvHandle.workflowRun.id, 200, 10); + + const sendHandle = await sender.run({ + targetRunId: recvHandle.workflowRun.id, + }); + await tickUntilTerminal( + backend, + worker, + sendHandle.workflowRun.id, + 200, + 10, + ); + + const attempts = await backend.listStepAttempts({ + workflowRunId: sendHandle.workflowRun.id, + limit: 100, + }); + const sendStep = attempts.data.find((a) => a.stepName === "send:my-event"); + expect(sendStep).toBeDefined(); + expect(sendStep?.config).toEqual({ + targetRunId: recvHandle.workflowRun.id, + signalName: "my-event", + }); + }); + + test("step.sendSignal with explicit name overrides default", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const receiver = client.defineWorkflow( + { name: `send-signal-cname-recv-${randomUUID()}` }, + async ({ step }) => { + await step.waitForSignal("event-x"); + }, + ); + + const sender = client.defineWorkflow<{ targetRunId: string }, void>( + { name: `send-signal-cname-send-${randomUUID()}` }, + async ({ step, input }) => { + await step.sendSignal(input.targetRunId, "event-x", {}, { + name: "notify-reviewer", + }); + }, + ); + + const worker = client.newWorker({ concurrency: 4 }); + const recvHandle = await receiver.run(); + await tickUntilParked(backend, worker, recvHandle.workflowRun.id, 200, 10); + + const sendHandle = await sender.run({ + targetRunId: recvHandle.workflowRun.id, + }); + await tickUntilTerminal( + backend, + worker, + sendHandle.workflowRun.id, + 200, + 10, + ); + + const attempts = await backend.listStepAttempts({ + workflowRunId: sendHandle.workflowRun.id, + limit: 100, + }); + const sendStep = attempts.data.find( + (a) => a.stepName === "notify-reviewer", + ); + expect(sendStep).toBeDefined(); + }); + + test("parallel cross-workflow signals: both signals delivered and both receivers complete", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const receiver = client.defineWorkflow( + { name: `parallel-recv-${randomUUID()}` }, + async ({ step }) => { + const p = await step.waitForSignal<{ n: number }>("go"); + return p.n; + }, + ); + + const sender = client.defineWorkflow< + { recvA: string; recvB: string }, + boolean + >( + { name: `parallel-send-${randomUUID()}` }, + async ({ step, input }) => { + // Send to both receivers in parallel — tests that duplicate default + // names are auto-indexed ("send:go" and "send:go:1") + const [a, b] = await Promise.all([ + step.sendSignal(input.recvA, "go", { n: 1 }), + step.sendSignal(input.recvB, "go", { n: 2 }), + ]); + return a.delivered && b.delivered; + }, + ); + + const worker = client.newWorker({ concurrency: 6 }); + + const recvA = await receiver.run(); + const recvB = await receiver.run(); + await tickUntilParked(backend, worker, recvA.workflowRun.id, 200, 10); + await tickUntilParked(backend, worker, recvB.workflowRun.id, 200, 10); + + const sendHandle = await sender.run({ + recvA: recvA.workflowRun.id, + recvB: recvB.workflowRun.id, + }); + await tickUntilTerminal( + backend, + worker, + sendHandle.workflowRun.id, + 200, + 10, + ); + await expect(sendHandle.result()).resolves.toBe(true); + + await tickUntilTerminal( + backend, + worker, + recvA.workflowRun.id, + 200, + 10, + ); + await tickUntilTerminal( + backend, + worker, + recvB.workflowRun.id, + 200, + 10, + ); + + await expect(recvA.result()).resolves.toBe(1); + await expect(recvB.result()).resolves.toBe(2); + + // Verify the two step names were auto-indexed + const sendAttempts = ( + await backend.listStepAttempts({ + workflowRunId: sendHandle.workflowRun.id, + limit: 100, + }) + ).data.filter((a) => a.stepName.startsWith("send:go")); + expect(sendAttempts).toHaveLength(2); + const names = sendAttempts.map((a) => a.stepName).sort(); + expect(names).toEqual(["send:go", "send:go:1"]); + }); + }); }); + describe("createStepExecutionStateFromAttempts", () => { test("builds successful cache and failed-count map from mixed history", () => { const completed = createMockStepAttempt({ diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index d825d737..9ce945a7 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -1,4 +1,4 @@ -import type { Backend } from "../core/backend.js"; +import type { Backend, DeliverSignalResult } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; import { deserializeError, @@ -6,6 +6,8 @@ import { type SerializedError, } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; +import type { SignalSpec } from "../core/signal-spec.js"; +import { resolveSignalName } from "../core/signal-spec.js"; import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; import { getCachedStepAttempt, @@ -14,6 +16,7 @@ import { calculateDateFromDuration, createSleepContext, createWorkflowContext, + createSignalContext, } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate, @@ -23,6 +26,8 @@ import { } from "../core/workflow-definition.js"; import type { StepRunWorkflowOptions, + StepWaitForSignalOptions, + StepSendSignalOptions, StepApi, StepFunction, StepFunctionConfig, @@ -49,6 +54,20 @@ class SleepSignal extends Error { } } +/** + * Error thrown when a signal step times out before receiving a signal. + */ +export class SignalTimeoutError extends Error { + readonly code = "SIGNAL_TIMEOUT"; + readonly signalName: string; + + constructor(signalName: string) { + super(`Timed out waiting for signal "${signalName}"`); + this.name = "SignalTimeoutError"; + this.signalName = signalName; + } +} + /** * Raised when a parallel branch continues after the parent execution has been * parked or otherwise finalized for this replay pass. @@ -337,6 +356,13 @@ function getRunningWaitAttemptResumeAt( return Number.isFinite(resumeAt.getTime()) ? resumeAt : null; } + if (attempt.kind === "signal" && attempt.context?.kind === "signal") { + const { timeoutAt } = attempt.context; + if (!timeoutAt) return null; // no timeout — waits until signal arrives + const signalTimeoutAt = new Date(timeoutAt); + return Number.isFinite(signalTimeoutAt.getTime()) ? signalTimeoutAt : null; + } + if (attempt.kind !== "workflow") { return null; } @@ -470,6 +496,7 @@ export interface StepExecutorOptions { backend: Backend; workflowRunId: string; workerId: string; + deadlineAt: Date | null; attempts: StepAttempt[]; stepLimit?: number; executionFence: ExecutionFenceController; @@ -493,6 +520,7 @@ class StepExecutor implements StepApi { private readonly backend: Backend; private readonly workflowRunId: string; private readonly workerId: string; + private readonly deadlineAt: Date | null; private readonly stepLimit: number; private stepCount: number; private cache: StepAttemptCache; @@ -507,6 +535,7 @@ class StepExecutor implements StepApi { this.backend = options.backend; this.workflowRunId = options.workflowRunId; this.workerId = options.workerId; + this.deadlineAt = options.deadlineAt; this.stepLimit = Math.max(1, options.stepLimit ?? WORKFLOW_STEP_LIMIT); this.stepCount = options.attempts.length; @@ -674,6 +703,110 @@ class StepExecutor implements StepApi { throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); } + // ---- step.waitForSignal ------------------------------------------------- + + async waitForSignal( + nameOrSpec: string | SignalSpec, + options?: Readonly, + ): Promise { + const stepName = this.resolveStepName(resolveSignalName(nameOrSpec)); + + // return cached result if signal already completed on a prior replay + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) return existingAttempt.output as Payload; + + // if signal previously timed out (failed), surface the error so the + // workflow function can catch it with a try/catch around waitForSignal() + const failedCount = this.failedCountsByStepName.get(stepName); + if (failedCount !== undefined && failedCount > 0) { + throw new SignalTimeoutError(stepName); + } + + // create new step attempt for the signal + const timeoutAt = + options?.timeout === undefined + ? null + : resolveWorkflowTimeoutAt(options.timeout); + const context = createSignalContext(timeoutAt); + + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal", + config: {}, + context, + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + // park workflow waiting for signal (or until timeout) + throw new SleepSignal( + timeoutAt ?? this.deadlineAt ?? defaultWorkflowTimeoutAt(), + ); + } + + // ---- step.sendSignal ---------------------------------------------------- + + async sendSignal( + targetRunId: string, + nameOrSpec: string | SignalSpec, + payload?: Payload, + options?: Readonly, + ): Promise { + const signalName = resolveSignalName(nameOrSpec); + const stepName = this.resolveStepName(options?.name ?? `send:${signalName}`); + + // Return cached result if this send already completed on a prior replay + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) return existingAttempt.output as DeliverSignalResult; + + // Create the durable step attempt — kind "function" since the semantics + // are identical to step.run(): execute once, memoize result, retry on error. + // Config records the target for observability (dashboards, logs, introspection). + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "function", + config: { targetRunId, signalName }, + context: null, + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + try { + const result = await this.backend.deliverSignal({ + workflowRunId: targetRunId, + signalName, + payload: (payload ?? null) as JsonValue, + }); + + const savedAttempt = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output: result as JsonValue, + }); + + this.cache = addToStepAttemptCache(this.cache, savedAttempt); + this.runningByStepName.delete(stepName); + + return savedAttempt.output as DeliverSignalResult; + } catch (error) { + return this.failStepWithError( + stepName, + attempt.id, + error, + resolveStepRetryPolicy(), + ); + } + } + // ---- step.runWorkflow ----------------------------------------------- async runWorkflow( @@ -1078,10 +1211,74 @@ export async function executeWorkflow( } } + // Pre-pass: process running signal step attempts — complete delivered + // signals, fail timed-out signals, then re-park on the earliest still- + // pending signal. Two-phase approach ensures all delivered/timed-out + // signals are settled before we decide whether to park (important for + // parallel Promise.all([waitForSignal("A"), waitForSignal("B")]) patterns + // where B might be delivered before A). + let earliestPendingSignalResumeAt: Date | null = null; + for (let i = 0; i < attempts.length; i += 1) { + const attempt = attempts[i]; + if (!attempt) continue; + + if ( + attempt.status !== "running" || + attempt.kind !== "signal" || + attempt.context?.kind !== "signal" + ) { + continue; + } + + const { timeoutAt } = attempt.context; + + // Signal has been delivered (context.delivered set by deliverSignal) + if (attempt.context.delivered) { + const completed = await backend.completeStepAttempt({ + workflowRunId: workflowRun.id, + stepAttemptId: attempt.id, + workerId, + output: attempt.output, + }); + // update cache w/ completed attempt + attempts[i] = completed; + continue; + } + + // Signal timeout has elapsed without receiving the signal + if (timeoutAt && Date.now() >= new Date(timeoutAt).getTime()) { + const failed = await backend.failStepAttempt({ + workflowRunId: workflowRun.id, + stepAttemptId: attempt.id, + workerId, + error: serializeError(new SignalTimeoutError(attempt.stepName)), + }); + attempts[i] = failed; + continue; + } + + // Signal not yet received — record earliest pending wake-up time + const signalResumeAt = timeoutAt + ? new Date(timeoutAt) + : (workflowRun.deadlineAt ?? defaultWorkflowTimeoutAt()); + if ( + !earliestPendingSignalResumeAt || + signalResumeAt.getTime() < earliestPendingSignalResumeAt.getTime() + ) { + earliestPendingSignalResumeAt = signalResumeAt; + } + } + + // Park workflow if any signal is still waiting + if (earliestPendingSignalResumeAt !== null) { + throw new SleepSignal(earliestPendingSignalResumeAt); + } + const executor = new StepExecutor({ backend, workflowRunId: workflowRun.id, workerId, + deadlineAt: workflowRun.deadlineAt, attempts, executionFence, });