Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

## Table of Contents

- [What's new (2026-06-20) — Saga / Compensating Rollback](#whats-new-2026-06-20--saga--compensating-rollback)
- [What's new (2026-06-20) — JSONPath Querying](#whats-new-2026-06-20--jsonpath-querying)
- [What's new (2026-06-20) — Multi-Channel Webhook Notifications](#whats-new-2026-06-20--multi-channel-webhook-notifications)
- [What's new (2026-06-20) — Outbound CloudEvents Emitter](#whats-new-2026-06-20--outbound-cloudevents-emitter)
Expand Down Expand Up @@ -104,6 +105,12 @@

---

## What's new (2026-06-20) — Saga / Compensating Rollback

Undo completed steps when a later one fails. Full reference: [`docs/source/Eng/doc/new_features/v52_features_doc.rst`](docs/source/Eng/doc/new_features/v52_features_doc.rst).

- **`Saga` / `run_saga`** (`AC_run_saga`, `ac_run_saga`): records a compensating action per step; on any failure runs the completed steps' compensations in **LIFO** order — the durable-transaction primitive `AC_try` (single-block) couldn't provide. Forward actions/compensations are callables (or JSON action lists), so it's fully unit-tested with no side effects; compensation is best-effort (a failing undo is logged, rollback continues). Returns `{ok, completed, compensated, failed_step, error}`.

## What's new (2026-06-20) — JSONPath Querying

Query API/DB JSON with wildcards, recursion, filters. Full reference: [`docs/source/Eng/doc/new_features/v51_features_doc.rst`](docs/source/Eng/doc/new_features/v51_features_doc.rst).
Expand Down
7 changes: 7 additions & 0 deletions README/README_zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

## 目录

- [本次更新 (2026-06-20) — Saga / 补偿回滚](#本次更新-2026-06-20--saga--补偿回滚)
- [本次更新 (2026-06-20) — JSONPath 查询](#本次更新-2026-06-20--jsonpath-查询)
- [本次更新 (2026-06-20) — 多通道 Webhook 通知](#本次更新-2026-06-20--多通道-webhook-通知)
- [本次更新 (2026-06-20) — 对外 CloudEvents 发送器](#本次更新-2026-06-20--对外-cloudevents-发送器)
Expand Down Expand Up @@ -103,6 +104,12 @@

---

## 本次更新 (2026-06-20) — Saga / 补偿回滚

后续步骤失败时回滚已完成步骤。完整参考:[`docs/source/Zh/doc/new_features/v52_features_doc.rst`](../docs/source/Zh/doc/new_features/v52_features_doc.rst)。

- **`Saga` / `run_saga`**(`AC_run_saga`、`ac_run_saga`):为每个步骤记录补偿动作;任何失败时以 **LIFO** 顺序对已完成步骤执行补偿 —— 单一区块的 `AC_try` 无法提供的持久性事务原语。前向动作/补偿为可调用对象(或 JSON 动作列表),因此可在无副作用下完整单元测试;补偿为尽力而为(失败的回滚会记录,回滚继续)。返回 `{ok, completed, compensated, failed_step, error}`。

## 本次更新 (2026-06-20) — JSONPath 查询

以通配符、递归、过滤查询 API/DB JSON。完整参考:[`docs/source/Zh/doc/new_features/v51_features_doc.rst`](../docs/source/Zh/doc/new_features/v51_features_doc.rst)。
Expand Down
7 changes: 7 additions & 0 deletions README/README_zh-TW.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

## 目錄

- [本次更新 (2026-06-20) — Saga / 補償回溯](#本次更新-2026-06-20--saga--補償回溯)
- [本次更新 (2026-06-20) — JSONPath 查詢](#本次更新-2026-06-20--jsonpath-查詢)
- [本次更新 (2026-06-20) — 多通道 Webhook 通知](#本次更新-2026-06-20--多通道-webhook-通知)
- [本次更新 (2026-06-20) — 對外 CloudEvents 發送器](#本次更新-2026-06-20--對外-cloudevents-發送器)
Expand Down Expand Up @@ -103,6 +104,12 @@

---

## 本次更新 (2026-06-20) — Saga / 補償回溯

後續步驟失敗時復原已完成步驟。完整參考:[`docs/source/Zh/doc/new_features/v52_features_doc.rst`](../docs/source/Zh/doc/new_features/v52_features_doc.rst)。

- **`Saga` / `run_saga`**(`AC_run_saga`、`ac_run_saga`):為每個步驟記錄補償動作;任何失敗時以 **LIFO** 順序對已完成步驟執行補償 —— 單一區塊的 `AC_try` 無法提供的持久性交易原語。前向動作/補償為可呼叫物件(或 JSON 動作清單),因此可在無副作用下完整單元測試;補償為盡力而為(失敗的復原會記錄,回溯繼續)。回傳 `{ok, completed, compensated, failed_step, error}`。

## 本次更新 (2026-06-20) — JSONPath 查詢

以萬用字元、遞迴、過濾查詢 API/DB JSON。完整參考:[`docs/source/Zh/doc/new_features/v51_features_doc.rst`](../docs/source/Zh/doc/new_features/v51_features_doc.rst)。
Expand Down
45 changes: 45 additions & 0 deletions docs/source/Eng/doc/new_features/v52_features_doc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
Saga / Compensating Rollback
============================

Some automations span several irreversible-looking steps — create a record, send
an email, move a file. If a later step fails, the already-completed steps should
be **undone**, but the executor's ``AC_try`` only does try/catch/finally for one
block; nothing tracked "what to undo" across N completed steps. A ``Saga``
records a compensating action per step and, on any failure, runs the
compensations for the completed steps in **LIFO** order.

Forward actions and compensations are plain callables (or, via the executor, JSON
action lists), so the orchestration is fully unit-testable with no real side
effects. Compensation is best-effort: a failing compensation is logged and the
rollback continues. Imports no ``PySide6``.

Headless API
------------

.. code-block:: python

from je_auto_control import Saga

result = (Saga()
.step("create", create_record, delete_record)
.step("notify", send_email, None) # nothing to undo
.step("move", move_file, restore_file)
.run())

if not result.ok:
result.failed_step # which step raised
result.completed # steps that ran forward
result.compensated # steps undone (LIFO over completed)

``run()`` returns a ``SagaResult`` (``ok`` / ``completed`` / ``compensated`` /
``failed_step`` / ``error``). A step "fails" when its action raises; steps with no
compensation are simply skipped during rollback.

Executor command
----------------

``AC_run_saga`` takes ``steps`` — a list (or JSON string) of ``{name, action:
[...], compensation: [...]}`` where each ``action`` / ``compensation`` is an
AutoControl action list. It returns ``{ok, completed, compensated, failed_step,
error}``. The same operation is exposed as the MCP tool ``ac_run_saga`` and as a
Script Builder command under **Flow**.
1 change: 1 addition & 0 deletions docs/source/Eng/eng_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Comprehensive guides for all AutoControl features.
doc/new_features/v49_features_doc
doc/new_features/v50_features_doc
doc/new_features/v51_features_doc
doc/new_features/v52_features_doc
doc/ocr_backends/ocr_backends_doc
doc/observability/observability_doc
doc/operations_layer/operations_layer_doc
Expand Down
41 changes: 41 additions & 0 deletions docs/source/Zh/doc/new_features/v52_features_doc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Saga / 補償回溯
===============

有些自動化橫跨數個看似不可逆的步驟 —— 建立紀錄、寄送郵件、移動檔案。若後續步驟失敗,
已完成的步驟應被**復原**,但執行器的 ``AC_try`` 只對單一區塊做 try/catch/finally;沒有
任何機制追蹤跨 N 個已完成步驟「該復原什麼」。``Saga`` 為每個步驟記錄一個補償動作,並在
任何失敗時以 **LIFO** 順序對已完成步驟執行補償。

前向動作與補償皆為純可呼叫物件(或經執行器以 JSON 動作清單),因此編排可在無任何真實副
作用下完整單元測試。補償為盡力而為:失敗的補償會被記錄,回溯繼續進行。不匯入
``PySide6``。

無頭 API
--------

.. code-block:: python

from je_auto_control import Saga

result = (Saga()
.step("create", create_record, delete_record)
.step("notify", send_email, None) # 無需復原
.step("move", move_file, restore_file)
.run())

if not result.ok:
result.failed_step # 哪個步驟拋出例外
result.completed # 前向執行過的步驟
result.compensated # 已復原的步驟(對已完成者 LIFO)

``run()`` 回傳 ``SagaResult``(``ok`` / ``completed`` / ``compensated`` /
``failed_step`` / ``error``)。步驟「失敗」即其動作拋出例外;沒有補償的步驟在回溯時直接略
過。

執行器指令
----------

``AC_run_saga`` 接受 ``steps`` —— 一個 ``{name, action: [...], compensation: [...]}``
的清單(或 JSON 字串),其中 ``action`` / ``compensation`` 各為一個 AutoControl 動作清單。
回傳 ``{ok, completed, compensated, failed_step, error}``。相同操作亦提供為 MCP 工具
``ac_run_saga``,以及 Script Builder 中 **Flow** 分類下的指令。
1 change: 1 addition & 0 deletions docs/source/Zh/zh_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ AutoControl 所有功能的完整使用指南。
doc/new_features/v49_features_doc
doc/new_features/v50_features_doc
doc/new_features/v51_features_doc
doc/new_features/v52_features_doc
doc/ocr_backends/ocr_backends_doc
doc/observability/observability_doc
doc/operations_layer/operations_layer_doc
Expand Down
3 changes: 3 additions & 0 deletions je_auto_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@
from je_auto_control.utils.jsonpath import (
json_extract, json_query, json_query_one,
)
# Saga orchestrator: multi-step flow with compensating rollback
from je_auto_control.utils.saga import Saga, SagaResult, run_saga
# Background popup/interrupt watchdog (unattended automation)
from je_auto_control.utils.watchdog import (
PopupWatchdog, WatchdogRule, default_popup_watchdog,
Expand Down Expand Up @@ -743,6 +745,7 @@ def start_autocontrol_gui(*args, **kwargs):
"EventEmitter", "post_cloudevent", "to_cloudevent",
"WebhookChannel", "WebhookResult", "notify_webhook", "set_default_poster",
"json_extract", "json_query", "json_query_one",
"Saga", "SagaResult", "run_saga",
# MCP server
"AuditLogger", "HttpMCPServer", "MCPContent", "MCPPrompt",
"MCPPromptArgument", "MCPResource", "MCPServer", "MCPTool",
Expand Down
9 changes: 9 additions & 0 deletions je_auto_control/gui/script_builder/command_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,15 @@ def _add_misc_specs(specs: List[CommandSpec]) -> None:
),
description="Extract a {key: jsonpath} mapping into a flat object.",
))
specs.append(CommandSpec(
"AC_run_saga", "Flow", "Run Saga (Compensating Rollback)",
fields=(
FieldSpec("steps", FieldType.STRING,
placeholder='[{"name": "s1", "action": [...], '
'"compensation": [...]}]'),
),
description="Run steps; on failure undo completed steps LIFO.",
))
specs.append(CommandSpec(
"AC_generate_sop", "Report", "Generate SOP Document",
fields=(
Expand Down
13 changes: 13 additions & 0 deletions je_auto_control/utils/executor/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3316,6 +3316,18 @@ def _json_extract(data: Any, mapping: Any) -> Dict[str, Any]:
return {"result": json_extract(data, mapping)}


def _run_saga(steps: Any) -> Dict[str, Any]:
"""Adapter: run a saga (steps with compensating rollback) from a spec."""
import json
from je_auto_control.utils.saga import run_saga
if isinstance(steps, str):
steps = json.loads(steps)
result = run_saga(steps)
return {"ok": result.ok, "completed": result.completed,
"compensated": result.compensated,
"failed_step": result.failed_step, "error": result.error}


class Executor:
"""
Executor
Expand Down Expand Up @@ -3598,6 +3610,7 @@ def __init__(self):
"AC_notify_webhook": _notify_webhook,
"AC_json_query": _json_query,
"AC_json_extract": _json_extract,
"AC_run_saga": _run_saga,
"AC_a11y_record_start": _a11y_record_start,
"AC_a11y_record_stop": _a11y_record_stop,
"AC_a11y_record_events": _a11y_record_events,
Expand Down
20 changes: 19 additions & 1 deletion je_auto_control/utils/mcp_server/tools/_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -3226,6 +3226,24 @@ def jsonpath_tools() -> List[MCPTool]:
]


def saga_tools() -> List[MCPTool]:
return [
MCPTool(
name="ac_run_saga",
description=("Run a saga: a list of steps {name, action:[AC...], "
"compensation:[AC...]}. On any step failure, the "
"compensations of completed steps run in LIFO order. "
"Returns {ok, completed, compensated, failed_step, "
"error}."),
input_schema=schema(
{"steps": {"type": "array", "items": {"type": "object"}}},
["steps"]),
handler=h.run_saga,
annotations=SIDE_EFFECT_ONLY,
),
]


def unattended_tools() -> List[MCPTool]:
return [
MCPTool(
Expand Down Expand Up @@ -4289,7 +4307,7 @@ def media_assert_tools() -> List[MCPTool]:
video_report_tools, fuzzy_tools, artifact_store_tools, image_dedup_tools,
locale_tools, voice_tools, coordinate_space_tools, loop_guard_tools,
process_mining_tools, asset_tools, events_tools, notify_channel_tools,
jsonpath_tools,
jsonpath_tools, saga_tools,
screen_record_tools,
process_and_shell_tools, remote_desktop_tools, gamepad_tools,
usb_passthrough_tools, assertion_tools, data_source_tools,
Expand Down
8 changes: 8 additions & 0 deletions je_auto_control/utils/mcp_server/tools/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,14 @@ def json_extract(data, mapping):
return {"result": _x(data, mapping)}


def run_saga(steps):
from je_auto_control.utils.saga import run_saga as _run
result = _run(steps)
return {"ok": result.ok, "completed": result.completed,
"compensated": result.compensated,
"failed_step": result.failed_step, "error": result.error}


def vlm_locate(description: str,
screen_region: Optional[List[int]] = None,
model: Optional[str] = None) -> Optional[List[int]]:
Expand Down
4 changes: 4 additions & 0 deletions je_auto_control/utils/saga/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Saga orchestrator: run steps with LIFO compensating rollback on failure."""
from je_auto_control.utils.saga.saga import Saga, SagaResult, run_saga

__all__ = ["Saga", "SagaResult", "run_saga"]
89 changes: 89 additions & 0 deletions je_auto_control/utils/saga/saga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Run a multi-step flow with compensating rollback (the saga pattern).

Some automations span several irreversible-looking steps (create record, send
email, move file). If a later step fails, the already-completed steps should be
*undone* — but the executor's ``AC_try`` only does try/catch/finally for one
block; nothing tracks "what to undo" across N completed steps. A ``Saga`` records
a compensating action per step and, on any failure, runs the compensations for
the completed steps in **LIFO** order.

Forward actions and compensations are plain callables (or, via :func:`run_saga`,
JSON action lists), so the orchestration is fully unit-testable without any real
side effects. Compensation is best-effort: a failing compensation is logged and
the rollback continues. Imports no ``PySide6``.
"""
from dataclasses import dataclass, field
from typing import Any, Callable, List, Optional, Tuple


@dataclass
class SagaResult:
"""Outcome of running a saga."""

ok: bool
completed: List[str] = field(default_factory=list)
compensated: List[str] = field(default_factory=list)
failed_step: Optional[str] = None
error: str = ""


class Saga:
"""A sequence of steps, each with an optional compensating action."""

def __init__(self) -> None:
self._steps: List[Tuple[str, Callable[[], Any],
Optional[Callable[[], Any]]]] = []

def step(self, name: str, action: Callable[[], Any],
compensation: Optional[Callable[[], Any]] = None) -> "Saga":
"""Append a step; returns self for chaining."""
self._steps.append((name, action, compensation))
return self

def _compensate(self, upto: int, result: SagaResult) -> None:
for name, _action, compensation in reversed(self._steps[:upto]):
if compensation is None:
continue
try:
compensation()
except Exception as error: # best-effort: log, keep rolling back
from je_auto_control.utils.logging.logging_instance import (
autocontrol_logger)
autocontrol_logger.warning(
"saga compensation for %r failed: %r", name, error)
result.compensated.append(name)

def run(self) -> SagaResult:
"""Run steps forward; on failure compensate completed steps LIFO."""
result = SagaResult(ok=True)
for index, (name, action, _compensation) in enumerate(self._steps):
try:
action()
except Exception as error: # noqa: BLE001 # saga catches any step failure
result.ok = False
result.failed_step = name
result.error = str(error)
self._compensate(index, result)
return result
result.completed.append(name)
return result


def run_saga(steps: Any) -> SagaResult:
"""Run a saga from a JSON-style spec of action lists.

``steps`` is a sequence of ``{"name", "action": [...], "compensation":
[...]}`` mappings; each ``action`` / ``compensation`` is an AutoControl
action list run through the executor.
"""
from je_auto_control.utils.executor.action_executor import execute_action

def _runner(action_list: Any) -> Callable[[], Any]:
return lambda: execute_action(action_list)

saga = Saga()
for spec in steps:
comp = spec.get("compensation")
saga.step(str(spec.get("name", "")), _runner(spec.get("action", [])),
_runner(comp) if comp else None)
return saga.run()
Loading
Loading