You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
#206 generalizes the fire-and-then-process pattern that Context.sendActivity already uses — enqueue work, return the HTTP response immediately, let a separate worker process the payload later — to arbitrary application-defined long-running jobs (digest mailers, image transcoding, periodic cleanup, …). Tasks run on a separate background worker, not in-request; in-process execution via setTimeout/queueMicrotask is explicitly out of scope.
This sub-issue delivers the end-to-end core: define a task, enqueue it from a Context, run it on a worker, retry on failure. It deliberately excludes deduplication (#798 sub-issue) and task-specific telemetry attributes (#799 sub-issue) so the first PR stays small and reviewable.
Public API
Registration
defineTask is declared on a new TaskRegistry<TContextData> interface; Federatable<TContextData> extends it, so both Federation and FederationBuilder inherit the method (they already extend Federatable).
The returned TaskDefinition<TContextData, TData> carries name and schema plus a phantom TContextData marker — no runtime methods. Dispatch lives on Context (below).
Payload handling
schema is required. Every task declares a Standard Schema object (Zod, Valibot, ArkType, …) and TData is inferred from it via StandardSchemaV1.InferOutput; there is no explicit-TData escape hatch and no unknown fallback. The schema validates the envelope of the payload, and an Activity Vocabulary object carried inside it (Note, Create, …) is described as an opaque instanceof leaf (z.instanceof(Create) / v.instance(Create)), so the schema stays small even when the payload carries ActivityPub objects.
Serialization is owned by Fedify, not the caller. The payload is encoded to a wire string with devalue, whose native custom-type hook bridges each vocabulary object through its JSON-LD document: vocabulary objects keep their state in private fields, so they cannot be serialized structurally and must travel as JSON-LD via the asynchronous toJsonLd/fromJsonLd pair. Encoding and decoding are therefore asynchronous, and there is no caller-facing stringify/parse hook.
The serializer (devalue) and the required-schema decision were settled on the umbrella issue; raise any reconsideration here rather than re-opening it on #206.
Dispatch (enqueue)
Two new methods on Context<TContextData>, placed alongside sendActivity / routeActivity / forwardActivity:
enqueueTask<TData>(task: TaskDefinition<TContextData,TData>,data: TData,options?: TaskEnqueueOptions): Promise<void>;enqueueTaskMany<TData>(task: TaskDefinition<TContextData,TData>,payloads: readonlyTData[],options?: TaskEnqueueOptions): Promise<void>;interfaceTaskEnqueueOptions{readonlydelay?: Temporal.DurationLike;readonlyorderingKey?: string;// deduplicationKey is added by the deduplication sub-issue (non-breaking).}
enqueueTask validates data against the task's schema (fail fast), encodes it with devalue, and stores the resulting string in the message before handing it to the queue adapter. enqueueTaskMany resolves the queue once and uses MessageQueue.enqueueMany when available, falling back to parallel single enqueues otherwise. Both go through the private #enqueueTasks helper.
Message is documented as opaque, so the new variant is non-breaking. data is a string (not unknown) because every queue backend serializes the message in its own way (JSON, structured clone, …); a pre-encoded string is the only form that survives all of them unchanged, and it is also what lets a vocabulary object travel as its JSON-LD document.
Queue routing
New task?: MessageQueue slot in FederationQueueOptions.
A task's queue is resolved as: per-task queue (a MessageQueue passed to defineTask) → taskQueue → outboxQueue fallback → throw. There is no per-call queue override on TaskEnqueueOptions; a task's destination is declared once at defineTask.
FederationOptions.taskQueueResolution?: "fallback" | "strict" (default "fallback") opts out of the outboxQueue fallback step for deployments that isolate task work.
Federation.startQueue(ctxData, { queue: "task" }) starts a dedicated task worker (extend the existing "inbox" | "outbox" | "fanout" selector).
Worker behavior
processQueuedTask gains a "task" branch dispatching to #listenTaskMessage, which: looks up the handler by taskName (missing → log + drop); decodes data with devalue (failure → log + drop, no retry); validates the decoded value against schema (invalid → log + drop, no retry); builds a Context and invokes the handler; on throw calls onError, then either rethrows (nativeRetrial) or applies retryPolicy (per-task, else FederationOptions.taskRetryPolicy, else createExponentialBackoffPolicy()) and re-enqueues with attempt + 1.
Forward compatibility (Approach 2)
The TaskRegistry<TContextData> seam exists so Approach 2 (a separated Worker class) lands without call-site churn: Federatable stops extending TaskRegistry, and a Worker-like class implements TaskRegistry instead. TaskMessage.taskName stays a string identifier (location-agnostic lookup), and the task queue slot becomes WorkerOptions.queue verbatim.
ctx.enqueueTask(task, data) round-trips to a typed handler; a wrong-shaped payload is a compile error.
schema (required) infers TData; an invalid decoded payload is dropped without retry.
A payload carrying a vocabulary object (Note/Create) alongside Date/Map round-trips faithfully through the devalue codec — each vocab object comes back as a real instance — and the schema validates the envelope with the vocab object as an instanceof leaf.
A decode failure on a malformed wire string is dropped without retry.
Handler throw → retry with backoff; a nativeRetrial queue rethrows without re-enqueue; per-task retryPolicy overrides the federation default.
Unknown taskName → drop + warning.
Queue routing precedence and taskQueueResolution: "strict" behave as specified; startQueue({ queue: "task" }) starts only the task worker.
enqueueTaskMany uses enqueueMany when available, else parallel enqueue.
defineTask with a duplicate name throws; builder clone isolation holds across two build() calls.
Type-level forward-compat guard: Federatable is assignable to TaskRegistry<TContextData>.
Tests use @fedify/fixturetest() (Cloudflare Workers harness compatibility) and pass under Deno, Node.js, and Bun.
docs/manual/tasks.md covers defining, payload handling, dispatching, retry/error, queue isolation; CHANGES.md updated; new public types carry @since JSDoc; AI usage disclosed per AI_POLICY.md.
First sub-issue of #206.
Background
#206 generalizes the fire-and-then-process pattern that
Context.sendActivityalready uses — enqueue work, return the HTTP response immediately, let a separate worker process the payload later — to arbitrary application-defined long-running jobs (digest mailers, image transcoding, periodic cleanup, …). Tasks run on a separate background worker, not in-request; in-process execution viasetTimeout/queueMicrotaskis explicitly out of scope.This sub-issue delivers the end-to-end core: define a task, enqueue it from a
Context, run it on a worker, retry on failure. It deliberately excludes deduplication (#798 sub-issue) and task-specific telemetry attributes (#799 sub-issue) so the first PR stays small and reviewable.Public API
Registration
defineTaskis declared on a newTaskRegistry<TContextData>interface;Federatable<TContextData>extends it, so bothFederationandFederationBuilderinherit the method (they already extendFederatable).The returned
TaskDefinition<TContextData, TData>carriesnameandschemaplus a phantomTContextDatamarker — no runtime methods. Dispatch lives onContext(below).Payload handling
schemais required. Every task declares a Standard Schema object (Zod, Valibot, ArkType, …) andTDatais inferred from it viaStandardSchemaV1.InferOutput; there is no explicit-TDataescape hatch and nounknownfallback. The schema validates the envelope of the payload, and an Activity Vocabulary object carried inside it (Note,Create, …) is described as an opaqueinstanceofleaf (z.instanceof(Create)/v.instance(Create)), so the schema stays small even when the payload carries ActivityPub objects.Serialization is owned by Fedify, not the caller. The payload is encoded to a wire string with devalue, whose native custom-type hook bridges each vocabulary object through its JSON-LD document: vocabulary objects keep their state in private fields, so they cannot be serialized structurally and must travel as JSON-LD via the asynchronous
toJsonLd/fromJsonLdpair. Encoding and decoding are therefore asynchronous, and there is no caller-facingstringify/parsehook.The serializer (devalue) and the required-
schemadecision were settled on the umbrella issue; raise any reconsideration here rather than re-opening it on #206.Dispatch (enqueue)
Two new methods on
Context<TContextData>, placed alongsidesendActivity/routeActivity/forwardActivity:enqueueTaskvalidatesdataagainst the task'sschema(fail fast), encodes it with devalue, and stores the resultingstringin the message before handing it to the queue adapter.enqueueTaskManyresolves the queue once and usesMessageQueue.enqueueManywhen available, falling back to parallel single enqueues otherwise. Both go through the private#enqueueTaskshelper.Wire format
Messageis documented as opaque, so the new variant is non-breaking.datais a string (notunknown) because every queue backend serializes the message in its own way (JSON, structured clone, …); a pre-encoded string is the only form that survives all of them unchanged, and it is also what lets a vocabulary object travel as its JSON-LD document.Queue routing
task?: MessageQueueslot inFederationQueueOptions.queue(aMessageQueuepassed todefineTask) →taskQueue→outboxQueuefallback → throw. There is no per-call queue override onTaskEnqueueOptions; a task's destination is declared once atdefineTask.FederationOptions.taskQueueResolution?: "fallback" | "strict"(default"fallback") opts out of theoutboxQueuefallback step for deployments that isolate task work.Federation.startQueue(ctxData, { queue: "task" })starts a dedicated task worker (extend the existing"inbox" | "outbox" | "fanout"selector).Worker behavior
processQueuedTaskgains a"task"branch dispatching to#listenTaskMessage, which: looks up the handler bytaskName(missing → log + drop); decodesdatawith devalue (failure → log + drop, no retry); validates the decoded value againstschema(invalid → log + drop, no retry); builds aContextand invokes the handler; on throw callsonError, then either rethrows (nativeRetrial) or appliesretryPolicy(per-task, elseFederationOptions.taskRetryPolicy, elsecreateExponentialBackoffPolicy()) and re-enqueues withattempt + 1.Forward compatibility (Approach 2)
The
TaskRegistry<TContextData>seam exists so Approach 2 (a separatedWorkerclass) lands without call-site churn:Federatablestops extendingTaskRegistry, and aWorker-like class implementsTaskRegistryinstead.TaskMessage.taskNamestays a string identifier (location-agnostic lookup), and thetaskqueue slot becomesWorkerOptions.queueverbatim.Out of scope (separate sub-issues)
MessageQueue.nativeDeduplication,TaskEnqueueOptions.deduplicationKey, KV fallback,taskDeduplicationTtl,taskDeduplicationFallback→ Outbox auto-filler #2 sub-issue.QueueTaskRole"task",fedify.taskspan,fedify.task.name/failure_reasonattributes → Custom WebFinger query #3 sub-issue. This PR implements the dispatch behavior and logging only; Custom WebFinger query #3 layers OTel attributes onto the decision points.AsyncResult/result backend,bind=True/self.retry, per-task priority.Acceptance criteria
ctx.enqueueTask(task, data)round-trips to a typed handler; a wrong-shaped payload is a compile error.schema(required) infersTData; an invalid decoded payload is dropped without retry.Note/Create) alongsideDate/Mapround-trips faithfully through the devalue codec — each vocab object comes back as a real instance — and the schema validates the envelope with the vocab object as aninstanceofleaf.nativeRetrialqueue rethrows without re-enqueue; per-taskretryPolicyoverrides the federation default.taskName→ drop + warning.taskQueueResolution: "strict"behave as specified;startQueue({ queue: "task" })starts only the task worker.enqueueTaskManyusesenqueueManywhen available, else parallel enqueue.defineTaskwith a duplicatenamethrows; builder clone isolation holds across twobuild()calls.Federatableis assignable toTaskRegistry<TContextData>.@fedify/fixturetest()(Cloudflare Workers harness compatibility) and pass under Deno, Node.js, and Bun.@sinceJSDoc; AI usage disclosed per AI_POLICY.md.References
nativeRetrialprior art for the native-capability flag pattern: Add automatic retry capability flag toMessageQueueinterface #250