Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions docs/content/docs/development/workflow_agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce

### Durable Execution

Use durable execution when you wrap a time-consuming or side-effecting operation. The framework persists the result and replays it on recovery when the same call is encountered, so the function will not be called again and side effects are avoided. Action code outside `durable_execute` / `durable_execute_async` is always re-executed during recovery.
Use durable execution when you wrap a time-consuming or side-effecting operation. The framework persists the result and replays it on recovery when the same call is encountered, so the function will not be called again and side effects are avoided. When recovery re-enters an action that has not been recorded as completed, code outside `durable_execute` / `durable_execute_async` will still be re-executed.

**Constraints:**
- The function must be deterministic and called in the same order on recovery.
Expand All @@ -298,9 +298,17 @@ on how to setup and configure the external action state store.

**Best-effort replay:**
- Results may not be reused if call order or arguments change (non-deterministic actions), which clears subsequent cached results and re-executes.
- If a failure happens after a function completes but before its result is persisted, the call will be re-executed.
- If a failure happens after a function starts but before it completes and its result is persisted, the call will be re-executed. See the "With a reconciler" section below.
- In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed.

**With a reconciler:**

Use a reconciler for durable calls when the original call may already have completed but its result or failure has not yet been persisted, so the framework cannot determine during recovery whether the call needs to be executed again. A reconciler provides custom logic that can return the result or raise the failure for the durable call instead of re-executing the original call.

- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no execution result has been persisted for it yet.
- If the reconciler logic returns a result, the runtime persists and replays that recovered result.
- If the reconciler logic raises an exception, the runtime persists and replays that recovered failure.

{{< tabs "Durable Execution" >}}
{{< tab "Python" >}}
Python actions can call `ctx.durable_execute(...)` to run a synchronous durable code block.
Expand All @@ -316,6 +324,28 @@ def process_input(event: InputEvent, ctx: RunnerContext) -> None:
result = ctx.durable_execute(slow_external_call, event.input)
ctx.send_event(OutputEvent(output=result))
```

You can also pass an optional `reconciler` callable to recover an execution outcome during recovery.
```python
@action(InputEvent)
@staticmethod
def process_input(event: InputEvent, ctx: RunnerContext) -> None:
def submit_payment(order_id: str) -> str:
return payment_client.submit(order_id)

def payment_reconciler() -> str:
status = payment_client.get_status(event.input)
if status == "SUCCEEDED":
return payment_client.lookup_completed_payment(event.input)
raise payment_client.get_failure(event.input)

result = ctx.durable_execute(
submit_payment,
event.input,
reconciler=payment_reconciler,
)
ctx.send_event(OutputEvent(output=result))
```
{{< /tab >}}

{{< tab "Java" >}}
Expand Down Expand Up @@ -346,6 +376,43 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce
ctx.sendEvent(new OutputEvent(result));
}
```

Java actions can also override `reconciler()` to recover an execution outcome during recovery.
```java
@Action(listenEvents = {InputEvent.class})
public static void processInput(InputEvent event, RunnerContext ctx) throws Exception {
DurableCallable<String> call = new DurableCallable<>() {
@Override
public String getId() {
return "submit_payment";
}

@Override
public Class<String> getResultClass() {
return String.class;
}

@Override
public String call() {
return paymentClient.submit(event.getInput());
}

@Override
public Callable<String> reconciler() {
return () -> {
PaymentStatus status = paymentClient.getStatus(event.getInput());
if (status == PaymentStatus.SUCCEEDED) {
return paymentClient.lookupCompletedPayment(event.getInput());
}
throw paymentClient.getFailure(event.getInput());
};
}
};

String result = ctx.durableExecute(call);
ctx.sendEvent(new OutputEvent(result));
}
```
{{< /tab >}}
{{< /tabs >}}

Expand All @@ -355,7 +422,7 @@ Async execution uses the same durable semantics but yields while waiting for a t

{{< tabs "Async Execution" >}}
{{< tab "Python" >}}
Define an `async def` action and `await ctx.durable_execute_async(...)`.
Define an `async def` action and `await ctx.durable_execute_async(...)`. The same optional `reconciler=...` argument is available for recovery.
```python
@action(InputEvent)
@staticmethod
Expand All @@ -376,7 +443,7 @@ functions like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and

{{< tab "Java" >}}
Use `ctx.durableExecuteAsync(DurableCallable)`; on **JDK 21+** it yields using Continuation,
and on **JDK < 21** it falls back to synchronous execution.
and on **JDK < 21** it falls back to synchronous execution. The same optional `reconciler()` hook can be used for recovery.
```java
@Action(listenEvents = {InputEvent.class})
public static void processInput(InputEvent event, RunnerContext ctx) throws Exception {
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/operations/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft

To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint.

The same persisted action state is also used by fine-grained durable execution.

{{< hint info >}}
**Note**: Currently, Kafka is supported as the external action state store.
{{< /hint >}}
Expand Down
Loading