Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
500 changes: 500 additions & 0 deletions docs/design-patterns/activity-dependency-injection.mdx

Large diffs are not rendered by default.

1,028 changes: 1,028 additions & 0 deletions docs/design-patterns/approval.mdx

Large diffs are not rendered by default.

252 changes: 252 additions & 0 deletions docs/design-patterns/batch-iterator.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
---
id: batch-iterator
title: "Batch Iterator"
sidebar_label: "Batch Iterator"
description: "Pages through unbounded datasets using Continue-As-New to prevent history overflow while maintaining exactly-once processing guarantees."
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

:::info[TLDR]
**Process one page at a time** and call Continue-as-New with the next offset after each page so the Workflow's event history never grows without bound. With this method you can process infinite pages. Use this when your record set is arbitrarily large, you need a durable checkpoint after every page, and sequential page-by-page throughput is acceptable.
:::

## Overview

The Batch Iterator pattern processes a large record set one page at a time. Each Workflow run processes a single page and then calls Continue-as-New with the next offset, producing a chain of short-lived runs that together cover the entire record set without accumulating unbounded event history.

## Problem

A single Workflow run is limited to 50,000 history events (aim for 2,000) and 2,000 in-flight Activities. Processing millions of records in one run is not possible within these bounds.

You need a way to process an arbitrarily large record set reliably, with the ability to resume from a checkpoint if the Workflow is interrupted, and without overwhelming downstream systems with a burst of concurrent requests.

## Solution

Each Workflow run fetches one page of records using a persistent `offset` parameter, processes each record sequentially, and then calls `continueAsNew` with the incremented offset. The next run picks up exactly where the previous one left off.

Because each run processes only a bounded number of records, history stays well within limits. The offset acts as a durable checkpoint: if the Workflow is interrupted mid-page, the next run replays only from the start of the current page.

```mermaid
flowchart TD
DB[("Data Source\n(paginated)")]
WF1["Workflow Run 1\n(offset=0)"]
WF2["Workflow Run 2\n(offset=PAGE_SIZE)"]
WF3["Workflow Run N\n(offset=N×PAGE_SIZE)"]
Done(["Complete"])

DB -->|"fetch page 1"| WF1
WF1 -->|"processRecord ×PAGE_SIZE"| Acts1["Activities"]
WF1 -->|"continueAsNew\n(offset=PAGE_SIZE)"| WF2

DB -->|"fetch page 2"| WF2
WF2 -->|"processRecord ×PAGE_SIZE"| Acts2["Activities"]
WF2 -->|"continueAsNew\n(offset=N×PAGE_SIZE)"| WF3

DB -->|"fetch page N"| WF3
WF3 -->|"processRecord ×PAGE_SIZE"| Acts3["Activities"]
WF3 -->|"last page → return"| Done
```

The following describes each step in the diagram:

1. The Workflow starts with `offset=0` and calls `fetchPage(offset, pageSize)` to retrieve the first page of records.
2. It processes each record in the page by executing the `processRecord` Activity.
3. After the page is fully processed, it calls `continueAsNew` with `offset + pageSize`, passing the updated offset to the next run.
4. The next run begins with a clean history and repeats the same steps for the next page.
5. When `fetchPage` returns fewer records than `pageSize`, the Workflow knows it has reached the last page and returns normally.

## Implementation


The following examples show how each SDK implements the Batch Iterator pattern.

<Tabs groupId="language" queryString>
<TabItem value="typescript" label="TypeScript" default>

```typescript
// workflows.ts
import { continueAsNew, log, proxyActivities } from "@temporalio/workflow";
import type * as activities from "./activities";
import { PAGE_SIZE } from "./shared";

const { fetchPage, processRecord } = proxyActivities<typeof activities>({
startToCloseTimeout: "10 seconds",
});

export async function batchIteratorWorkflow(
offset: number = 0,
totalProcessed: number = 0
): Promise<number> {
const page = await fetchPage(offset, PAGE_SIZE);

for (const record of page) {
await processRecord(record);
totalProcessed++;
}

log.info(`Processed page at offset ${offset} (${page.length} records, running total: ${totalProcessed})`);

if (page.length === PAGE_SIZE) {
await continueAsNew<typeof batchIteratorWorkflow>(offset + PAGE_SIZE, totalProcessed);
}

return totalProcessed;
}
```

</TabItem>
<TabItem value="python" label="Python">

```python
# workflows.py
from temporalio import workflow
from temporalio.workflow import continue_as_new
from datetime import timedelta
from activities import fetch_page, process_record
from shared import PAGE_SIZE


@workflow.defn
class BatchIteratorWorkflow:
@workflow.run
async def run(self, offset: int = 0, total_processed: int = 0) -> int:
page = await workflow.execute_activity(
fetch_page,
args=[offset, PAGE_SIZE],
start_to_close_timeout=timedelta(seconds=10),
)

for record in page:
await workflow.execute_activity(
process_record,
record,
start_to_close_timeout=timedelta(seconds=10),
)
total_processed += 1

workflow.logger.info(
f"Processed page at offset {offset} ({len(page)} records, running total: {total_processed})"
)

if len(page) == PAGE_SIZE:
continue_as_new(offset + PAGE_SIZE, total_processed)

return total_processed
```

