diff --git a/README.md b/README.md index 3f97c263..92f11326 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ ## Table of Contents +- [What's new (2026-06-18)](#whats-new-2026-06-18) - [What's new (2026-06-17)](#whats-new-2026-06-17) - [What's new (2026-06)](#whats-new-2026-06) - [What's new (2026-05)](#whats-new-2026-05) @@ -57,6 +58,35 @@ --- +## What's new (2026-06-18) + +Eight headless capabilities that round out scripting, integration, and CI +use: a real command-line interface, recording-to-code generation, and +first-class HTTP / SQL / email / PDF / wait steps. Each ships a headless +Python API, an `AC_*` executor command, an MCP tool, and a visual Script +Builder entry, and is covered by headless tests (network / SMTP / PDF +backends are injected, so nothing touches the outside world). Full +reference page: +[`docs/source/Eng/doc/new_features/v5_features_doc.rst`](docs/source/Eng/doc/new_features/v5_features_doc.rst). + +**Command-line interface** +- **`je_auto_control` console script** — run and inspect action files from a shell / CI: `run` (with `--var`, `--dry-run`), `validate` (alias `lint`), `list-commands`, `fmt`, `record`, `codegen`, `version`. + +**Code generation** +- **Recording → code** — `generate_code` / `generate_code_file` (`AC_generate_code`, `je_auto_control codegen`) turn a recording or action file into a pytest test, standalone Python, or Robot suite. The default `calls` style emits readable `ac.(...)` statements, falling back to `ac.execute_action([...])` for flow control. + +**Integrations** +- **HTTP / API** — `http_request` (`AC_http_request`): method, headers, JSON or raw body, basic / bearer auth, explicit timeout; non-2xx responses are returned (not raised) so you can assert on status. `AC_http_to_var` now shares the client and can POST bodies. +- **SQL** — `query_sqlite` (`AC_sql_to_var` / `AC_assert_db`): read-only, parameter-bound SQLite queries into a variable, or a scalar assertion (e.g. `SELECT COUNT(*) ... == 0`). +- **Email (SMTP)** — `send_email` (`AC_send_email`): stdlib SMTP with TLS on by default (STARTTLS or implicit SSL over a verified context), attachments, and multiple recipients. +- **PDF** — `extract_pdf_text` / `pdf_metadata` / `assert_pdf_text` (`AC_pdf_to_var` / `AC_assert_pdf_text`): text extraction and content assertions, backed by the optional `pypdf` extra (`pip install je_auto_control[pdf]`). + +**Smart waits** +- **Wait for a file** — `wait_until_file` (`AC_wait_for_file`) blocks until a file exists and its size stops growing (a download finished writing). +- **Wait for a TCP port** — `wait_until_port` (`AC_wait_for_port`) blocks until `host:port` accepts connections (pairs with `launch_process`). + +**Security** — HTTP / SMTP enforce http/https or TLS with verified certificates and explicit timeouts; SQL is read-only and parameter-bound; file paths are resolved before I/O. + ## What's new (2026-06-17) Thirty-plus automation primitives across input realism, vision, flow diff --git a/README/README_zh-CN.md b/README/README_zh-CN.md index ddba273e..037d66e2 100644 --- a/README/README_zh-CN.md +++ b/README/README_zh-CN.md @@ -12,6 +12,7 @@ ## 目录 +- [本次更新 (2026-06-18)](#本次更新-2026-06-18) - [本次更新 (2026-06-17)](#本次更新-2026-06-17) - [本次更新 (2026-06)](#本次更新-2026-06) - [本次更新 (2026-05)](#本次更新-2026-05) @@ -56,6 +57,29 @@ --- +## 本次更新 (2026-06-18) + +八项 headless 能力,补齐脚本化、集成与 CI 场景:真正的命令行界面、把录制转成代码,以及一级的 HTTP / SQL / Email / PDF / 等待步骤。每项都附带 headless API、`AC_*` 执行器指令、MCP 工具与可视化脚本构建器项目,并有 headless 测试(网络 / SMTP / PDF 后端均注入,不接触外部系统)。完整参考页: +[`docs/source/Eng/doc/new_features/v5_features_doc.rst`](../docs/source/Eng/doc/new_features/v5_features_doc.rst)。 + +**命令行界面** +- **`je_auto_control` console script** — 在 shell/CI 运行与检查动作文件:`run`(含 `--var`、`--dry-run`)、`validate`(别名 `lint`)、`list-commands`、`fmt`、`record`、`codegen`、`version`。 + +**代码生成** +- **录制 → 代码** — `generate_code` / `generate_code_file`(`AC_generate_code`、`je_auto_control codegen`):把录制或动作文件转成 pytest/独立 Python/Robot 脚本。默认 `calls` 风格生成可读的 `ac.(...)`,流程控制退回 `ac.execute_action([...])`。 + +**集成** +- **HTTP / API** — `http_request`(`AC_http_request`):method、headers、JSON/原始 body、basic/bearer 认证、明确超时;非 2xx 返回而非抛异常。`AC_http_to_var` 现共用此客户端,可发送 body。 +- **SQL** — `query_sqlite`(`AC_sql_to_var` / `AC_assert_db`):只读、参数绑定的 SQLite 查询,存入变量或做标量断言。 +- **Email(SMTP)** — `send_email`(`AC_send_email`):标准库 SMTP,默认 TLS(STARTTLS/SSL、已验证证书),支持附件与多收件人。 +- **PDF** — `extract_pdf_text` / `pdf_metadata` / `assert_pdf_text`(`AC_pdf_to_var` / `AC_assert_pdf_text`):文本提取与内容断言,后端为可选 `pypdf`(`pip install je_auto_control[pdf]`)。 + +**智能等待** +- **等待文件** — `wait_until_file`(`AC_wait_for_file`):等到文件存在且大小停止增长(下载写完)。 +- **等待 TCP 端口** — `wait_until_port`(`AC_wait_for_port`):等到 `host:port` 可连接(与 `launch_process` 互补)。 + +**安全性** — HTTP/SMTP 强制 http/https 或已验证 TLS 与明确超时;SQL 只读且参数绑定;文件路径 I/O 前以 `realpath` 解析。 + ## 本次更新 (2026-06-17) 新增 30+ 个自动化原语,涵盖输入拟真、视觉、流程控制、触发器、窗口管理与文件安全, diff --git a/README/README_zh-TW.md b/README/README_zh-TW.md index 4fd4a569..97e00192 100644 --- a/README/README_zh-TW.md +++ b/README/README_zh-TW.md @@ -12,6 +12,7 @@ ## 目錄 +- [本次更新 (2026-06-18)](#本次更新-2026-06-18) - [本次更新 (2026-06-17)](#本次更新-2026-06-17) - [本次更新 (2026-06)](#本次更新-2026-06) - [本次更新 (2026-05)](#本次更新-2026-05) @@ -56,6 +57,29 @@ --- +## 本次更新 (2026-06-18) + +八項 headless 能力,補齊腳本化、整合與 CI 情境:真正的命令列介面、把錄製轉成程式碼,以及一級的 HTTP / SQL / Email / PDF / 等待步驟。每項都附帶 headless API、`AC_*` 執行器指令、MCP 工具與視覺化腳本建構器項目,並有 headless 測試(網路 / SMTP / PDF 後端皆注入,不碰外部系統)。完整參考頁: +[`docs/source/Zh/doc/new_features/v5_features_doc.rst`](../docs/source/Zh/doc/new_features/v5_features_doc.rst)。 + +**命令列介面** +- **`je_auto_control` console script** — 在 shell/CI 執行與檢查動作檔:`run`(含 `--var`、`--dry-run`)、`validate`(別名 `lint`)、`list-commands`、`fmt`、`record`、`codegen`、`version`。 + +**程式碼產生** +- **錄製 → 程式碼** — `generate_code` / `generate_code_file`(`AC_generate_code`、`je_auto_control codegen`):把錄製或動作檔轉成 pytest/獨立 Python/Robot 腳本。預設 `calls` 風格產生可讀的 `ac.(...)`,流程控制退回 `ac.execute_action([...])`。 + +**整合** +- **HTTP / API** — `http_request`(`AC_http_request`):method、headers、JSON/原始 body、basic/bearer 認證、明確逾時;非 2xx 回傳而非丟例外。`AC_http_to_var` 現共用此客戶端,可送 body。 +- **SQL** — `query_sqlite`(`AC_sql_to_var` / `AC_assert_db`):唯讀、參數綁定的 SQLite 查詢,存入變數或做純量斷言。 +- **Email(SMTP)** — `send_email`(`AC_send_email`):標準庫 SMTP,預設 TLS(STARTTLS/SSL、已驗證憑證),支援附件與多收件人。 +- **PDF** — `extract_pdf_text` / `pdf_metadata` / `assert_pdf_text`(`AC_pdf_to_var` / `AC_assert_pdf_text`):文字抽取與內容斷言,後端為可選 `pypdf`(`pip install je_auto_control[pdf]`)。 + +**智慧等待** +- **等待檔案** — `wait_until_file`(`AC_wait_for_file`):等到檔案存在且大小停止增長(下載寫完)。 +- **等待 TCP 連接埠** — `wait_until_port`(`AC_wait_for_port`):等到 `host:port` 可連線(與 `launch_process` 互補)。 + +**安全性** — HTTP/SMTP 強制 http/https 或已驗證 TLS 與明確逾時;SQL 唯讀且參數綁定;檔案路徑 I/O 前以 `realpath` 解析。 + ## 本次更新 (2026-06-17) 新增 30+ 個自動化原語,涵蓋輸入擬真、視覺、流程控制、觸發器、視窗管理與檔案安全, diff --git a/docs/source/Eng/doc/new_features/v5_features_doc.rst b/docs/source/Eng/doc/new_features/v5_features_doc.rst new file mode 100644 index 00000000..d927eae0 --- /dev/null +++ b/docs/source/Eng/doc/new_features/v5_features_doc.rst @@ -0,0 +1,162 @@ +================================================ +New Features (2026-06-18) — CLI & Integrations +================================================ + +Eight headless capabilities that round out scripting, integration, and +continuous-integration use: a real command-line interface, +recording-to-code generation, and first-class HTTP / SQL / email / PDF / +wait steps. Every feature ships a headless Python API, an ``AC_*`` +executor command, an MCP tool, and a visual Script Builder entry, and is +covered by headless tests — the network, SMTP, and PDF backends are +injected, so nothing touches the outside world. + +.. contents:: + :local: + :depth: 2 + + +Command-line interface +====================== + +The package now installs a ``je_auto_control`` console script for running +and inspecting action files from a shell or CI pipeline:: + + je_auto_control run script.json --var user=alice --dry-run + je_auto_control validate script.json # alias: lint + je_auto_control list-commands --filter mouse --json + je_auto_control fmt script.json --check + je_auto_control record out.json --duration 5 + je_auto_control codegen script.json --target pytest -o test_flow.py + je_auto_control version + +``run`` executes (``--dry-run`` validates and lists steps without acting), +``validate`` / ``lint`` checks structure and rejects unknown commands, +``fmt`` canonicalises the JSON, ``record`` captures input, ``codegen`` +emits source (below), and ``list-commands`` prints the live executor +catalogue. + + +Code generation +=============== + +Turn a recording or an action file into committable, runnable source:: + + from je_auto_control import generate_code, generate_code_file + + code = generate_code(actions, target="pytest", style="calls") + generate_code_file("flow.json", "test_flow.py", target="pytest") + +``target`` is ``pytest`` / ``python`` / ``robot``. The default ``calls`` +style maps each ``AC_*`` command to its facade call +(``ac.click_mouse(...)``) and falls back to ``ac.execute_action([...])`` +for flow control and private adapters; the ``actions`` style embeds the +list and replays it through the executor. + +Executor command: ``AC_generate_code``. CLI: ``je_auto_control codegen``. + + +HTTP / API +========== + +A dependency-free HTTP(S) client for hybrid UI + API flows:: + + from je_auto_control import http_request + + resp = http_request( + "https://api.example/items", method="POST", + json_body={"name": "Sam"}, + headers={"X-Trace": "1"}, + auth={"type": "bearer", "token": "..."}, + timeout=30.0) + assert resp["status"] == 201 + +Returns ``{status, ok, headers, text, json, url}``; non-2xx responses are +returned rather than raised, so you can assert on the status code. Only +``http`` / ``https`` schemes are allowed. ``AC_http_to_var`` now shares +the same client, so it can POST bodies and send headers / auth. + +Executor command: ``AC_http_request``. + + +SQL +=== + +Read-only, parameter-bound SQLite queries:: + + from je_auto_control import query_sqlite + + rows = query_sqlite("app.db", "SELECT id, name FROM users") + count = query_sqlite("app.db", + "SELECT COUNT(*) FROM users WHERE active = ?", + params=[1], fetch="scalar") + +Queries are restricted to a single read-only ``SELECT`` / ``WITH`` +statement, run over a read-only connection, with values always bound as +parameters (never string-interpolated). + +Executor commands: ``AC_sql_to_var`` (rows / one row / scalar into a +variable) and ``AC_assert_db`` (a scalar query asserted with +eq / ne / lt / gt / contains / ...). + + +Email (SMTP) +============ + +Send mail — for example a flow's report — over the standard library:: + + from je_auto_control import send_email + + send_email( + {"sender": "bot@x.com", "to": ["qa@x.com"], + "subject": "Run passed", "body": "All green", + "attachments": ["report.html"]}, + {"host": "smtp.x.com", "port": 587, + "username": "bot@x.com", "password": "..."}) + +TLS is enabled by default (STARTTLS, or implicit SSL when ``use_ssl`` is +set) over a verified default context; supports multiple recipients, CC, +HTML bodies, and file attachments. + +Executor command: ``AC_send_email``. + + +PDF +=== + +Extract text from and assert on PDF documents (optional ``pypdf`` +backend — ``pip install je_auto_control[pdf]``):: + + from je_auto_control import extract_pdf_text, assert_pdf_text + + text = extract_pdf_text("invoice.pdf", pages=1) + assert_pdf_text("invoice.pdf", "Total: $50.00") + +Executor commands: ``AC_pdf_to_var`` (text into a variable) and +``AC_assert_pdf_text`` (text present / absent, optionally on a page). + + +Smart waits +=========== + +Two waits that replace unreliable ``sleep`` calls:: + + from je_auto_control import wait_until_file, wait_until_port + + wait_until_file("~/Downloads/report.pdf", stable_for_s=1.0) + wait_until_port("127.0.0.1", 8080, timeout_s=30.0) + +``wait_until_file`` returns once a file exists, is at least ``min_size`` +bytes, and its size has held steady for ``stable_for_s`` (a download has +finished). ``wait_until_port`` returns once a TCP connection to +``host:port`` succeeds — the companion to launching a server. Both return +a ``WaitOutcome`` and honour a hard ``timeout_s`` cap. + +Executor commands: ``AC_wait_for_file``, ``AC_wait_for_port``. + + +Security +======== + +HTTP and SMTP enforce ``http`` / ``https`` or TLS with verified +certificates and explicit timeouts; SQL is read-only and parameter-bound; +all user-supplied file paths are resolved with ``realpath`` before I/O. diff --git a/docs/source/Eng/eng_index.rst b/docs/source/Eng/eng_index.rst index a914ac4f..95cf59e2 100644 --- a/docs/source/Eng/eng_index.rst +++ b/docs/source/Eng/eng_index.rst @@ -27,6 +27,7 @@ Comprehensive guides for all AutoControl features. doc/new_features/v2_features_doc doc/new_features/v3_features_doc doc/new_features/v4_features_doc + doc/new_features/v5_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/docs/source/Zh/doc/new_features/v4_features_doc.rst b/docs/source/Zh/doc/new_features/v4_features_doc.rst new file mode 100644 index 00000000..16abbb84 --- /dev/null +++ b/docs/source/Zh/doc/new_features/v4_features_doc.rst @@ -0,0 +1,138 @@ +========================================== +新功能 (2026-06-17) — 自動化工具箱 +========================================== + +三十多個自動化原語,涵蓋輸入擬真、視覺、流程控制、觸發器、視窗管理與 +檔案安全——另加「可還原(資源回收桶)刪除」與「錄製編輯器 Undo」。每項 +功能都附帶 headless Python API、``AC_*`` 執行器指令,以及視覺化 Script +Builder 項目。視覺與視窗功能的 geometry / IO 操作皆可注入,因此邏輯無需 +真實螢幕或視窗即可完整單元測試。 + +.. contents:: + :local: + :depth: 2 + + +擬人化輸入 +========== + +像真人一樣移動游標與打字——適合 demo、擬真自動化,以及會偵測機械式時序 +的應用。路徑與延遲產生器在給定 ``seed`` 時為純函式且可重現:: + + from je_auto_control import move_mouse_humanized, type_text_humanized + + # 曲線、eased Bezier 路徑,含 overshoot 與 jitter。 + move_mouse_humanized(800, 400, duration_s=0.5, + motion=None, seed=None) + + # 逐字打字,每字隨機微延遲。 + type_text_humanized("Hello, world", base_delay=0.05, + jitter=0.04, pause_chance=0.1, seed=1) + +執行器指令:``AC_human_move``、``AC_human_type``。 + + +視覺 +==== + +* **VLM 自然語言斷言** — ``assert_by_description("a green success toast")`` + 以視覺語言模型判斷畫面是否符合描述(``locate_by_description`` 的 + ``verify()`` 搭檔)。``AC_assert_vlm``。 +* **捲動找元素** — ``scroll_until_visible(target, kind="image", + direction="down", max_scrolls=10)`` 往某方向捲動直到樣板圖或 OCR 文字 + 出現,回傳 ``{found, coords, scrolls}``。``AC_scroll_to_find``。 +* **區域顏色統計** — ``region_color_stats(source, region)`` 回傳區域的 + ``average_rgb``、``dominant_rgb`` 及該色的像素占比(量化色彩空間 → 取 + 最多的 bucket → 平均其真實像素)。``AC_region_color_stats``。 +* **讀取 QR code** — ``read_qr_codes(source, region)`` 以 OpenCV 的 + ``QRCodeDetector`` 解碼 QR(不需新相依)。``AC_read_qr``。 + + +流程控制與變數 +============== + +* **可重用巨集** — ``AC_define_macro`` 註冊具名、帶參數的動作子程序; + ``AC_call_macro`` 以 ``${arg}`` 綁定呼叫它——補上 loop / if 原語表達 + 不了的「可呼叫函式」。 +* **同進程平行** — ``AC_parallel`` 讓多個分支動作清單並行執行,各自在 + 獨立的全新 executor 上,因此分支不會在共享變數上互相 race(跨主機 DAG + 的同進程版)。 +* **效能預算斷言** — ``assert_duration(action, max_ms)`` / + ``AC_assert_duration`` 在區塊耗時超過預算時判失敗——銜接 profiler 與 + 斷言 DSL 的延遲回歸守門。 +* **讀進變數** — 把外部資料綁進流程範圍供後續 ``${var}`` 使用: + ``AC_ocr_to_var``(區域文字)、``AC_shell_to_var``(命令 stdout)、 + ``AC_read_file_to_var``(檔案文字)、``AC_http_to_var``(GET body 或 + dotted JSON path)、``AC_now_to_var``(strftime)、``AC_random_to_var`` + (seeded int / float / choice)。 +* **變數轉換** — ``AC_transform_var`` 套用 upper / lower / strip / title / + replace / regex 取出 / slice,可就地或寫入新變數——與「讀進變數」系列 + 搭配,在使用前清理原始文字。 +* **斷言變數** — ``assert_variable(value, op, expected)`` / + ``AC_assert_var`` 在變數不滿足 eq / ne / lt / gt / contains / regex 時 + 判失敗(分支 ``if_var`` 的斷言 DSL 搭檔)。 + + +觸發器與智慧等待 +================ + +* **複合觸發器** — ``AllOfTrigger`` / ``AnyOfTrigger`` / ``SequenceTrigger`` + 以布林 AND、OR 或有序序列組合任何現有觸發器;子項重用各觸發器的 + ``is_fired()``,因此任何型別都能自由巢狀。 +* **Cron 觸發器** — ``CronTrigger("0 9 * * *")`` 以五欄 cron 運算式觸發, + 每個符合的分鐘最多一次,並可與布林觸發器組合(例如 *在 09:00 且只在 + 圖片可見時*)。 +* **更多智慧等待** — ``wait_until_clipboard_changes``(changed / equals / + contains,``AC_wait_clipboard_change``)與 ``wait_until_window_closed`` + (``AC_wait_window_closed``)補齊 screen / pixel / region 等待。 + + +視窗管理 +======== + +* **單一視窗擷取** — ``capture_window(title, output_path)`` 以標題解析視窗 + geometry(Win32 ``GetWindowRect``)並精確擷取其範圍。``AC_capture_window``。 +* **版面儲存 / 還原** — ``save_window_layout(path)`` 把每個視窗的位置快照 + 成 JSON;``restore_window_layout(path)`` 再把它們全部移回(方便測試 + setup / teardown)。``AC_save_window_layout`` / ``AC_restore_window_layout``。 +* **貼齊 / 平鋪** — ``snap_window(title, "left")`` 把視窗移到螢幕一半 + (left / right / top / bottom)、四分之一(四個角)或 ``"max"``。 + ``AC_snap_window``。 + + +檔案安全 +======== + +* **動作檔簽章** — ``sign_action_file`` 寫出 HMAC-SHA256 的 ``.sig`` + sidecar;``verify_action_file`` 以常數時間驗證。設定 + ``JE_AUTOCONTROL_REQUIRE_SIGNED_ACTIONS`` 時,``execute_files`` 會強制 + 簽章(opt-in)。``AC_sign_action_file`` / ``AC_verify_action_file``。 +* **動作檔加密** — ``encrypt_action_file`` / ``decrypt_action_file`` 以 + Fernet(AES-128-CBC + HMAC)讓腳本內容在靜態時保密,金鑰來自通行碼或 + 每位使用者的 0600 金鑰。``AC_encrypt_action_file`` / + ``AC_decrypt_action_file``。 +* **可還原刪除** — ``move_to_trash(path)`` 把檔案送進 OS 資源回收桶 + (Win32 ``SHFileOperation`` undo flag / macOS Trash / Linux XDG trash, + 優先使用 ``send2trash``),讓「刪除」的檔案能還原。``AC_move_to_trash``。 + + +報告與通知 +========== + +* **截圖標註** — ``annotate_screenshot(source, annotations, output_path)`` + 在截圖上畫出帶標籤的方框、半透明高亮、箭頭與文字(redaction 模糊化的 + 標記版搭檔)。``AC_annotate_screenshot``。 +* **桌面通知** — ``notify(title, message)`` 顯示跨平台通知 + (``notify-send`` / ``osascript`` / PowerShell);防注入(Linux 用 argv, + macOS / Windows 用從環境變數讀字串的固定腳本)。``AC_notify``。 + + +GUI +=== + +* **錄製編輯器 Undo** — 每次編輯(刪步驟、trim、rescale、調整延遲、 + filter)都會快照進 undo stack;**Ctrl+Z** 與 Undo 按鈕可還原前一狀態。 +* **觸發器分頁** — *Combine selected* 把選取的觸發器包成 AllOf / AnyOf / + Sequence 複合觸發器;新增 **Cron** 觸發器型別。 +* **斷言分頁** — 新增 **VLM**(「畫面符合描述」)斷言型別。 +* 每個新的 ``AC_*`` 指令都可在視覺化 **Script Builder** 中建構。 diff --git a/docs/source/Zh/doc/new_features/v5_features_doc.rst b/docs/source/Zh/doc/new_features/v5_features_doc.rst new file mode 100644 index 00000000..b95665cc --- /dev/null +++ b/docs/source/Zh/doc/new_features/v5_features_doc.rst @@ -0,0 +1,153 @@ +==================================== +新功能 (2026-06-18) — CLI 與整合 +==================================== + +八項 headless 能力,補齊腳本化、整合與 CI 使用情境:一個真正的命令列 +介面、把錄製轉成程式碼,以及一級的 HTTP / SQL / Email / PDF / 等待步驟。 +每項功能都提供 headless Python API、``AC_*`` 執行器指令、MCP 工具,以及 +視覺化 Script Builder 項目,並有 headless 測試覆蓋——網路、SMTP、PDF +後端皆以注入方式測試,完全不會碰到外部系統。 + +.. contents:: + :local: + :depth: 2 + + +命令列介面 +========== + +套件現在會安裝 ``je_auto_control`` console script,可在 shell 或 CI 中 +執行與檢查動作檔:: + + je_auto_control run script.json --var user=alice --dry-run + je_auto_control validate script.json # 別名:lint + je_auto_control list-commands --filter mouse --json + je_auto_control fmt script.json --check + je_auto_control record out.json --duration 5 + je_auto_control codegen script.json --target pytest -o test_flow.py + je_auto_control version + +``run`` 直接執行(``--dry-run`` 只驗證並列出步驟而不實際操作), +``validate`` / ``lint`` 檢查結構並拒絕未知指令,``fmt`` 標準化 JSON, +``record`` 錄製輸入,``codegen`` 產生程式碼(見下),``list-commands`` +列出執行器即時的指令目錄。 + + +程式碼產生 +========== + +把錄製或動作檔轉成可提交、可執行的程式碼:: + + from je_auto_control import generate_code, generate_code_file + + code = generate_code(actions, target="pytest", style="calls") + generate_code_file("flow.json", "test_flow.py", target="pytest") + +``target`` 可為 ``pytest`` / ``python`` / ``robot``。預設的 ``calls`` +風格會把每個 ``AC_*`` 指令對應到 facade 呼叫(``ac.click_mouse(...)``), +流程控制與私有 adapter 則退回 ``ac.execute_action([...])``;``actions`` +風格則直接嵌入動作清單並透過執行器重播。 + +執行器指令:``AC_generate_code``。CLI:``je_auto_control codegen``。 + + +HTTP / API +========== + +不需額外相依的 HTTP(S) 客戶端,適合 UI + API 混合流程:: + + from je_auto_control import http_request + + resp = http_request( + "https://api.example/items", method="POST", + json_body={"name": "Sam"}, + headers={"X-Trace": "1"}, + auth={"type": "bearer", "token": "..."}, + timeout=30.0) + assert resp["status"] == 201 + +回傳 ``{status, ok, headers, text, json, url}``;非 2xx 回應會被回傳而非 +丟出例外,因此可直接對狀態碼斷言。僅允許 ``http`` / ``https``。 +``AC_http_to_var`` 現在共用同一個客戶端,因此也能送 body、headers 與認證。 + +執行器指令:``AC_http_request``。 + + +SQL +=== + +唯讀、參數化的 SQLite 查詢:: + + from je_auto_control import query_sqlite + + rows = query_sqlite("app.db", "SELECT id, name FROM users") + count = query_sqlite("app.db", + "SELECT COUNT(*) FROM users WHERE active = ?", + params=[1], fetch="scalar") + +查詢僅限單句唯讀的 ``SELECT`` / ``WITH``,以唯讀連線執行,且值一律以 +參數綁定(絕不字串拼接)。 + +執行器指令:``AC_sql_to_var``(列 / 單列 / 純量存入變數)與 +``AC_assert_db``(對純量查詢以 eq / ne / lt / gt / contains / ... 斷言)。 + + +Email(SMTP) +============ + +透過標準庫寄信——例如流程的報告:: + + from je_auto_control import send_email + + send_email( + {"sender": "bot@x.com", "to": ["qa@x.com"], + "subject": "Run passed", "body": "All green", + "attachments": ["report.html"]}, + {"host": "smtp.x.com", "port": 587, + "username": "bot@x.com", "password": "..."}) + +預設啟用 TLS(STARTTLS,或設定 ``use_ssl`` 時用隱式 SSL),使用已驗證 +憑證的預設 context;支援多收件人、CC、HTML 內文與檔案附件。 + +執行器指令:``AC_send_email``。 + + +PDF +=== + +從 PDF 文件抽取文字並斷言其內容(可選的 ``pypdf`` 後端—— +``pip install je_auto_control[pdf]``):: + + from je_auto_control import extract_pdf_text, assert_pdf_text + + text = extract_pdf_text("invoice.pdf", pages=1) + assert_pdf_text("invoice.pdf", "Total: $50.00") + +執行器指令:``AC_pdf_to_var``(文字存入變數)與 ``AC_assert_pdf_text`` +(文字存在 / 不存在,可指定頁碼)。 + + +智慧等待 +======== + +兩個用來取代不可靠 ``sleep`` 的等待:: + + from je_auto_control import wait_until_file, wait_until_port + + wait_until_file("~/Downloads/report.pdf", stable_for_s=1.0) + wait_until_port("127.0.0.1", 8080, timeout_s=30.0) + +``wait_until_file`` 會在檔案存在、達到 ``min_size`` 位元組、且大小持續 +``stable_for_s`` 秒不變(下載寫完)後回傳。``wait_until_port`` 會在 +``host:port`` 可接受 TCP 連線後回傳——是啟動伺服器的最佳搭檔。兩者都 +回傳 ``WaitOutcome`` 並有硬性 ``timeout_s`` 上限。 + +執行器指令:``AC_wait_for_file``、``AC_wait_for_port``。 + + +安全性 +====== + +HTTP 與 SMTP 強制 ``http`` / ``https`` 或使用已驗證憑證的 TLS,並設定明確 +逾時;SQL 為唯讀且參數綁定;所有使用者提供的檔案路徑在 I/O 前都會以 +``realpath`` 解析。 diff --git a/docs/source/Zh/zh_index.rst b/docs/source/Zh/zh_index.rst index bfbc8e2b..eef64e83 100644 --- a/docs/source/Zh/zh_index.rst +++ b/docs/source/Zh/zh_index.rst @@ -27,6 +27,7 @@ AutoControl 所有功能的完整使用指南。 doc/new_features/v2_features_doc doc/new_features/v3_features_doc doc/new_features/v4_features_doc + doc/new_features/v5_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/je_auto_control/__init__.py b/je_auto_control/__init__.py index 87fc5687..39ec3034 100644 --- a/je_auto_control/__init__.py +++ b/je_auto_control/__init__.py @@ -148,9 +148,9 @@ ) # Smart waits (frame-diff replacements for time.sleep) from je_auto_control.utils.smart_waits import ( - WaitOutcome, wait_until_clipboard_changes, wait_until_pixel_changes, - wait_until_region_idle, wait_until_screen_stable, - wait_until_window_closed, + WaitOutcome, wait_until_clipboard_changes, wait_until_file, + wait_until_pixel_changes, wait_until_port, wait_until_region_idle, + wait_until_screen_stable, wait_until_window_closed, ) # Assertion DSL (verify screen state; raise on mismatch) from je_auto_control.utils.assertion import ( @@ -347,6 +347,22 @@ # json from je_auto_control.utils.json.json_file import read_action_json from je_auto_control.utils.json.json_file import write_action_json +from je_auto_control.utils.json.json_file import format_action_json +# codegen: action list -> pytest / python / robot source +from je_auto_control.utils.codegen.codegen import ( + generate_code, + generate_code_file, +) +# HTTP/API request action (dependency-free, stdlib urllib) +from je_auto_control.utils.http_client.http_client import http_request +# Ad-hoc read-only SQL query against SQLite +from je_auto_control.utils.sql.sql_query import query_sqlite +# Send email via SMTP +from je_auto_control.utils.email_send.email_sender import send_email +# PDF document text extraction + assertion (optional pypdf backend) +from je_auto_control.utils.pdf.pdf_reader import ( + assert_pdf_text, extract_pdf_text, pdf_metadata, pdf_page_count, +) # package manager from je_auto_control.utils.package_manager.package_manager_class import \ package_manager @@ -397,6 +413,7 @@ # record from je_auto_control.wrapper.auto_control_record import record from je_auto_control.wrapper.auto_control_record import stop_record +from je_auto_control.wrapper.auto_control_record import record_to_json # Screen wrappers from je_auto_control.wrapper.auto_control_screen import screen_size from je_auto_control.wrapper.auto_control_screen import screenshot @@ -434,8 +451,12 @@ def start_autocontrol_gui(*args, **kwargs): "AutoControlMouseException", "AutoControlCantFindKeyException", "AutoControlScreenException", "ImageNotFoundException", "AutoControlJsonActionException", "AutoControlRecordException", "AutoControlActionNullException", "AutoControlActionException", "record", - "stop_record", "read_action_json", "write_action_json", "execute_action", "execute_files", "executor", - "execute_action_with_vars", + "stop_record", "read_action_json", "write_action_json", "format_action_json", + "execute_action", "execute_files", "executor", + "execute_action_with_vars", "record_to_json", + "generate_code", "generate_code_file", "http_request", "query_sqlite", + "send_email", "assert_pdf_text", "extract_pdf_text", "pdf_metadata", + "pdf_page_count", "add_command_to_executor", "test_record_instance", "pil_screenshot", # OCR "TextMatch", "find_text_matches", "locate_text_center", "wait_for_text", @@ -548,6 +569,7 @@ def start_autocontrol_gui(*args, **kwargs): "WaitOutcome", "wait_until_pixel_changes", "wait_until_region_idle", "wait_until_screen_stable", "wait_until_clipboard_changes", "wait_until_window_closed", + "wait_until_file", "wait_until_port", # Assertion DSL "AssertionResult", "assert_image", "assert_pixel", "assert_text", "assert_window", "assert_clipboard", "assert_process", diff --git a/je_auto_control/cli.py b/je_auto_control/cli.py index 2468aa80..4a59595d 100644 --- a/je_auto_control/cli.py +++ b/je_auto_control/cli.py @@ -1,11 +1,20 @@ """Command-line entry point. +Installed as the ``je_auto_control`` console script; also runnable via +``python -m je_auto_control.cli``. + Usage:: - python -m je_auto_control.cli run script.json [--var x=10 --var y=20] - python -m je_auto_control.cli list-jobs - python -m je_auto_control.cli start-server --port 9938 - python -m je_auto_control.cli start-rest --port 9939 + je_auto_control run script.json [--var x=10 --var y=20] [--dry-run] + je_auto_control validate script.json # alias: lint + je_auto_control list-commands [--filter mouse] [--json] + je_auto_control fmt script.json [--check] + je_auto_control record out.json [--duration 5] + je_auto_control codegen script.json [--target pytest] [-o test_flow.py] + je_auto_control version + je_auto_control list-jobs + je_auto_control start-server --port 9938 + je_auto_control start-rest --port 9939 The CLI is a thin wrapper around the headless APIs so every feature works without ever importing PySide6. @@ -14,9 +23,16 @@ import json import signal import sys +import threading import time from typing import Dict, List, Optional, Sequence +from je_auto_control.utils.exception.exceptions import ( + AutoControlActionException, + AutoControlException, + AutoControlJsonActionException, +) + def _parse_vars(pairs: Optional[Sequence[str]]) -> Dict[str, object]: """Parse ``--var name=value`` entries into a dict (JSON value when parseable).""" @@ -56,6 +72,100 @@ def cmd_run(args: argparse.Namespace) -> int: return 0 +def cmd_validate(args: argparse.Namespace) -> int: + """Validate an action file's structure without executing it.""" + from je_auto_control.utils.executor.action_executor import executor + from je_auto_control.utils.executor.action_schema import validate_actions + from je_auto_control.utils.json.json_file import read_action_json + actions = read_action_json(args.script) + validate_actions(actions, executor.known_commands()) + sys.stdout.write(f"OK: {len(actions)} action(s)\n") + return 0 + + +def cmd_list_commands(args: argparse.Namespace) -> int: + """Print the known ``AC_*`` commands as text or JSON.""" + from je_auto_control.utils.executor.action_executor import executor + names = sorted(executor.known_commands()) + if args.filter: + needle = args.filter.lower() + names = [name for name in names if needle in name.lower()] + if args.json: + json.dump(names, sys.stdout, ensure_ascii=False, indent=2) + sys.stdout.write("\n") + else: + for name in names: + sys.stdout.write(f"{name}\n") + return 0 + + +def cmd_fmt(args: argparse.Namespace) -> int: + """Canonicalise an action file's JSON, or report drift under ``--check``.""" + from je_auto_control.utils.json.json_file import format_action_json + changed = format_action_json(args.script, check=args.check) + if args.check: + if changed: + sys.stderr.write(f"would reformat: {args.script}\n") + return 1 + return 0 + sys.stdout.write( + f"{'reformatted' if changed else 'unchanged'}: {args.script}\n") + return 0 + + +def cmd_record(args: argparse.Namespace) -> int: + """Record mouse/keyboard input into an action file.""" + if sys.platform == "darwin": + sys.stderr.write("record is not supported on macOS\n") + return 1 + from je_auto_control.wrapper.auto_control_record import record_to_json + stop_event = threading.Event() + if args.duration is None: + sys.stderr.write("Recording... press Enter to stop.\n") + threading.Thread( + target=_set_on_enter, args=(stop_event,), daemon=True).start() + actions = record_to_json( + args.output, stop_event=stop_event, timeout=args.duration) + sys.stderr.write(f"Recorded {len(actions)} action(s) to {args.output}\n") + return 0 + + +def _set_on_enter(stop_event: threading.Event) -> None: + """Block on a line of stdin, then signal the recorder to stop.""" + try: + sys.stdin.readline() + except (EOFError, OSError): + pass + stop_event.set() + + +def cmd_codegen(args: argparse.Namespace) -> int: + """Generate pytest/python/robot source from an action file.""" + from je_auto_control.utils.codegen.codegen import ( + generate_code, generate_code_file, + ) + from je_auto_control.utils.json.json_file import read_action_json + if args.output: + generate_code_file(args.script, args.output, target=args.target, + name=args.name, style=args.style) + sys.stderr.write(f"Wrote {args.target} code to {args.output}\n") + else: + code = generate_code(read_action_json(args.script), target=args.target, + name=args.name, style=args.style) + sys.stdout.write(code) + return 0 + + +def cmd_version(_: argparse.Namespace) -> int: + """Print the installed ``je_auto_control`` version.""" + from importlib.metadata import PackageNotFoundError, version + try: + sys.stdout.write(version("je_auto_control") + "\n") + except PackageNotFoundError: + sys.stdout.write("unknown\n") + return 0 + + def cmd_list_jobs(_: argparse.Namespace) -> int: from je_auto_control.utils.scheduler.scheduler import default_scheduler jobs = default_scheduler.list_jobs() @@ -112,6 +222,42 @@ def build_parser() -> argparse.ArgumentParser: help="record actions without calling them") p_run.set_defaults(func=cmd_run) + for name in ("validate", "lint"): + p_validate = sub.add_parser(name, help="Validate an action JSON file") + p_validate.add_argument("script") + p_validate.set_defaults(func=cmd_validate) + + p_list = sub.add_parser("list-commands", help="List known AC_* commands") + p_list.add_argument("--filter", help="Only names containing this text") + p_list.add_argument("--json", action="store_true", help="Emit JSON") + p_list.set_defaults(func=cmd_list_commands) + + p_fmt = sub.add_parser("fmt", help="Canonicalise an action file's JSON") + p_fmt.add_argument("script") + p_fmt.add_argument("--check", action="store_true", + help="Exit 1 if the file is not already formatted") + p_fmt.set_defaults(func=cmd_fmt) + + p_record = sub.add_parser("record", help="Record input into an action file") + p_record.add_argument("output") + p_record.add_argument("--duration", type=float, default=None, + help="Auto-stop after N seconds (else press Enter)") + p_record.set_defaults(func=cmd_record) + + p_codegen = sub.add_parser( + "codegen", help="Generate test code from an action file") + p_codegen.add_argument("script") + p_codegen.add_argument("--target", choices=("pytest", "python", "robot"), + default="pytest") + p_codegen.add_argument("--style", choices=("calls", "actions"), + default="calls") + p_codegen.add_argument("--name", default="recorded_flow") + p_codegen.add_argument("-o", "--output", help="Write to file instead of stdout") + p_codegen.set_defaults(func=cmd_codegen) + + p_version = sub.add_parser("version", help="Print the installed version") + p_version.set_defaults(func=cmd_version) + p_jobs = sub.add_parser("list-jobs", help="List scheduler jobs") p_jobs.set_defaults(func=cmd_list_jobs) @@ -130,7 +276,13 @@ def build_parser() -> argparse.ArgumentParser: def main(argv: Optional[List[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) - return args.func(args) + try: + return args.func(args) + except (AutoControlActionException, AutoControlException, + AutoControlJsonActionException, OSError, RuntimeError, + ValueError) as error: + sys.stderr.write(f"error: {error}\n") + return 1 if __name__ == "__main__": diff --git a/je_auto_control/gui/language_wrapper/english.py b/je_auto_control/gui/language_wrapper/english.py index e3f00d57..66da26d2 100644 --- a/je_auto_control/gui/language_wrapper/english.py +++ b/je_auto_control/gui/language_wrapper/english.py @@ -787,6 +787,8 @@ "re_browse": "Browse", "re_load": "Load", "re_save_as": "Save As", + "re_export_code": "Export as code", + "re_export_target": "Choose code target", "re_trim_start": "Trim start:", "re_trim_end": "end:", "re_apply_trim": "Apply trim", diff --git a/je_auto_control/gui/language_wrapper/japanese.py b/je_auto_control/gui/language_wrapper/japanese.py index 8b505e56..83dbfe6b 100644 --- a/je_auto_control/gui/language_wrapper/japanese.py +++ b/je_auto_control/gui/language_wrapper/japanese.py @@ -673,6 +673,8 @@ "re_browse": "参照", "re_load": "読込", "re_save_as": "名前を付けて保存", + "re_export_code": "コードとして書き出す", + "re_export_target": "コードの出力先を選択", "re_trim_start": "トリム開始:", "re_trim_end": "終了:", "re_apply_trim": "トリム適用", diff --git a/je_auto_control/gui/language_wrapper/simplified_chinese.py b/je_auto_control/gui/language_wrapper/simplified_chinese.py index dcd79ec2..b4bb4930 100644 --- a/je_auto_control/gui/language_wrapper/simplified_chinese.py +++ b/je_auto_control/gui/language_wrapper/simplified_chinese.py @@ -662,6 +662,8 @@ "re_browse": "浏览", "re_load": "载入", "re_save_as": "另存为", + "re_export_code": "导出为代码", + "re_export_target": "选择代码目标", "re_trim_start": "裁剪起点:", "re_trim_end": "终点:", "re_apply_trim": "应用裁剪", diff --git a/je_auto_control/gui/language_wrapper/traditional_chinese.py b/je_auto_control/gui/language_wrapper/traditional_chinese.py index d67aee4a..ca6012c8 100644 --- a/je_auto_control/gui/language_wrapper/traditional_chinese.py +++ b/je_auto_control/gui/language_wrapper/traditional_chinese.py @@ -666,6 +666,8 @@ "re_browse": "瀏覽", "re_load": "載入", "re_save_as": "另存為", + "re_export_code": "匯出為程式碼", + "re_export_target": "選擇程式碼目標", "re_trim_start": "裁剪起點:", "re_trim_end": "終點:", "re_apply_trim": "套用裁剪", diff --git a/je_auto_control/gui/recording_editor_tab.py b/je_auto_control/gui/recording_editor_tab.py index c2fcdb9f..3d325397 100644 --- a/je_auto_control/gui/recording_editor_tab.py +++ b/je_auto_control/gui/recording_editor_tab.py @@ -4,14 +4,15 @@ from PySide6.QtGui import QKeySequence, QShortcut from PySide6.QtWidgets import ( - QFileDialog, QHBoxLayout, QLabel, QLineEdit, QListWidget, QMessageBox, - QPushButton, QTextEdit, QVBoxLayout, QWidget, + QFileDialog, QHBoxLayout, QInputDialog, QLabel, QLineEdit, QListWidget, + QMessageBox, QPushButton, QTextEdit, QVBoxLayout, QWidget, ) from je_auto_control.gui._i18n_helpers import TranslatableMixin from je_auto_control.gui.language_wrapper.multi_language_wrapper import ( language_wrapper, ) +from je_auto_control.utils.codegen.codegen import generate_code_file from je_auto_control.utils.json.json_file import read_action_json, write_action_json from je_auto_control.utils.recording_edit.editor import ( adjust_delays, filter_actions, remove_action, scale_coordinates, trim_actions, @@ -72,6 +73,7 @@ def _build_layout(self) -> None: ("re_browse", self._browse), ("re_load", self._load), ("re_save_as", self._save_as), + ("re_export_code", self._export_code), ): btn = self._tr(QPushButton(), key) btn.clicked.connect(handler) @@ -166,6 +168,34 @@ def _save_as(self) -> None: return self._status.setText(f"Saved to {path}") + _TARGET_FILTERS = { + "pytest": "Python (*.py)", + "python": "Python (*.py)", + "robot": "Robot (*.robot)", + } + + def _export_code(self) -> None: + """Generate pytest/python/robot source from the loaded actions.""" + if not self._actions: + return + target, ok = QInputDialog.getItem( + self, _t("re_export_target"), "target", + list(self._TARGET_FILTERS), 0, False, + ) + if not ok: + return + path, _filter = QFileDialog.getSaveFileName( + self, _t("re_export_code"), "", self._TARGET_FILTERS[target], + ) + if not path: + return + try: + generate_code_file(self._actions, path, target=target) + except (OSError, ValueError) as error: + QMessageBox.warning(self, "Error", str(error)) + return + self._status.setText(f"Exported {target} code to {path}") + def _refresh(self) -> None: self._list.clear() for idx, action in enumerate(self._actions): diff --git a/je_auto_control/gui/script_builder/command_schema.py b/je_auto_control/gui/script_builder/command_schema.py index 10bb562a..1f400733 100644 --- a/je_auto_control/gui/script_builder/command_schema.py +++ b/je_auto_control/gui/script_builder/command_schema.py @@ -326,6 +326,35 @@ def _add_window_specs(specs: List[CommandSpec]) -> None: ), description="Wait until a window matching the title disappears.", )) + specs.append(CommandSpec( + "AC_wait_for_file", "Flow", "Wait for File", + fields=( + FieldSpec("path", FieldType.FILE_PATH), + FieldSpec("timeout_s", FieldType.FLOAT, optional=True, + default=30.0), + FieldSpec("stable_for_s", FieldType.FLOAT, optional=True, + default=1.0, min_value=0.0), + FieldSpec("min_size", FieldType.INT, optional=True, default=1, + min_value=0), + FieldSpec("poll_interval_s", FieldType.FLOAT, optional=True, + default=0.25, min_value=0.01), + ), + description="Wait until a file appears and stops growing (download done).", + )) + specs.append(CommandSpec( + "AC_wait_for_port", "Flow", "Wait for TCP Port", + fields=( + FieldSpec("host", FieldType.STRING, default="127.0.0.1"), + FieldSpec("port", FieldType.INT, min_value=1, max_value=65535), + FieldSpec("timeout_s", FieldType.FLOAT, optional=True, + default=30.0), + FieldSpec("connect_timeout_s", FieldType.FLOAT, optional=True, + default=1.0, min_value=0.01), + FieldSpec("poll_interval_s", FieldType.FLOAT, optional=True, + default=0.25, min_value=0.01), + ), + description="Wait until a TCP host:port accepts connections.", + )) specs.append(CommandSpec("AC_list_windows", "Window", "List Windows")) specs.append(CommandSpec( "AC_capture_window", "Window", "Capture Window", @@ -553,15 +582,88 @@ def _add_misc_specs(specs: List[CommandSpec]) -> None: description="Read a file's text content into a flow variable.", )) specs.append(CommandSpec( - "AC_http_to_var", "Report", "HTTP GET into Variable", + "AC_sql_to_var", "Report", "SQL Query into Variable", + fields=( + FieldSpec("database", FieldType.FILE_PATH), + FieldSpec("query", FieldType.STRING, + placeholder="SELECT name FROM users WHERE id = ?"), + FieldSpec("var", FieldType.STRING, default="sql_result"), + FieldSpec("fetch", FieldType.ENUM, + choices=("all", "one", "scalar"), + optional=True, default="all"), + ), + description=("Run a read-only SQLite SELECT; store rows / a row / a " + "scalar in a variable. Bind values via params (JSON view)."), + )) + specs.append(CommandSpec( + "AC_assert_db", "Report", "Assert SQL Result", + fields=( + FieldSpec("database", FieldType.FILE_PATH), + FieldSpec("query", FieldType.STRING, + placeholder="SELECT COUNT(*) FROM users"), + FieldSpec("op", FieldType.ENUM, + choices=("eq", "ne", "lt", "le", "gt", "ge", + "contains", "startswith", "endswith"), + optional=True, default="eq"), + FieldSpec("expected", FieldType.STRING, optional=True), + ), + description=("Run a scalar SELECT and assert its value (use the JSON " + "view for non-string expected values / params)."), + )) + specs.append(CommandSpec( + "AC_http_to_var", "Report", "HTTP Request into Variable", fields=( FieldSpec("url", FieldType.STRING, placeholder="https://..."), + FieldSpec("method", FieldType.ENUM, + choices=("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD"), + optional=True, default="GET"), FieldSpec("var", FieldType.STRING, default="http_response"), FieldSpec("json_path", FieldType.STRING, optional=True, placeholder="data.0.name"), FieldSpec("timeout", FieldType.FLOAT, optional=True, default=30.0), ), - description="GET a URL; store the body or a JSON field in a variable.", + description="Request a URL; store the body or a JSON field in a variable.", + )) + specs.append(CommandSpec( + "AC_pdf_to_var", "Report", "PDF Text into Variable", + fields=( + FieldSpec("path", FieldType.FILE_PATH), + FieldSpec("var", FieldType.STRING, default="pdf_text"), + FieldSpec("page", FieldType.INT, optional=True, min_value=1), + ), + description="Extract a PDF's text (all pages or one) into a variable.", + )) + specs.append(CommandSpec( + "AC_assert_pdf_text", "Report", "Assert PDF Text", + fields=( + FieldSpec("path", FieldType.FILE_PATH), + FieldSpec("text", FieldType.STRING), + FieldSpec("present", FieldType.BOOL, optional=True, default=True), + FieldSpec("page", FieldType.INT, optional=True, min_value=1), + FieldSpec("case_sensitive", FieldType.BOOL, optional=True, + default=True), + ), + description="Assert text is present (or absent) in a PDF document.", + )) + specs.append(CommandSpec( + "AC_send_email", "Report", "Send Email", + description=("Send an email via SMTP. Configure the 'message' " + "{sender,to,subject,body,attachments} and 'smtp' " + "{host,port,username,password} dicts in the JSON view."), + )) + specs.append(CommandSpec( + "AC_http_request", "Report", "HTTP Request", + fields=( + FieldSpec("url", FieldType.STRING, placeholder="https://..."), + FieldSpec("method", FieldType.ENUM, + choices=("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD"), + default="GET"), + FieldSpec("data", FieldType.STRING, optional=True, + placeholder="raw request body"), + FieldSpec("timeout", FieldType.FLOAT, optional=True, default=30.0), + ), + description=("Perform an HTTP(S) request; returns status/headers/text/" + "json. Use the JSON view for headers/json_body/auth."), )) specs.append(CommandSpec( "AC_execute_process", "Shell", "Start Executable", diff --git a/je_auto_control/utils/codegen/__init__.py b/je_auto_control/utils/codegen/__init__.py new file mode 100644 index 00000000..83dd5826 --- /dev/null +++ b/je_auto_control/utils/codegen/__init__.py @@ -0,0 +1,7 @@ +"""Generate runnable test code from AutoControl action lists.""" +from je_auto_control.utils.codegen.codegen import ( + generate_code, + generate_code_file, +) + +__all__ = ["generate_code", "generate_code_file"] diff --git a/je_auto_control/utils/codegen/codegen.py b/je_auto_control/utils/codegen/codegen.py new file mode 100644 index 00000000..72a2f6b6 --- /dev/null +++ b/je_auto_control/utils/codegen/codegen.py @@ -0,0 +1,134 @@ +"""Generate runnable test code from AutoControl action lists. + +Turns a recording or JSON action file into a committable pytest test, a +standalone Python script, or a Robot Framework suite. The default +``calls`` style emits readable ``ac.(...)`` statements (mapping +each ``AC_*`` command to its facade function via the executor registry), +with a safe fall-back to ``ac.execute_action([...])`` for flow-control +commands and private adapters. The ``actions`` style instead embeds the +whole list and replays it through the executor for exact fidelity. + +This module imports no ``PySide6`` so codegen works fully headlessly. +""" +import json +import os +import pprint +import re +import textwrap +from typing import Sequence, Tuple + +from je_auto_control.utils.json.json_file import read_action_json + +_HEADER = "Generated by AutoControl codegen. Edit freely." + + +def _slug(name: str) -> str: + """Return a valid lower_snake identifier derived from ``name``.""" + slug = re.sub(r"\W+", "_", name.strip()).strip("_").lower() + if not slug: + return "recorded_flow" + return f"flow_{slug}" if slug[0].isdigit() else slug + + +def _call_context() -> Tuple[dict, set]: + """Return ``(event_dict, public_names)`` for mapping commands to calls.""" + import je_auto_control as ac + from je_auto_control.utils.executor.action_executor import executor + return executor.event_dict, set(getattr(ac, "__all__", ())) + + +def _action_to_call(action: Sequence, event_dict: dict, public: set) -> str: + """Render one action as a Python statement (call or executor fall-back).""" + name = action[0] + params = action[1] if len(action) == 2 else None + func = event_dict.get(name) + direct = func is not None and getattr(func, "__name__", "") in public + if direct and (params is None or isinstance(params, dict)): + if params: + kwargs = ", ".join( + f"{key}={value!r}" for key, value in params.items()) + return f"ac.{func.__name__}({kwargs})" + return f"ac.{func.__name__}()" + return f"ac.execute_action({[list(action)]!r})" + + +def _calls_body(actions: Sequence) -> str: + event_dict, public = _call_context() + return "\n".join( + _action_to_call(action, event_dict, public) for action in actions) + + +def _actions_body(actions: Sequence) -> str: + literal = pprint.pformat([list(action) for action in actions], + indent=4, width=88) + return f"actions = {literal}\nac.execute_action(actions)" + + +def _body(actions: Sequence, style: str) -> str: + if style == "actions": + return _actions_body(actions) + if style == "calls": + return _calls_body(actions) + raise ValueError(f"unknown codegen style: {style!r}") + + +def _render_pytest(actions: Sequence, name: str, style: str) -> str: + body = textwrap.indent(_body(actions, style), " ") + return (f'"""{_HEADER}"""\n' + "import je_auto_control as ac\n\n\n" + f"def test_{_slug(name)}():\n{body}\n") + + +def _render_python(actions: Sequence, name: str, style: str) -> str: + slug = _slug(name) + body = textwrap.indent(_body(actions, style), " ") + return (f'"""{_HEADER}"""\n' + "import je_auto_control as ac\n\n\n" + f"def {slug}():\n{body}\n\n\n" + 'if __name__ == "__main__":\n' + f" {slug}()\n") + + +def _render_robot(actions: Sequence, name: str, _style: str) -> str: + payload = json.dumps([list(action) for action in actions], + ensure_ascii=False) + test_name = name.replace("_", " ").strip().title() or "Recorded Flow" + return "\n".join([ + "*** Settings ***", + f"Documentation {_HEADER}", + "", + "*** Test Cases ***", + test_name, + f" ${{actions}}= Evaluate json.loads(r'''{payload}''') json", + " Evaluate __import__('je_auto_control').execute_action($actions)", + "", + ]) + + +_RENDERERS = { + "pytest": _render_pytest, + "python": _render_python, + "robot": _render_robot, +} + + +def generate_code(actions: Sequence, target: str = "pytest", + name: str = "recorded_flow", style: str = "calls") -> str: + """Render ``actions`` as source code for ``target`` (pytest/python/robot).""" + if not isinstance(actions, list) or not actions: + raise ValueError("actions must be a non-empty list") + renderer = _RENDERERS.get(target) + if renderer is None: + raise ValueError(f"unknown codegen target: {target!r}") + return renderer(actions, name, style) + + +def generate_code_file(source, output_path: str, target: str = "pytest", + name: str = "recorded_flow", style: str = "calls") -> str: + """Generate code from a list or JSON action-file path; write and return it.""" + actions = source if isinstance(source, list) else read_action_json( + os.path.realpath(source)) + code = generate_code(actions, target=target, name=name, style=style) + with open(os.path.realpath(output_path), "w", encoding="utf-8") as handle: + handle.write(code) + return code diff --git a/je_auto_control/utils/email_send/__init__.py b/je_auto_control/utils/email_send/__init__.py new file mode 100644 index 00000000..dfeeed3e --- /dev/null +++ b/je_auto_control/utils/email_send/__init__.py @@ -0,0 +1,4 @@ +"""Send email via SMTP (the sending companion to the email trigger).""" +from je_auto_control.utils.email_send.email_sender import send_email + +__all__ = ["send_email"] diff --git a/je_auto_control/utils/email_send/email_sender.py b/je_auto_control/utils/email_send/email_sender.py new file mode 100644 index 00000000..4a421391 --- /dev/null +++ b/je_auto_control/utils/email_send/email_sender.py @@ -0,0 +1,112 @@ +"""Send email via SMTP for the ``AC_send_email`` action step. + +Dependency-free (stdlib ``smtplib`` + ``email``). Parameters are grouped +into two dicts so the action stays JSON-friendly and within the project's +argument-count limit: + +* ``message`` — ``{sender, to, subject, body, cc?, html?, attachments?}`` +* ``smtp`` — ``{host, port?, username?, password?, use_tls?, use_ssl?, + timeout?}`` + +Security: TLS is on by default (STARTTLS on 587, or implicit SSL when +``use_ssl`` is set), the connection uses a verified default SSL context, +every call has an explicit timeout, and credentials are never logged. +Imports no ``PySide6`` so it stays fully headless. +""" +import mimetypes +import os +import smtplib +import ssl +from email.message import EmailMessage +from typing import Any, Dict, List, Mapping, Optional + + +def _as_list(value: Any) -> List[str]: + """Normalise a string / iterable of addresses into a list of strings.""" + if value is None: + return [] + if isinstance(value, str): + return [value] + return [str(item) for item in value] + + +def _attach_files(mime: EmailMessage, attachments: Any) -> None: + """Attach each file path in ``attachments`` to ``mime``.""" + for raw in attachments or []: + path = os.path.realpath(str(raw)) + if not os.path.isfile(path): + raise FileNotFoundError(f"attachment not found: {raw}") + ctype, _ = mimetypes.guess_type(path) + maintype, subtype = ( + ctype.split("/", 1) if ctype else ("application", "octet-stream")) + with open(path, "rb") as handle: + mime.add_attachment(handle.read(), maintype=maintype, + subtype=subtype, filename=os.path.basename(path)) + + +def _build_message(message: Mapping[str, Any]) -> EmailMessage: + """Assemble an :class:`EmailMessage` from the ``message`` spec.""" + sender = message.get("sender") or message.get("from") + recipients = _as_list(message.get("to")) + if not sender or not recipients: + raise ValueError("email requires 'sender' and at least one 'to'") + mime = EmailMessage() + mime["From"] = str(sender) + mime["To"] = ", ".join(recipients) + cc = _as_list(message.get("cc")) + if cc: + mime["Cc"] = ", ".join(cc) + mime["Subject"] = str(message.get("subject", "")) + body = str(message.get("body", "")) + mime.set_content(body, subtype="html" if message.get("html") else "plain") + _attach_files(mime, message.get("attachments")) + return mime + + +def _login_send(server: smtplib.SMTP, username: Optional[str], + password: Optional[str], mime: EmailMessage) -> None: + """Authenticate (when credentials are given) and send the message.""" + if username and password: + server.login(username, password) + server.send_message(mime) + + +def _tls_context() -> ssl.SSLContext: + """Return a hardened TLS context: verified certs over TLS 1.2 or newer.""" + context = ssl.create_default_context() + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + return context + + +def _deliver(mime: EmailMessage, smtp: Mapping[str, Any]) -> None: + """Open an SMTP(S) connection per ``smtp`` config and send ``mime``.""" + host = smtp.get("host") + if not host: + raise ValueError("smtp 'host' is required") + port = int(smtp.get("port", 587)) + timeout = float(smtp.get("timeout", 30.0)) + username, password = smtp.get("username"), smtp.get("password") + if bool(smtp.get("use_ssl", False)): + with smtplib.SMTP_SSL(str(host), port, timeout=timeout, + context=_tls_context()) as server: + _login_send(server, username, password, mime) + return + with smtplib.SMTP(str(host), port, timeout=timeout) as server: + if bool(smtp.get("use_tls", True)): + server.starttls(context=_tls_context()) + _login_send(server, username, password, mime) + + +def send_email(message: Mapping[str, Any], + smtp: Mapping[str, Any]) -> Dict[str, Any]: + """Send an email and return a small result dict. + + :param message: ``{sender, to, subject, body, cc?, html?, attachments?}``. + :param smtp: ``{host, port?, username?, password?, use_tls?, use_ssl?, + timeout?}``; TLS is enabled by default. + """ + mime = _build_message(message) + _deliver(mime, smtp) + return {"sent": True, "to": mime["To"], "subject": mime["Subject"]} diff --git a/je_auto_control/utils/executor/action_executor.py b/je_auto_control/utils/executor/action_executor.py index 90e2b29f..67e21a71 100644 --- a/je_auto_control/utils/executor/action_executor.py +++ b/je_auto_control/utils/executor/action_executor.py @@ -55,6 +55,7 @@ interpolate_actions, interpolate_value, ) from je_auto_control.utils.script_vars.scope import VariableScope +from je_auto_control.utils.http_client.http_client import http_request from je_auto_control.utils.generate_report.generate_html_report import generate_html, generate_html_report from je_auto_control.utils.generate_report.generate_json_report import generate_json, generate_json_report from je_auto_control.utils.generate_report.generate_xml_report import generate_xml, generate_xml_report @@ -386,6 +387,31 @@ def _wait_region_idle(region: List[int], ).to_dict() +def _wait_for_file(path: str, timeout_s: float = 30.0, + poll_interval_s: float = 0.25, + stable_for_s: float = 1.0, + min_size: int = 1) -> Dict[str, Any]: + """Executor adapter: wait until a file exists and finishes being written.""" + from je_auto_control.utils.smart_waits import wait_until_file + return wait_until_file( + path, timeout_s=float(timeout_s), + poll_interval_s=float(poll_interval_s), + stable_for_s=float(stable_for_s), min_size=int(min_size), + ).to_dict() + + +def _wait_for_port(host: str, port: int, timeout_s: float = 30.0, + poll_interval_s: float = 0.25, + connect_timeout_s: float = 1.0) -> Dict[str, Any]: + """Executor adapter: wait until a TCP port accepts connections.""" + from je_auto_control.utils.smart_waits import wait_until_port + return wait_until_port( + host, int(port), timeout_s=float(timeout_s), + poll_interval_s=float(poll_interval_s), + connect_timeout_s=float(connect_timeout_s), + ).to_dict() + + def _ocr_read_structure(region: Optional[List[int]] = None, lang: str = "eng", min_confidence: float = 60.0, @@ -2103,6 +2129,36 @@ def _region_color_stats(region: Optional[Union[List[int], str]] = None, pass +def _generate_code(source: Any, output: Optional[str] = None, + target: str = "pytest", name: str = "recorded_flow", + style: str = "calls") -> str: + """Render an action list/file as code, optionally writing a file.""" + from je_auto_control.utils.codegen.codegen import ( + generate_code, generate_code_file, + ) + if output: + return generate_code_file(source, output, target=target, + name=name, style=style) + actions = source if isinstance(source, list) else read_action_json(source) + return generate_code(actions, target=target, name=name, style=style) + + +def _send_email(message: Any, smtp: Any) -> Dict[str, Any]: + """Adapter: send an email via SMTP (message/smtp config dicts).""" + from je_auto_control.utils.email_send.email_sender import send_email + return send_email(message, smtp) + + +def _assert_pdf_text(path: str, text: str, present: bool = True, + page: Any = None, case_sensitive: bool = True, + raise_on_fail: bool = True) -> Dict[str, Any]: + """Adapter: assert text is present/absent in a PDF document.""" + from je_auto_control.utils.pdf.pdf_reader import assert_pdf_text + return assert_pdf_text(path, text, present=bool(present), page=page, + case_sensitive=bool(case_sensitive), + raise_on_fail=bool(raise_on_fail)) + + class Executor: """ Executor @@ -2165,6 +2221,10 @@ def __init__(self): "AC_generate_html_report": generate_html_report, "AC_generate_json_report": generate_json_report, "AC_generate_xml_report": generate_xml_report, + "AC_generate_code": _generate_code, + "AC_send_email": _send_email, + "AC_assert_pdf_text": _assert_pdf_text, + "AC_http_request": http_request, # Record 錄製 "AC_record": record, @@ -2331,6 +2391,8 @@ def __init__(self): "AC_wait_screen_stable": _wait_screen_stable, "AC_wait_pixel_changes": _wait_pixel_changes, "AC_wait_region_idle": _wait_region_idle, + "AC_wait_for_file": _wait_for_file, + "AC_wait_for_port": _wait_for_port, "AC_wait_clipboard_change": _wait_clipboard_change, "AC_wait_window_closed": _wait_window_closed, diff --git a/je_auto_control/utils/executor/flow_control.py b/je_auto_control/utils/executor/flow_control.py index cae96138..fc13ebaa 100644 --- a/je_auto_control/utils/executor/flow_control.py +++ b/je_auto_control/utils/executor/flow_control.py @@ -406,7 +406,7 @@ def exec_random_to_var(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: """Store a random value (int / float / choice) in a flow variable.""" import random - rng = random.Random(args.get("seed")) + rng = random.Random(args.get("seed")) # nosec B311 # reason: non-crypto test data kind = str(args.get("kind", "int")) if kind == "choice": value: Any = rng.choice(list(args.get("choices") or [None])) @@ -431,6 +431,39 @@ def exec_assert_var(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: ).to_dict() +def exec_pdf_to_var(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: + """Extract a PDF's text (all pages or one page) into a flow variable.""" + from je_auto_control.utils.pdf.pdf_reader import extract_pdf_text + text = extract_pdf_text(args["path"], pages=args.get("page")) + var_name = args.get("var", "pdf_text") + executor.variables.set(var_name, text) + return {"var": var_name, "length": len(text)} + + +def exec_sql_to_var(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: + """Run a read-only SQLite query and store its result in a flow variable.""" + from je_auto_control.utils.sql.sql_query import query_sqlite + fetch = str(args.get("fetch", "all")) + result = query_sqlite(args["database"], args["query"], + params=args.get("params"), fetch=fetch) + var_name = args.get("var", "sql_result") + executor.variables.set(var_name, result) + return {"var": var_name, "fetch": fetch} + + +def exec_assert_db(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: + """Assert a scalar SQLite query result satisfies a condition.""" + from je_auto_control.utils.assertion import assert_variable + from je_auto_control.utils.sql.sql_query import query_sqlite + value = query_sqlite(args["database"], args["query"], + params=args.get("params"), fetch="scalar") + return assert_variable( + value, op=str(args.get("op", "eq")), expected=args.get("expected"), + name="AC_assert_db", + raise_on_fail=bool(args.get("raise_on_fail", True)), + ).to_dict() + + def exec_read_file_to_var(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: """Read a file's text content into a flow variable.""" @@ -441,21 +474,6 @@ def exec_read_file_to_var(executor: Any, return {"var": var_name, "length": len(text)} -def _http_get(url: str, method: str, timeout: float) -> tuple: - """Issue an HTTP(S) GET, returning ``(status, body)``. http/https only.""" - import urllib.request - scheme = url.split("://", 1)[0].lower() if "://" in url else "" - if scheme not in ("http", "https"): - raise AutoControlActionException( - f"AC_http_to_var: only http/https URLs allowed, got {url!r}" - ) - request = urllib.request.Request(url, method=method.upper()) - with urllib.request.urlopen( # nosec B310 — scheme allow-listed above - request, timeout=timeout) as response: - return int(response.status), response.read().decode( - "utf-8", errors="replace") - - def _dig_json(body: str, path: str) -> Any: """Navigate a dotted JSON path, e.g. ``data.0.name``.""" data = json.loads(body) @@ -465,16 +483,24 @@ def _dig_json(body: str, path: str) -> Any: def exec_http_to_var(executor: Any, args: Mapping[str, Any]) -> Dict[str, Any]: - """GET a URL and store the body (or a JSON field) in a flow variable.""" - status, body = _http_get( - args["url"], str(args.get("method", "GET")), - float(args.get("timeout", 30.0)), + """Request a URL and store the body (or a JSON field) in a flow variable. + + Supports method/headers/json_body/data/auth via the shared HTTP client, + so the same command drives plain GET reads and POST/PUT API calls. + """ + from je_auto_control.utils.http_client.http_client import http_request + response = http_request( + args["url"], method=str(args.get("method", "GET")), + headers=args.get("headers"), json_body=args.get("json_body"), + data=args.get("data"), auth=args.get("auth"), + timeout=float(args.get("timeout", 30.0)), ) json_path = args.get("json_path") + body = response["text"] value = _dig_json(body, json_path) if json_path else body var_name = args.get("var", "http_response") executor.variables.set(var_name, value) - return {"var": var_name, "status": status} + return {"var": var_name, "status": response["status"]} _SIMPLE_TRANSFORMS: Dict[str, Callable[[str], str]] = { @@ -643,6 +669,9 @@ def exec_call_macro(executor: Any, args: Mapping[str, Any]) -> Any: "AC_ocr_to_var": exec_ocr_to_var, "AC_shell_to_var": exec_shell_to_var, "AC_read_file_to_var": exec_read_file_to_var, + "AC_pdf_to_var": exec_pdf_to_var, + "AC_sql_to_var": exec_sql_to_var, + "AC_assert_db": exec_assert_db, "AC_http_to_var": exec_http_to_var, "AC_transform_var": exec_transform_var, "AC_now_to_var": exec_now_to_var, diff --git a/je_auto_control/utils/http_client/__init__.py b/je_auto_control/utils/http_client/__init__.py new file mode 100644 index 00000000..5ea5dcf2 --- /dev/null +++ b/je_auto_control/utils/http_client/__init__.py @@ -0,0 +1,4 @@ +"""Dependency-free HTTP(S) client for AutoControl action steps.""" +from je_auto_control.utils.http_client.http_client import http_request + +__all__ = ["http_request"] diff --git a/je_auto_control/utils/http_client/http_client.py b/je_auto_control/utils/http_client/http_client.py new file mode 100644 index 00000000..2548e62f --- /dev/null +++ b/je_auto_control/utils/http_client/http_client.py @@ -0,0 +1,104 @@ +"""Perform HTTP(S) requests headlessly for first-class API action steps. + +A small, dependency-free client built on the standard library, so the +package needs no ``requests`` dependency. Supports method, headers, a JSON +or raw body, basic/bearer auth, and an explicit timeout; it returns a +plain response dict (status / ok / headers / text / json / url) so that +non-2xx responses are inspectable rather than raised. Imports no +``PySide6`` and only allows http/https schemes (Bandit B310). +""" +import base64 +import json +import urllib.error +import urllib.request +from typing import Any, Dict, Mapping, Optional + +# NOSONAR python:S5332 — http is allow-listed deliberately (other schemes +# are rejected); plain http is required for internal/localhost endpoints. +_ALLOWED_SCHEMES = ("http://", "https://") # NOSONAR python:S5332 +_DEFAULT_TIMEOUT = 30.0 + + +def _validate_url(url: str) -> None: + if not isinstance(url, str) or not url.lower().startswith(_ALLOWED_SCHEMES): + raise ValueError(f"only http/https URLs are allowed, got {url!r}") + + +def _apply_auth(headers: Dict[str, str], + auth: Optional[Mapping[str, Any]]) -> None: + if not auth: + return + kind = str(auth.get("type", "")).lower() + if kind == "bearer": + headers["Authorization"] = f"Bearer {auth.get('token', '')}" + elif kind == "basic": + raw = f"{auth.get('username', '')}:{auth.get('password', '')}".encode() + headers["Authorization"] = "Basic " + base64.b64encode(raw).decode() + else: + raise ValueError(f"unknown auth type: {auth.get('type')!r}") + + +def _build_headers(headers: Optional[Mapping[str, Any]], json_body: Any, + auth: Optional[Mapping[str, Any]]) -> Dict[str, str]: + result = {str(key): str(value) for key, value in (headers or {}).items()} + has_content_type = any(key.lower() == "content-type" for key in result) + if json_body is not None and not has_content_type: + result["Content-Type"] = "application/json" + _apply_auth(result, auth) + return result + + +def _encode_body(json_body: Any, data: Any) -> Optional[bytes]: + if json_body is not None: + return json.dumps(json_body).encode("utf-8") + if data is None: + return None + return data.encode("utf-8") if isinstance(data, str) else bytes(data) + + +def _try_json(text: str) -> Any: + try: + return json.loads(text) + except (ValueError, TypeError): + return None + + +def _read_response(response: Any) -> Dict[str, Any]: + status = int(getattr(response, "status", None) or getattr(response, "code", 0)) + text = response.read().decode("utf-8", errors="replace") + raw_headers = getattr(response, "headers", None) + headers = dict(raw_headers.items()) if raw_headers else {} + return { + "status": status, + "ok": 200 <= status < 400, + "headers": headers, + "text": text, + "json": _try_json(text), + "url": getattr(response, "url", None), + } + + +def http_request(url: str, method: str = "GET", + headers: Optional[Mapping[str, Any]] = None, + json_body: Any = None, data: Any = None, + auth: Optional[Mapping[str, Any]] = None, + timeout: float = _DEFAULT_TIMEOUT) -> Dict[str, Any]: + """Perform an HTTP(S) request and return a response dict. + + ``json_body`` is serialised to JSON (setting Content-Type when absent); + ``data`` sends a raw string/bytes body. ``auth`` is a dict such as + ``{"type": "bearer", "token": ...}`` or + ``{"type": "basic", "username": ..., "password": ...}``. Non-2xx/3xx + responses are returned (with their body) rather than raised, so callers + can assert on status codes. + """ + _validate_url(url) + request = urllib.request.Request( + url, data=_encode_body(json_body, data), method=str(method).upper(), + headers=_build_headers(headers, json_body, auth)) + try: + with urllib.request.urlopen( # nosec B310 — scheme allow-listed + request, timeout=float(timeout)) as response: + return _read_response(response) + except urllib.error.HTTPError as error: + return _read_response(error) diff --git a/je_auto_control/utils/humanize/motion.py b/je_auto_control/utils/humanize/motion.py index f6c5b37e..5a21eafc 100644 --- a/je_auto_control/utils/humanize/motion.py +++ b/je_auto_control/utils/humanize/motion.py @@ -60,7 +60,7 @@ def humanized_path(start: Point, end: Point, is set. """ motion = motion or HumanizedMotion() - rng = random.Random(motion.seed) + rng = random.Random(motion.seed) # nosec B311 # reason: non-crypto motion jitter sx, sy = float(start[0]), float(start[1]) ex, ey = float(end[0]), float(end[1]) dx, dy = ex - sx, ey - sy diff --git a/je_auto_control/utils/humanize/typing.py b/je_auto_control/utils/humanize/typing.py index fe6524ec..e6059e46 100644 --- a/je_auto_control/utils/humanize/typing.py +++ b/je_auto_control/utils/humanize/typing.py @@ -20,7 +20,7 @@ def humanized_key_delays(text: str, *, base_delay: float = 0.05, ``pause_chance`` probability of an extra ``pause_delay`` (a human pausing to think). Deterministic when ``seed`` is set. """ - rng = random.Random(seed) + rng = random.Random(seed) # nosec B311 # reason: non-crypto typing jitter delays: List[float] = [] for _ in text: delay = max(0.0, base_delay + rng.uniform(-jitter, jitter)) diff --git a/je_auto_control/utils/json/json_file.py b/je_auto_control/utils/json/json_file.py index cd762888..effb6b6e 100644 --- a/je_auto_control/utils/json/json_file.py +++ b/je_auto_control/utils/json/json_file.py @@ -1,4 +1,5 @@ import json +import os from pathlib import Path from threading import Lock from typing import List, Dict @@ -41,4 +42,25 @@ def write_action_json(json_save_path: str, action_json: list) -> None: with open(json_save_path, "w+", encoding="utf-8") as file_to_write: json.dump(action_json, file_to_write, indent=4, ensure_ascii=False) except (OSError, TypeError, ValueError) as error: - raise AutoControlJsonActionException(f"{cant_save_json_error_message}: {repr(error)}") from error \ No newline at end of file + raise AutoControlJsonActionException(f"{cant_save_json_error_message}: {repr(error)}") from error + + +def format_action_json(json_file_path: str, check: bool = False) -> bool: + """ + Canonicalise an action JSON file's layout (4-space indent, UTF-8). + 標準化動作 JSON 檔案的排版 + + :param json_file_path: JSON 檔案路徑 + :param check: 若為 True,只回報是否需要重新排版而不寫入 + :return: 內容是否改變 (或在 check 模式下是否將會改變) + """ + real_path = os.path.realpath(json_file_path) + actions = read_action_json(real_path) + canonical = json.dumps(actions, indent=4, ensure_ascii=False) + with open(real_path, encoding="utf-8") as read_file: + current = read_file.read() + changed = current.strip() != canonical.strip() + if check or not changed: + return changed + write_action_json(real_path, actions) + return True \ No newline at end of file diff --git a/je_auto_control/utils/mcp_server/tools/_factories.py b/je_auto_control/utils/mcp_server/tools/_factories.py index c40c991d..0fbc2a7f 100644 --- a/je_auto_control/utils/mcp_server/tools/_factories.py +++ b/je_auto_control/utils/mcp_server/tools/_factories.py @@ -1263,6 +1263,38 @@ def smart_wait_tools() -> List[MCPTool]: handler=h.wait_region_idle, annotations=READ_ONLY, ), + MCPTool( + name="ac_wait_for_file", + description=("Block until a file exists, is >= min_size bytes, and " + "its size has held steady for stable_for_s seconds " + "(i.e. a download finished writing). Returns a " + "WaitOutcome (succeeded/reason/elapsed_s)."), + input_schema=schema({ + "path": {"type": "string"}, + "timeout_s": {"type": "number"}, + "poll_interval_s": {"type": "number"}, + "stable_for_s": {"type": "number"}, + "min_size": {"type": "integer"}, + }, required=["path"]), + handler=h.wait_for_file, + annotations=READ_ONLY, + ), + MCPTool( + name="ac_wait_for_port", + description=("Block until a TCP connection to host:port succeeds " + "(e.g. wait for a server to come up after launching " + "it). Returns a WaitOutcome (succeeded/reason/" + "elapsed_s)."), + input_schema=schema({ + "host": {"type": "string"}, + "port": {"type": "integer"}, + "timeout_s": {"type": "number"}, + "poll_interval_s": {"type": "number"}, + "connect_timeout_s": {"type": "number"}, + }, required=["host", "port"]), + handler=h.wait_for_port, + annotations=READ_ONLY, + ), ] @@ -2110,6 +2142,154 @@ def data_source_tools() -> List[MCPTool]: ] +def pdf_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_extract_pdf_text", + description=("Extract text from a PDF file. 'pages' is null (all " + "pages), a 1-based page number, or a list of them. " + "Requires the optional pypdf package."), + input_schema=schema({ + "path": {"type": "string"}, + "pages": {"type": ["integer", "array", "null"]}, + }, required=["path"]), + handler=h.extract_pdf_text, + annotations=READ_ONLY, + ), + MCPTool( + name="ac_assert_pdf_text", + description=("Assert that text is present (or absent when " + "present=false) in a PDF, optionally restricted to a " + "1-based 'page'. Set case_sensitive=false for a " + "case-insensitive match. Raises on failure unless " + "raise_on_fail is false."), + input_schema=schema({ + "path": {"type": "string"}, + "text": {"type": "string"}, + "present": {"type": "boolean"}, + "page": {"type": "integer"}, + "case_sensitive": {"type": "boolean"}, + "raise_on_fail": {"type": "boolean"}, + }, required=["path", "text"]), + handler=h.assert_pdf_text, + annotations=READ_ONLY, + ), + ] + + +def email_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_send_email", + description=("Send an email via SMTP. 'message' = {sender, to, " + "subject, body, cc?, html?, attachments?} (to/cc may " + "be a string or list; attachments are file paths). " + "'smtp' = {host, port?, username?, password?, " + "use_tls?, use_ssl?, timeout?}; TLS is on by default. " + "Sends mail (irreversible side effect)."), + input_schema=schema({ + "message": {"type": "object"}, + "smtp": {"type": "object"}, + }, required=["message", "smtp"]), + handler=h.send_email, + annotations=SIDE_EFFECT_ONLY, + ), + ] + + +def sql_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_sql_query", + description=("Run a read-only SELECT/WITH query against a SQLite " + "database file and return the result. fetch=all (list " + "of row objects), one (a single row or null), or " + "scalar (first column of first row). Bind values via " + "'params' (?/:name placeholders) — never interpolate. " + "A single read-only statement only."), + input_schema=schema({ + "database": {"type": "string"}, + "query": {"type": "string"}, + "params": {"type": ["array", "object"]}, + "fetch": {"type": "string", "enum": ["all", "one", "scalar"]}, + }, required=["database", "query"]), + handler=h.sql_query, + annotations=READ_ONLY, + ), + MCPTool( + name="ac_assert_db", + description=("Run a scalar SELECT and assert its value against " + "'expected' with op=eq|ne|lt|le|gt|ge|contains|" + "startswith|endswith (e.g. SELECT COUNT(*) ... == 0). " + "Bind values via 'params'. Raises on failure unless " + "raise_on_fail is false."), + input_schema=schema({ + "database": {"type": "string"}, + "query": {"type": "string"}, + "params": {"type": ["array", "object"]}, + "op": {"type": "string"}, + "expected": {}, + "raise_on_fail": {"type": "boolean"}, + }, required=["database", "query"]), + handler=h.assert_db, + annotations=READ_ONLY, + ), + ] + + +def http_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_http_request", + description=("Perform an HTTP(S) request and return a response dict " + "(status, ok, headers, text, json, url). method=GET|" + "POST|PUT|PATCH|DELETE|HEAD; send a JSON body via " + "'json_body' or a raw body via 'data'; 'auth' is " + "{type:bearer, token} or {type:basic, username, " + "password}. Non-2xx responses are returned, not raised, " + "so you can assert on status. http/https only."), + input_schema=schema({ + "url": {"type": "string"}, + "method": {"type": "string", + "enum": ["GET", "POST", "PUT", "PATCH", + "DELETE", "HEAD"]}, + "headers": {"type": "object"}, + "json_body": {"type": "object"}, + "data": {"type": "string"}, + "auth": {"type": "object"}, + "timeout": {"type": "number"}, + }, required=["url"]), + handler=h.http_request, + annotations=SIDE_EFFECT_ONLY, + ), + ] + + +def codegen_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_generate_code", + description=("Generate runnable test code from an action list or a " + "JSON action-file path. target=pytest|python|robot; " + "style=calls (readable ac.(...) statements, the " + "default) or actions (embed the list and replay via " + "execute_action). Pass 'output' to also write the file. " + "Returns the generated source code."), + input_schema=schema({ + "source": {"type": ["array", "string"], + "description": "Action list, or path to a JSON action file."}, + "target": {"type": "string", + "enum": ["pytest", "python", "robot"]}, + "style": {"type": "string", "enum": ["calls", "actions"]}, + "name": {"type": "string"}, + "output": {"type": "string"}, + }, required=["source"]), + handler=h.generate_code, + annotations=SIDE_EFFECT_ONLY, + ), + ] + + def flakiness_tools() -> List[MCPTool]: return [ MCPTool( @@ -2304,6 +2484,7 @@ def media_assert_tools() -> List[MCPTool]: scheduler_tools, trigger_tools, hotkey_tools, screen_record_tools, process_and_shell_tools, remote_desktop_tools, gamepad_tools, usb_passthrough_tools, assertion_tools, data_source_tools, + sql_tools, http_tools, email_tools, pdf_tools, codegen_tools, flakiness_tools, suite_tools, quarantine_tools, a11y_audit_tools, device_matrix_tools, media_assert_tools, ) diff --git a/je_auto_control/utils/mcp_server/tools/_handlers.py b/je_auto_control/utils/mcp_server/tools/_handlers.py index 74740503..e1fad24d 100644 --- a/je_auto_control/utils/mcp_server/tools/_handlers.py +++ b/je_auto_control/utils/mcp_server/tools/_handlers.py @@ -953,6 +953,29 @@ def wait_screen_stable(region: Optional[List[int]] = None, ).to_dict() +def wait_for_file(path: str, timeout_s: float = 30.0, + poll_interval_s: float = 0.25, + stable_for_s: float = 1.0, + min_size: int = 1) -> Dict[str, Any]: + from je_auto_control.utils.smart_waits import wait_until_file + return wait_until_file( + path, timeout_s=float(timeout_s), + poll_interval_s=float(poll_interval_s), + stable_for_s=float(stable_for_s), min_size=int(min_size), + ).to_dict() + + +def wait_for_port(host: str, port: int, timeout_s: float = 30.0, + poll_interval_s: float = 0.25, + connect_timeout_s: float = 1.0) -> Dict[str, Any]: + from je_auto_control.utils.smart_waits import wait_until_port + return wait_until_port( + host, int(port), timeout_s=float(timeout_s), + poll_interval_s=float(poll_interval_s), + connect_timeout_s=float(connect_timeout_s), + ).to_dict() + + def wait_pixel_changes(x: int, y: int, timeout_s: float = 10.0, poll_interval_s: float = 0.1, @@ -1746,6 +1769,87 @@ def load_data(source: Dict[str, Any], return load_rows(source, limit=limit if limit is None else int(limit)) +# --- Ad-hoc SQL query + assertion ------------------------------------------ + +def sql_query(database: str, query: str, + params: Any = None, fetch: str = "all") -> Any: + from je_auto_control.utils.sql.sql_query import query_sqlite + return query_sqlite(database, query, params=params, fetch=fetch) + + +def assert_db(database: str, query: str, params: Any = None, + op: str = "eq", expected: Any = None, + raise_on_fail: bool = True) -> Dict[str, Any]: + from je_auto_control.utils.assertion import assert_variable + from je_auto_control.utils.sql.sql_query import query_sqlite + value = query_sqlite(database, query, params=params, fetch="scalar") + return assert_variable( + value, op=op, expected=expected, name="ac_assert_db", + raise_on_fail=bool(raise_on_fail), + ).to_dict() + + +# --- PDF text + assertion -------------------------------------------------- + +def extract_pdf_text(path: str, pages: Any = None) -> str: + from je_auto_control.utils.pdf.pdf_reader import ( + extract_pdf_text as _extract, + ) + return _extract(path, pages=pages) + + +def assert_pdf_text(path: str, text: str, present: bool = True, + page: Any = None, case_sensitive: bool = True, + raise_on_fail: bool = True) -> Dict[str, Any]: + from je_auto_control.utils.pdf.pdf_reader import ( + assert_pdf_text as _assert, + ) + return _assert(path, text, present=bool(present), page=page, + case_sensitive=bool(case_sensitive), + raise_on_fail=bool(raise_on_fail)) + + +# --- Send email (SMTP) ----------------------------------------------------- + +def send_email(message: Dict[str, Any], + smtp: Dict[str, Any]) -> Dict[str, Any]: + from je_auto_control.utils.email_send.email_sender import ( + send_email as _send, + ) + return _send(message, smtp) + + +# --- HTTP / API request ---------------------------------------------------- + +def http_request(url: str, method: str = "GET", + headers: Optional[Dict[str, Any]] = None, + json_body: Any = None, data: Any = None, + auth: Optional[Dict[str, Any]] = None, + timeout: float = 30.0) -> Dict[str, Any]: + from je_auto_control.utils.http_client.http_client import ( + http_request as _request, + ) + return _request(url, method=method, headers=headers, json_body=json_body, + data=data, auth=auth, timeout=float(timeout)) + + +# --- Codegen: action list -> source code ----------------------------------- + +def generate_code(source: Any, target: str = "pytest", + name: str = "recorded_flow", style: str = "calls", + output: Optional[str] = None) -> Dict[str, Any]: + from je_auto_control.utils.codegen.codegen import ( + generate_code as _gen, generate_code_file, + ) + from je_auto_control.utils.json.json_file import read_action_json + if output: + code = generate_code_file(source, output, target=target, + name=name, style=style) + return {"output": output, "code": code} + actions = source if isinstance(source, list) else read_action_json(source) + return {"code": _gen(actions, target=target, name=name, style=style)} + + # --- Flaky-test detection -------------------------------------------------- def flaky_report(limit: int = 500, diff --git a/je_auto_control/utils/pdf/__init__.py b/je_auto_control/utils/pdf/__init__.py new file mode 100644 index 00000000..43fe8267 --- /dev/null +++ b/je_auto_control/utils/pdf/__init__.py @@ -0,0 +1,11 @@ +"""Read and assert on PDF documents (optional pypdf backend).""" +from je_auto_control.utils.pdf.pdf_reader import ( + assert_pdf_text, + extract_pdf_text, + pdf_metadata, + pdf_page_count, +) + +__all__ = [ + "assert_pdf_text", "extract_pdf_text", "pdf_metadata", "pdf_page_count", +] diff --git a/je_auto_control/utils/pdf/pdf_reader.py b/je_auto_control/utils/pdf/pdf_reader.py new file mode 100644 index 00000000..26690d23 --- /dev/null +++ b/je_auto_control/utils/pdf/pdf_reader.py @@ -0,0 +1,76 @@ +"""Read text from PDF documents and assert on their content. + +Backed by the optional ``pypdf`` package (pure-Python, MIT). The single +``_open_pdf`` seam imports it lazily and raises a clear ``RuntimeError`` +when it is missing, mirroring the optional Excel backend in the +data-source loader; the rest of the module stays import-light. Imports no +``PySide6`` so PDF checks run fully headlessly. +""" +import os +from typing import Any, Dict, Iterable, List, Optional, Union + +from je_auto_control.utils.exception.exceptions import AutoControlAssertionException + +PageSelector = Optional[Union[int, Iterable[int]]] + + +def _open_pdf(path: str): + """Return a ``pypdf.PdfReader`` for an existing PDF; raise if unavailable.""" + try: + from pypdf import PdfReader + except ImportError as error: + raise RuntimeError( + "PDF features require pypdf (pip install pypdf).") from error + real = os.path.realpath(path) + if not os.path.isfile(real): + raise FileNotFoundError(f"PDF not found: {path}") + return PdfReader(real) + + +def _page_indices(pages: PageSelector, total: int) -> List[int]: + """Translate a 1-based page selector into 0-based indices (or all pages).""" + if pages is None: + return list(range(total)) + wanted = [pages] if isinstance(pages, int) else [int(p) for p in pages] + indices: List[int] = [] + for one_based in wanted: + index = int(one_based) - 1 + if not 0 <= index < total: + raise ValueError(f"page {one_based} out of range 1..{total}") + indices.append(index) + return indices + + +def extract_pdf_text(path: str, pages: PageSelector = None) -> str: + """Extract text from a PDF; ``pages`` is None (all), a 1-based page, or list.""" + reader = _open_pdf(path) + indices = _page_indices(pages, len(reader.pages)) + return "\n".join(reader.pages[i].extract_text() or "" for i in indices) + + +def pdf_page_count(path: str) -> int: + """Return the number of pages in a PDF.""" + return len(_open_pdf(path).pages) + + +def pdf_metadata(path: str) -> Dict[str, Any]: + """Return the PDF's document metadata (keys without the leading slash).""" + meta = _open_pdf(path).metadata or {} + return {str(key).lstrip("/"): str(value) for key, value in dict(meta).items()} + + +def assert_pdf_text(path: str, text: str, *, present: bool = True, + page: PageSelector = None, case_sensitive: bool = True, + raise_on_fail: bool = True) -> Dict[str, Any]: + """Assert ``text`` is present (or absent) in a PDF, optionally on ``page``.""" + extracted = extract_pdf_text(path, pages=page) + haystack = extracted if case_sensitive else extracted.lower() + needle = str(text) if case_sensitive else str(text).lower() + found = needle in haystack + passed = (found == bool(present)) + where = f" on page {page}" if page is not None else "" + state = "contains" if found else "does not contain" + message = f"assert_pdf_text: PDF{where} {state} {text!r} (present={present})" + if not passed and raise_on_fail: + raise AutoControlAssertionException(message) + return {"kind": "pdf_text", "passed": passed, "message": message} diff --git a/je_auto_control/utils/smart_waits/__init__.py b/je_auto_control/utils/smart_waits/__init__.py index 80b9e167..d70b69a3 100644 --- a/je_auto_control/utils/smart_waits/__init__.py +++ b/je_auto_control/utils/smart_waits/__init__.py @@ -8,16 +8,17 @@ ) """ from je_auto_control.utils.smart_waits.waits import ( - ClipboardReader, Frame, ScreenSampler, WaitOutcome, WindowFinder, - wait_until_clipboard_changes, wait_until_pixel_changes, - wait_until_region_idle, wait_until_screen_stable, - wait_until_window_closed, + ClipboardReader, FileStatReader, Frame, PortConnector, ScreenSampler, + WaitOutcome, WindowFinder, wait_until_clipboard_changes, wait_until_file, + wait_until_pixel_changes, wait_until_port, wait_until_region_idle, + wait_until_screen_stable, wait_until_window_closed, ) __all__ = [ - "ClipboardReader", "Frame", "ScreenSampler", "WaitOutcome", - "WindowFinder", "wait_until_clipboard_changes", - "wait_until_pixel_changes", "wait_until_region_idle", + "ClipboardReader", "FileStatReader", "Frame", "PortConnector", + "ScreenSampler", "WaitOutcome", "WindowFinder", + "wait_until_clipboard_changes", "wait_until_file", + "wait_until_pixel_changes", "wait_until_port", "wait_until_region_idle", "wait_until_screen_stable", "wait_until_window_closed", ] diff --git a/je_auto_control/utils/smart_waits/waits.py b/je_auto_control/utils/smart_waits/waits.py index 9b03fe4f..bb203236 100644 --- a/je_auto_control/utils/smart_waits/waits.py +++ b/je_auto_control/utils/smart_waits/waits.py @@ -22,6 +22,9 @@ from dataclasses import asdict, dataclass from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +_TIMEOUT_POSITIVE = "timeout_s must be positive" +_POLL_POSITIVE = "poll_interval_s must be positive" + @dataclass(frozen=True) class WaitOutcome: @@ -78,9 +81,9 @@ def wait_until_screen_stable(*, samples; ``timeout_s`` is the absolute cap. """ if timeout_s <= 0: - raise ValueError("timeout_s must be positive") + raise ValueError(_TIMEOUT_POSITIVE) if poll_interval_s <= 0: - raise ValueError("poll_interval_s must be positive") + raise ValueError(_POLL_POSITIVE) if stable_for_s < 0: raise ValueError("stable_for_s must be >= 0") grab = sampler or _default_sampler @@ -114,7 +117,7 @@ def wait_until_pixel_changes(*, x: int, y: int, ) -> WaitOutcome: """Return when the pixel at ``(x, y)`` changes beyond ``rgb_tolerance``.""" if timeout_s <= 0: - raise ValueError("timeout_s must be positive") + raise ValueError(_TIMEOUT_POSITIVE) grab = sampler or _default_sampler started = time.monotonic() deadline = started + float(timeout_s) @@ -169,9 +172,9 @@ def wait_until_clipboard_changes(*, tests need no real clipboard. """ if timeout_s <= 0: - raise ValueError("timeout_s must be positive") + raise ValueError(_TIMEOUT_POSITIVE) if poll_interval_s <= 0: - raise ValueError("poll_interval_s must be positive") + raise ValueError(_POLL_POSITIVE) read = reader or _default_clipboard_reader started = time.monotonic() deadline = started + float(timeout_s) @@ -214,9 +217,9 @@ def wait_until_window_closed(title: str, *, case_sensitive: bool = False, a matching window still exists; it is injectable for tests. """ if timeout_s <= 0: - raise ValueError("timeout_s must be positive") + raise ValueError(_TIMEOUT_POSITIVE) if poll_interval_s <= 0: - raise ValueError("poll_interval_s must be positive") + raise ValueError(_POLL_POSITIVE) exists = finder or _default_window_finder started = time.monotonic() deadline = started + float(timeout_s) @@ -235,6 +238,113 @@ def _default_window_finder(title: str, case_sensitive: bool) -> bool: return find_window(title, case_sensitive=case_sensitive) is not None +FileStatReader = Callable[[str], Optional[int]] + + +def wait_until_file(path: str, *, + timeout_s: float = 30.0, + poll_interval_s: float = 0.25, + stable_for_s: float = 1.0, + min_size: int = 1, + stat_reader: Optional[FileStatReader] = None, + ) -> WaitOutcome: + """Return when ``path`` exists, is >= ``min_size`` bytes, and its size has + held steady for ``stable_for_s`` (i.e. a download has finished writing). + + ``stat_reader(path) -> size or None`` is injectable so tests need no real + growing file; the default reports the on-disk size (``None`` when absent). + """ + if timeout_s <= 0: + raise ValueError(_TIMEOUT_POSITIVE) + if poll_interval_s <= 0: + raise ValueError(_POLL_POSITIVE) + if stable_for_s < 0: + raise ValueError("stable_for_s must be >= 0") + read = stat_reader or _default_file_size + tracker = _StableSize(float(stable_for_s), int(min_size)) + started = time.monotonic() + deadline = started + float(timeout_s) + samples = 0 + while time.monotonic() < deadline: + samples += 1 + if tracker.ready(read(str(path))): + return _finish(True, "file ready", started, samples) + time.sleep(float(poll_interval_s)) + return _finish(False, "timeout while waiting for file", started, samples) + + +class _StableSize: + """Track whether a file's size has stayed >= min for long enough.""" + + def __init__(self, stable_for_s: float, min_size: int) -> None: + self._stable_for_s = stable_for_s + self._min_size = min_size + self._last: Optional[int] = None + self._since: Optional[float] = None + + def ready(self, size: Optional[int]) -> bool: + if size is None or size < self._min_size: + self._last, self._since = size, None + return False + now = time.monotonic() + if size != self._last: + self._last, self._since = size, now + return self._since is not None and now - self._since >= self._stable_for_s + + +def _default_file_size(path: str) -> Optional[int]: + """Return the on-disk byte size of ``path``, or None if it is absent.""" + import os + try: + return os.path.getsize(path) + except OSError: + return None + + +PortConnector = Callable[[str, int, float], bool] + + +def wait_until_port(host: str, port: int, *, + timeout_s: float = 30.0, + poll_interval_s: float = 0.25, + connect_timeout_s: float = 1.0, + connector: Optional[PortConnector] = None, + ) -> WaitOutcome: + """Return when a TCP connection to ``(host, port)`` succeeds, else timeout. + + The closing companion to launching a server: poll until the port + accepts connections. ``connector(host, port, timeout) -> bool`` is + injectable so tests need no real listener. + """ + if timeout_s <= 0: + raise ValueError(_TIMEOUT_POSITIVE) + if poll_interval_s <= 0: + raise ValueError(_POLL_POSITIVE) + if not 0 < int(port) <= 65535: + raise ValueError("port must be in 1..65535") + probe = connector or _default_port_connector + started = time.monotonic() + deadline = started + float(timeout_s) + samples = 0 + while time.monotonic() < deadline: + samples += 1 + if probe(str(host), int(port), float(connect_timeout_s)): + return _finish(True, f"port {host}:{port} open", started, samples) + time.sleep(float(poll_interval_s)) + return _finish(False, f"timeout waiting for {host}:{port}", + started, samples) + + +def _default_port_connector(host: str, port: int, timeout: float) -> bool: + """Return True if a TCP connection to ``(host, port)`` can be opened.""" + import socket + try: + with socket.create_connection((host, port), timeout=timeout): + return True + except OSError: + return False + + # --- internals ------------------------------------------------- def _frame_diff(a: Frame, b: Frame) -> int: @@ -271,8 +381,9 @@ def _finish(succeeded: bool, reason: str, started: float, __all__ = [ - "ClipboardReader", "Frame", "ScreenSampler", "WaitOutcome", - "WindowFinder", "wait_until_clipboard_changes", - "wait_until_pixel_changes", "wait_until_region_idle", + "ClipboardReader", "FileStatReader", "Frame", "PortConnector", + "ScreenSampler", "WaitOutcome", "WindowFinder", + "wait_until_clipboard_changes", "wait_until_file", + "wait_until_pixel_changes", "wait_until_port", "wait_until_region_idle", "wait_until_screen_stable", "wait_until_window_closed", ] diff --git a/je_auto_control/utils/sql/__init__.py b/je_auto_control/utils/sql/__init__.py new file mode 100644 index 00000000..016fb681 --- /dev/null +++ b/je_auto_control/utils/sql/__init__.py @@ -0,0 +1,4 @@ +"""Ad-hoc read-only SQL queries against a SQLite database.""" +from je_auto_control.utils.sql.sql_query import query_sqlite + +__all__ = ["query_sqlite"] diff --git a/je_auto_control/utils/sql/sql_query.py b/je_auto_control/utils/sql/sql_query.py new file mode 100644 index 00000000..30cef5b1 --- /dev/null +++ b/je_auto_control/utils/sql/sql_query.py @@ -0,0 +1,52 @@ +"""Run ad-hoc read-only SQL queries for ``AC_sql_to_var`` / ``AC_assert_db``. + +Generalises the SQLite reading already used by the data-source loader into +a standalone query helper that returns rows, a single row, or a scalar. +Queries are restricted to a single read-only ``SELECT``/``WITH`` statement +and run against a read-only connection; values are always bound as +parameters (never string-interpolated) to avoid SQL injection. Imports no +``PySide6`` so it stays fully headless. +""" +import sqlite3 +from typing import Any, Dict, List, Optional, Union + +from je_auto_control.utils.data_source.data_source import ( + _resolve_path, _validate_select, +) + +_FetchResult = Union[List[Dict[str, Any]], Dict[str, Any], Any, None] + + +def _shape(cursor: sqlite3.Cursor, fetch: str) -> _FetchResult: + """Reduce a cursor to the requested result shape.""" + if fetch in ("all", "rows"): + return [dict(row) for row in cursor.fetchall()] + if fetch == "one": + row = cursor.fetchone() + return dict(row) if row is not None else None + if fetch == "scalar": + row = cursor.fetchone() + return row[0] if row is not None else None + raise ValueError( + f"unknown fetch mode {fetch!r}; expected all/one/scalar") + + +def query_sqlite(database: str, query: str, + params: Optional[Union[list, tuple, dict]] = None, + fetch: str = "all") -> _FetchResult: + """Run a read-only SELECT/WITH against ``database`` and return its result. + + :param database: path to a SQLite database file (opened read-only). + :param query: a single SELECT/WITH statement; reject anything else. + :param params: values bound to ``?``/``:name`` placeholders in ``query``. + :param fetch: ``all`` (list of row dicts), ``one`` (a row dict or None), + or ``scalar`` (the first column of the first row, or None). + """ + path = _resolve_path(database) + statement = _validate_select(str(query)) + uri = f"file:{path}?mode=ro" + with sqlite3.connect(uri, uri=True) as connection: + connection.row_factory = sqlite3.Row + cursor = connection.execute( + statement, params if params is not None else ()) + return _shape(cursor, fetch) diff --git a/je_auto_control/wrapper/auto_control_record.py b/je_auto_control/wrapper/auto_control_record.py index acea3fb5..bdb40602 100644 --- a/je_auto_control/wrapper/auto_control_record.py +++ b/je_auto_control/wrapper/auto_control_record.py @@ -1,8 +1,12 @@ +import os import sys +import threading +from typing import Optional from je_auto_control.utils.exception.exception_tags import macos_record_error_message from je_auto_control.utils.exception.exceptions import AutoControlException from je_auto_control.utils.exception.exceptions import AutoControlJsonActionException +from je_auto_control.utils.json.json_file import write_action_json from je_auto_control.utils.logging.logging_instance import autocontrol_logger from je_auto_control.utils.test_record.record_test_class import record_action_to_list from je_auto_control.wrapper.platform_wrapper import recorder @@ -46,3 +50,26 @@ def stop_record() -> list: except (OSError, RuntimeError, AttributeError, TypeError, ValueError, AutoControlException, AutoControlJsonActionException) as error: record_action_to_list("stop_record", None, repr(error)) autocontrol_logger.error(f"stop_record, failed: {repr(error)}") + + +def record_to_json(output_path: str, *, stop_event: threading.Event, + timeout: Optional[float] = None) -> list: + """ + Record input until ``stop_event`` is set (or ``timeout``), saving to JSON. + + The caller owns ``stop_event`` and signals it (for example when the user + presses Enter), keeping terminal I/O out of this headless helper. + + :param output_path: 錄製結果的儲存路徑 + :param stop_event: 設定後即停止錄製的事件旗標 + :param timeout: 最長錄製秒數,None 代表等到 stop_event 為止 + :return: 錄製到的動作列表 + """ + target = os.path.realpath(output_path) + record() + try: + stop_event.wait(timeout) + finally: + actions = stop_record() or [] + write_action_json(target, actions) + return actions diff --git a/pyproject.toml b/pyproject.toml index e98b04b2..da75d458 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ classifiers = [ ] [project.scripts] +je_auto_control = "je_auto_control.cli:main" je_auto_control_mcp = "je_auto_control.utils.mcp_server.__main__:main" [project.entry-points.pytest11] @@ -68,6 +69,7 @@ gui = ["PySide6==6.11.1", "qt-material==2.17"] webrtc = ["aiortc>=1.14.0", "av>=14.0.0"] signaling = ["fastapi>=0.115", "uvicorn>=0.32"] discovery = ["zeroconf>=0.130"] +pdf = ["pypdf>=4.0"] [tool.bandit] exclude_dirs = [ diff --git a/test/unit_test/headless/test_cli.py b/test/unit_test/headless/test_cli.py index c2974928..3a097ec2 100644 --- a/test/unit_test/headless/test_cli.py +++ b/test/unit_test/headless/test_cli.py @@ -1,8 +1,11 @@ """Tests for the CLI entry point (argument parsing + dry-run execution).""" import json +import sys +import threading import pytest +import je_auto_control as ac from je_auto_control.cli import _parse_vars, build_parser, main @@ -51,3 +54,111 @@ def test_list_jobs_prints_nothing_when_empty(capsys): out = capsys.readouterr().out for line in out.splitlines(): assert "\t" in line + + +def _write(tmp_path, actions, name="s.json"): + path = tmp_path / name + path.write_text(json.dumps(actions), encoding="utf-8") + return str(path) + + +def test_validate_accepts_valid_file(tmp_path): + assert main(["validate", _write(tmp_path, [["AC_screen_size"]])]) == 0 + + +def test_lint_alias_accepts_valid_file(tmp_path): + assert main(["lint", _write(tmp_path, [["AC_screen_size"]])]) == 0 + + +def test_validate_rejects_unknown_command(tmp_path): + assert main(["validate", _write(tmp_path, [["AC_not_a_real_command"]])]) == 1 + + +def test_validate_missing_file_returns_one(tmp_path): + assert main(["validate", str(tmp_path / "missing.json")]) == 1 + + +def test_list_commands_text_filtered(capsys): + assert main(["list-commands", "--filter", "mouse"]) == 0 + out = capsys.readouterr().out + assert "AC_click_mouse" in out + + +def test_list_commands_json_is_sorted(capsys): + assert main(["list-commands", "--json"]) == 0 + names = json.loads(capsys.readouterr().out) + assert "AC_screen_size" in names + assert names == sorted(names) + + +def test_fmt_check_then_reformat(tmp_path, capsys): + path = _write(tmp_path, [["AC_screen_size"]]) + assert main(["fmt", "--check", path]) == 1 # compact -> drift + assert main(["fmt", path]) == 0 # reformat in place + capsys.readouterr() + assert main(["fmt", "--check", path]) == 0 # now canonical + + +def test_format_action_json_helper_is_idempotent(tmp_path): + path = _write(tmp_path, [["AC_screen_size"]]) + assert ac.format_action_json(path, check=True) is True + assert ac.format_action_json(path) is True + assert ac.format_action_json(path) is False + assert ac.format_action_json(path, check=True) is False + + +def test_record_subcommand_delegates_to_helper(tmp_path, monkeypatch): + captured = {} + + def fake_record_to_json(output_path, *, stop_event, timeout=None): + captured["output"] = output_path + captured["timeout"] = timeout + return [["AC_type_keyboard", {"keycode": "a"}]] + + monkeypatch.setattr( + "je_auto_control.wrapper.auto_control_record.record_to_json", + fake_record_to_json) + out = str(tmp_path / "rec.json") + rc = main(["record", out, "--duration", "0"]) + if sys.platform == "darwin": + assert rc == 1 + else: + assert rc == 0 + assert captured["output"] == out + assert captured["timeout"] == pytest.approx(0.0) + + +def test_record_to_json_helper_writes_file(tmp_path, monkeypatch): + recorded = [["AC_type_keyboard", {"keycode": "x"}]] + import je_auto_control.wrapper.auto_control_record as rec_mod + monkeypatch.setattr(rec_mod, "record", lambda: None) + monkeypatch.setattr(rec_mod, "stop_record", lambda: recorded) + out = str(tmp_path / "rec2.json") + event = threading.Event() + event.set() # return immediately without real recording + actions = ac.record_to_json(out, stop_event=event) + assert actions == recorded + assert json.loads( + (tmp_path / "rec2.json").read_text(encoding="utf-8")) == recorded + + +def test_version_subcommand(capsys): + assert main(["version"]) == 0 + assert capsys.readouterr().out.strip() + + +def test_top_level_import_stays_qt_free(): + """Import the facade in a fresh interpreter so the check isn't polluted + by GUI tests that may have already imported PySide6 in this session.""" + import subprocess + script = ( + "import sys, je_auto_control as ac\n" + "ac.format_action_json; ac.record_to_json # touch CLI helpers\n" + "qt = [m for m in sys.modules if 'PySide6' in m]\n" + "import json; print(json.dumps(qt))\n" + ) + result = subprocess.run( # nosec B603 # nosemgrep + [sys.executable, "-c", script], + capture_output=True, text=True, check=True, timeout=60, + ) + assert result.stdout.strip() in ("[]", "") diff --git a/test/unit_test/headless/test_codegen.py b/test/unit_test/headless/test_codegen.py new file mode 100644 index 00000000..472e8e68 --- /dev/null +++ b/test/unit_test/headless/test_codegen.py @@ -0,0 +1,109 @@ +"""Headless tests for AutoControl codegen (action list -> source code). + +No PySide6 is imported; generated Python is checked for valid syntax via +``compile`` so the tests stay fast and side-effect free. +""" +import json + +import pytest + +import je_auto_control as ac +from je_auto_control.utils.codegen.codegen import generate_code, generate_code_file + +_ACTIONS = [ + ["AC_click_mouse", {"mouse_keycode": "mouse_left", "x": 500, "y": 500}], + ["AC_type_keyboard", {"keycode": "a"}], + ["AC_loop", {"times": 2, "body": [["AC_screen_size"]]}], +] + + +def _compiles(source: str) -> bool: + compile(source, "", "exec") + return True + + +def test_calls_style_maps_commands_to_facade_calls(): + code = generate_code(_ACTIONS, target="pytest", style="calls") + assert "import je_auto_control as ac" in code + assert "def test_recorded_flow():" in code + assert "ac.click_mouse(mouse_keycode='mouse_left', x=500, y=500)" in code + assert "ac.type_keyboard(keycode='a')" in code + # flow-control falls back to the executor + assert "ac.execute_action([['AC_loop'," in code + assert _compiles(code) + + +def test_actions_style_embeds_and_replays(): + code = generate_code(_ACTIONS, target="python", style="actions") + assert "actions = [" in code + assert "ac.execute_action(actions)" in code + assert 'if __name__ == "__main__":' in code + assert _compiles(code) + + +def test_python_target_has_main_guard(): + code = generate_code(_ACTIONS, target="python", name="My Flow") + assert "def my_flow():" in code + assert " my_flow()" in code + assert _compiles(code) + + +def test_robot_target_is_self_contained(): + code = generate_code(_ACTIONS, target="robot", name="login_flow") + assert "*** Test Cases ***" in code + assert "Login Flow" in code + assert "Evaluate" in code + payload_line = [ln for ln in code.splitlines() if "json.loads" in ln][0] + assert "AC_click_mouse" in payload_line + + +def test_name_is_slugged_to_valid_identifier(): + code = generate_code(_ACTIONS, target="pytest", name="123 weird-name!") + assert "def test_flow_123_weird_name():" in code + assert _compiles(code) + + +def test_unknown_target_rejected(): + with pytest.raises(ValueError): + generate_code(_ACTIONS, target="java") + + +def test_unknown_style_rejected(): + with pytest.raises(ValueError): + generate_code(_ACTIONS, style="bogus") + + +def test_empty_actions_rejected(): + with pytest.raises(ValueError): + generate_code([]) + + +def test_generate_code_file_from_path(tmp_path): + src = tmp_path / "flow.json" + src.write_text(json.dumps(_ACTIONS), encoding="utf-8") + out = tmp_path / "test_flow.py" + code = generate_code_file(str(src), str(out), target="pytest") + assert out.read_text(encoding="utf-8") == code + assert _compiles(code) + + +def test_facade_reexports_codegen(): + assert ac.generate_code is generate_code + assert ac.generate_code_file is generate_code_file + + +def test_executor_command_registered(): + assert "AC_generate_code" in ac.executor.known_commands() + code = ac.execute_action( + [["AC_generate_code", {"source": _ACTIONS, "target": "pytest"}]]) + # execute_action returns a record dict; the value is the generated code + assert any("import je_auto_control" in str(value) for value in code.values()) + + +def test_cli_codegen_to_file(tmp_path): + from je_auto_control.cli import main + src = tmp_path / "flow.json" + src.write_text(json.dumps(_ACTIONS), encoding="utf-8") + out = tmp_path / "gen.py" + assert main(["codegen", str(src), "--target", "pytest", "-o", str(out)]) == 0 + assert _compiles(out.read_text(encoding="utf-8")) diff --git a/test/unit_test/headless/test_email_send.py b/test/unit_test/headless/test_email_send.py new file mode 100644 index 00000000..ae4f459c --- /dev/null +++ b/test/unit_test/headless/test_email_send.py @@ -0,0 +1,119 @@ +"""Headless tests for the email-send action (send_email / AC_send_email). + +No real SMTP server is contacted: smtplib.SMTP / SMTP_SSL are replaced +with a fake that records the message and the calls made. +""" +import pytest + +import je_auto_control as ac +from je_auto_control.utils.email_send import email_sender + + +class _FakeSMTP: + """Records send_message / starttls / login without any network I/O.""" + + instances = [] + + def __init__(self, host, port, timeout=None, context=None): + self.host, self.port, self.timeout = host, port, timeout + self.started_tls = False + self.logged_in = None + self.sent = None + _FakeSMTP.instances.append(self) + + def __enter__(self): + return self + + def __exit__(self, *_exc): + return False + + def starttls(self, context=None): + self.started_tls = True + + def login(self, username, password): + self.logged_in = (username, password) + + def send_message(self, mime): + self.sent = mime + + +@pytest.fixture(autouse=True) +def _fake_smtp(monkeypatch): + _FakeSMTP.instances = [] + monkeypatch.setattr(email_sender.smtplib, "SMTP", _FakeSMTP) + monkeypatch.setattr(email_sender.smtplib, "SMTP_SSL", _FakeSMTP) + return _FakeSMTP + + +def _msg(**over): + base = {"sender": "a@x.com", "to": "b@y.com", + "subject": "Hi", "body": "Body"} + base.update(over) + return base + + +def test_sends_with_starttls_by_default(): + result = ac.send_email(_msg(), {"host": "smtp.x.com"}) + assert result["sent"] is True + server = _FakeSMTP.instances[-1] + assert server.started_tls is True + assert server.sent["Subject"] == "Hi" + assert server.sent["To"] == "b@y.com" + + +def test_login_when_credentials_given(): + ac.send_email(_msg(), {"host": "smtp.x.com", "username": "u", + "password": "p"}) # NOSONAR python:S2068 + assert _FakeSMTP.instances[-1].logged_in == ("u", "p") + + +def test_ssl_path_skips_starttls(): + ac.send_email(_msg(), {"host": "smtp.x.com", "port": 465, + "use_ssl": True}) + server = _FakeSMTP.instances[-1] + assert server.port == 465 + assert server.started_tls is False + + +def test_multiple_recipients_and_cc(): + ac.send_email(_msg(to=["b@y.com", "c@y.com"], cc="d@y.com"), + {"host": "smtp.x.com"}) + server = _FakeSMTP.instances[-1] + assert server.sent["To"] == "b@y.com, c@y.com" + assert server.sent["Cc"] == "d@y.com" + + +def test_attachment_is_added(tmp_path): + attach = tmp_path / "report.txt" + attach.write_text("hello", encoding="utf-8") + ac.send_email(_msg(attachments=[str(attach)]), {"host": "smtp.x.com"}) + mime = _FakeSMTP.instances[-1].sent + names = [part.get_filename() for part in mime.iter_attachments()] + assert "report.txt" in names + + +def test_missing_attachment_raises(tmp_path): + with pytest.raises(FileNotFoundError): + ac.send_email(_msg(attachments=[str(tmp_path / "nope.txt")]), + {"host": "smtp.x.com"}) + + +def test_missing_sender_or_recipient_raises(): + with pytest.raises(ValueError): + ac.send_email({"subject": "x"}, {"host": "smtp.x.com"}) + + +def test_missing_host_raises(): + with pytest.raises(ValueError): + ac.send_email(_msg(), {}) + + +def test_facade_executor_and_mcp_wiring(): + assert ac.send_email is email_sender.send_email + assert "AC_send_email" in ac.executor.known_commands() + record = ac.execute_action( + [["AC_send_email", {"message": _msg(), "smtp": {"host": "smtp.x.com"}}]]) + assert any("sent" in str(value) for value in record.values()) + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {tool.name for tool in build_default_tool_registry()} + assert "ac_send_email" in names diff --git a/test/unit_test/headless/test_flow_var_commands.py b/test/unit_test/headless/test_flow_var_commands.py index 6879a093..cc524238 100644 --- a/test/unit_test/headless/test_flow_var_commands.py +++ b/test/unit_test/headless/test_flow_var_commands.py @@ -25,8 +25,9 @@ def test_read_file_to_var(tmp_path): def test_http_to_var_stores_body(monkeypatch): - monkeypatch.setattr(flow_control, "_http_get", - lambda url, method, timeout: (200, "BODYTEXT")) + import je_auto_control.utils.http_client.http_client as hc + monkeypatch.setattr(hc, "http_request", + lambda *a, **k: {"status": 200, "text": "BODYTEXT"}) executor = Executor() result = exec_http_to_var(executor, {"url": "https://x", "var": "resp"}) assert result["status"] == 200 @@ -34,9 +35,10 @@ def test_http_to_var_stores_body(monkeypatch): def test_http_to_var_extracts_json_path(monkeypatch): + import je_auto_control.utils.http_client.http_client as hc monkeypatch.setattr( - flow_control, "_http_get", - lambda url, method, timeout: (200, '{"data": [{"name": "Sam"}]}')) + hc, "http_request", + lambda *a, **k: {"status": 200, "text": '{"data": [{"name": "Sam"}]}'}) executor = Executor() exec_http_to_var(executor, {"url": "https://x", "var": "n", "json_path": "data.0.name"}) diff --git a/test/unit_test/headless/test_http_client.py b/test/unit_test/headless/test_http_client.py new file mode 100644 index 00000000..f77632ec --- /dev/null +++ b/test/unit_test/headless/test_http_client.py @@ -0,0 +1,129 @@ +"""Headless tests for the HTTP/API request action. + +No real network is used: urllib.request.urlopen is monkeypatched with a +fake response/HTTPError so the tests stay deterministic and offline. +""" +import io +import json +import urllib.error + +import pytest + +import je_auto_control as ac +from je_auto_control.utils.http_client import http_client + + +class _FakeResponse: + def __init__(self, status, body, headers=None, url="https://x"): + self.status = status + self._body = body.encode("utf-8") if isinstance(body, str) else body + self.headers = headers or {"Content-Type": "application/json"} + self.url = url + + def read(self): + return self._body + + def __enter__(self): + return self + + def __exit__(self, *_exc): + return False + + +def _capture_request(captured): + def fake_urlopen(request, timeout=None): + captured["request"] = request + captured["timeout"] = timeout + return _FakeResponse(200, json.dumps({"ok": True})) + return fake_urlopen + + +def test_get_parses_json_and_status(monkeypatch): + monkeypatch.setattr(http_client.urllib.request, "urlopen", + lambda req, timeout=None: _FakeResponse( + 200, '{"hello": "world"}')) + resp = http_client.http_request("https://api.example/data") + assert resp["status"] == 200 + assert resp["ok"] is True + assert resp["json"] == {"hello": "world"} + assert resp["text"] == '{"hello": "world"}' + + +def test_post_sends_json_body_and_content_type(monkeypatch): + captured = {} + monkeypatch.setattr(http_client.urllib.request, "urlopen", + _capture_request(captured)) + http_client.http_request("https://api.example/items", method="post", + json_body={"name": "Sam"}) + request = captured["request"] + assert request.method == "POST" + assert request.data == b'{"name": "Sam"}' + assert request.headers.get("Content-type") == "application/json" + + +def test_bearer_auth_header(monkeypatch): + captured = {} + monkeypatch.setattr(http_client.urllib.request, "urlopen", + _capture_request(captured)) + http_client.http_request("https://api.example", auth={ + "type": "bearer", "token": "abc123"}) + assert captured["request"].headers.get("Authorization") == "Bearer abc123" + + +def test_basic_auth_header(monkeypatch): + captured = {} + monkeypatch.setattr(http_client.urllib.request, "urlopen", + _capture_request(captured)) + http_client.http_request("https://api.example", auth={ + "type": "basic", "username": "u", "password": "p"}) # NOSONAR python:S2068 + # base64("u:p") == "dTpw" + assert captured["request"].headers.get("Authorization") == "Basic dTpw" + + +def test_unknown_auth_type_rejected(monkeypatch): + monkeypatch.setattr(http_client.urllib.request, "urlopen", + lambda req, timeout=None: _FakeResponse(200, "{}")) + with pytest.raises(ValueError): + http_client.http_request("https://x", auth={"type": "oauth"}) + + +def test_http_error_is_returned_not_raised(monkeypatch): + def raise_http_error(req, timeout=None): + raise urllib.error.HTTPError( + "https://x", 404, "Not Found", {"Content-Type": "text/plain"}, + io.BytesIO(b"missing")) + monkeypatch.setattr(http_client.urllib.request, "urlopen", raise_http_error) + resp = http_client.http_request("https://x") + assert resp["status"] == 404 + assert resp["ok"] is False + assert resp["text"] == "missing" + + +def test_non_http_scheme_rejected(): + with pytest.raises(ValueError): + http_client.http_request("file:///etc/passwd") + + +def test_timeout_is_passed_through(monkeypatch): + captured = {} + monkeypatch.setattr(http_client.urllib.request, "urlopen", + _capture_request(captured)) + http_client.http_request("https://x", timeout=5) + assert captured["timeout"] == pytest.approx(5.0) + + +def test_facade_and_executor_wiring(monkeypatch): + monkeypatch.setattr(http_client.urllib.request, "urlopen", + lambda req, timeout=None: _FakeResponse(201, '{"id": 7}')) + assert ac.http_request is http_client.http_request + assert "AC_http_request" in ac.executor.known_commands() + record = ac.execute_action( + [["AC_http_request", {"url": "https://x", "method": "POST", + "json_body": {"a": 1}}]]) + assert any("201" in str(value) for value in record.values()) + + +def test_mcp_tool_registered(): + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {tool.name for tool in build_default_tool_registry()} + assert "ac_http_request" in names diff --git a/test/unit_test/headless/test_pdf.py b/test/unit_test/headless/test_pdf.py new file mode 100644 index 00000000..05c93048 --- /dev/null +++ b/test/unit_test/headless/test_pdf.py @@ -0,0 +1,125 @@ +"""Headless tests for PDF text extraction and assertion. + +The single ``_open_pdf`` seam is monkeypatched with a fake reader, so the +tests need neither the optional pypdf package nor a real PDF file. +""" +import pytest + +import je_auto_control as ac +from je_auto_control.utils.exception.exceptions import AutoControlAssertionException +from je_auto_control.utils.executor.action_executor import Executor +from je_auto_control.utils.executor.flow_control import exec_pdf_to_var +from je_auto_control.utils.pdf import pdf_reader + + +class _FakePage: + def __init__(self, text): + self._text = text + + def extract_text(self): + return self._text + + +class _FakeReader: + def __init__(self, pages, metadata=None): + self.pages = [_FakePage(t) for t in pages] + self.metadata = metadata or {} + + +@pytest.fixture() +def fake_pdf(monkeypatch): + reader = _FakeReader( + ["Invoice 123\nAmount due", "Total: $50.00", "Thank you"], + metadata={"/Title": "Inv", "/Author": "Acme"}) + monkeypatch.setattr(pdf_reader, "_open_pdf", lambda _path: reader) + return reader + + +def test_extract_all_pages(fake_pdf): + text = pdf_reader.extract_pdf_text("x.pdf") + assert "Invoice 123" in text + assert "Total: $50.00" in text + assert "Thank you" in text + + +def test_extract_single_page(fake_pdf): + assert pdf_reader.extract_pdf_text("x.pdf", pages=2) == "Total: $50.00" + + +def test_extract_page_out_of_range(fake_pdf): + with pytest.raises(ValueError): + pdf_reader.extract_pdf_text("x.pdf", pages=9) + + +def test_page_count_and_metadata(fake_pdf): + assert pdf_reader.pdf_page_count("x.pdf") == 3 + meta = pdf_reader.pdf_metadata("x.pdf") + assert meta == {"Title": "Inv", "Author": "Acme"} + + +def test_assert_present_passes(fake_pdf): + result = pdf_reader.assert_pdf_text("x.pdf", "Invoice 123") + assert result["passed"] is True + + +def test_assert_absent_passes(fake_pdf): + result = pdf_reader.assert_pdf_text("x.pdf", "Refund", present=False) + assert result["passed"] is True + + +def test_assert_present_fails_raises(fake_pdf): + with pytest.raises(AutoControlAssertionException): + pdf_reader.assert_pdf_text("x.pdf", "Nonexistent") + + +def test_assert_no_raise_returns_false(fake_pdf): + result = pdf_reader.assert_pdf_text("x.pdf", "Nope", raise_on_fail=False) + assert result["passed"] is False + + +def test_assert_case_insensitive(fake_pdf): + result = pdf_reader.assert_pdf_text("x.pdf", "invoice 123", + case_sensitive=False) + assert result["passed"] is True + + +def test_assert_on_specific_page(fake_pdf): + ok = pdf_reader.assert_pdf_text("x.pdf", "Total", page=2) + assert ok["passed"] is True + with pytest.raises(AutoControlAssertionException): + pdf_reader.assert_pdf_text("x.pdf", "Total", page=1) + + +def test_pdf_to_var_flow(fake_pdf): + executor = Executor() + result = exec_pdf_to_var(executor, {"path": "x.pdf", "var": "doc", "page": 1}) + assert result["var"] == "doc" + assert executor.variables.get_value("doc") == "Invoice 123\nAmount due" + + +def test_missing_pypdf_raises_runtimeerror(monkeypatch): + import builtins + real_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "pypdf": + raise ImportError("no pypdf") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + with pytest.raises(RuntimeError): + pdf_reader._open_pdf("x.pdf") + + +def test_facade_executor_and_mcp_wiring(fake_pdf): + assert ac.assert_pdf_text is pdf_reader.assert_pdf_text + assert ac.extract_pdf_text is pdf_reader.extract_pdf_text + known = ac.executor.known_commands() + assert {"AC_assert_pdf_text", "AC_pdf_to_var"} <= known + record = ac.execute_action( + [["AC_assert_pdf_text", {"path": "x.pdf", "text": "Invoice 123"}]]) + assert any("passed" in str(value) or "pdf_text" in str(value) + for value in record.values()) + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {tool.name for tool in build_default_tool_registry()} + assert {"ac_extract_pdf_text", "ac_assert_pdf_text"} <= names diff --git a/test/unit_test/headless/test_sql_steps.py b/test/unit_test/headless/test_sql_steps.py new file mode 100644 index 00000000..29aeffe4 --- /dev/null +++ b/test/unit_test/headless/test_sql_steps.py @@ -0,0 +1,118 @@ +"""Headless tests for the generic SQL steps (sql_to_var / assert_db). + +Uses a real temporary SQLite database file (stdlib sqlite3); no PySide6. +""" +import sqlite3 + +import pytest + +import je_auto_control as ac +from je_auto_control.utils.exception.exceptions import AutoControlAssertionException +from je_auto_control.utils.executor.action_executor import Executor +from je_auto_control.utils.executor.flow_control import ( + exec_assert_db, exec_sql_to_var, +) +from je_auto_control.utils.sql.sql_query import query_sqlite + + +@pytest.fixture() +def users_db(tmp_path): + path = tmp_path / "app.db" + with sqlite3.connect(str(path)) as conn: + conn.execute("CREATE TABLE users (id INTEGER, name TEXT, active INT)") + conn.executemany( + "INSERT INTO users VALUES (?, ?, ?)", + [(1, "Sam", 1), (2, "Lee", 0), (3, "Ann", 1)], + ) + conn.commit() + return str(path) + + +def test_query_all_returns_row_dicts(users_db): + rows = query_sqlite(users_db, "SELECT id, name FROM users ORDER BY id") + assert rows == [{"id": 1, "name": "Sam"}, + {"id": 2, "name": "Lee"}, + {"id": 3, "name": "Ann"}] + + +def test_query_scalar_and_params(users_db): + count = query_sqlite(users_db, "SELECT COUNT(*) FROM users WHERE active = ?", + params=[1], fetch="scalar") + assert count == 2 + + +def test_query_one_or_none(users_db): + row = query_sqlite(users_db, "SELECT name FROM users WHERE id = ?", + params=[2], fetch="one") + assert row == {"name": "Lee"} + missing = query_sqlite(users_db, "SELECT name FROM users WHERE id = ?", + params=[99], fetch="one") + assert missing is None + + +def test_named_params(users_db): + name = query_sqlite(users_db, "SELECT name FROM users WHERE id = :id", + params={"id": 3}, fetch="scalar") + assert name == "Ann" + + +def test_non_select_rejected(users_db): + with pytest.raises(ValueError): + query_sqlite(users_db, "DELETE FROM users") + + +def test_multi_statement_rejected(users_db): + with pytest.raises(ValueError): + query_sqlite(users_db, "SELECT 1; SELECT 2") + + +def test_unknown_fetch_rejected(users_db): + with pytest.raises(ValueError): + query_sqlite(users_db, "SELECT 1", fetch="many") + + +def test_missing_db_rejected(tmp_path): + with pytest.raises((FileNotFoundError, ValueError)): + query_sqlite(str(tmp_path / "nope.db"), "SELECT 1") + + +def test_sql_to_var_stores_scalar(users_db): + executor = Executor() + exec_sql_to_var(executor, { + "database": users_db, "query": "SELECT COUNT(*) FROM users", + "var": "n", "fetch": "scalar"}) + assert executor.variables.get_value("n") == 3 + + +def test_assert_db_passes_and_fails(users_db): + executor = Executor() + ok = exec_assert_db(executor, { + "database": users_db, + "query": "SELECT COUNT(*) FROM users WHERE active = 1", + "op": "eq", "expected": 2}) + assert ok["passed"] is True + with pytest.raises(AutoControlAssertionException): + exec_assert_db(executor, { + "database": users_db, "query": "SELECT COUNT(*) FROM users", + "op": "eq", "expected": 999}) + + +def test_facade_and_executor_wiring(users_db): + assert ac.query_sqlite is query_sqlite + known = ac.executor.known_commands() + assert "AC_sql_to_var" in known + assert "AC_assert_db" in known + record = ac.execute_action([ + ["AC_sql_to_var", {"database": users_db, "var": "total", + "query": "SELECT COUNT(*) FROM users", + "fetch": "scalar"}], + ["AC_assert_db", {"database": users_db, "op": "ge", "expected": 1, + "query": "SELECT COUNT(*) FROM users"}], + ]) + assert record # both steps executed + + +def test_mcp_tools_registered(): + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {tool.name for tool in build_default_tool_registry()} + assert {"ac_sql_query", "ac_assert_db"} <= names diff --git a/test/unit_test/headless/test_wait_for_file.py b/test/unit_test/headless/test_wait_for_file.py new file mode 100644 index 00000000..b8bdf37f --- /dev/null +++ b/test/unit_test/headless/test_wait_for_file.py @@ -0,0 +1,83 @@ +"""Headless tests for AC_wait_for_file / wait_until_file. + +The file-size reader is injectable, so most cases are deterministic and +need no real filesystem; one case exercises the real default reader. +""" +import pytest + +import je_auto_control as ac +from je_auto_control.utils.smart_waits.waits import wait_until_file + + +def _reader(values): + """Return a stat_reader yielding ``values`` then repeating the last one.""" + seq = list(values) + + def read(_path): + return seq.pop(0) if len(seq) > 1 else seq[0] + return read + + +def test_succeeds_when_file_present(): + outcome = wait_until_file("f", stable_for_s=0, stat_reader=_reader([100])) + assert outcome.succeeded is True + assert outcome.reason == "file ready" + + +def test_succeeds_after_file_appears(): + outcome = wait_until_file( + "f", stable_for_s=0, timeout_s=1.0, poll_interval_s=0.01, + stat_reader=_reader([None, None, 50])) + assert outcome.succeeded is True + + +def test_missing_file_times_out(): + outcome = wait_until_file( + "f", timeout_s=0.2, poll_interval_s=0.02, stable_for_s=0, + stat_reader=_reader([None])) + assert outcome.succeeded is False + assert "timeout" in outcome.reason + + +def test_min_size_gate_blocks_small_file(): + outcome = wait_until_file( + "f", timeout_s=0.2, poll_interval_s=0.02, stable_for_s=0, + min_size=10, stat_reader=_reader([3])) + assert outcome.succeeded is False + + +def test_stability_requires_steady_size(): + outcome = wait_until_file( + "f", timeout_s=1.0, poll_interval_s=0.02, stable_for_s=0.1, + stat_reader=_reader([200])) + assert outcome.succeeded is True + assert outcome.elapsed_s >= 0.1 + + +@pytest.mark.parametrize("kwargs", [ + {"timeout_s": 0}, {"poll_interval_s": 0}, {"stable_for_s": -1}, +]) +def test_validation_errors(kwargs): + with pytest.raises(ValueError): + wait_until_file("f", stat_reader=_reader([1]), **kwargs) + + +def test_default_reader_on_real_file(tmp_path): + target = tmp_path / "download.bin" + target.write_bytes(b"hello") + outcome = wait_until_file(str(target), stable_for_s=0, timeout_s=1.0) + assert outcome.succeeded is True + + +def test_facade_executor_and_mcp_wiring(tmp_path): + target = tmp_path / "done.txt" + target.write_text("ok", encoding="utf-8") + assert ac.wait_until_file is wait_until_file + assert "AC_wait_for_file" in ac.executor.known_commands() + record = ac.execute_action( + [["AC_wait_for_file", {"path": str(target), "stable_for_s": 0, + "timeout_s": 1.0}]]) + assert any("file ready" in str(value) for value in record.values()) + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {tool.name for tool in build_default_tool_registry()} + assert "ac_wait_for_file" in names diff --git a/test/unit_test/headless/test_wait_for_port.py b/test/unit_test/headless/test_wait_for_port.py new file mode 100644 index 00000000..469c100a --- /dev/null +++ b/test/unit_test/headless/test_wait_for_port.py @@ -0,0 +1,87 @@ +"""Headless tests for AC_wait_for_port / wait_until_port. + +The TCP connector is injectable, so most cases are deterministic and need +no real socket; one case binds a real loopback listener. +""" +import socket +import threading + +import pytest + +import je_auto_control as ac +from je_auto_control.utils.smart_waits.waits import wait_until_port + + +def _connector(values): + """Return a connector yielding ``values`` then repeating the last one.""" + seq = list(values) + + def connect(_host, _port, _timeout): + return seq.pop(0) if len(seq) > 1 else seq[0] + return connect + + +def test_succeeds_when_port_open(): + outcome = wait_until_port("localhost", 8080, + connector=_connector([True])) + assert outcome.succeeded is True + assert "open" in outcome.reason + + +def test_succeeds_after_port_comes_up(): + outcome = wait_until_port( + "localhost", 9000, timeout_s=1.0, poll_interval_s=0.01, + connector=_connector([False, False, True])) + assert outcome.succeeded is True + assert outcome.samples_taken >= 3 + + +def test_times_out_when_port_closed(): + outcome = wait_until_port( + "localhost", 9000, timeout_s=0.2, poll_interval_s=0.02, + connector=_connector([False])) + assert outcome.succeeded is False + assert "timeout" in outcome.reason + + +@pytest.mark.parametrize("kwargs", [ + {"timeout_s": 0}, {"poll_interval_s": 0}, {"port": 0}, {"port": 70000}, +]) +def test_validation_errors(kwargs): + args = {"host": "localhost", "port": 8080, "timeout_s": 10.0, + "poll_interval_s": 0.25, "connect_timeout_s": 1.0} + args.update(kwargs) + with pytest.raises(ValueError): + wait_until_port( + args["host"], args["port"], timeout_s=args["timeout_s"], + poll_interval_s=args["poll_interval_s"], + connect_timeout_s=args["connect_timeout_s"], + connector=_connector([True])) + + +def test_real_loopback_listener(): + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind(("127.0.0.1", 0)) + server.listen(1) + port = server.getsockname()[1] + accepted = [] + threading.Thread( + target=lambda: accepted.append(server.accept()), daemon=True).start() + try: + outcome = wait_until_port("127.0.0.1", port, timeout_s=2.0) + assert outcome.succeeded is True + finally: + server.close() + + +def test_facade_executor_and_mcp_wiring(): + assert ac.wait_until_port is wait_until_port + assert "AC_wait_for_port" in ac.executor.known_commands() + record = ac.execute_action( + [["AC_wait_for_port", {"host": "127.0.0.1", "port": 9, "timeout_s": 0.15, + "poll_interval_s": 0.02, "connect_timeout_s": 0.05}]]) + # port 9 (discard) is almost certainly closed -> a timeout outcome recorded + assert any("9" in str(value) for value in record.values()) + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {tool.name for tool in build_default_tool_registry()} + assert "ac_wait_for_port" in names