From b634f91ed5795de3d8a9fc0717e8dfe7f2221685 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Thu, 9 Apr 2026 10:26:27 +0800 Subject: [PATCH 1/8] Document durable reconciler usage --- .../docs/development/workflow_agent.md | 61 ++++++++++++++++++- docs/content/docs/faq/faq.md | 2 + docs/content/docs/operations/deployment.md | 2 + 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index cbe071d3c..9900401e3 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -283,7 +283,7 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce ### Durable Execution -Use durable execution when you wrap a time-consuming or side-effecting operation. The framework persists the result and replays it on recovery when the same call is encountered, so the function will not be called again and side effects are avoided. Action code outside `durable_execute` / `durable_execute_async` is always re-executed during recovery. +Use durable execution when you wrap a time-consuming or side-effecting operation. The framework persists the result and replays it on recovery when the same call is encountered, so the function will not be called again and side effects are avoided. When recovery re-enters an action that has not been recorded as completed, code outside `durable_execute` / `durable_execute_async` will still be re-executed. **Constraints:** - The function must be deterministic and called in the same order on recovery. @@ -301,6 +301,11 @@ on how to setup and configure the external action state store. - If a failure happens after a function completes but before its result is persisted, the call will be re-executed. - In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed. +**Recovered success with a reconciler:** +- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no terminal outcome has been persisted for it yet. +- If the reconciler returns a result, Flink Agents uses it as the result of the durable call, persists it as the recovered successful outcome, and replays the persisted result on subsequent recovery. +- If the reconciler raises an exception, the exception is propagated to the caller and no recovered terminal outcome is persisted. + {{< tabs "Durable Execution" >}} {{< tab "Python" >}} Python actions can call `ctx.durable_execute(...)` to run a synchronous durable code block. @@ -316,6 +321,25 @@ def process_input(event: InputEvent, ctx: RunnerContext) -> None: result = ctx.durable_execute(slow_external_call, event.input) ctx.send_event(OutputEvent(output=result)) ``` + +You can also pass an optional `reconciler` callable to recover a successful result during recovery. +```python +@action(InputEvent) +@staticmethod +def process_input(event: InputEvent, ctx: RunnerContext) -> None: + def submit_payment(order_id: str) -> str: + return payment_client.submit(order_id) + + def payment_reconciler() -> str: + return payment_client.lookup_completed_payment(event.input) + + result = ctx.durable_execute( + submit_payment, + event.input, + reconciler=payment_reconciler, + ) + ctx.send_event(OutputEvent(output=result)) +``` {{< /tab >}} {{< tab "Java" >}} @@ -346,6 +370,37 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce ctx.sendEvent(new OutputEvent(result)); } ``` + +Java actions can also override `reconciler()` to recover a successful result during recovery. +```java +@Action(listenEvents = {InputEvent.class}) +public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { + DurableCallable call = new DurableCallable<>() { + @Override + public String getId() { + return "submit_payment"; + } + + @Override + public Class getResultClass() { + return String.class; + } + + @Override + public String call() { + return paymentClient.submit(event.getInput()); + } + + @Override + public Callable reconciler() { + return () -> paymentClient.lookupCompletedPayment(event.getInput()); + } + }; + + String result = ctx.durableExecute(call); + ctx.sendEvent(new OutputEvent(result)); +} +``` {{< /tab >}} {{< /tabs >}} @@ -355,7 +410,7 @@ Async execution uses the same durable semantics but yields while waiting for a t {{< tabs "Async Execution" >}} {{< tab "Python" >}} -Define an `async def` action and `await ctx.durable_execute_async(...)`. +Define an `async def` action and `await ctx.durable_execute_async(...)`. The same optional `reconciler=...` argument is available for recovery. ```python @action(InputEvent) @staticmethod @@ -376,7 +431,7 @@ functions like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and {{< tab "Java" >}} Use `ctx.durableExecuteAsync(DurableCallable)`; on **JDK 21+** it yields using Continuation, -and on **JDK < 21** it falls back to synchronous execution. +and on **JDK < 21** it falls back to synchronous execution. The same optional `reconciler()` hook can be used for recovery. ```java @Action(listenEvents = {InputEvent.class}) public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index 6931ec3ca..089c4d679 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -83,6 +83,8 @@ Async execution can significantly improve performance by allowing multiple opera > **Cross-language async limitation**: When using cross-language resources (e.g., calling Java integrations from Python or vice versa), async execution is not supported. Cross-language calls always execute synchronously regardless of your JDK version. +Reconciler-backed durable execution is available in both Python and Java, but it only affects how durable calls recover successful outcomes. It does not change the async execution matrix above. + This is important because: - **For Python users**: Async execution is always available. diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index bfe609c2a..ef89acea6 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -171,6 +171,8 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint. +The same persisted action state is also used by fine-grained durable execution, including reconciler-based recovery for durable calls that need to recover a successful outcome after failure. + {{< hint info >}} **Note**: Currently, Kafka is supported as the external action state store. {{< /hint >}} From 1a233e85ba4806a02e5c12ab91e3b51958838f10 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Wed, 15 Apr 2026 13:49:06 +0800 Subject: [PATCH 2/8] Update docs for reconciler failure semantics --- docs/content/docs/development/workflow_agent.md | 10 +++++----- docs/content/docs/faq/faq.md | 4 ++-- docs/content/docs/operations/deployment.md | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index 9900401e3..045d73d4b 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -301,10 +301,10 @@ on how to setup and configure the external action state store. - If a failure happens after a function completes but before its result is persisted, the call will be re-executed. - In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed. -**Recovered success with a reconciler:** +**Recovered terminal outcome with a reconciler:** - A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no terminal outcome has been persisted for it yet. -- If the reconciler returns a result, Flink Agents uses it as the result of the durable call, persists it as the recovered successful outcome, and replays the persisted result on subsequent recovery. -- If the reconciler raises an exception, the exception is propagated to the caller and no recovered terminal outcome is persisted. +- If the reconciler returns a result, the runtime persists and replays that recovered result. +- If the reconciler raises an exception, the runtime persists and replays that recovered failure. {{< tabs "Durable Execution" >}} {{< tab "Python" >}} @@ -322,7 +322,7 @@ def process_input(event: InputEvent, ctx: RunnerContext) -> None: ctx.send_event(OutputEvent(output=result)) ``` -You can also pass an optional `reconciler` callable to recover a successful result during recovery. +You can also pass an optional `reconciler` callable to recover a terminal outcome during recovery. ```python @action(InputEvent) @staticmethod @@ -371,7 +371,7 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce } ``` -Java actions can also override `reconciler()` to recover a successful result during recovery. +Java actions can also override `reconciler()` to recover a terminal outcome during recovery. ```java @Action(listenEvents = {InputEvent.class}) public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index 089c4d679..3c29d7822 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -83,7 +83,7 @@ Async execution can significantly improve performance by allowing multiple opera > **Cross-language async limitation**: When using cross-language resources (e.g., calling Java integrations from Python or vice versa), async execution is not supported. Cross-language calls always execute synchronously regardless of your JDK version. -Reconciler-backed durable execution is available in both Python and Java, but it only affects how durable calls recover successful outcomes. It does not change the async execution matrix above. +Reconciler-backed durable execution is available in both Python and Java, but it only affects how durable calls recover persisted terminal outcomes. It does not change the async execution matrix above. This is important because: @@ -133,4 +133,4 @@ Flink Agents provides built-in integrations for many ecosystem providers. Some i To avoid potential conflict with Flink cluster, the scope of the dependencies related to Flink and Flink Agents for agent job are provided. See [Maven Dependencies]({{< ref "docs/get-started/installation#maven-dependencies-for-java" >}}) for details. To run the examples in IDE, users must enable the IDE feature: `add dependencies with provided scope to classpath`. -* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add dependencies with provided scope to classpath`**. See [Run/Debug Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html) for details. \ No newline at end of file +* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add dependencies with provided scope to classpath`**. See [Run/Debug Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html) for details. diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index ef89acea6..ed720f05f 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -171,7 +171,7 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint. -The same persisted action state is also used by fine-grained durable execution, including reconciler-based recovery for durable calls that need to recover a successful outcome after failure. +The same persisted action state is also used by fine-grained durable execution, including reconciler-based recovery for durable calls that need to recover a terminal outcome after failure. {{< hint info >}} **Note**: Currently, Kafka is supported as the external action state store. From a459603d128e86b1f0696df90a4aa8288ba814e0 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Wed, 15 Apr 2026 13:55:13 +0800 Subject: [PATCH 3/8] Drop FAQ newline-only diff --- docs/content/docs/faq/faq.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index 3c29d7822..c904e273b 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -133,4 +133,4 @@ Flink Agents provides built-in integrations for many ecosystem providers. Some i To avoid potential conflict with Flink cluster, the scope of the dependencies related to Flink and Flink Agents for agent job are provided. See [Maven Dependencies]({{< ref "docs/get-started/installation#maven-dependencies-for-java" >}}) for details. To run the examples in IDE, users must enable the IDE feature: `add dependencies with provided scope to classpath`. -* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add dependencies with provided scope to classpath`**. See [Run/Debug Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html) for details. +* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add dependencies with provided scope to classpath`**. See [Run/Debug Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html) for details. \ No newline at end of file From 92ac1accd812f5f42882c6a2957b9c66564c4e71 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Wed, 15 Apr 2026 15:33:55 +0800 Subject: [PATCH 4/8] Align reconciler docs examples with failure replay --- docs/content/docs/development/workflow_agent.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index 045d73d4b..f6453460b 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -331,7 +331,10 @@ def process_input(event: InputEvent, ctx: RunnerContext) -> None: return payment_client.submit(order_id) def payment_reconciler() -> str: - return payment_client.lookup_completed_payment(event.input) + status = payment_client.get_status(event.input) + if status == "SUCCEEDED": + return payment_client.lookup_completed_payment(event.input) + raise payment_client.get_failure(event.input) result = ctx.durable_execute( submit_payment, @@ -393,7 +396,13 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce @Override public Callable reconciler() { - return () -> paymentClient.lookupCompletedPayment(event.getInput()); + return () -> { + PaymentStatus status = paymentClient.getStatus(event.getInput()); + if (status == PaymentStatus.SUCCEEDED) { + return paymentClient.lookupCompletedPayment(event.getInput()); + } + throw paymentClient.getFailure(event.getInput()); + }; } }; From df3383e4908d3e92f8656cafa1dd38177d511656 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Fri, 17 Apr 2026 13:45:13 +0800 Subject: [PATCH 5/8] Refine reconciler docs wording --- docs/content/docs/development/workflow_agent.md | 13 ++++++++----- docs/content/docs/faq/faq.md | 2 -- docs/content/docs/operations/deployment.md | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index f6453460b..d5c135004 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -298,11 +298,14 @@ on how to setup and configure the external action state store. **Best-effort replay:** - Results may not be reused if call order or arguments change (non-deterministic actions), which clears subsequent cached results and re-executes. -- If a failure happens after a function completes but before its result is persisted, the call will be re-executed. +- If a failure happens after a function starts but before it completes and its result is persisted, the call will be re-executed. See [With a reconciler](#durable-call-reconciler). - In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed. -**Recovered terminal outcome with a reconciler:** -- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no terminal outcome has been persisted for it yet. + +**With a reconciler:** +Use a reconciler for durable calls when the original call may already have completed but its result or failure has not yet been persisted, so the framework cannot determine during recovery whether the call needs to be executed again. A reconciler provides custom logic that can return the result or raise the failure for the durable call instead of re-executing the original call. + +- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no terminal result has been persisted for it yet. - If the reconciler returns a result, the runtime persists and replays that recovered result. - If the reconciler raises an exception, the runtime persists and replays that recovered failure. @@ -322,7 +325,7 @@ def process_input(event: InputEvent, ctx: RunnerContext) -> None: ctx.send_event(OutputEvent(output=result)) ``` -You can also pass an optional `reconciler` callable to recover a terminal outcome during recovery. +You can also pass an optional `reconciler` callable to recover an execution outcome during recovery. ```python @action(InputEvent) @staticmethod @@ -374,7 +377,7 @@ public static void processInput(InputEvent event, RunnerContext ctx) throws Exce } ``` -Java actions can also override `reconciler()` to recover a terminal outcome during recovery. +Java actions can also override `reconciler()` to recover an execution outcome during recovery. ```java @Action(listenEvents = {InputEvent.class}) public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index c904e273b..6931ec3ca 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -83,8 +83,6 @@ Async execution can significantly improve performance by allowing multiple opera > **Cross-language async limitation**: When using cross-language resources (e.g., calling Java integrations from Python or vice versa), async execution is not supported. Cross-language calls always execute synchronously regardless of your JDK version. -Reconciler-backed durable execution is available in both Python and Java, but it only affects how durable calls recover persisted terminal outcomes. It does not change the async execution matrix above. - This is important because: - **For Python users**: Async execution is always available. diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index ed720f05f..1758c416f 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -171,7 +171,7 @@ After recovery from a checkpoint, Flink Agents reprocess events that arrived aft To ensure exactly-once action consistency, you must configure an external action state store. Flink Agents record action state in this store on a per-action basis. After recovering from a checkpoint, Flink Agents consult the external store and will not re-execute actions that were already completed. This guarantees each action is executed exactly once after recovering from a checkpoint. -The same persisted action state is also used by fine-grained durable execution, including reconciler-based recovery for durable calls that need to recover a terminal outcome after failure. +The same persisted action state is also used by fine-grained durable execution. {{< hint info >}} **Note**: Currently, Kafka is supported as the external action state store. From 8343a10a319416cb58924cf18fb90b480b212812 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Fri, 17 Apr 2026 13:56:29 +0800 Subject: [PATCH 6/8] Polish reconciler docs wording --- docs/content/docs/development/workflow_agent.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index d5c135004..eab71de89 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -305,9 +305,9 @@ on how to setup and configure the external action state store. **With a reconciler:** Use a reconciler for durable calls when the original call may already have completed but its result or failure has not yet been persisted, so the framework cannot determine during recovery whether the call needs to be executed again. A reconciler provides custom logic that can return the result or raise the failure for the durable call instead of re-executing the original call. -- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no terminal result has been persisted for it yet. -- If the reconciler returns a result, the runtime persists and replays that recovered result. -- If the reconciler raises an exception, the runtime persists and replays that recovered failure. +- A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no execution result has been persisted for it yet. +- If the reconciler logic returns a result, the runtime persists and replays that recovered result. +- If the reconciler logic raises an exception, the runtime persists and replays that recovered failure. {{< tabs "Durable Execution" >}} {{< tab "Python" >}} From 6953dda6cdc66293ae1f98f1b665c42750dfe98a Mon Sep 17 00:00:00 2001 From: joeyutong Date: Fri, 17 Apr 2026 14:19:40 +0800 Subject: [PATCH 7/8] Drop docs reconciler anchor --- docs/content/docs/development/workflow_agent.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index eab71de89..0fa831ab9 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -298,10 +298,9 @@ on how to setup and configure the external action state store. **Best-effort replay:** - Results may not be reused if call order or arguments change (non-deterministic actions), which clears subsequent cached results and re-executes. -- If a failure happens after a function starts but before it completes and its result is persisted, the call will be re-executed. See [With a reconciler](#durable-call-reconciler). +- If a failure happens after a function starts but before it completes and its result is persisted, the call will be re-executed. See the "With a reconciler" section below. - In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed. - **With a reconciler:** Use a reconciler for durable calls when the original call may already have completed but its result or failure has not yet been persisted, so the framework cannot determine during recovery whether the call needs to be executed again. A reconciler provides custom logic that can return the result or raise the failure for the durable call instead of re-executing the original call. From 3978aa0563406f9afde1d3a1d291f4fd41b497f2 Mon Sep 17 00:00:00 2001 From: joeyutong Date: Fri, 17 Apr 2026 14:38:24 +0800 Subject: [PATCH 8/8] Tweak reconciler docs formatting --- docs/content/docs/development/workflow_agent.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/docs/development/workflow_agent.md b/docs/content/docs/development/workflow_agent.md index 0fa831ab9..5cc51a817 100644 --- a/docs/content/docs/development/workflow_agent.md +++ b/docs/content/docs/development/workflow_agent.md @@ -302,6 +302,7 @@ on how to setup and configure the external action state store. - In Python async actions, if `ctx.durable_execute_async(...)` is not awaited, the result is not recorded and cannot be replayed. **With a reconciler:** + Use a reconciler for durable calls when the original call may already have completed but its result or failure has not yet been persisted, so the framework cannot determine during recovery whether the call needs to be executed again. A reconciler provides custom logic that can return the result or raise the failure for the durable call instead of re-executing the original call. - A durable call may optionally provide a reconciler that is used only during recovery, when the same durable call is revisited and no execution result has been persisted for it yet.