</TabItem>
<TabItem value="go" label="Go">

```go
// workflows.go
package main

import (
"go.temporal.io/sdk/workflow"
)

func BatchIteratorWorkflow(ctx workflow.Context, offset int, totalProcessed int) (int, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var page []Record
if err := workflow.ExecuteActivity(ctx, FetchPage, offset, PageSize).Get(ctx, &page); err != nil {
return totalProcessed, err
}

for _, record := range page {
if err := workflow.ExecuteActivity(ctx, ProcessRecord, record).Get(ctx, nil); err != nil {
return totalProcessed, err
}
totalProcessed++
}

workflow.GetLogger(ctx).Info("Processed page",
"offset", offset,
"pageSize", len(page),
"totalProcessed", totalProcessed)

if len(page) == PageSize {
return totalProcessed, workflow.NewContinueAsNewError(ctx, BatchIteratorWorkflow, offset+PageSize, totalProcessed)
}

return totalProcessed, nil
}
```

</TabItem>
<TabItem value="java" label="Java">

```java
// BatchIteratorWorkflow.java
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.*;
import java.time.Duration;
import java.util.List;

@WorkflowInterface
public interface BatchIteratorWorkflow {
@WorkflowMethod
int run(int offset, int totalProcessed);
}

// BatchIteratorWorkflowImpl.java
public class BatchIteratorWorkflowImpl implements BatchIteratorWorkflow {
private final Activities activities = Workflow.newActivityStub(
Activities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.build()
);

@Override
public int run(int offset, int totalProcessed) {
List<Record> page = activities.fetchPage(offset, Shared.PAGE_SIZE);

for (Record record : page) {
activities.processRecord(record);
totalProcessed++;
}

Workflow.getLogger(BatchIteratorWorkflowImpl.class).info(
"Processed page at offset " + offset + " (" + page.size() + " records, total: " + totalProcessed + ")"
);

if (page.size() == Shared.PAGE_SIZE) {
throw Workflow.newContinueAsNewStub(BatchIteratorWorkflow.class)
.run(offset + Shared.PAGE_SIZE, totalProcessed);
}

return totalProcessed;
}
}
```

</TabItem>
</Tabs>

## Best Practices

- **Choose a page size that keeps history under 2,000 events.** Each page produces roughly `3 × pageSize` history events (`ActivityTaskScheduled` + `ActivityTaskStarted` + `ActivityTaskCompleted`). A page size of 500–800 records is a safe target.
- **Include `totalProcessed` (or a similar counter) in the `continueAsNew` args.** This lets you observe overall progress via the Workflow input visible in the UI without querying internal state.
- **Fetch inside an Activity, not the Workflow.** The `fetchPage` call must be an Activity — not inline Workflow code — so it can interact with external systems and be retried independently.
- **Make `processRecord` idempotent.** Activities have at-least-once execution semantics. If a worker crashes after an Activity completes externally but before the completion is recorded in history, Temporal will retry it. Your downstream system must tolerate receiving the same record more than once.
- **Avoid accumulating large local state between pages.** `continueAsNew` does not carry over in-memory state; only the arguments you pass are available in the next run.

## Common Pitfalls

- **Forgetting `continueAsNew` on the last page.** If you call `continueAsNew` unconditionally, the Workflow loops forever even when the data source is exhausted. Check whether the returned page is shorter than `pageSize` before continuing.
- **Passing unnecessary state into `continueAsNew`.** All arguments are serialized and stored in history. Pass only the minimal state needed (offset, counters) — not accumulated result lists or large collections that grow with each page.
- **Sequential processing bottlenecks.** The default implementation processes one record at a time per page. You can fan out Activities concurrently within a page using the SDK's async primitives for higher per-page throughput — note this increases per-page event count accordingly. If record-set-wide throughput matters more than rate limiting, consider [Sliding Window](/design-patterns/sliding-window) or [MapReduce Tree](/design-patterns/mapreduce-tree).

## Related Resources

