diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index cbe071d3c..5cc51a817 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -283,7 +283,7 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce ### Durable Execution -Use durable execution when you wrap a time-consuming or side-effecting operation. The framework persists the result and replays it on recovery when the same call is encountered, so the function will not be called again and side effects are avoided. Action code outside `durable_execute` / `durable_execute_async` is always re-executed during recovery. +Use durable execution when you wrap a time-consuming or side-effecting operation. The framework persists the result and replays it on recovery when the same call is encountered, so the function will not be called again and side effects are avoided. When recovery re-enters an action that has not been recorded as completed, code outside `durable_execute` / `durable_execute_async` will still be re-executed. **Constraints:** - The function must be deterministic and called in the same order on recovery. @@ -298,9 +298,17 @@ on how to setup and configure the external action state store. **Best-effort replay:** - Results may not be reused if call order or arguments change (non-deterministic actions), which clears subsequent cached results and re-executes. -- If a failure happens after a function completes but before its result is persisted, the call will be re-executed. +- If a failure happens after a function starts but before it completes and its result is persisted, the call will be re-executed. See the "With a reconciler" section below. - In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed. +**With a reconciler:** + +Use a reconciler for durable calls when the original call may already have completed but its result or failure has not yet been persisted, so the framework cannot determine during recovery whether the call needs to be executed again. A reconciler provides custom logic that can return the result or raise the failure for the durable call instead of re-executing the original call. + +- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no execution result has been persisted for it yet. +- If the reconciler logic returns a result, the runtime persists and replays that recovered result. +- If the reconciler logic raises an exception, the runtime persists and replays that recovered failure. + {{< tabs "Durable Execution" >}} {{< tab "Python" >}} Python actions can call `ctx.durable_execute(...)` to run a synchronous durable code block. @@ -316,6 +324,28 @@ def process_input(event: InputEvent, ctx: RunnerContext) -> None: result = ctx.durable_execute(slow_external_call, event.input) ctx.send_event(OutputEvent(output=result)) ``` + +You can also pass an optional `reconciler` callable to recover an execution outcome during recovery. +```python +@action(InputEvent) +@staticmethod +def process_input(event: InputEvent, ctx: RunnerContext) -> None: + def submit_payment(order_id: str) -> str: + return payment_client.submit(order_id) + + def payment_reconciler() -> str: + status = payment_client.get_status(event.input) + if status == "SUCCEEDED": + return payment_client.lookup_completed_payment(event.input) + raise payment_client.get_failure(event.input) + + result = ctx.durable_execute( + submit_payment, + event.input, + reconciler=payment_reconciler, + ) + ctx.send_event(OutputEvent(output=result)) +``` {{< /tab >}} {{< tab "Java" >}} @@ -346,6 +376,43 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce ctx.sendEvent(new OutputEvent(result)); } ``` + +Java actions can also override `reconciler()` to recover an execution outcome during recovery. +```java +@Action(listenEvents = {InputEvent.class}) +public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { + DurableCallable call = new DurableCallable<>() { + @Override + public String getId() { + return "submit_payment"; + } + + @Override + public Class getResultClass() { + return String.class; + } + + @Override + public String call() { + return paymentClient.submit(event.getInput()); + } + + @Override + public Callable reconciler() { + return () -> { + PaymentStatus status = paymentClient.getStatus(event.getInput()); + if (status == PaymentStatus.SUCCEEDED) { + return paymentClient.lookupCompletedPayment(event.getInput()); + } + throw paymentClient.getFailure(event.getInput()); + }; + } + }; + + String result = ctx.durableExecute(call); + ctx.sendEvent(new OutputEvent(result)); +} +``` {{< /tab >}} {{< /tabs >}} @@ -355,7 +422,7 @@ Async execution uses the same durable semantics but yields while waiting for a t {{< tabs "Async Execution" >}} {{< tab "Python" >}} -Define an `async def` action and `await ctx.durable_execute_async(...)`. +Define an `async def` action and `await ctx.durable_execute_async(...)`. The same optional `reconciler=...` argument is available for recovery. ```python @action(InputEvent) @staticmethod @@ -376,7 +443,7 @@ functions like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and {{< tab "Java" >}} Use `ctx.durableExecuteAsync(DurableCallable)`; on **JDK 21+** it yields using Continuation, -and on **JDK < 21** it falls back to synchronous execution. +and on **JDK < 21** it falls back to synchronous execution. The same optional `reconciler()` hook can be used for recovery. ```java @Action(listenEvents = {InputEvent.class}) public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index bfe609c2a..1758c416f 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -171,6 +171,8 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint. +The same persisted action state is also used by fine-grained durable execution. + {{< hint info >}} **Note**: Currently, Kafka is supported as the external action state store. {{< /hint >}}