diff --git a/CLAUDE.md b/CLAUDE.md index 142fa503..3d6d190a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -242,6 +242,7 @@ generic_automation_mcp/ │ ├── _mcp_names.py # MCP prefix detection │ ├── _onboarding.py # First-run detection + guided menu │ ├── _prompts.py # Orchestrator prompt builder +│ ├── _timed_input.py # timed_prompt() and status_line() CLI primitives │ ├── _update.py # run_update_command(): first-class upgrade path for `autoskillit update` │ ├── _update_checks.py # Unified startup update check: version/hook/source-drift signals, branch-aware dismissal │ ├── _workspace.py # Workspace clean helpers diff --git a/src/autoskillit/cli/_cook.py b/src/autoskillit/cli/_cook.py index 69ffc008..a6cac5cb 100644 --- a/src/autoskillit/cli/_cook.py +++ b/src/autoskillit/cli/_cook.py @@ -115,12 +115,13 @@ def cook(*, resume: bool = False, session_id: str | None = None) -> None: print() from autoskillit.cli._ansi import permissions_warning - from autoskillit.cli._init_helpers import _require_interactive_stdin + from autoskillit.cli._timed_input import timed_prompt print(permissions_warning()) - _require_interactive_stdin("autoskillit cook") - confirm = input("\nLaunch session? [Enter/n]: ").strip().lower() - if confirm in ("n", "no"): + confirm = timed_prompt( + "\nLaunch session? [Enter/n]", default="", timeout=120, label="autoskillit cook" + ) + if confirm.lower() in ("n", "no"): return from autoskillit.cli._onboarding import is_first_run, run_onboarding_menu diff --git a/src/autoskillit/cli/_init_helpers.py b/src/autoskillit/cli/_init_helpers.py index 24711d03..b1164040 100644 --- a/src/autoskillit/cli/_init_helpers.py +++ b/src/autoskillit/cli/_init_helpers.py @@ -73,7 +73,8 @@ def _require_interactive_stdin(command_name: str) -> None: def _prompt_recipe_choice() -> str: - _require_interactive_stdin("autoskillit order") + from autoskillit.cli._timed_input import timed_prompt + available = list_recipes(Path.cwd()).items if not available: print("No recipes found. Run 'autoskillit recipes list' to check.") @@ -81,13 +82,16 @@ def _prompt_recipe_choice() -> str: print("Available recipes:") for i, r in enumerate(available, 1): print(f" {i}. {r.name}") - return input("Recipe name: ").strip() + return timed_prompt("Recipe name:", default="", timeout=120, label="autoskillit order") def _prompt_test_command() -> list[str]: - _require_interactive_stdin("autoskillit init") + from autoskillit.cli._timed_input import timed_prompt + default = "task test-all" - answer = input(f"Test command [{default}]: ").strip() + answer = timed_prompt( + f"Test command [{default}]:", default=default, timeout=120, label="autoskillit init" + ) return (answer if answer else default).split() @@ -116,12 +120,19 @@ def _prompt_github_repo() -> str | None: _B, _C, _D, _G, _Y, _R = _colors() detected = _detect_github_repo() + from autoskillit.cli._timed_input import timed_prompt + if detected: print(f"\n {_Y}GitHub repo{_R} {_G}{detected}{_R} {_D}(detected from git remote){_R}") - value = input(f" {_D}Press Enter to confirm, or type a different repo:{_R} ").strip() + value = timed_prompt( + "Press Enter to confirm, or type a different repo:", + default=detected or "", + timeout=120, + label="init", + ) else: print(f"\n {_Y}GitHub repo{_R} {_D}owner/repo, URL, or blank to skip{_R}") - value = input(f" {_D}Repository:{_R} ").strip() + value = timed_prompt("Repository:", default="", timeout=120, label="init") if not value: return detected @@ -251,9 +262,11 @@ def _check_secret_scanning(project_dir: Path) -> _ScanResult: " Recommended: add gitleaks to .pre-commit-config.yaml\n" " before proceeding.\n" ) + from autoskillit.cli._timed_input import timed_prompt + print(" To bypass, type exactly:\n") print(f" {_D}{_SECRET_SCAN_BYPASS_PHRASE}{_R}\n") - response = input(" > ").strip() + response = timed_prompt(">", default="", timeout=120, label="secret-scan-bypass") if response != _SECRET_SCAN_BYPASS_PHRASE: print(f"\n {_B}Aborted.{_R} Phrase did not match.") return _ScanResult(False) diff --git a/src/autoskillit/cli/_onboarding.py b/src/autoskillit/cli/_onboarding.py index 02b319a2..73ebe8ba 100644 --- a/src/autoskillit/cli/_onboarding.py +++ b/src/autoskillit/cli/_onboarding.py @@ -152,12 +152,13 @@ def run_onboarding_menu(project_dir: Path, *, color: bool = True) -> str | None: _Y = "\x1b[33m" if color else "" _R = "\x1b[0m" if color else "" + from autoskillit.cli._timed_input import timed_prompt + print(f"\n{_B}It looks like this is your first time using AutoSkillit in this project.{_R}") - try: - ans = input("Would you like help getting started? [Y/n]: ").strip().lower() - except EOFError: - ans = "" - if ans in ("n", "no"): + ans = timed_prompt( + "Would you like help getting started? [Y/n]", default="", timeout=120, label="onboarding" + ) + if ans.lower() in ("n", "no"): mark_onboarded(project_dir) return None @@ -175,10 +176,7 @@ def run_onboarding_menu(project_dir: Path, *, color: bool = True) -> str | None: print(f" {_Y}D{_R} — {_C}Write a custom recipe{_R} (runs /autoskillit:write-recipe)") print(f" {_Y}E{_R} — {_C}Skip{_R} (start a normal session)") - try: - choice = input(f"\n{_B}[A/B/C/D/E]: {_R}").strip().upper() - except EOFError: - choice = "" + choice = timed_prompt("\n[A/B/C/D/E]", default="E", timeout=120, label="onboarding").upper() # Wait up to 5s for gather_intel to complete, then shut down cleanly try: @@ -193,10 +191,7 @@ def run_onboarding_menu(project_dir: Path, *, color: bool = True) -> str | None: return "/autoskillit:setup-project" if choice == "B": - try: - ref = input("Issue URL or number: ").strip() - except EOFError: - ref = "" + ref = timed_prompt("Issue URL or number:", default="", timeout=120, label="onboarding") if ref: return f"/autoskillit:prepare-issue {ref}" return "/autoskillit:setup-project" diff --git a/src/autoskillit/cli/_timed_input.py b/src/autoskillit/cli/_timed_input.py new file mode 100644 index 00000000..a74df7fb --- /dev/null +++ b/src/autoskillit/cli/_timed_input.py @@ -0,0 +1,89 @@ +"""Timed prompt primitive with TTY guard, ANSI formatting, and timeout. + +Every user-facing ``input()`` in the CLI layer must go through +``timed_prompt()`` rather than calling ``input()`` directly. A structural +test (``test_input_tty_contracts.py``) enforces this invariant. + +``status_line()`` is the "pre-flight feedback" primitive — it prints a +single status message before any blocking I/O so the user always sees +output immediately. +""" + +from __future__ import annotations + +import select +import sys + +from autoskillit.cli._ansi import supports_color +from autoskillit.cli._init_helpers import _require_interactive_stdin + + +def timed_prompt( + text: str, + *, + default: str = "", + timeout: int = 30, + label: str = "prompt", +) -> str: + """Display a formatted prompt with a bounded wait. + + Composes three invariants into one call: + + 1. **TTY guard** — calls ``_require_interactive_stdin(label)``. + 2. **ANSI formatting** — bold prompt, dim timeout hint. + 3. **Timeout** — ``select.select`` bounds the wait; returns *default* + on expiry instead of blocking forever. + + Parameters + ---------- + text + The prompt text shown to the user (plain text — ANSI is applied + automatically based on ``supports_color()``). + default + Value returned when the user does not respond within *timeout* + seconds, or on ``EOFError``. + timeout + Maximum seconds to wait for input. ``0`` disables the timeout + (waits indefinitely — use only for prompts the user explicitly + initiated). + label + Human-readable label for the TTY-guard error message. + """ + _require_interactive_stdin(label) + + color = supports_color() + _B = "\x1b[1m" if color else "" + _D = "\x1b[2m" if color else "" + _R = "\x1b[0m" if color else "" + + hint = f" {_D}(auto-continues in {timeout}s){_R}" if timeout else "" + formatted = f"{_B}{text}{_R}{hint} " + print(formatted, end="", flush=True) + + if timeout: + try: + ready, _, _ = select.select([sys.stdin], [], [], timeout) + except (OSError, TypeError, ValueError): + # stdin lacks fileno() or fileno() returns non-int + # (e.g. pytest capture, piped input) — fall back to blocking input(). + ready = [sys.stdin] + if not ready: + print(f"\n{_D}(timed out, continuing...){_R}", flush=True) + return default + + try: + return input("").strip() + except EOFError: + return default + + +def status_line(message: str) -> None: + """Print a single-line status message with appropriate formatting. + + This is the "pre-flight feedback" primitive: call it before any + blocking I/O so the terminal is never silent. + """ + color = supports_color() + _D = "\x1b[2m" if color else "" + _R = "\x1b[0m" if color else "" + print(f"{_D}{message}{_R}", flush=True) diff --git a/src/autoskillit/cli/_update_checks.py b/src/autoskillit/cli/_update_checks.py index c4b648c5..0c0c6245 100644 --- a/src/autoskillit/cli/_update_checks.py +++ b/src/autoskillit/cli/_update_checks.py @@ -45,7 +45,6 @@ _DISMISS_FILE = "update_check.json" _STABLE_DISMISS_WINDOW = timedelta(days=7) _DEV_DISMISS_WINDOW = timedelta(hours=12) -_SNOOZE_WINDOW = timedelta(hours=1) # Disposable on-disk cache for GitHub API GETs. This is intentionally a plain # JSON dict (not ``write_versioned_json``) — it is a transient performance @@ -639,7 +638,7 @@ def run_update_checks(home: Path | None = None) -> None: InstallType.UNKNOWN, InstallType.LOCAL_PATH, InstallType.LOCAL_EDITABLE, - ): + ) and not os.environ.get("AUTOSKILLIT_FORCE_UPDATE_CHECK"): return import autoskillit as _pkg @@ -649,6 +648,11 @@ def run_update_checks(home: Path | None = None) -> None: window = dismissal_window(info) state = _read_dismiss_state(_home) + # Pre-flight feedback: ensure the terminal is never silent during network I/O + from autoskillit.cli._timed_input import status_line + + status_line("Checking for updates...") + # Gather signals raw_signals: list[Signal | None] = [ _binary_signal(info, _home, current), @@ -673,12 +677,19 @@ def run_update_checks(home: Path | None = None) -> None: return # Consolidated prompt + from autoskillit.cli._timed_input import timed_prompt + + _TIMEOUT_SENTINEL = "__timeout__" bullet_lines = "\n".join(f" - {s.message}" for s in firing_signals) - prompt_text = f"\nAutoSkillit has updates available:\n{bullet_lines}\nUpdate now? [Y/n] " - print(prompt_text, end="", flush=True) - answer = input("").strip().lower() + prompt_text = f"\nAutoSkillit has updates available:\n{bullet_lines}\nUpdate now? [Y/n]" + answer = timed_prompt(prompt_text, default=_TIMEOUT_SENTINEL, timeout=30, label="update check") + + # On timeout: proceed to app() without writing a dismissal record so the + # prompt reappears on the next invocation. + if answer == _TIMEOUT_SENTINEL: + return - if answer in ("", "y", "yes"): + if answer.lower() in ("", "y", "yes"): _run_update_sequence(info, current, _home, state, _skip_env) return diff --git a/src/autoskillit/cli/_workspace.py b/src/autoskillit/cli/_workspace.py index c1dad17d..119a3a49 100644 --- a/src/autoskillit/cli/_workspace.py +++ b/src/autoskillit/cli/_workspace.py @@ -74,12 +74,16 @@ async def run_workspace_clean( print() if not force: - from autoskillit.cli._init_helpers import _require_interactive_stdin + from autoskillit.cli._timed_input import timed_prompt - _require_interactive_stdin("autoskillit workspace clean") suffix = "ies" if len(stale) != 1 else "y" - answer = input(f"Remove {len(stale)} director{suffix}? [y/N]: ") - if answer.strip().lower() != "y": + answer = timed_prompt( + f"Remove {len(stale)} director{suffix}? [y/N]", + default="n", + timeout=120, + label="autoskillit workspace clean", + ) + if answer.lower() != "y": print("Aborted.") return @@ -148,12 +152,16 @@ def _safe_mtime(p: Path) -> float | None: print() if not force: - from autoskillit.cli._init_helpers import _require_interactive_stdin + from autoskillit.cli._timed_input import timed_prompt - _require_interactive_stdin("autoskillit workspace clean") suffix = "ies" if len(stale_wts) != 1 else "y" - answer = input(f"Remove {len(stale_wts)} worktree director{suffix}? [y/N]: ") - if answer.strip().lower() != "y": + answer = timed_prompt( + f"Remove {len(stale_wts)} worktree director{suffix}? [y/N]", + default="n", + timeout=120, + label="autoskillit workspace clean", + ) + if answer.lower() != "y": print("Aborted.") return diff --git a/src/autoskillit/cli/app.py b/src/autoskillit/cli/app.py index ebf674a2..4a005835 100644 --- a/src/autoskillit/cli/app.py +++ b/src/autoskillit/cli/app.py @@ -27,7 +27,6 @@ _log_secret_scan_bypass, _prompt_test_command, _register_all, - _require_interactive_stdin, ) from autoskillit.cli._terminal import terminal_guard from autoskillit.core import ClaudeFlags, RecipeSource, atomic_write, pkg_root @@ -587,6 +586,8 @@ def order(recipe: str | None = None, session_id: str | None = None, *, resume: b mcp_prefix = detect_autoskillit_mcp_prefix() + from autoskillit.cli._timed_input import timed_prompt + if recipe is None: from autoskillit.cli._prompts import ( _OPEN_KITCHEN_CHOICE, @@ -594,7 +595,6 @@ def order(recipe: str | None = None, session_id: str | None = None, *, resume: b _resolve_recipe_input, ) - _require_interactive_stdin("autoskillit order") available = list_recipes(Path.cwd()).items if not available: print("No recipes found. Run 'autoskillit recipes list' to check.") @@ -603,7 +603,12 @@ def order(recipe: str | None = None, session_id: str | None = None, *, resume: b print(" 0. Open kitchen (no recipe)") for i, r in enumerate(available, 1): print(f" {i}. {r.name}") - raw = input(f"Select recipe [0-{len(available)}]: ").strip() + raw = timed_prompt( + f"Select recipe [0-{len(available)}]:", + default="", + timeout=120, + label="autoskillit order", + ) resolved = _resolve_recipe_input(raw, available) if resolved is _OPEN_KITCHEN_CHOICE: from autoskillit.cli._prompts import _OPEN_KITCHEN_GREETINGS @@ -665,11 +670,12 @@ def order(recipe: str | None = None, session_id: str | None = None, *, resume: b if _needed: subset_list = ", ".join(sorted(_needed)) print(f"\nThis recipe requires subset(s): {subset_list}") - _require_interactive_stdin("autoskillit order") print(" 1. Enable temporarily (for this run only)") print(" 2. Enable permanently (update .autoskillit/config.yaml)") print(" 3. Cancel") - _choice = input("Choose [1/2/3]: ").strip() + _choice = timed_prompt( + "Choose [1/2/3]:", default="3", timeout=120, label="autoskillit order" + ) if _choice == "1": _extra_env["AUTOSKILLIT_SUBSETS__DISABLED"] = "@json []" elif _choice == "2": @@ -691,11 +697,12 @@ def order(recipe: str | None = None, session_id: str | None = None, *, resume: b if _packs_needed: pack_list = ", ".join(sorted(_packs_needed)) print(f"\nThis recipe requires pack(s): {pack_list}") - _require_interactive_stdin("autoskillit order") print(" 1. Enable temporarily (for this run only)") print(" 2. Enable permanently (update .autoskillit/config.yaml)") print(" 3. Cancel") - _pack_choice = input("Choose [1/2/3]: ").strip() + _pack_choice = timed_prompt( + "Choose [1/2/3]:", default="3", timeout=120, label="autoskillit order" + ) if _pack_choice == "1": import json as _json @@ -714,9 +721,10 @@ def order(recipe: str | None = None, session_id: str | None = None, *, resume: b from autoskillit.cli._ansi import permissions_warning print(permissions_warning()) - _require_interactive_stdin("autoskillit order") - confirm = input("Launch session? [Enter/n]: ").strip().lower() - if confirm in ("n", "no"): + confirm = timed_prompt( + "Launch session? [Enter/n]", default="", timeout=120, label="autoskillit order" + ) + if confirm.lower() in ("n", "no"): return greeting = random.choice(_COOK_GREETINGS).format(recipe_name=recipe) diff --git a/src/autoskillit/core/_type_constants.py b/src/autoskillit/core/_type_constants.py index f53a9fd2..95a83742 100644 --- a/src/autoskillit/core/_type_constants.py +++ b/src/autoskillit/core/_type_constants.py @@ -37,6 +37,7 @@ "AUTOSKILLIT_HEADLESS", "AUTOSKILLIT_SKIP_STALE_CHECK", "AUTOSKILLIT_SKIP_SOURCE_DRIFT_CHECK", + "AUTOSKILLIT_FORCE_UPDATE_CHECK", } ) diff --git a/tests/arch/_rules.py b/tests/arch/_rules.py index 6415d89e..b89da022 100644 --- a/tests/arch/_rules.py +++ b/tests/arch/_rules.py @@ -59,6 +59,7 @@ class RuleDescriptor: "_cook.py", "_init_helpers.py", "_onboarding.py", + "_timed_input.py", "_update.py", "_update_checks.py", "app.py", diff --git a/tests/arch/test_subpackage_isolation.py b/tests/arch/test_subpackage_isolation.py index 8980f5c2..ce55cc3f 100644 --- a/tests/arch/test_subpackage_isolation.py +++ b/tests/arch/test_subpackage_isolation.py @@ -691,7 +691,7 @@ def test_no_subpackage_exceeds_10_files() -> None: "recipe": 31, "execution": 26, "core": 17, - "cli": 17, + "cli": 18, "hooks": 20, } violations: list[str] = [] diff --git a/tests/cli/test_cook.py b/tests/cli/test_cook.py index 05546b1b..5f3b91d5 100644 --- a/tests/cli/test_cook.py +++ b/tests/cli/test_cook.py @@ -647,13 +647,12 @@ def test_order_named_recipe_only_confirmation_prompt( args=[], returncode=0, stdout="", stderr="" ) - prompts_seen: list[str] = [] - monkeypatch.setattr("builtins.input", lambda prompt="": prompts_seen.append(prompt) or "") + input_calls = [] + monkeypatch.setattr("builtins.input", lambda prompt="": input_calls.append(prompt) or "") cli.order("test-script") - assert len(prompts_seen) == 1, "input() should be called exactly once (confirmation)" - assert "Launch session" in prompts_seen[0] + assert len(input_calls) == 1, "input() should be called exactly once (confirmation)" @patch("autoskillit.cli.subprocess.run") def test_order_command_includes_positional_greeting( diff --git a/tests/cli/test_input_tty_contracts.py b/tests/cli/test_input_tty_contracts.py index f0d035b5..ff583b7d 100644 --- a/tests/cli/test_input_tty_contracts.py +++ b/tests/cli/test_input_tty_contracts.py @@ -1,7 +1,7 @@ """ -Structural enforcement: every input() call in src/autoskillit/cli/ must be -preceded by _require_interactive_stdin(), or the function must be in the -allowlist (_TTY_EXEMPT_FUNCTIONS). +Structural enforcement: every input() call in src/autoskillit/cli/ must go +through timed_prompt() in _timed_input.py, or the function must be in the +allowlist (_RAW_INPUT_EXEMPT_FILES). """ from __future__ import annotations @@ -11,79 +11,82 @@ import pytest -# Functions exempt from the _require_interactive_stdin() contract. -# Two allowlist categories: -# - call-site-guarded: only called from contexts that already guarantee isatty() -# - custom-handled: implement their own non-interactive path (return a result, -# not SystemExit) — cannot use _require_interactive_stdin by design -_TTY_EXEMPT_FUNCTIONS: frozenset[str] = frozenset( +# Files that are *allowed* to contain raw input() calls. +# _timed_input.py is the sole module that wraps input() with timeout/TTY/ANSI. +_RAW_INPUT_EXEMPT_FILES: frozenset[str] = frozenset( { - "_prompt_github_repo", # call-site-guarded: only caller (_register_all) wraps in isatty() - "_check_secret_scanning", # custom-handled: returns _ScanResult(False) non-interactively - "run_onboarding_menu", # custom-handled: catches EOFError on each input() - "run_update_checks", # custom-handled: guards with isatty() check at entry + "_timed_input.py", # the prompt primitive itself } ) -_CLI_DIR = Path(__file__).resolve().parent.parent.parent / "src" / "autoskillit" / "cli" +_SRC_DIR = Path(__file__).resolve().parent.parent.parent / "src" +_CLI_DIR = _SRC_DIR / "autoskillit" / "cli" -def _has_tty_guard_before_input(func_body: list[ast.stmt]) -> bool: - """Return True if the function body contains a _require_interactive_stdin() call. - - The canonical TTY guard is exclusively _require_interactive_stdin(). Inline - isatty() checks are NOT accepted — they are too permissive: a function with - multiple input() calls passes even when only one call site is guarded, and - the enforcement silently breaks when the guard is removed. - """ - for node in ast.walk(ast.Module(body=func_body, type_ignores=[])): - if isinstance(node, ast.Call): - if isinstance(node.func, ast.Name) and node.func.id == "_require_interactive_stdin": - return True - return False +# --------------------------------------------------------------------------- +# Test 1a — no raw input() outside _timed_input.py +# --------------------------------------------------------------------------- -def test_all_input_calls_in_cli_are_tty_guarded() -> None: - """Every function in src/autoskillit/cli/ that calls input() must call - _require_interactive_stdin() in its body, OR be listed in _TTY_EXEMPT_FUNCTIONS. +def test_all_cli_prompts_use_timed_prompt_or_are_exempt() -> None: + """No function in src/autoskillit/cli/ may call input() directly except + those inside files listed in _RAW_INPUT_EXEMPT_FILES. - This prevents silent EOFError crashes in non-interactive environments and makes - the class of bugs in issue #470 structurally impossible to re-introduce. + All user-facing prompts must go through timed_prompt() which composes + TTY guard, ANSI formatting, and timeout into a single call. """ violations: list[str] = [] for py_file in sorted(_CLI_DIR.rglob("*.py")): + if py_file.name in _RAW_INPUT_EXEMPT_FILES: + continue source = py_file.read_text() tree = ast.parse(source, filename=str(py_file)) + rel_path = py_file.relative_to(_SRC_DIR) + + # Check inside function/method bodies for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): continue func_name = node.name - if func_name in _TTY_EXEMPT_FUNCTIONS: - continue - # Check if this function contains any input() call has_input = any( isinstance(n, ast.Call) and isinstance(n.func, ast.Name) and n.func.id == "input" for n in ast.walk(ast.Module(body=node.body, type_ignores=[])) ) - if not has_input: - continue - # Function has input() — verify TTY guard - if not _has_tty_guard_before_input(node.body): - rel_path = py_file.relative_to(Path("src")) + if has_input: violations.append( - f"{rel_path}:{node.lineno}: {func_name}() calls input() " - f"without _require_interactive_stdin()" + f"{rel_path}:{node.lineno}: {func_name}() calls raw input() — " + f"use timed_prompt() from _timed_input.py instead" ) + # Check module-level code (outside any function or class) + for stmt in tree.body: + if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + continue + for n in ast.walk(stmt): + if ( + isinstance(n, ast.Call) + and isinstance(n.func, ast.Name) + and n.func.id == "input" + ): + violations.append( + f"{rel_path}:{n.lineno}: calls raw input() — " + f"use timed_prompt() from _timed_input.py instead" + ) + assert not violations, ( - "The following CLI functions call input() without _require_interactive_stdin().\n" - "Add _require_interactive_stdin(context) at the start of each function,\n" - "or add the function to _TTY_EXEMPT_FUNCTIONS with a justification comment:\n\n" + "The following CLI functions call input() directly instead of timed_prompt().\n" + "All user-facing prompts must use timed_prompt() which composes TTY guard,\n" + "ANSI formatting, and select.select timeout into a single call:\n\n" + "\n".join(f" • {v}" for v in violations) ) +# --------------------------------------------------------------------------- +# _require_interactive_stdin behavioural tests (kept for regression coverage) +# --------------------------------------------------------------------------- + + def test_require_interactive_stdin_raises_system_exit_when_not_tty( monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture ) -> None: diff --git a/tests/cli/test_onboarding.py b/tests/cli/test_onboarding.py index 72828f75..d49bc607 100644 --- a/tests/cli/test_onboarding.py +++ b/tests/cli/test_onboarding.py @@ -93,6 +93,7 @@ def test_run_onboarding_menu_decline_returns_none_and_marks( """User inputs 'n' to the initial prompt. Returns None and marker is created.""" _make_initialized_project(tmp_path) inputs = iter(["n"]) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) monkeypatch.setattr("builtins.input", lambda _prompt="": next(inputs)) result = run_onboarding_menu(tmp_path, color=False) assert result is None @@ -106,6 +107,7 @@ def test_run_onboarding_menu_skip_e_returns_none_and_marks( """User inputs 'y', then 'E'. Returns None and marker is created.""" _make_initialized_project(tmp_path) inputs = iter(["y", "E"]) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) monkeypatch.setattr("builtins.input", lambda _prompt="": next(inputs)) monkeypatch.setattr("autoskillit.cli._onboarding.gather_intel", lambda _p: OnboardingIntel()) result = run_onboarding_menu(tmp_path, color=False) @@ -120,6 +122,7 @@ def test_run_onboarding_menu_option_a_returns_setup_project( """User inputs 'y', then 'A'. Returns /autoskillit:setup-project. Marker NOT created yet.""" _make_initialized_project(tmp_path) inputs = iter(["y", "A"]) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) monkeypatch.setattr("builtins.input", lambda _prompt="": next(inputs)) monkeypatch.setattr("autoskillit.cli._onboarding.gather_intel", lambda _p: OnboardingIntel()) result = run_onboarding_menu(tmp_path, color=False) @@ -135,6 +138,7 @@ def test_run_onboarding_menu_option_b_with_url_returns_prepare_issue( """User inputs 'y', then 'B', then a URL. Returns string with /autoskillit:prepare-issue.""" _make_initialized_project(tmp_path) inputs = iter(["y", "B", "https://github.com/org/repo/issues/42"]) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) monkeypatch.setattr("builtins.input", lambda _prompt="": next(inputs)) monkeypatch.setattr("autoskillit.cli._onboarding.gather_intel", lambda _p: OnboardingIntel()) result = run_onboarding_menu(tmp_path, color=False) @@ -149,6 +153,7 @@ def test_run_onboarding_menu_option_d_returns_write_recipe( """User inputs 'y', then 'D'. Returns string with /autoskillit:write-recipe.""" _make_initialized_project(tmp_path) inputs = iter(["y", "D"]) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) monkeypatch.setattr("builtins.input", lambda _prompt="": next(inputs)) monkeypatch.setattr("autoskillit.cli._onboarding.gather_intel", lambda _p: OnboardingIntel()) result = run_onboarding_menu(tmp_path, color=False) @@ -163,6 +168,7 @@ def test_run_onboarding_menu_option_c_returns_setup_project_prompt( """User inputs 'y', then 'C'. Returns string starting with /autoskillit:setup-project.""" _make_initialized_project(tmp_path) inputs = iter(["y", "C"]) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) monkeypatch.setattr("builtins.input", lambda _prompt="": next(inputs)) monkeypatch.setattr("autoskillit.cli._onboarding.gather_intel", lambda _p: OnboardingIntel()) result = run_onboarding_menu(tmp_path, color=False) diff --git a/tests/cli/test_update_checks.py b/tests/cli/test_update_checks.py index a3d5aba6..39064674 100644 --- a/tests/cli/test_update_checks.py +++ b/tests/cli/test_update_checks.py @@ -169,6 +169,7 @@ def test_run_update_checks_skips_local_and_unknown_install_types( monkeypatch.delenv("CI", raising=False) monkeypatch.delenv("AUTOSKILLIT_SKIP_STALE_CHECK", raising=False) monkeypatch.delenv("AUTOSKILLIT_SKIP_SOURCE_DRIFT_CHECK", raising=False) + monkeypatch.delenv("AUTOSKILLIT_FORCE_UPDATE_CHECK", raising=False) fake_stdin = MagicMock() fake_stdin.isatty.return_value = True @@ -443,12 +444,15 @@ def _setup_run_checks( state: dict | None = None, ) -> tuple[list[str], list[str]]: """Set up mocks for run_update_checks and return (printed_lines, input_calls).""" + import select as _select_mod + from autoskillit.cli._update_checks import Signal monkeypatch.delenv("CLAUDECODE", raising=False) monkeypatch.delenv("CI", raising=False) monkeypatch.delenv("AUTOSKILLIT_SKIP_STALE_CHECK", raising=False) monkeypatch.delenv("AUTOSKILLIT_SKIP_SOURCE_DRIFT_CHECK", raising=False) + monkeypatch.delenv("AUTOSKILLIT_FORCE_UPDATE_CHECK", raising=False) fake_stdin = MagicMock() fake_stdin.isatty.return_value = True @@ -457,6 +461,12 @@ def _setup_run_checks( monkeypatch.setattr(sys, "stdin", fake_stdin) monkeypatch.setattr(sys, "stdout", fake_stdout) + # timed_prompt uses select.select to implement timeout; mock it to + # report "stdin is ready" so tests proceed without real file descriptors. + monkeypatch.setattr( + _select_mod, "select", lambda rlist, wlist, xlist, timeout=None: (rlist, [], []) + ) + _info = info or _make_stable_info() monkeypatch.setattr("autoskillit.cli._update_checks.detect_install", lambda: _info) @@ -1131,6 +1141,199 @@ def get(self, url, headers=None, **kw): assert received_headers["Authorization"] == "Bearer my-secret-token" +# --------------------------------------------------------------------------- +# timed_prompt primitive tests +# --------------------------------------------------------------------------- + + +def test_timed_prompt_returns_default_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None: + """timed_prompt returns the default value when select.select times out.""" + import select as _select_mod + + from autoskillit.cli._timed_input import timed_prompt + + monkeypatch.setattr("sys.stdin.isatty", lambda: True) + monkeypatch.setattr("sys.stdout.isatty", lambda: True) + monkeypatch.delenv("NO_COLOR", raising=False) + + # select.select returns empty list = timeout + monkeypatch.setattr( + _select_mod, "select", lambda rlist, wlist, xlist, timeout=None: ([], [], []) + ) + + printed: list[str] = [] + monkeypatch.setattr("builtins.print", lambda *args, **kw: printed.append(str(args))) + + result = timed_prompt("Test prompt?", default="n", timeout=30, label="test") + assert result == "n" + assert any("timed out" in p for p in printed) + + +def test_timed_prompt_applies_ansi_formatting(monkeypatch: pytest.MonkeyPatch) -> None: + """timed_prompt output includes ANSI escape sequences when color is supported.""" + import select as _select_mod + + from autoskillit.cli._timed_input import timed_prompt + + monkeypatch.setattr("sys.stdin.isatty", lambda: True) + monkeypatch.setattr("sys.stdout.isatty", lambda: True) + monkeypatch.delenv("NO_COLOR", raising=False) + monkeypatch.delenv("TERM", raising=False) + + monkeypatch.setattr( + _select_mod, "select", lambda rlist, wlist, xlist, timeout=None: (rlist, [], []) + ) + monkeypatch.setattr("builtins.input", lambda _="": "y") + + raw_output: list[str] = [] + monkeypatch.setattr( + "builtins.print", + lambda *args, **kw: raw_output.append(" ".join(str(a) for a in args)), + ) + + timed_prompt("Update now? [Y/n]", default="n", timeout=30, label="test") + combined = " ".join(raw_output) + assert "\x1b[" in combined # ANSI escape present + + +def test_timed_prompt_respects_no_color(monkeypatch: pytest.MonkeyPatch) -> None: + """timed_prompt output has no ANSI sequences when NO_COLOR is set.""" + import select as _select_mod + + from autoskillit.cli._timed_input import timed_prompt + + monkeypatch.setattr("sys.stdin.isatty", lambda: True) + monkeypatch.setattr("sys.stdout.isatty", lambda: True) + monkeypatch.setenv("NO_COLOR", "1") + + monkeypatch.setattr( + _select_mod, "select", lambda rlist, wlist, xlist, timeout=None: (rlist, [], []) + ) + monkeypatch.setattr("builtins.input", lambda _="": "y") + + raw_output: list[str] = [] + monkeypatch.setattr( + "builtins.print", + lambda *args, **kw: raw_output.append(" ".join(str(a) for a in args)), + ) + + timed_prompt("Update now? [Y/n]", default="n", timeout=30, label="test") + combined = " ".join(raw_output) + assert "\x1b[" not in combined # No ANSI escapes + + +# --------------------------------------------------------------------------- +# Pre-flight feedback ordering test +# --------------------------------------------------------------------------- + + +def test_run_update_checks_emits_status_before_network( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """The first print() call in run_update_checks occurs before any signal + gatherer is invoked (the 'time to first output' structural test).""" + import select as _select_mod + + from autoskillit.cli._update_checks import Signal + + monkeypatch.delenv("CLAUDECODE", raising=False) + monkeypatch.delenv("CI", raising=False) + monkeypatch.delenv("AUTOSKILLIT_SKIP_STALE_CHECK", raising=False) + monkeypatch.delenv("AUTOSKILLIT_SKIP_SOURCE_DRIFT_CHECK", raising=False) + monkeypatch.delenv("AUTOSKILLIT_FORCE_UPDATE_CHECK", raising=False) + + fake_stdin = MagicMock() + fake_stdin.isatty.return_value = True + fake_stdout = MagicMock() + fake_stdout.isatty.return_value = True + monkeypatch.setattr(sys, "stdin", fake_stdin) + monkeypatch.setattr(sys, "stdout", fake_stdout) + + monkeypatch.setattr( + _select_mod, + "select", + lambda rlist, wlist, xlist, timeout=None: (rlist, [], []), + ) + + info = _make_stable_info() + monkeypatch.setattr("autoskillit.cli._update_checks.detect_install", lambda: info) + monkeypatch.setattr( + "autoskillit.cli._update_checks._claude_settings_path", + lambda scope: tmp_path / "settings.json", + ) + + import autoskillit as _pkg + + monkeypatch.setattr(_pkg, "__version__", "0.7.77") + + # Track call order: print vs signal gatherers + call_log: list[str] = [] + + def tracking_print(*args: Any, **kw: Any) -> None: + call_log.append("print") + + monkeypatch.setattr("builtins.print", tracking_print) + + monkeypatch.setattr( + "autoskillit.cli._update_checks._binary_signal", + lambda info, home, current: ( + call_log.append("binary_signal") + or Signal("binary", "New release: 0.9.0 (you have 0.7.77)") + ), + ) + monkeypatch.setattr( + "autoskillit.cli._update_checks._hooks_signal", + lambda settings_path: call_log.append("hooks_signal"), + ) + monkeypatch.setattr( + "autoskillit.cli._update_checks._source_drift_signal", + lambda info, home: call_log.append("source_drift_signal"), + ) + + monkeypatch.setattr("builtins.input", lambda _="": "n") + + run_update_checks(home=tmp_path) + + # The first print (status_line) must come before any signal gatherer + assert call_log[0] == "print", ( + f"Expected first call to be print() (status_line), got: {call_log}" + ) + signal_calls = [c for c in call_log if c.endswith("_signal")] + assert signal_calls, f"No signal gatherers were called — call_log: {call_log}" + first_signal_idx = call_log.index(signal_calls[0]) + first_print_idx = call_log.index("print") + assert first_print_idx < first_signal_idx, ( + f"print() must occur before signal gatherers. Order: {call_log}" + ) + + +# --------------------------------------------------------------------------- +# AUTOSKILLIT_FORCE_UPDATE_CHECK override +# --------------------------------------------------------------------------- + + +def test_force_update_check_env_overrides_local_editable( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """AUTOSKILLIT_FORCE_UPDATE_CHECK=1 bypasses the LOCAL_EDITABLE early return.""" + info = InstallInfo( + install_type=InstallType.LOCAL_EDITABLE, + commit_id=None, + requested_revision=None, + url=None, + editable_source=Path(tmp_path), + ) + printed, input_calls = _setup_run_checks( + monkeypatch, tmp_path, info=info, binary_signal=True, answer="n" + ) + monkeypatch.setenv("AUTOSKILLIT_FORCE_UPDATE_CHECK", "1") + run_update_checks(home=tmp_path) + # The prompt should have been reached (not early-returned) + assert len(input_calls) == 1 + + def test_fetch_sends_if_none_match_when_cached_etag(tmp_path: Path) -> None: import time diff --git a/tests/infra/test_schema_version_convention.py b/tests/infra/test_schema_version_convention.py index d07e7247..c17f8196 100644 --- a/tests/infra/test_schema_version_convention.py +++ b/tests/infra/test_schema_version_convention.py @@ -134,7 +134,7 @@ def _is_yaml_dump(node: ast.expr) -> bool: # _hooks.py — settings.json dict (co-owned with Claude CLI) ("src/autoskillit/cli/_hooks.py", 23), # _init_helpers.py — ~/.claude.json (co-owned) - ("src/autoskillit/cli/_init_helpers.py", 342), + ("src/autoskillit/cli/_init_helpers.py", 355), # _marketplace.py — installed_plugins.json (co-owned with Claude plugin system) ("src/autoskillit/cli/_marketplace.py", 44), # _marketplace.py — marketplace.json (co-owned) @@ -142,9 +142,9 @@ def _is_yaml_dump(node: ast.expr) -> bool: # _marketplace.py — hooks.json (co-owned) ("src/autoskillit/cli/_marketplace.py", 143), # _update_checks.py — dismissal state file - ("src/autoskillit/cli/_update_checks.py", 103), + ("src/autoskillit/cli/_update_checks.py", 102), # _update_checks.py — fetch cache - ("src/autoskillit/cli/_update_checks.py", 130), + ("src/autoskillit/cli/_update_checks.py", 129), # smoke_utils.py — domain partitions dict, hunk ranges list, merge queue list ("src/autoskillit/smoke_utils.py", 53), ("src/autoskillit/smoke_utils.py", 83),