- [Continue-as-New pattern](/design-patterns/continue-as-new) — core concepts for history management via `continueAsNew`
- [Sliding Window](/design-patterns/sliding-window) — bounded concurrency that progresses at the rate of the fastest processor
- [MapReduce Tree](/design-patterns/mapreduce-tree) — fully parallel processing for maximum speed
- [Temporal limits reference](https://docs.temporal.io/cloud/limits)
- [Batch samples (Java)](https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/batch/iterator)
134 changes: 134 additions & 0 deletions docs/design-patterns/batch-processing-patterns.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
---
id: batch-processing-patterns
title: Batch Processing Patterns
sidebar_label: Overview
description: Patterns for processing large volumes of records reliably, at scale, and without overwhelming downstream systems.
---

import PatternCards from '@site/src/components/PatternCards';

Patterns for processing large volumes of records reliably, at scale, and without overwhelming downstream systems.

Choose based on your throughput requirements, record set size, and whether you need rate limiting or maximum parallelism.

## When to use which pattern

| Pattern | Record set size | Parallelism model | Workflow-based rate control |
|---|---|---|---|
| [Basic Workflow](#basic-workflow-single-tier-fan-out) | Small (up to a few hundred records) | Sequential or parallel activities in one Workflow | No |
| [Fan-Out with Child Workflows](/design-patterns/fanout-child-workflows) | Up to ~4M records | Fixed concurrency (one child per chunk) | No |
| [Batch Iterator](/design-patterns/batch-iterator) | Unlimited | Limited (activities per page) | Yes — fixed page rate |
| [Sliding Window](/design-patterns/sliding-window) | Unlimited | Bounded window of concurrent children | Yes — configurable window |
| [MapReduce Tree](/design-patterns/mapreduce-tree) | Unlimited | Fully parallel recursive tree | No — maximum speed |

<PatternCards items={[
{
href: "/design-patterns/fanout-child-workflows",
title: "Fan-Out with Child Workflows",
description: "Splits a record set into fixed-size chunks and assigns each to an independent child Workflow. Simple to reason about; best for record sets up to ~4M items.",
},
{
href: "/design-patterns/batch-iterator",
title: "Batch Iterator",
description: "Processes one page of records per Workflow run and continues as new with the next page offset. Handles unlimited record sets while controlling downstream traffic.",
},
{
href: "/design-patterns/sliding-window",
title: "Sliding Window",
description: "Maintains a fixed-size window of concurrent child Workflows. As each child completes it signals the parent, which immediately starts a replacement — maximizing throughput within a concurrency budget.",
},
{
href: "/design-patterns/mapreduce-tree",
title: "MapReduce Tree",
description: "Recursively splits a record set into chunks, fans out to leaf Workflows for parallel processing, and signals results back up the tree. Maximizes speed for embarrassingly parallel workloads.",
},
]} />

---

## Schedules

Schedules allow Workflows to be executed on a recurring basis — think of them as a more powerful cron.

- Supports `start` / `pause` / `stop` / `update` / `backfill` of scheduled Workflow executions
- Configurable **Overlap Policies** control what happens when the previous run is still running
- Full execution history visibility in the Temporal UI
- Schedules can be created via the UI, CLI, or SDK

```bash
temporal schedule create \
--schedule-id 'your-schedule-id' \
--workflow-id 'your-workflow-id' \
--task-queue 'your-task-queue' \
--workflow-type 'YourWorkflowType'
```

**References:**
- [Temporal Schedules](https://docs.temporal.io/workflows#schedule)
- [CLI schedule commands](https://docs.temporal.io/cli/schedule)

---

## Basic Workflow (single-tier fan-out)

The simplest form of batch processing: the Workflow fetches or receives record IDs and executes one Activity per record.

- Activities can be executed sequentially or concurrently (using the SDK's async primitives)
- **Limit: 2,000 in-flight Activities per Workflow run** (aim for 500)
- If total event count is likely to exceed 2,000 (hard limit: 50,000), use the [Batch Iterator](/design-patterns/batch-iterator) instead

**Pros:** Simple
**Cons:** Hard cap on concurrent Activities; all-or-nothing failure model; can overwhelm downstream systems

```mermaid
flowchart TD
Records["📋 Record IDs<br/>(fetched or passed in)"]
WF["Workflow"]
A1["Activity"]
A2["Activity"]
AN["Activity ..."]

Records --> WF
WF --> A1
WF --> A2
WF --> AN
```

---

## Batch Signalling

The Temporal CLI lets you signal, reset, cancel, or terminate multiple Workflows with a single command using a visibility query.

- 1 running batch job per namespace
- 50 Workflows per second per batch

```bash
# Signal all running Workflows of a given type
temporal workflow signal \
--name MySignal \
--input '{"Input": "As-JSON"}' \
--query 'ExecutionStatus = "Running" AND WorkflowType="YourWorkflow"' \
--reason "Testing"

# Terminate all running Workflows of a given type
temporal workflow terminate \
--query 'ExecutionStatus = "Running" AND WorkflowType="SomeWorkflowType"' \
--reason "Terminate Test Workflows"
```

**Reference:** [CLI batch commands](https://docs.temporal.io/cli/batch)

---

## Key Limits

Full reference: [Temporal Cloud limits](https://docs.temporal.io/cloud/limits)

| Limit | Value |
|---|---|
| Unfinished actions per Workflow | 2,000 max (aim for 500). Includes Activities, Signals, Child Workflows, cancellation requests |
| Events per Workflow history | 50,000 events max (aim for 2,000) **or** 50 MB total history size |
| Signals per Workflow | 10,000 |
| Updates per Workflow | 10 in-flight, 2,000 total |
| Batch Signalling | 1 batch job per namespace; 50 Workflows/sec per batch |
Loading