diff --git a/README.md b/README.md index 06fed1c2..d580298f 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ ## Table of Contents +- [What's new (2026-06-19) — Transactional Queue](#whats-new-2026-06-19--transactional-queue) - [What's new (2026-06-19) — Unattended Reliability](#whats-new-2026-06-19--unattended-reliability) - [What's new (2026-06-19) — Popup Watchdog](#whats-new-2026-06-19--popup-watchdog) - [What's new (2026-06-19) — Native UI Control](#whats-new-2026-06-19--native-ui-control) @@ -62,6 +63,13 @@ --- +## What's new (2026-06-19) — Transactional Queue + +Turn AutoControl from "run a script" into "run a robot." A SQLite-backed work queue implements the production-RPA dispatcher/performer pattern: enqueue items, process one at a time with per-item status, dedup and retry, so a run of thousands is **resumable after a crash** and parallelizable. Pure stdlib, full stack. Full reference: [`docs/source/Eng/doc/new_features/v10_features_doc.rst`](docs/source/Eng/doc/new_features/v10_features_doc.rst). + +- **Dispatcher/performer** — `WorkQueue.add()` enqueues (dedupes by reference); `get_next()` atomically claims the oldest item; `complete()` / `fail()` record the outcome. `AC_queue_add` / `AC_queue_next` / `AC_queue_complete` / `AC_queue_fail` / `AC_queue_stats`. +- **Failure semantics** — application errors retry up to `max_retries`; **business** errors (`BusinessError` / `kind="business"`) never retry. `stats()` gives per-status counts for dashboards. + ## What's new (2026-06-19) — Unattended Reliability Three practitioner-pain fixes for unattended / login automation, all headless and full-stack. Full reference: [`docs/source/Eng/doc/new_features/v9_features_doc.rst`](docs/source/Eng/doc/new_features/v9_features_doc.rst). diff --git a/README/README_zh-CN.md b/README/README_zh-CN.md index 73ba4e08..a826ee6f 100644 --- a/README/README_zh-CN.md +++ b/README/README_zh-CN.md @@ -12,6 +12,7 @@ ## 目录 +- [本次更新 (2026-06-19) — 事务式工作队列](#本次更新-2026-06-19--事务式工作队列) - [本次更新 (2026-06-19) — 无人值守可靠性](#本次更新-2026-06-19--无人值守可靠性) - [本次更新 (2026-06-19) — 弹窗看门狗](#本次更新-2026-06-19--弹窗看门狗) - [本次更新 (2026-06-19) — 原生 UI 控制](#本次更新-2026-06-19--原生-ui-控制) @@ -61,6 +62,13 @@ --- +## 本次更新 (2026-06-19) — 事务式工作队列 + +把 AutoControl 从「跑脚本」升级成「跑机器人」。以 SQLite 为底的工作队列实作生产级 RPA dispatcher/performer:入列项目、一次处理一项、具每项状态/去重/重试,使上千项执行能**崩溃后续跑**且可并行化。纯标准库、走完整五层。完整参考:[`docs/source/Eng/doc/new_features/v10_features_doc.rst`](../docs/source/Eng/doc/new_features/v10_features_doc.rst)。 + +- **Dispatcher/performer** — `WorkQueue.add()` 入列(依 reference 去重);`get_next()` 原子认领最旧项;`complete()` / `fail()` 记录结果。`AC_queue_add` / `AC_queue_next` / `AC_queue_complete` / `AC_queue_fail` / `AC_queue_stats`。 +- **失败语义** — application 错误重试至 `max_retries`;**business** 错误(`BusinessError` / `kind="business"`)永不重试。`stats()` 给各状态计数供仪表板。 + ## 本次更新 (2026-06-19) — 无人值守可靠性 三个无人值守/登录自动化的社区痛点修复,均 headless 且走完整五层。完整参考:[`docs/source/Eng/doc/new_features/v9_features_doc.rst`](../docs/source/Eng/doc/new_features/v9_features_doc.rst)。 diff --git a/README/README_zh-TW.md b/README/README_zh-TW.md index 8f690127..36ec2782 100644 --- a/README/README_zh-TW.md +++ b/README/README_zh-TW.md @@ -12,6 +12,7 @@ ## 目錄 +- [本次更新 (2026-06-19) — 交易式工作佇列](#本次更新-2026-06-19--交易式工作佇列) - [本次更新 (2026-06-19) — 無人值守可靠性](#本次更新-2026-06-19--無人值守可靠性) - [本次更新 (2026-06-19) — 彈窗看門狗](#本次更新-2026-06-19--彈窗看門狗) - [本次更新 (2026-06-19) — 原生 UI 控制](#本次更新-2026-06-19--原生-ui-控制) @@ -61,6 +62,13 @@ --- +## 本次更新 (2026-06-19) — 交易式工作佇列 + +把 AutoControl 從「跑腳本」升級成「跑機器人」。以 SQLite 為底的工作佇列實作生產級 RPA dispatcher/performer:入列項目、一次處理一項、具每項狀態/去重/重試,使上千項執行能**當機後續跑**且可平行化。純標準庫、走完整五層。完整參考:[`docs/source/Zh/doc/new_features/v10_features_doc.rst`](../docs/source/Zh/doc/new_features/v10_features_doc.rst)。 + +- **Dispatcher/performer** — `WorkQueue.add()` 入列(依 reference 去重);`get_next()` 原子認領最舊項;`complete()` / `fail()` 記錄結果。`AC_queue_add` / `AC_queue_next` / `AC_queue_complete` / `AC_queue_fail` / `AC_queue_stats`。 +- **失敗語意** — application 錯誤重試至 `max_retries`;**business** 錯誤(`BusinessError` / `kind="business"`)永不重試。`stats()` 給各狀態計數供儀表板。 + ## 本次更新 (2026-06-19) — 無人值守可靠性 三個無人值守/登入自動化的社群痛點修復,皆 headless 且走完整五層。完整參考:[`docs/source/Zh/doc/new_features/v9_features_doc.rst`](../docs/source/Zh/doc/new_features/v9_features_doc.rst)。 diff --git a/docs/source/Eng/doc/new_features/v10_features_doc.rst b/docs/source/Eng/doc/new_features/v10_features_doc.rst new file mode 100644 index 00000000..9ad7cdbe --- /dev/null +++ b/docs/source/Eng/doc/new_features/v10_features_doc.rst @@ -0,0 +1,76 @@ +================================================ +New Features (2026-06-19) — Transactional Queue +================================================ + +Turn AutoControl from a "run a script" tool into "run a robot." A +SQLite-backed work queue implements the standard production-RPA +dispatcher/performer pattern: a *dispatcher* enqueues work items, and a +*performer* processes them one at a time with per-item status, dedup and +retry — so a run of thousands of items is **resumable after a crash** and +parallelizable across workers. + +Pure standard library, fully headless, wired through the full stack +(facade, ``AC_*`` executor commands, MCP tools, Script Builder). Surfaced +by the competitor research as the missing piece vs UiPath Orchestrator +queues / REFramework. + +.. contents:: + :local: + :depth: 2 + + +Dispatcher / performer +====================== + +:: + + from je_auto_control import WorkQueue + + q = WorkQueue("run.db", name="invoices") + + # Dispatcher: enqueue work (dedupes on a live reference). + for inv in invoices: + q.add({"path": inv}, reference=inv) + + # Performer: drain the queue, resumable across restarts. + item = q.get_next() + while item is not None: + try: + process(item.data) + q.complete(item.id, output={"ok": True}) + except BusinessError as exc: # bad data — don't retry + q.fail(item.id, str(exc), kind="business") + except Exception as exc: # transient — retry + q.fail(item.id, str(exc), kind="application") + item = q.get_next() + +``get_next`` atomically claims the oldest ``new`` item (marking it +``in_progress``) so multiple performers don't double-process. + + +Failure semantics +================= + +Two failure kinds, mirroring REFramework: + +* **application** (transient — a timeout, a stale element): the item is + retried up to ``max_retries`` (default 3), then marked ``failed``. +* **business** (the data itself is invalid): never retried — marked + ``failed`` immediately. Raise :class:`BusinessError` or pass + ``kind="business"``. + +``stats()`` returns per-status counts (``new`` / ``in_progress`` / +``success`` / ``failed``) for dashboards and run reports. + + +Executor commands +================= + +* ``AC_queue_add`` — enqueue ``data`` (dedup by ``reference``). +* ``AC_queue_next`` — claim the next item (or null when drained). +* ``AC_queue_complete`` — mark an item successful. +* ``AC_queue_fail`` — fail with ``kind`` (``application`` / ``business``). +* ``AC_queue_stats`` — per-status counts. + +The same ``db`` file + ``name`` identify a queue, so a dispatcher script +and a performer script (or many parallel performers) share it. diff --git a/docs/source/Eng/eng_index.rst b/docs/source/Eng/eng_index.rst index 2c63bef7..b0a6be85 100644 --- a/docs/source/Eng/eng_index.rst +++ b/docs/source/Eng/eng_index.rst @@ -32,6 +32,7 @@ Comprehensive guides for all AutoControl features. doc/new_features/v7_features_doc doc/new_features/v8_features_doc doc/new_features/v9_features_doc + doc/new_features/v10_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/docs/source/Zh/doc/new_features/v10_features_doc.rst b/docs/source/Zh/doc/new_features/v10_features_doc.rst new file mode 100644 index 00000000..67e49499 --- /dev/null +++ b/docs/source/Zh/doc/new_features/v10_features_doc.rst @@ -0,0 +1,72 @@ +==================================== +新功能 (2026-06-19) — 交易式工作佇列 +==================================== + +把 AutoControl 從「跑一支腳本」升級成「跑一個機器人」。以 SQLite 為底的 +工作佇列實作了標準生產級 RPA 的 dispatcher/performer 模式:*dispatcher* +把工作項目入列,*performer* 一次處理一項,具備每項狀態、去重與重試—— +因此處理上千項的執行能在**當機後續跑**,並可由多個 worker 平行處理。 + +純標準庫、完全 headless,走完整五層(facade、``AC_*`` 執行器指令、 +MCP 工具、Script Builder)。由競品研究指出為相對 UiPath Orchestrator +佇列 / REFramework 所缺的一塊。 + +.. contents:: + :local: + :depth: 2 + + +Dispatcher / performer +====================== + +:: + + from je_auto_control import WorkQueue + + q = WorkQueue("run.db", name="invoices") + + # Dispatcher:把工作入列(依 live reference 去重)。 + for inv in invoices: + q.add({"path": inv}, reference=inv) + + # Performer:把佇列處理完,可跨重啟續跑。 + item = q.get_next() + while item is not None: + try: + process(item.data) + q.complete(item.id, output={"ok": True}) + except BusinessError as exc: # 資料有問題——不重試 + q.fail(item.id, str(exc), kind="business") + except Exception as exc: # 暫時性——重試 + q.fail(item.id, str(exc), kind="application") + item = q.get_next() + +``get_next`` 會原子性地認領最舊的 ``new`` 項目(標為 ``in_progress``), +因此多個 performer 不會重複處理。 + + +失敗語意 +======== + +兩種失敗類型,對應 REFramework: + +* **application**(暫時性——逾時、stale element):重試至 ``max_retries`` + (預設 3)次,然後標為 ``failed``。 +* **business**(資料本身無效):永不重試——立即標為 ``failed``。請丟出 + :class:`BusinessError` 或傳 ``kind="business"``。 + +``stats()`` 回傳各狀態計數(``new`` / ``in_progress`` / ``success`` / +``failed``),供儀表板與執行報告使用。 + + +執行器指令 +========== + +* ``AC_queue_add`` — 入列 ``data``(依 ``reference`` 去重)。 +* ``AC_queue_next`` — 認領下一項(排空時回 null)。 +* ``AC_queue_complete`` — 標記項目成功。 +* ``AC_queue_fail`` — 以 ``kind``(``application`` / ``business``)失敗。 +* ``AC_queue_stats`` — 各狀態計數。 + +同一個 ``db`` 檔 + ``name`` 識別一個佇列,因此 dispatcher 腳本與 performer +腳本(或多個平行 performer)可共用它。 diff --git a/docs/source/Zh/zh_index.rst b/docs/source/Zh/zh_index.rst index d6fe8a69..d54859dd 100644 --- a/docs/source/Zh/zh_index.rst +++ b/docs/source/Zh/zh_index.rst @@ -32,6 +32,7 @@ AutoControl 所有功能的完整使用指南。 doc/new_features/v7_features_doc doc/new_features/v8_features_doc doc/new_features/v9_features_doc + doc/new_features/v10_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/je_auto_control/__init__.py b/je_auto_control/__init__.py index 77ea570c..df9067c3 100644 --- a/je_auto_control/__init__.py +++ b/je_auto_control/__init__.py @@ -109,6 +109,10 @@ from je_auto_control.utils.session_guard import ( ensure_interactive_session, is_session_locked, ) +# Transactional work queue (dispatcher/performer) +from je_auto_control.utils.work_queue import ( + BusinessError, WorkItem, WorkQueue, +) # Background popup/interrupt watchdog (unattended automation) from je_auto_control.utils.watchdog import ( PopupWatchdog, WatchdogRule, default_popup_watchdog, @@ -509,6 +513,7 @@ def start_autocontrol_gui(*args, **kwargs): "generate_totp", "verify_totp", "generate_secret", "TOTPError", "handle_file_dialog", "FileDialogDriver", "ensure_interactive_session", "is_session_locked", + "WorkQueue", "WorkItem", "BusinessError", # MCP server "AuditLogger", "HttpMCPServer", "MCPContent", "MCPPrompt", "MCPPromptArgument", "MCPResource", "MCPServer", "MCPTool", diff --git a/je_auto_control/gui/script_builder/command_schema.py b/je_auto_control/gui/script_builder/command_schema.py index 54c1fe9e..82cc9ac6 100644 --- a/je_auto_control/gui/script_builder/command_schema.py +++ b/je_auto_control/gui/script_builder/command_schema.py @@ -652,6 +652,46 @@ def _add_misc_specs(specs: List[CommandSpec]) -> None: "AC_assert_session_active", "Flow", "Assert Session Active", description="Fail if the session is locked / non-interactive.", )) + _add_work_queue_specs(specs) + + +def _add_work_queue_specs(specs: List[CommandSpec]) -> None: + db = FieldSpec("db", FieldType.FILE_PATH) + name = FieldSpec("name", FieldType.STRING, optional=True, default="default") + specs.append(CommandSpec( + "AC_queue_add", "Queue", "Queue: Add Item", + fields=(db, FieldSpec("reference", FieldType.STRING, optional=True), + name), + description="Enqueue a work item (data via JSON view); dedupes by " + "reference.", + )) + specs.append(CommandSpec( + "AC_queue_next", "Queue", "Queue: Get Next Item", + fields=(db, name), + description="Atomically claim the next work item (performer).", + )) + specs.append(CommandSpec( + "AC_queue_complete", "Queue", "Queue: Complete Item", + fields=(db, FieldSpec("item_id", FieldType.INT), name), + description="Mark a work item successfully processed.", + )) + specs.append(CommandSpec( + "AC_queue_fail", "Queue", "Queue: Fail Item", + fields=(db, FieldSpec("item_id", FieldType.INT), + FieldSpec("error", FieldType.STRING), + FieldSpec("kind", FieldType.ENUM, + choices=("application", "business"), + optional=True, default="application"), + FieldSpec("max_retries", FieldType.INT, optional=True, + default=3), + name), + description="Fail an item; application errors retry, business don't.", + )) + specs.append(CommandSpec( + "AC_queue_stats", "Queue", "Queue: Stats", + fields=(db, name), + description="Per-status counts for a work queue.", + )) specs.append(CommandSpec( "AC_shell_command", "Shell", "Shell Command", fields=(FieldSpec("shell_command", FieldType.STRING),), diff --git a/je_auto_control/utils/executor/action_executor.py b/je_auto_control/utils/executor/action_executor.py index e2de074c..61e942e2 100644 --- a/je_auto_control/utils/executor/action_executor.py +++ b/je_auto_control/utils/executor/action_executor.py @@ -2306,6 +2306,46 @@ def _assert_session_active() -> Dict[str, Any]: return {"interactive": ensure_interactive_session()} +def _queue(db: str, name: str): + from je_auto_control.utils.work_queue import WorkQueue + return WorkQueue(db, name) + + +def _queue_add(db: str, data: Any, reference: Optional[str] = None, + name: str = "default") -> Dict[str, Any]: + """Adapter: enqueue a work item (skips live duplicate references).""" + return {"id": _queue(db, name).add(data, reference=reference)} + + +def _queue_next(db: str, name: str = "default") -> Optional[Dict[str, Any]]: + """Adapter: atomically claim the next work item (or None).""" + item = _queue(db, name).get_next() + return None if item is None else { + "id": item.id, "reference": item.reference, "data": item.data, + "status": item.status, "retries": item.retries} + + +def _queue_complete(db: str, item_id: int, output: Any = None, + name: str = "default") -> Dict[str, Any]: + """Adapter: mark a work item successful.""" + _queue(db, name).complete(int(item_id), output=output) + return {"id": int(item_id), "status": "success"} + + +def _queue_fail(db: str, item_id: int, error: str, + kind: str = "application", max_retries: int = 3, + name: str = "default") -> Dict[str, Any]: + """Adapter: fail a work item (application errors retry, business don't).""" + status = _queue(db, name).fail(int(item_id), str(error), kind=str(kind), + max_retries=int(max_retries)) + return {"id": int(item_id), "status": status} + + +def _queue_stats(db: str, name: str = "default") -> Dict[str, int]: + """Adapter: return per-status counts for a work queue.""" + return _queue(db, name).stats() + + class Executor: """ Executor @@ -2464,6 +2504,11 @@ def __init__(self): "AC_watchdog_list": _watchdog_list, "AC_handle_file_dialog": _handle_file_dialog, "AC_assert_session_active": _assert_session_active, + "AC_queue_add": _queue_add, + "AC_queue_next": _queue_next, + "AC_queue_complete": _queue_complete, + "AC_queue_fail": _queue_fail, + "AC_queue_stats": _queue_stats, "AC_a11y_record_start": _a11y_record_start, "AC_a11y_record_stop": _a11y_record_stop, "AC_a11y_record_events": _a11y_record_events, diff --git a/je_auto_control/utils/mcp_server/tools/_factories.py b/je_auto_control/utils/mcp_server/tools/_factories.py index d1ddd73e..447bf76d 100644 --- a/je_auto_control/utils/mcp_server/tools/_factories.py +++ b/je_auto_control/utils/mcp_server/tools/_factories.py @@ -1669,6 +1669,64 @@ def process_and_shell_tools() -> List[MCPTool]: ] +def work_queue_tools() -> List[MCPTool]: + _Q = {"db": {"type": "string"}, "name": {"type": "string"}} + return [ + MCPTool( + name="ac_queue_add", + description=("Enqueue a work item into a SQLite-backed queue " + "(dispatcher). 'data' is the item payload; a live " + "duplicate 'reference' is skipped. Returns {id} (null " + "if deduped)."), + input_schema=schema({"data": {"type": "object"}, + "reference": {"type": "string"}, **_Q}, + required=["db", "data"]), + handler=h.queue_add, + annotations=SIDE_EFFECT_ONLY, + ), + MCPTool( + name="ac_queue_next", + description=("Atomically claim the next 'new' work item " + "(performer), marking it in-progress. Returns the " + "item or null when the queue is drained."), + input_schema=schema(dict(_Q), required=["db"]), + handler=h.queue_next, + annotations=SIDE_EFFECT_ONLY, + ), + MCPTool( + name="ac_queue_complete", + description="Mark a claimed work item successfully processed.", + input_schema=schema({"item_id": {"type": "integer"}, + "output": {}, **_Q}, + required=["db", "item_id"]), + handler=h.queue_complete, + annotations=SIDE_EFFECT_ONLY, + ), + MCPTool( + name="ac_queue_fail", + description=("Fail a work item. kind='application' (default) " + "retries up to max_retries then marks failed; " + "kind='business' fails immediately (bad data, no " + "retry). Returns the resulting status."), + input_schema=schema({"item_id": {"type": "integer"}, + "error": {"type": "string"}, + "kind": {"type": "string"}, + "max_retries": {"type": "integer"}, **_Q}, + required=["db", "item_id", "error"]), + handler=h.queue_fail, + annotations=SIDE_EFFECT_ONLY, + ), + MCPTool( + name="ac_queue_stats", + description=("Return per-status item counts (new / in_progress / " + "success / failed) for a work queue."), + input_schema=schema(dict(_Q), required=["db"]), + handler=h.queue_stats, + annotations=READ_ONLY, + ), + ] + + def unattended_tools() -> List[MCPTool]: return [ MCPTool( @@ -2698,7 +2756,7 @@ def media_assert_tools() -> List[MCPTool]: computer_use_tools, dag_tools, presence_tools, chatops_tools, redaction_tools, android_widget_tools, ios_tools, webrunner_tools, scheduler_tools, trigger_tools, hotkey_tools, watchdog_tools, - unattended_tools, + unattended_tools, work_queue_tools, screen_record_tools, process_and_shell_tools, remote_desktop_tools, gamepad_tools, usb_passthrough_tools, assertion_tools, data_source_tools, diff --git a/je_auto_control/utils/mcp_server/tools/_handlers.py b/je_auto_control/utils/mcp_server/tools/_handlers.py index 77b1ddcd..5d9ec22d 100644 --- a/je_auto_control/utils/mcp_server/tools/_handlers.py +++ b/je_auto_control/utils/mcp_server/tools/_handlers.py @@ -793,6 +793,39 @@ def assert_session_active(): return {"interactive": ensure_interactive_session()} +def _work_queue(db, name): + from je_auto_control.utils.work_queue import WorkQueue + return WorkQueue(db, name) + + +def queue_add(db, data, reference=None, name="default"): + return {"id": _work_queue(db, name).add(data, reference=reference)} + + +def queue_next(db, name="default"): + item = _work_queue(db, name).get_next() + return None if item is None else { + "id": item.id, "reference": item.reference, "data": item.data, + "status": item.status, "retries": item.retries} + + +def queue_complete(db, item_id, output=None, name="default"): + _work_queue(db, name).complete(int(item_id), output=output) + return {"id": int(item_id), "status": "success"} + + +def queue_fail(db, item_id, error, kind="application", max_retries=3, + name="default"): + status = _work_queue(db, name).fail(int(item_id), str(error), + kind=str(kind), + max_retries=int(max_retries)) + return {"id": int(item_id), "status": status} + + +def queue_stats(db, name="default"): + return _work_queue(db, name).stats() + + def vlm_locate(description: str, screen_region: Optional[List[int]] = None, model: Optional[str] = None) -> Optional[List[int]]: diff --git a/je_auto_control/utils/work_queue/__init__.py b/je_auto_control/utils/work_queue/__init__.py new file mode 100644 index 00000000..8f351971 --- /dev/null +++ b/je_auto_control/utils/work_queue/__init__.py @@ -0,0 +1,6 @@ +"""Transactional work queue (dispatcher/performer) for resilient bulk runs.""" +from je_auto_control.utils.work_queue.work_queue import ( + BusinessError, WorkItem, WorkQueue, +) + +__all__ = ["BusinessError", "WorkItem", "WorkQueue"] diff --git a/je_auto_control/utils/work_queue/work_queue.py b/je_auto_control/utils/work_queue/work_queue.py new file mode 100644 index 00000000..0e6db16b --- /dev/null +++ b/je_auto_control/utils/work_queue/work_queue.py @@ -0,0 +1,168 @@ +"""SQLite-backed transactional work queue (dispatcher / performer). + +The standard production-RPA pattern: instead of one long script, a +*dispatcher* enqueues work items and a *performer* processes them one at +a time with per-item status, retry, and dedup — so a run of 10k items is +resumable after a crash and parallelizable across workers. + +Two failure kinds, mirroring REFramework: + +* **application error** (transient — a timeout, a stale element): the item + is retried up to ``max_retries`` then marked ``failed``. +* **business error** (the data itself is invalid): never retried — marked + ``failed`` immediately. Raise :class:`BusinessError` (or pass + ``kind="business"``) to signal it. + +Pure standard library (``sqlite3``); imports no ``PySide6``. +""" +import json +import sqlite3 +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +STATUS_NEW = "new" +STATUS_IN_PROGRESS = "in_progress" +STATUS_SUCCESS = "success" +STATUS_FAILED = "failed" + + +class BusinessError(Exception): + """A non-retryable, data-level failure of a work item.""" + + +@dataclass +class WorkItem: + """One unit of work and its processing state.""" + id: int + reference: str + data: Dict[str, Any] + status: str + retries: int + error: str = "" + output: str = "" + + +class WorkQueue: + """A named, SQLite-backed queue of work items.""" + + def __init__(self, db_path: str, name: str = "default") -> None: + self._db_path = db_path + self._name = name + self._ensure_schema() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self._db_path, timeout=30.0, + isolation_level=None) + conn.row_factory = sqlite3.Row + return conn + + def _ensure_schema(self) -> None: + with self._connect() as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS work_items (" + "id INTEGER PRIMARY KEY AUTOINCREMENT, queue TEXT NOT NULL, " + "reference TEXT, data TEXT NOT NULL, status TEXT NOT NULL, " + "retries INTEGER NOT NULL DEFAULT 0, error TEXT DEFAULT '', " + "output TEXT DEFAULT '', updated REAL NOT NULL)") + + def add(self, data: Dict[str, Any], *, reference: Optional[str] = None, + dedupe: bool = True) -> Optional[int]: + """Enqueue an item; skip (return None) on a live duplicate reference.""" + with self._connect() as conn: + if dedupe and reference and self._has_pending(conn, reference): + return None + cur = conn.execute( + "INSERT INTO work_items (queue, reference, data, status, " + "updated) VALUES (?, ?, ?, ?, ?)", + (self._name, reference or "", json.dumps(data), STATUS_NEW, + time.time())) + return int(cur.lastrowid) + + def _has_pending(self, conn: sqlite3.Connection, reference: str) -> bool: + row = conn.execute( + "SELECT 1 FROM work_items WHERE queue=? AND reference=? AND " + "status IN (?, ?) LIMIT 1", + (self._name, reference, STATUS_NEW, STATUS_IN_PROGRESS)).fetchone() + return row is not None + + def get_next(self) -> Optional[WorkItem]: + """Atomically claim the oldest ``new`` item, marking it in-progress.""" + with self._connect() as conn: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute( + "SELECT * FROM work_items WHERE queue=? AND status=? " + "ORDER BY id LIMIT 1", (self._name, STATUS_NEW)).fetchone() + if row is None: + conn.execute("COMMIT") + return None + conn.execute( + "UPDATE work_items SET status=?, updated=? WHERE id=?", + (STATUS_IN_PROGRESS, time.time(), row["id"])) + conn.execute("COMMIT") + return _row_to_item(dict(row), status=STATUS_IN_PROGRESS) + + def complete(self, item_id: int, *, output: Any = None) -> None: + """Mark an item successfully processed.""" + self._set_status(item_id, STATUS_SUCCESS, + output=json.dumps(output) if output is not None else "") + + def fail(self, item_id: int, error: str, *, kind: str = "application", + max_retries: int = 3) -> str: + """Fail an item; application errors retry, business errors don't. + + Returns the resulting status (``new`` when requeued, else ``failed``). + """ + with self._connect() as conn: + row = conn.execute("SELECT retries FROM work_items WHERE id=?", + (item_id,)).fetchone() + retries = int(row["retries"]) if row else 0 + retryable = kind == "application" and retries < int(max_retries) + status = STATUS_NEW if retryable else STATUS_FAILED + conn.execute( + "UPDATE work_items SET status=?, retries=?, error=?, updated=? " + "WHERE id=?", + (status, retries + 1, str(error), time.time(), item_id)) + return status + + def _set_status(self, item_id: int, status: str, *, output: str = "") -> None: + with self._connect() as conn: + conn.execute( + "UPDATE work_items SET status=?, output=?, updated=? WHERE id=?", + (status, output, time.time(), item_id)) + + def stats(self) -> Dict[str, int]: + """Return a count of items per status for this queue.""" + with self._connect() as conn: + rows = conn.execute( + "SELECT status, COUNT(*) c FROM work_items WHERE queue=? " + "GROUP BY status", (self._name,)).fetchall() + counts = {STATUS_NEW: 0, STATUS_IN_PROGRESS: 0, + STATUS_SUCCESS: 0, STATUS_FAILED: 0} + for row in rows: + counts[row["status"]] = int(row["c"]) + return counts + + def list_items(self, *, status: Optional[str] = None, + limit: int = 100) -> List[WorkItem]: + """List items, optionally filtered by status.""" + with self._connect() as conn: + if status: + rows = conn.execute( + "SELECT * FROM work_items WHERE queue=? AND status=? " + "ORDER BY id LIMIT ?", + (self._name, status, int(limit))).fetchall() + else: + rows = conn.execute( + "SELECT * FROM work_items WHERE queue=? ORDER BY id " + "LIMIT ?", (self._name, int(limit))).fetchall() + return [_row_to_item(dict(row)) for row in rows] + + +def _row_to_item(row: Dict[str, Any], + status: Optional[str] = None) -> WorkItem: + return WorkItem( + id=int(row["id"]), reference=row["reference"] or "", + data=json.loads(row["data"]), status=status or row["status"], + retries=int(row["retries"]), error=row.get("error") or "", + output=row.get("output") or "") diff --git a/test/unit_test/headless/test_work_queue.py b/test/unit_test/headless/test_work_queue.py new file mode 100644 index 00000000..f0617f2e --- /dev/null +++ b/test/unit_test/headless/test_work_queue.py @@ -0,0 +1,94 @@ +"""Headless tests for the transactional work queue (dispatcher/performer). + +Uses a temporary SQLite file; no external services. Covers FIFO claim, +dedup, completion, application-vs-business failure semantics, stats, and +the executor/MCP/builder wiring.""" +import pytest + +import je_auto_control as ac +from je_auto_control.utils.work_queue import WorkQueue + + +@pytest.fixture() +def q(tmp_path): + return WorkQueue(str(tmp_path / "q.db"), "test") + + +def test_add_and_get_next_is_fifo(q): + first, second = q.add({"n": 1}), q.add({"n": 2}) + assert first < second + item = q.get_next() + assert item.data == {"n": 1} + assert item.status == "in_progress" + assert q.get_next().data == {"n": 2} + assert q.get_next() is None + + +def test_dedupe_by_reference(q): + assert q.add({"x": 1}, reference="r1") is not None + assert q.add({"x": 2}, reference="r1") is None # live duplicate + item = q.get_next() + q.complete(item.id) + assert q.add({"x": 3}, reference="r1") is not None # ref free again + + +def test_complete_records_success(q): + q.add({"n": 1}) + item = q.get_next() + q.complete(item.id, output={"ok": True}) + stats = q.stats() + assert stats["success"] == 1 and stats["new"] == 0 + + +def test_application_error_retries_then_fails(q): + q.add({"n": 1}) + item = q.get_next() + assert q.fail(item.id, "boom", kind="application", max_retries=2) == "new" + item = q.get_next() + assert item.retries == 1 + assert q.fail(item.id, "boom", max_retries=2) == "new" + item = q.get_next() + assert item.retries == 2 + assert q.fail(item.id, "boom", max_retries=2) == "failed" + assert q.stats()["failed"] == 1 + + +def test_business_error_never_retries(q): + q.add({"n": 1}) + item = q.get_next() + assert q.fail(item.id, "bad data", kind="business") == "failed" + assert q.stats()["failed"] == 1 + assert q.get_next() is None + + +def test_list_items_by_status(q): + q.add({"n": 1}) + q.add({"n": 2}) + assert len(q.list_items(status="new")) == 2 + assert len(q.list_items(status="success")) == 0 + + +def test_executor_and_mcp_wiring(tmp_path): + db = str(tmp_path / "e.db") + rec = ac.execute_action( + [["AC_queue_add", {"db": db, "data": {"n": 1}, "name": "t"}]]) + assert any("id" in str(v) for v in rec.values()) + nxt = ac.execute_action([["AC_queue_next", {"db": db, "name": "t"}]]) + assert any("'n': 1" in str(v) for v in nxt.values()) + known = ac.executor.known_commands() + assert {"AC_queue_add", "AC_queue_next", "AC_queue_complete", + "AC_queue_fail", "AC_queue_stats"} <= known + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {t.name for t in build_default_tool_registry()} + assert {"ac_queue_add", "ac_queue_next", "ac_queue_complete", + "ac_queue_fail", "ac_queue_stats"} <= names + + +def test_facade_and_builder(): + assert ac.WorkQueue is WorkQueue + from je_auto_control.gui.script_builder.command_schema import _build_specs + cmds = {s.command for s in _build_specs()} + qcmds = {"AC_queue_add", "AC_queue_next", "AC_queue_complete", + "AC_queue_fail", "AC_queue_stats"} + assert qcmds <= cmds + assert qcmds <= ac.executor.known_commands()