diff --git a/README.md b/README.md index 83343b8..ca7a81f 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ ## Table of Contents +- [What's new (2026-06-20) — Saga / Compensating Rollback](#whats-new-2026-06-20--saga--compensating-rollback) - [What's new (2026-06-20) — JSONPath Querying](#whats-new-2026-06-20--jsonpath-querying) - [What's new (2026-06-20) — Multi-Channel Webhook Notifications](#whats-new-2026-06-20--multi-channel-webhook-notifications) - [What's new (2026-06-20) — Outbound CloudEvents Emitter](#whats-new-2026-06-20--outbound-cloudevents-emitter) @@ -104,6 +105,12 @@ --- +## What's new (2026-06-20) — Saga / Compensating Rollback + +Undo completed steps when a later one fails. Full reference: [`docs/source/Eng/doc/new_features/v52_features_doc.rst`](docs/source/Eng/doc/new_features/v52_features_doc.rst). + +- **`Saga` / `run_saga`** (`AC_run_saga`, `ac_run_saga`): records a compensating action per step; on any failure runs the completed steps' compensations in **LIFO** order — the durable-transaction primitive `AC_try` (single-block) couldn't provide. Forward actions/compensations are callables (or JSON action lists), so it's fully unit-tested with no side effects; compensation is best-effort (a failing undo is logged, rollback continues). Returns `{ok, completed, compensated, failed_step, error}`. + ## What's new (2026-06-20) — JSONPath Querying Query API/DB JSON with wildcards, recursion, filters. Full reference: [`docs/source/Eng/doc/new_features/v51_features_doc.rst`](docs/source/Eng/doc/new_features/v51_features_doc.rst). diff --git a/README/README_zh-CN.md b/README/README_zh-CN.md index ccb5bce..a66115d 100644 --- a/README/README_zh-CN.md +++ b/README/README_zh-CN.md @@ -12,6 +12,7 @@ ## 目录 +- [本次更新 (2026-06-20) — Saga / 补偿回滚](#本次更新-2026-06-20--saga--补偿回滚) - [本次更新 (2026-06-20) — JSONPath 查询](#本次更新-2026-06-20--jsonpath-查询) - [本次更新 (2026-06-20) — 多通道 Webhook 通知](#本次更新-2026-06-20--多通道-webhook-通知) - [本次更新 (2026-06-20) — 对外 CloudEvents 发送器](#本次更新-2026-06-20--对外-cloudevents-发送器) @@ -103,6 +104,12 @@ --- +## 本次更新 (2026-06-20) — Saga / 补偿回滚 + +后续步骤失败时回滚已完成步骤。完整参考:[`docs/source/Zh/doc/new_features/v52_features_doc.rst`](../docs/source/Zh/doc/new_features/v52_features_doc.rst)。 + +- **`Saga` / `run_saga`**(`AC_run_saga`、`ac_run_saga`):为每个步骤记录补偿动作;任何失败时以 **LIFO** 顺序对已完成步骤执行补偿 —— 单一区块的 `AC_try` 无法提供的持久性事务原语。前向动作/补偿为可调用对象(或 JSON 动作列表),因此可在无副作用下完整单元测试;补偿为尽力而为(失败的回滚会记录,回滚继续)。返回 `{ok, completed, compensated, failed_step, error}`。 + ## 本次更新 (2026-06-20) — JSONPath 查询 以通配符、递归、过滤查询 API/DB JSON。完整参考:[`docs/source/Zh/doc/new_features/v51_features_doc.rst`](../docs/source/Zh/doc/new_features/v51_features_doc.rst)。 diff --git a/README/README_zh-TW.md b/README/README_zh-TW.md index 24d5d16..022d08e 100644 --- a/README/README_zh-TW.md +++ b/README/README_zh-TW.md @@ -12,6 +12,7 @@ ## 目錄 +- [本次更新 (2026-06-20) — Saga / 補償回溯](#本次更新-2026-06-20--saga--補償回溯) - [本次更新 (2026-06-20) — JSONPath 查詢](#本次更新-2026-06-20--jsonpath-查詢) - [本次更新 (2026-06-20) — 多通道 Webhook 通知](#本次更新-2026-06-20--多通道-webhook-通知) - [本次更新 (2026-06-20) — 對外 CloudEvents 發送器](#本次更新-2026-06-20--對外-cloudevents-發送器) @@ -103,6 +104,12 @@ --- +## 本次更新 (2026-06-20) — Saga / 補償回溯 + +後續步驟失敗時復原已完成步驟。完整參考:[`docs/source/Zh/doc/new_features/v52_features_doc.rst`](../docs/source/Zh/doc/new_features/v52_features_doc.rst)。 + +- **`Saga` / `run_saga`**(`AC_run_saga`、`ac_run_saga`):為每個步驟記錄補償動作;任何失敗時以 **LIFO** 順序對已完成步驟執行補償 —— 單一區塊的 `AC_try` 無法提供的持久性交易原語。前向動作/補償為可呼叫物件(或 JSON 動作清單),因此可在無副作用下完整單元測試;補償為盡力而為(失敗的復原會記錄,回溯繼續)。回傳 `{ok, completed, compensated, failed_step, error}`。 + ## 本次更新 (2026-06-20) — JSONPath 查詢 以萬用字元、遞迴、過濾查詢 API/DB JSON。完整參考:[`docs/source/Zh/doc/new_features/v51_features_doc.rst`](../docs/source/Zh/doc/new_features/v51_features_doc.rst)。 diff --git a/docs/source/Eng/doc/new_features/v52_features_doc.rst b/docs/source/Eng/doc/new_features/v52_features_doc.rst new file mode 100644 index 0000000..7be081f --- /dev/null +++ b/docs/source/Eng/doc/new_features/v52_features_doc.rst @@ -0,0 +1,45 @@ +Saga / Compensating Rollback +============================ + +Some automations span several irreversible-looking steps — create a record, send +an email, move a file. If a later step fails, the already-completed steps should +be **undone**, but the executor's ``AC_try`` only does try/catch/finally for one +block; nothing tracked "what to undo" across N completed steps. A ``Saga`` +records a compensating action per step and, on any failure, runs the +compensations for the completed steps in **LIFO** order. + +Forward actions and compensations are plain callables (or, via the executor, JSON +action lists), so the orchestration is fully unit-testable with no real side +effects. Compensation is best-effort: a failing compensation is logged and the +rollback continues. Imports no ``PySide6``. + +Headless API +------------ + +.. code-block:: python + + from je_auto_control import Saga + + result = (Saga() + .step("create", create_record, delete_record) + .step("notify", send_email, None) # nothing to undo + .step("move", move_file, restore_file) + .run()) + + if not result.ok: + result.failed_step # which step raised + result.completed # steps that ran forward + result.compensated # steps undone (LIFO over completed) + +``run()`` returns a ``SagaResult`` (``ok`` / ``completed`` / ``compensated`` / +``failed_step`` / ``error``). A step "fails" when its action raises; steps with no +compensation are simply skipped during rollback. + +Executor command +---------------- + +``AC_run_saga`` takes ``steps`` — a list (or JSON string) of ``{name, action: +[...], compensation: [...]}`` where each ``action`` / ``compensation`` is an +AutoControl action list. It returns ``{ok, completed, compensated, failed_step, +error}``. The same operation is exposed as the MCP tool ``ac_run_saga`` and as a +Script Builder command under **Flow**. diff --git a/docs/source/Eng/eng_index.rst b/docs/source/Eng/eng_index.rst index 137f68a..ea387f6 100644 --- a/docs/source/Eng/eng_index.rst +++ b/docs/source/Eng/eng_index.rst @@ -74,6 +74,7 @@ Comprehensive guides for all AutoControl features. doc/new_features/v49_features_doc doc/new_features/v50_features_doc doc/new_features/v51_features_doc + doc/new_features/v52_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/docs/source/Zh/doc/new_features/v52_features_doc.rst b/docs/source/Zh/doc/new_features/v52_features_doc.rst new file mode 100644 index 0000000..ff8d1f9 --- /dev/null +++ b/docs/source/Zh/doc/new_features/v52_features_doc.rst @@ -0,0 +1,41 @@ +Saga / 補償回溯 +=============== + +有些自動化橫跨數個看似不可逆的步驟 —— 建立紀錄、寄送郵件、移動檔案。若後續步驟失敗, +已完成的步驟應被**復原**,但執行器的 ``AC_try`` 只對單一區塊做 try/catch/finally;沒有 +任何機制追蹤跨 N 個已完成步驟「該復原什麼」。``Saga`` 為每個步驟記錄一個補償動作,並在 +任何失敗時以 **LIFO** 順序對已完成步驟執行補償。 + +前向動作與補償皆為純可呼叫物件(或經執行器以 JSON 動作清單),因此編排可在無任何真實副 +作用下完整單元測試。補償為盡力而為:失敗的補償會被記錄,回溯繼續進行。不匯入 +``PySide6``。 + +無頭 API +-------- + +.. code-block:: python + + from je_auto_control import Saga + + result = (Saga() + .step("create", create_record, delete_record) + .step("notify", send_email, None) # 無需復原 + .step("move", move_file, restore_file) + .run()) + + if not result.ok: + result.failed_step # 哪個步驟拋出例外 + result.completed # 前向執行過的步驟 + result.compensated # 已復原的步驟(對已完成者 LIFO) + +``run()`` 回傳 ``SagaResult``(``ok`` / ``completed`` / ``compensated`` / +``failed_step`` / ``error``)。步驟「失敗」即其動作拋出例外;沒有補償的步驟在回溯時直接略 +過。 + +執行器指令 +---------- + +``AC_run_saga`` 接受 ``steps`` —— 一個 ``{name, action: [...], compensation: [...]}`` +的清單(或 JSON 字串),其中 ``action`` / ``compensation`` 各為一個 AutoControl 動作清單。 +回傳 ``{ok, completed, compensated, failed_step, error}``。相同操作亦提供為 MCP 工具 +``ac_run_saga``,以及 Script Builder 中 **Flow** 分類下的指令。 diff --git a/docs/source/Zh/zh_index.rst b/docs/source/Zh/zh_index.rst index 6bed021..6271153 100644 --- a/docs/source/Zh/zh_index.rst +++ b/docs/source/Zh/zh_index.rst @@ -74,6 +74,7 @@ AutoControl 所有功能的完整使用指南。 doc/new_features/v49_features_doc doc/new_features/v50_features_doc doc/new_features/v51_features_doc + doc/new_features/v52_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/je_auto_control/__init__.py b/je_auto_control/__init__.py index c9030ce..838b5d1 100644 --- a/je_auto_control/__init__.py +++ b/je_auto_control/__init__.py @@ -280,6 +280,8 @@ from je_auto_control.utils.jsonpath import ( json_extract, json_query, json_query_one, ) +# Saga orchestrator: multi-step flow with compensating rollback +from je_auto_control.utils.saga import Saga, SagaResult, run_saga # Background popup/interrupt watchdog (unattended automation) from je_auto_control.utils.watchdog import ( PopupWatchdog, WatchdogRule, default_popup_watchdog, @@ -743,6 +745,7 @@ def start_autocontrol_gui(*args, **kwargs): "EventEmitter", "post_cloudevent", "to_cloudevent", "WebhookChannel", "WebhookResult", "notify_webhook", "set_default_poster", "json_extract", "json_query", "json_query_one", + "Saga", "SagaResult", "run_saga", # MCP server "AuditLogger", "HttpMCPServer", "MCPContent", "MCPPrompt", "MCPPromptArgument", "MCPResource", "MCPServer", "MCPTool", diff --git a/je_auto_control/gui/script_builder/command_schema.py b/je_auto_control/gui/script_builder/command_schema.py index df004e8..be698dc 100644 --- a/je_auto_control/gui/script_builder/command_schema.py +++ b/je_auto_control/gui/script_builder/command_schema.py @@ -1145,6 +1145,15 @@ def _add_misc_specs(specs: List[CommandSpec]) -> None: ), description="Extract a {key: jsonpath} mapping into a flat object.", )) + specs.append(CommandSpec( + "AC_run_saga", "Flow", "Run Saga (Compensating Rollback)", + fields=( + FieldSpec("steps", FieldType.STRING, + placeholder='[{"name": "s1", "action": [...], ' + '"compensation": [...]}]'), + ), + description="Run steps; on failure undo completed steps LIFO.", + )) specs.append(CommandSpec( "AC_generate_sop", "Report", "Generate SOP Document", fields=( diff --git a/je_auto_control/utils/executor/action_executor.py b/je_auto_control/utils/executor/action_executor.py index 6a006ad..dafe824 100644 --- a/je_auto_control/utils/executor/action_executor.py +++ b/je_auto_control/utils/executor/action_executor.py @@ -3316,6 +3316,18 @@ def _json_extract(data: Any, mapping: Any) -> Dict[str, Any]: return {"result": json_extract(data, mapping)} +def _run_saga(steps: Any) -> Dict[str, Any]: + """Adapter: run a saga (steps with compensating rollback) from a spec.""" + import json + from je_auto_control.utils.saga import run_saga + if isinstance(steps, str): + steps = json.loads(steps) + result = run_saga(steps) + return {"ok": result.ok, "completed": result.completed, + "compensated": result.compensated, + "failed_step": result.failed_step, "error": result.error} + + class Executor: """ Executor @@ -3598,6 +3610,7 @@ def __init__(self): "AC_notify_webhook": _notify_webhook, "AC_json_query": _json_query, "AC_json_extract": _json_extract, + "AC_run_saga": _run_saga, "AC_a11y_record_start": _a11y_record_start, "AC_a11y_record_stop": _a11y_record_stop, "AC_a11y_record_events": _a11y_record_events, diff --git a/je_auto_control/utils/mcp_server/tools/_factories.py b/je_auto_control/utils/mcp_server/tools/_factories.py index 8e6a77a..4d76283 100644 --- a/je_auto_control/utils/mcp_server/tools/_factories.py +++ b/je_auto_control/utils/mcp_server/tools/_factories.py @@ -3226,6 +3226,24 @@ def jsonpath_tools() -> List[MCPTool]: ] +def saga_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_run_saga", + description=("Run a saga: a list of steps {name, action:[AC...], " + "compensation:[AC...]}. On any step failure, the " + "compensations of completed steps run in LIFO order. " + "Returns {ok, completed, compensated, failed_step, " + "error}."), + input_schema=schema( + {"steps": {"type": "array", "items": {"type": "object"}}}, + ["steps"]), + handler=h.run_saga, + annotations=SIDE_EFFECT_ONLY, + ), + ] + + def unattended_tools() -> List[MCPTool]: return [ MCPTool( @@ -4289,7 +4307,7 @@ def media_assert_tools() -> List[MCPTool]: video_report_tools, fuzzy_tools, artifact_store_tools, image_dedup_tools, locale_tools, voice_tools, coordinate_space_tools, loop_guard_tools, process_mining_tools, asset_tools, events_tools, notify_channel_tools, - jsonpath_tools, + jsonpath_tools, saga_tools, screen_record_tools, process_and_shell_tools, remote_desktop_tools, gamepad_tools, usb_passthrough_tools, assertion_tools, data_source_tools, diff --git a/je_auto_control/utils/mcp_server/tools/_handlers.py b/je_auto_control/utils/mcp_server/tools/_handlers.py index 4f06fa5..13203da 100644 --- a/je_auto_control/utils/mcp_server/tools/_handlers.py +++ b/je_auto_control/utils/mcp_server/tools/_handlers.py @@ -1556,6 +1556,14 @@ def json_extract(data, mapping): return {"result": _x(data, mapping)} +def run_saga(steps): + from je_auto_control.utils.saga import run_saga as _run + result = _run(steps) + return {"ok": result.ok, "completed": result.completed, + "compensated": result.compensated, + "failed_step": result.failed_step, "error": result.error} + + def vlm_locate(description: str, screen_region: Optional[List[int]] = None, model: Optional[str] = None) -> Optional[List[int]]: diff --git a/je_auto_control/utils/saga/__init__.py b/je_auto_control/utils/saga/__init__.py new file mode 100644 index 0000000..c0c0646 --- /dev/null +++ b/je_auto_control/utils/saga/__init__.py @@ -0,0 +1,4 @@ +"""Saga orchestrator: run steps with LIFO compensating rollback on failure.""" +from je_auto_control.utils.saga.saga import Saga, SagaResult, run_saga + +__all__ = ["Saga", "SagaResult", "run_saga"] diff --git a/je_auto_control/utils/saga/saga.py b/je_auto_control/utils/saga/saga.py new file mode 100644 index 0000000..2115303 --- /dev/null +++ b/je_auto_control/utils/saga/saga.py @@ -0,0 +1,89 @@ +"""Run a multi-step flow with compensating rollback (the saga pattern). + +Some automations span several irreversible-looking steps (create record, send +email, move file). If a later step fails, the already-completed steps should be +*undone* — but the executor's ``AC_try`` only does try/catch/finally for one +block; nothing tracks "what to undo" across N completed steps. A ``Saga`` records +a compensating action per step and, on any failure, runs the compensations for +the completed steps in **LIFO** order. + +Forward actions and compensations are plain callables (or, via :func:`run_saga`, +JSON action lists), so the orchestration is fully unit-testable without any real +side effects. Compensation is best-effort: a failing compensation is logged and +the rollback continues. Imports no ``PySide6``. +""" +from dataclasses import dataclass, field +from typing import Any, Callable, List, Optional, Tuple + + +@dataclass +class SagaResult: + """Outcome of running a saga.""" + + ok: bool + completed: List[str] = field(default_factory=list) + compensated: List[str] = field(default_factory=list) + failed_step: Optional[str] = None + error: str = "" + + +class Saga: + """A sequence of steps, each with an optional compensating action.""" + + def __init__(self) -> None: + self._steps: List[Tuple[str, Callable[[], Any], + Optional[Callable[[], Any]]]] = [] + + def step(self, name: str, action: Callable[[], Any], + compensation: Optional[Callable[[], Any]] = None) -> "Saga": + """Append a step; returns self for chaining.""" + self._steps.append((name, action, compensation)) + return self + + def _compensate(self, upto: int, result: SagaResult) -> None: + for name, _action, compensation in reversed(self._steps[:upto]): + if compensation is None: + continue + try: + compensation() + except Exception as error: # best-effort: log, keep rolling back + from je_auto_control.utils.logging.logging_instance import ( + autocontrol_logger) + autocontrol_logger.warning( + "saga compensation for %r failed: %r", name, error) + result.compensated.append(name) + + def run(self) -> SagaResult: + """Run steps forward; on failure compensate completed steps LIFO.""" + result = SagaResult(ok=True) + for index, (name, action, _compensation) in enumerate(self._steps): + try: + action() + except Exception as error: # noqa: BLE001 # saga catches any step failure + result.ok = False + result.failed_step = name + result.error = str(error) + self._compensate(index, result) + return result + result.completed.append(name) + return result + + +def run_saga(steps: Any) -> SagaResult: + """Run a saga from a JSON-style spec of action lists. + + ``steps`` is a sequence of ``{"name", "action": [...], "compensation": + [...]}`` mappings; each ``action`` / ``compensation`` is an AutoControl + action list run through the executor. + """ + from je_auto_control.utils.executor.action_executor import execute_action + + def _runner(action_list: Any) -> Callable[[], Any]: + return lambda: execute_action(action_list) + + saga = Saga() + for spec in steps: + comp = spec.get("compensation") + saga.step(str(spec.get("name", "")), _runner(spec.get("action", [])), + _runner(comp) if comp else None) + return saga.run() diff --git a/test/unit_test/headless/test_saga_batch.py b/test/unit_test/headless/test_saga_batch.py new file mode 100644 index 0000000..f6e4e82 --- /dev/null +++ b/test/unit_test/headless/test_saga_batch.py @@ -0,0 +1,109 @@ +"""Headless tests for the saga / compensating-rollback orchestrator. Steps are +plain callables recording call order — fully deterministic, no side effects. +Pure stdlib, no Qt imports.""" +import je_auto_control as ac +from je_auto_control.utils.saga import Saga + + +def _recorder(): + log = [] + return log, (lambda tag: (lambda: log.append(tag))) + + +def test_all_steps_succeed_no_compensation(): + log, make = _recorder() + result = (Saga() + .step("a", make("do-a"), make("undo-a")) + .step("b", make("do-b"), make("undo-b")) + .run()) + assert result.ok is True + assert result.completed == ["a", "b"] + assert result.compensated == [] + assert log == ["do-a", "do-b"] + + +def test_failure_triggers_lifo_compensation(): + log, make = _recorder() + + def boom(): + raise RuntimeError("step c failed") + + result = (Saga() + .step("a", make("do-a"), make("undo-a")) + .step("b", make("do-b"), make("undo-b")) + .step("c", boom, make("undo-c")) + .run()) + assert result.ok is False + assert result.failed_step == "c" + assert "step c failed" in result.error + assert result.completed == ["a", "b"] # c never completed + assert result.compensated == ["b", "a"] # LIFO over completed + # forward a,b then undo b,a (c's action raised before completing) + assert log == ["do-a", "do-b", "undo-b", "undo-a"] + + +def test_steps_without_compensation_are_skipped(): + log, make = _recorder() + + def boom(): + raise ValueError("x") + + result = (Saga() + .step("a", make("do-a")) # no compensation + .step("b", make("do-b"), make("undo-b")) + .step("c", boom) + .run()) + assert result.failed_step == "c" + assert result.compensated == ["b"] # a had none to run + assert log == ["do-a", "do-b", "undo-b"] + + +def test_compensation_failure_is_best_effort(): + log, make = _recorder() + + def bad_undo(): + raise RuntimeError("undo failed") + + def boom(): + raise RuntimeError("fail") + + result = (Saga() + .step("a", make("do-a"), make("undo-a")) + .step("b", make("do-b"), bad_undo) # its undo raises + .step("c", boom, None) + .run()) + # rollback continues past the failing compensation; a still undone + assert result.compensated == ["b", "a"] + assert log == ["do-a", "do-b", "undo-a"] # undo-a still ran + + +# --- wiring --------------------------------------------------------------- + +def test_executor_round_trip_success(): + rec = ac.execute_action([[ + "AC_run_saga", + {"steps": [ + {"name": "q", + "action": [["AC_json_query", {"data": {"a": 1}, "path": "$.a"}]]}, + ]}, + ]]) + out = next(v for v in rec.values() if isinstance(v, dict) + and "completed" in v) + assert out["ok"] is True and out["completed"] == ["q"] + + +def test_wiring(): + assert "AC_run_saga" in ac.executor.known_commands() + from je_auto_control.utils.mcp_server.tools import ( + build_default_tool_registry) + names = {t.name for t in build_default_tool_registry()} + assert "ac_run_saga" in names + from je_auto_control.gui.script_builder.command_schema import _build_specs + cmds = {s.command for s in _build_specs()} + assert "AC_run_saga" in cmds + + +def test_facade_exports(): + for attr in ("Saga", "SagaResult", "run_saga"): + assert hasattr(ac, attr) + assert attr in ac.__all__