diff --git a/gr2/python_cli/app.py b/gr2/python_cli/app.py index be29f21..0fa3843 100644 --- a/gr2/python_cli/app.py +++ b/gr2/python_cli/app.py @@ -23,6 +23,7 @@ repo_dirty, stash_if_dirty, ) +from .events import emit, EventType from .hooks import HookContext, apply_file_projections, load_repo_hooks, run_lifecycle_stage from . import spec_apply from gr2.prototypes import lane_workspace_prototype as lane_proto @@ -607,6 +608,28 @@ def lane_create( ) _exit(lane_proto.create_lane(ns)) _materialize_lane_repos(workspace_root, owner_unit, lane_name, manual_hooks=manual_hooks) + repo_list = [r.strip() for r in repos.split(",")] + branch_parts = branch.split(",") + branch_map = {} + for part in branch_parts: + if "=" in part: + k, v = part.split("=", 1) + branch_map[k.strip()] = v.strip() + else: + for r in repo_list: + branch_map[r] = part.strip() + emit( + event_type=EventType.LANE_CREATED, + workspace_root=workspace_root, + actor=source, + owner_unit=owner_unit, + payload={ + "lane_name": lane_name, + "lane_type": lane_type, + "repos": repo_list, + "branch_map": branch_map, + }, + ) @lane_app.command("enter") @@ -631,6 +654,18 @@ def lane_enter( recall=recall, ) _exit(lane_proto.enter_lane(ns)) + lane_doc = lane_proto.load_lane_doc(workspace_root, owner_unit, lane_name) + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace_root, + actor=actor, + owner_unit=owner_unit, + payload={ + "lane_name": lane_name, + "lane_type": lane_doc.get("type", "feature"), + "repos": lane_doc.get("repos", []), + }, + ) @lane_app.command("exit") @@ -647,10 +682,12 @@ def lane_exit( current_doc = lane_proto.load_current_lane_doc(workspace_root, owner_unit) lane_name = current_doc["current"]["lane_name"] lane_doc = lane_proto.load_lane_doc(workspace_root, owner_unit, lane_name) + stashed_repos: list[str] = [] for repo_name in lane_doc.get("repos", []): repo_root = _lane_repo_root(workspace_root, owner_unit, lane_name, repo_name) if repo_root.exists(): - stash_if_dirty(repo_root, f"gr2 exit {owner_unit}/{lane_name}") + if stash_if_dirty(repo_root, f"gr2 exit {owner_unit}/{lane_name}"): + stashed_repos.append(repo_name) _run_lane_stage(workspace_root, owner_unit, lane_name, "on_exit", manual_hooks=manual_hooks) ns = SimpleNamespace( workspace_root=workspace_root, @@ -660,6 +697,16 @@ def lane_exit( recall=recall, ) _exit(lane_proto.exit_lane(ns)) + emit( + event_type=EventType.LANE_EXITED, + workspace_root=workspace_root, + actor=actor, + owner_unit=owner_unit, + payload={ + "lane_name": lane_name, + "stashed_repos": stashed_repos, + }, + ) @lane_app.command("current") @@ -698,6 +745,18 @@ def lane_lease_acquire( force=force, ) _exit(lane_proto.acquire_lane_lease(ns)) + emit( + event_type=EventType.LEASE_ACQUIRED, + workspace_root=workspace_root, + actor=actor, + owner_unit=owner_unit, + payload={ + "lane_name": lane_name, + "mode": mode, + "ttl_seconds": ttl_seconds, + "lease_id": f"{owner_unit}:{lane_name}", + }, + ) @lease_app.command("release") @@ -715,6 +774,16 @@ def lane_lease_release( actor=actor, ) _exit(lane_proto.release_lane_lease(ns)) + emit( + event_type=EventType.LEASE_RELEASED, + workspace_root=workspace_root, + actor=actor, + owner_unit=owner_unit, + payload={ + "lane_name": lane_name, + "lease_id": f"{owner_unit}:{lane_name}", + }, + ) @lease_app.command("show") diff --git a/gr2/python_cli/channel_bridge.py b/gr2/python_cli/channel_bridge.py new file mode 100644 index 0000000..a4c08e0 --- /dev/null +++ b/gr2/python_cli/channel_bridge.py @@ -0,0 +1,113 @@ +"""gr2 channel bridge consumer. + +Translates outbox events into channel messages per the mapping table in +HOOK-EVENT-CONTRACT.md section 8. Uses cursor-based consumption from +events.read_events(). + +The bridge is a pure function layer: format_event() maps an event dict to +a message string (or None), and run_bridge() orchestrates cursor reads and +posts via a caller-provided post_fn. This keeps the MCP/recall_channel +dependency out of the module and makes it fully testable. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Callable + +from .events import read_events + + +_CONSUMER_NAME = "channel_bridge" + + +def format_event(event: dict[str, object]) -> str | None: + """Apply the section 8 mapping table to produce a channel message. + + Returns None if the event type is not mapped (silently dropped). + """ + etype = event.get("type", "") + + if etype == "lane.created": + return ( + f"{event['actor']} created lane {event['lane_name']}" + f" [{event.get('lane_type', 'unknown')}]" + f" repos={event.get('repos', [])}" + ) + + if etype == "lane.entered": + return f"{event['actor']} entered {event['owner_unit']}/{event['lane_name']}" + + if etype == "lane.exited": + return f"{event['actor']} exited {event['owner_unit']}/{event['lane_name']}" + + if etype == "pr.created": + repos = event.get("repos", []) + if isinstance(repos, list) and repos and isinstance(repos[0], dict): + repo_names = [r.get("repo", "") for r in repos] + else: + repo_names = repos + return ( + f"{event['actor']} opened PR group {event['pr_group_id']}: {repo_names}" + ) + + if etype == "pr.merged": + return f"{event['actor']} merged PR group {event['pr_group_id']}" + + if etype == "pr.checks_failed": + failed = event.get("failed_checks", []) + return f"CI failed on {event['repo']}#{event['pr_number']}: {failed}" + + if etype == "hook.failed": + # Only blocking hook failures produce channel messages. + if event.get("on_failure") != "block": + return None + return ( + f"Hook {event['hook_name']} failed in {event['repo']}" + f" (blocking): {event.get('stderr_tail', '')}" + ) + + if etype == "sync.conflict": + files = event.get("conflicting_files", []) + return f"Sync conflict in {event['repo']}: {files}" + + if etype == "lease.force_broken": + return ( + f"Lease on {event['lane_name']} force-broken" + f" by {event['broken_by']}: {event.get('reason', '')}" + ) + + if etype == "failure.resolved": + return ( + f"{event['resolved_by']} resolved failure" + f" {event['operation_id']} on {event['lane_name']}" + ) + + if etype == "lease.reclaimed": + return ( + f"Stale lease on {event['lane_name']} reclaimed" + f" (was held by {event['previous_holder']})" + ) + + # Unmapped event type: silently dropped. + return None + + +def run_bridge( + workspace_root: Path, + *, + post_fn: Callable[[str], object], +) -> int: + """Read new events from the outbox and post mapped messages. + + Uses the 'channel_bridge' cursor. Returns the number of messages posted. + The post_fn receives formatted message strings; the caller decides how to + deliver them (recall_channel, print, log, etc.). + """ + events = read_events(workspace_root, _CONSUMER_NAME) + posted = 0 + for event in events: + msg = format_event(event) + if msg is not None: + post_fn(msg) + posted += 1 + return posted diff --git a/gr2/python_cli/events.py b/gr2/python_cli/events.py new file mode 100644 index 0000000..2758e0d --- /dev/null +++ b/gr2/python_cli/events.py @@ -0,0 +1,240 @@ +"""gr2 event system runtime. + +Implements the event contract from HOOK-EVENT-CONTRACT.md sections 3-8: +- EventType enum (section 7.2) +- emit() function (sections 4.2, 7.1) +- Outbox management with rotation (sections 4.1-4.4) +- Cursor-based consumer model (section 5.1) +""" +from __future__ import annotations + +import fcntl +import json +import os +import sys +from datetime import datetime, timezone +from enum import Enum +from pathlib import Path + + +# Reserved field names that payload keys must not collide with (section 3.1). +_RESERVED_NAMES = frozenset({ + "version", "event_id", "seq", "timestamp", "type", + "workspace", "actor", "agent_id", "owner_unit", +}) + +_ROTATION_THRESHOLD = 10 * 1024 * 1024 # 10 MB + + +class EventType(str, Enum): + # Lane lifecycle + LANE_CREATED = "lane.created" + LANE_ENTERED = "lane.entered" + LANE_EXITED = "lane.exited" + LANE_SWITCHED = "lane.switched" + LANE_ARCHIVED = "lane.archived" + + # Lease lifecycle + LEASE_ACQUIRED = "lease.acquired" + LEASE_RELEASED = "lease.released" + LEASE_EXPIRED = "lease.expired" + LEASE_FORCE_BROKEN = "lease.force_broken" + + # Hook execution + HOOK_STARTED = "hook.started" + HOOK_COMPLETED = "hook.completed" + HOOK_FAILED = "hook.failed" + HOOK_SKIPPED = "hook.skipped" + + # PR lifecycle + PR_CREATED = "pr.created" + PR_STATUS_CHANGED = "pr.status_changed" + PR_CHECKS_PASSED = "pr.checks_passed" + PR_CHECKS_FAILED = "pr.checks_failed" + PR_REVIEW_SUBMITTED = "pr.review_submitted" + PR_MERGED = "pr.merged" + PR_MERGE_FAILED = "pr.merge_failed" + + # Sync operations + SYNC_STARTED = "sync.started" + SYNC_REPO_UPDATED = "sync.repo_updated" + SYNC_REPO_SKIPPED = "sync.repo_skipped" + SYNC_CONFLICT = "sync.conflict" + SYNC_COMPLETED = "sync.completed" + SYNC_CACHE_SEEDED = "sync.cache_seeded" + SYNC_CACHE_REFRESHED = "sync.cache_refreshed" + + # Recovery + FAILURE_RESOLVED = "failure.resolved" + LEASE_RECLAIMED = "lease.reclaimed" + + # Workspace operations + WORKSPACE_MATERIALIZED = "workspace.materialized" + WORKSPACE_FILE_PROJECTED = "workspace.file_projected" + + +def _outbox_path(workspace_root: Path) -> Path: + return workspace_root / ".grip" / "events" / "outbox.jsonl" + + +def _cursors_dir(workspace_root: Path) -> Path: + return workspace_root / ".grip" / "events" / "cursors" + + +def _current_seq(outbox: Path) -> int: + """Return the highest seq in the outbox, or 0 if empty/missing.""" + if not outbox.exists(): + return 0 + try: + text = outbox.read_text() + except OSError: + return 0 + last_seq = 0 + for line in text.strip().split("\n"): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + if isinstance(obj, dict) and "seq" in obj: + last_seq = max(last_seq, obj["seq"]) + except (json.JSONDecodeError, TypeError): + continue + return last_seq + + +def _maybe_rotate(outbox: Path) -> None: + """Rotate the outbox file if it exceeds the size threshold.""" + if not outbox.exists(): + return + try: + size = outbox.stat().st_size + except OSError: + return + if size <= _ROTATION_THRESHOLD: + return + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") + archive = outbox.parent / f"outbox.{ts}.jsonl" + outbox.rename(archive) + + +def emit( + event_type: EventType, + workspace_root: Path, + actor: str, + owner_unit: str, + payload: dict[str, object], + *, + agent_id: str | None = None, +) -> None: + """Emit a single event to the workspace outbox. + + Builds a flat JSON object from envelope + context + payload fields and + appends it as one line to .grip/events/outbox.jsonl. + + Does not raise on write failure (section 10.1). Errors are logged to + stderr so the parent operation can continue. + """ + # Validate payload keys against reserved names. + collisions = _RESERVED_NAMES & payload.keys() + if collisions: + raise ValueError( + f"payload keys collide with reserved envelope/context names: {collisions}" + ) + + try: + outbox = _outbox_path(workspace_root) + outbox.parent.mkdir(parents=True, exist_ok=True) + lock_path = outbox.with_suffix(".lock") + + with lock_path.open("a+") as lock_fh: + fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX) + try: + # Capture seq before rotation (rotation empties the current file). + seq = _current_seq(outbox) + 1 + _maybe_rotate(outbox) + + # Build flat event object. + event: dict[str, object] = { + "version": 1, + "event_id": os.urandom(8).hex(), + "seq": seq, + "timestamp": datetime.now(timezone.utc).isoformat(), + "type": str(event_type.value), + "workspace": workspace_root.name, + "actor": actor, + "owner_unit": owner_unit, + } + if agent_id is not None: + event["agent_id"] = agent_id + event.update(payload) + + # Append as single JSONL line. + with outbox.open("a") as f: + f.write(json.dumps(event, separators=(",", ":")) + "\n") + f.flush() + finally: + fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN) + + except Exception as exc: + print(f"gr2: event emit failed: {exc}", file=sys.stderr) + + +def read_events(workspace_root: Path, consumer: str) -> list[dict[str, object]]: + """Read new events from the outbox for the named consumer. + + Returns events with seq > cursor's last_seq. Updates the cursor file + atomically after reading. + """ + outbox = _outbox_path(workspace_root) + if not outbox.exists(): + return [] + + cursor = _load_cursor(workspace_root, consumer) + last_seq = cursor.get("last_seq", 0) + + events: list[dict[str, object]] = [] + text = outbox.read_text() + for line in text.strip().split("\n"): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + if not isinstance(obj, dict): + continue + if obj.get("seq", 0) <= last_seq: + continue + events.append(obj) + + if events: + last_event = events[-1] + _save_cursor(workspace_root, consumer, { + "consumer": consumer, + "last_seq": last_event["seq"], + "last_event_id": last_event.get("event_id", ""), + "last_read": datetime.now(timezone.utc).isoformat(), + }) + + return events + + +def _load_cursor(workspace_root: Path, consumer: str) -> dict[str, object]: + cursor_file = _cursors_dir(workspace_root) / f"{consumer}.json" + if not cursor_file.exists(): + return {} + try: + return json.loads(cursor_file.read_text()) + except (json.JSONDecodeError, OSError): + return {} + + +def _save_cursor(workspace_root: Path, consumer: str, data: dict[str, object]) -> None: + cursors = _cursors_dir(workspace_root) + cursors.mkdir(parents=True, exist_ok=True) + cursor_file = cursors / f"{consumer}.json" + tmp = cursor_file.with_suffix(".tmp") + tmp.write_text(json.dumps(data, indent=2)) + tmp.rename(cursor_file) diff --git a/gr2/python_cli/hooks.py b/gr2/python_cli/hooks.py index 460b15f..41c4bf1 100644 --- a/gr2/python_cli/hooks.py +++ b/gr2/python_cli/hooks.py @@ -4,9 +4,12 @@ import json import sys import subprocess +import time import tomllib from pathlib import Path +from .events import emit, EventType + VALID_IF_EXISTS = {"skip", "overwrite", "merge", "error"} VALID_ON_FAILURE = {"block", "warn", "skip"} @@ -282,6 +285,18 @@ def run_lifecycle_stage( first_materialize=first_materialize, allow_manual=allow_manual, ): + emit( + event_type=EventType.HOOK_SKIPPED, + workspace_root=ctx.workspace_root, + actor="system", + owner_unit=ctx.lane_owner, + payload={ + "stage": stage, + "hook_name": hook.name, + "repo": ctx.repo_name, + "reason": f"when={hook.when} did not match current invocation", + }, + ) results.append( HookResult( kind="lifecycle", @@ -293,6 +308,20 @@ def run_lifecycle_stage( continue cwd = render_path(hook.cwd, ctx) command = render_text(hook.command, ctx) + emit( + event_type=EventType.HOOK_STARTED, + workspace_root=ctx.workspace_root, + actor="system", + owner_unit=ctx.lane_owner, + payload={ + "stage": stage, + "hook_name": hook.name, + "repo": ctx.repo_name, + "command": command, + "cwd": str(cwd), + }, + ) + t0 = time.monotonic() proc = subprocess.run( command, cwd=cwd, @@ -300,7 +329,21 @@ def run_lifecycle_stage( capture_output=True, text=True, ) + duration_ms = int((time.monotonic() - t0) * 1000) if proc.returncode == 0: + emit( + event_type=EventType.HOOK_COMPLETED, + workspace_root=ctx.workspace_root, + actor="system", + owner_unit=ctx.lane_owner, + payload={ + "stage": stage, + "hook_name": hook.name, + "repo": ctx.repo_name, + "duration_ms": duration_ms, + "exit_code": 0, + }, + ) results.append( HookResult( kind="lifecycle", @@ -315,6 +358,22 @@ def run_lifecycle_stage( ) ) continue + stderr_tail = proc.stderr[-500:] if proc.stderr else "" + emit( + event_type=EventType.HOOK_FAILED, + workspace_root=ctx.workspace_root, + actor="system", + owner_unit=ctx.lane_owner, + payload={ + "stage": stage, + "hook_name": hook.name, + "repo": ctx.repo_name, + "duration_ms": duration_ms, + "exit_code": proc.returncode, + "on_failure": hook.on_failure, + "stderr_tail": stderr_tail, + }, + ) payload = { "kind": "lifecycle", "stage": stage, diff --git a/gr2/python_cli/pr.py b/gr2/python_cli/pr.py new file mode 100644 index 0000000..3ccd113 --- /dev/null +++ b/gr2/python_cli/pr.py @@ -0,0 +1,254 @@ +"""gr2 PR group orchestration. + +Implements multi-repo PR lifecycle from PR-LIFECYCLE.md: +- create_pr_group: Create linked PRs across repos with pr_group_id +- merge_pr_group: Merge all PRs in a group (stops on first failure) +- check_pr_group_status: Poll status/checks and emit change events +- record_pr_review: Record an externally-submitted review event + +The PlatformAdapter is group-unaware. This module assigns pr_group_id, +persists group metadata, and emits events per HOOK-EVENT-CONTRACT.md +section 3.2 (PR Lifecycle). +""" +from __future__ import annotations + +import json +import os +from pathlib import Path + +from .events import emit, EventType +from .platform import AdapterError, CreatePRRequest, PlatformAdapter + + +class PRMergeError(RuntimeError): + """Raised when a PR merge fails.""" + + def __init__(self, repo: str, pr_number: int, reason: str) -> None: + self.repo = repo + self.pr_number = pr_number + self.reason = reason + super().__init__(f"merge failed for {repo}#{pr_number}: {reason}") + + +def _pr_groups_dir(workspace_root: Path) -> Path: + return workspace_root / ".grip" / "pr_groups" + + +def _generate_group_id() -> str: + return "pg_" + os.urandom(4).hex() + + +def _load_group(workspace_root: Path, pr_group_id: str) -> dict: + path = _pr_groups_dir(workspace_root) / f"{pr_group_id}.json" + return json.loads(path.read_text()) + + +def _save_group(workspace_root: Path, group: dict) -> None: + d = _pr_groups_dir(workspace_root) + d.mkdir(parents=True, exist_ok=True) + path = d / f"{group['pr_group_id']}.json" + path.write_text(json.dumps(group, indent=2)) + + +def create_pr_group( + workspace_root: Path, + owner_unit: str, + lane_name: str, + title: str, + base_branch: str, + head_branch: str, + repos: list[str], + adapter: PlatformAdapter, + actor: str, + *, + body: str = "", + draft: bool = False, +) -> dict: + """Create linked PRs across repos and emit pr.created.""" + pr_group_id = _generate_group_id() + prs: list[dict] = [] + + for repo in repos: + request = CreatePRRequest( + repo=repo, + title=title, + body=body, + head_branch=head_branch, + base_branch=base_branch, + draft=draft, + ) + ref = adapter.create_pr(request) + prs.append({ + "repo": repo, + "pr_number": ref.number, + "url": ref.url, + }) + + group = { + "pr_group_id": pr_group_id, + "lane_name": lane_name, + "title": title, + "base_branch": base_branch, + "head_branch": head_branch, + "prs": prs, + "status": {repo: "OPEN" for repo in repos}, + } + _save_group(workspace_root, group) + + emit( + event_type=EventType.PR_CREATED, + workspace_root=workspace_root, + actor=actor, + owner_unit=owner_unit, + payload={ + "pr_group_id": pr_group_id, + "lane_name": lane_name, + "repos": prs, + }, + ) + + return group + + +def merge_pr_group( + workspace_root: Path, + pr_group_id: str, + adapter: PlatformAdapter, + actor: str, +) -> dict: + """Merge all PRs in a group. Stops on first failure.""" + group = _load_group(workspace_root, pr_group_id) + merged: list[dict] = [] + + for pr_info in group["prs"]: + repo = pr_info["repo"] + number = pr_info["pr_number"] + try: + adapter.merge_pr(repo, number) + except AdapterError as exc: + emit( + event_type=EventType.PR_MERGE_FAILED, + workspace_root=workspace_root, + actor=actor, + owner_unit=group.get("owner_unit", actor), + payload={ + "pr_group_id": pr_group_id, + "repo": repo, + "pr_number": number, + "reason": str(exc), + }, + ) + raise PRMergeError(repo, number, str(exc)) from exc + merged.append(pr_info) + + emit( + event_type=EventType.PR_MERGED, + workspace_root=workspace_root, + actor=actor, + owner_unit=group.get("owner_unit", actor), + payload={ + "pr_group_id": pr_group_id, + "repos": merged, + }, + ) + + return group + + +def check_pr_group_status( + workspace_root: Path, + pr_group_id: str, + adapter: PlatformAdapter, + actor: str, +) -> dict: + """Poll PR status/checks for all repos in a group. Emit change events.""" + group = _load_group(workspace_root, pr_group_id) + cached_status = group.get("status", {}) + + for pr_info in group["prs"]: + repo = pr_info["repo"] + number = pr_info["pr_number"] + status = adapter.pr_status(repo, number) + old_state = cached_status.get(repo, "OPEN") + + # Detect state change (OPEN -> MERGED, OPEN -> CLOSED, etc.) + if status.state != old_state: + emit( + event_type=EventType.PR_STATUS_CHANGED, + workspace_root=workspace_root, + actor=actor, + owner_unit=group.get("owner_unit", actor), + payload={ + "pr_group_id": pr_group_id, + "repo": repo, + "pr_number": number, + "old_status": old_state, + "new_status": status.state, + }, + ) + cached_status[repo] = status.state + + # Detect check results (only when checks are complete) + if status.checks: + completed = [c for c in status.checks if c.status == "COMPLETED"] + if completed and len(completed) == len(status.checks): + failed = [c.name for c in completed if c.conclusion != "SUCCESS"] + if failed: + emit( + event_type=EventType.PR_CHECKS_FAILED, + workspace_root=workspace_root, + actor=actor, + owner_unit=group.get("owner_unit", actor), + payload={ + "pr_group_id": pr_group_id, + "repo": repo, + "pr_number": number, + "failed_checks": failed, + }, + ) + else: + emit( + event_type=EventType.PR_CHECKS_PASSED, + workspace_root=workspace_root, + actor=actor, + owner_unit=group.get("owner_unit", actor), + payload={ + "pr_group_id": pr_group_id, + "repo": repo, + "pr_number": number, + "passed_checks": [c.name for c in completed], + }, + ) + + group["status"] = cached_status + _save_group(workspace_root, group) + return group + + +def record_pr_review( + workspace_root: Path, + pr_group_id: str, + repo: str, + pr_number: int, + reviewer: str, + state: str, + actor: str, +) -> None: + """Record an externally-submitted PR review and emit pr.review_submitted. + + Reviews come from outside gr2 (GitHub webhooks, human action, etc.). + The adapter doesn't query reviews, so this is a push-model entry point. + """ + emit( + event_type=EventType.PR_REVIEW_SUBMITTED, + workspace_root=workspace_root, + actor=actor, + owner_unit=actor, + payload={ + "pr_group_id": pr_group_id, + "repo": repo, + "pr_number": pr_number, + "reviewer": reviewer, + "state": state, + }, + ) diff --git a/gr2/tests/__init__.py b/gr2/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gr2/tests/conftest.py b/gr2/tests/conftest.py new file mode 100644 index 0000000..244cb15 --- /dev/null +++ b/gr2/tests/conftest.py @@ -0,0 +1,15 @@ +"""Shared fixtures for gr2 tests.""" +from __future__ import annotations + +import pytest +from pathlib import Path + + +@pytest.fixture +def workspace(tmp_path: Path) -> Path: + """Create a minimal workspace with .grip/ directory.""" + grip = tmp_path / ".grip" + grip.mkdir() + events = grip / "events" + events.mkdir() + return tmp_path diff --git a/gr2/tests/test_channel_bridge.py b/gr2/tests/test_channel_bridge.py new file mode 100644 index 0000000..5fa7c70 --- /dev/null +++ b/gr2/tests/test_channel_bridge.py @@ -0,0 +1,333 @@ +"""Tests for gr2 channel bridge consumer. + +Tests the event-to-channel-message mapping from HOOK-EVENT-CONTRACT.md +section 8. Written TDD-first. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + + +# --------------------------------------------------------------------------- +# 1. format_event() message templates (section 8 mapping table) +# --------------------------------------------------------------------------- + +class TestFormatEvent: + """format_event() applies the mapping table to produce channel messages.""" + + def test_lane_created(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "lane.created", + "actor": "agent:apollo", + "owner_unit": "apollo", + "lane_name": "feat/hook-events", + "lane_type": "feature", + "repos": ["grip", "synapt"], + } + msg = format_event(event) + assert msg == "agent:apollo created lane feat/hook-events [feature] repos=['grip', 'synapt']" + + def test_lane_entered(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "lane.entered", + "actor": "agent:apollo", + "owner_unit": "apollo", + "lane_name": "feat/hook-events", + } + msg = format_event(event) + assert msg == "agent:apollo entered apollo/feat/hook-events" + + def test_lane_exited(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "lane.exited", + "actor": "agent:apollo", + "owner_unit": "apollo", + "lane_name": "feat/hook-events", + } + msg = format_event(event) + assert msg == "agent:apollo exited apollo/feat/hook-events" + + def test_pr_created(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "pr.created", + "actor": "agent:apollo", + "pr_group_id": "pg_8a3f1b2c", + "repos": [{"repo": "grip", "pr_number": 570}, {"repo": "synapt", "pr_number": 583}], + } + msg = format_event(event) + assert "pg_8a3f1b2c" in msg + assert "agent:apollo" in msg + + def test_pr_merged(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "pr.merged", + "actor": "agent:apollo", + "pr_group_id": "pg_8a3f1b2c", + } + msg = format_event(event) + assert msg == "agent:apollo merged PR group pg_8a3f1b2c" + + def test_pr_checks_failed(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "pr.checks_failed", + "repo": "grip", + "pr_number": 574, + "failed_checks": ["ci/test", "ci/lint"], + } + msg = format_event(event) + assert "grip#574" in msg + assert "ci/test" in msg + + def test_hook_failed_block(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "hook.failed", + "hook_name": "editable-install", + "repo": "synapt", + "on_failure": "block", + "stderr_tail": "pip install failed", + } + msg = format_event(event) + assert "editable-install" in msg + assert "synapt" in msg + assert "blocking" in msg + + def test_hook_failed_warn_not_mapped(self): + """hook.failed with on_failure=warn should NOT produce a channel message.""" + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "hook.failed", + "hook_name": "lint", + "repo": "synapt", + "on_failure": "warn", + "stderr_tail": "lint warnings", + } + msg = format_event(event) + assert msg is None + + def test_hook_failed_skip_not_mapped(self): + """hook.failed with on_failure=skip should NOT produce a channel message.""" + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "hook.failed", + "hook_name": "optional", + "repo": "synapt", + "on_failure": "skip", + "stderr_tail": "skipped", + } + msg = format_event(event) + assert msg is None + + def test_sync_conflict(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "sync.conflict", + "repo": "synapt", + "conflicting_files": ["src/main.py", "tests/test_core.py"], + } + msg = format_event(event) + assert "synapt" in msg + assert "src/main.py" in msg + + def test_lease_force_broken(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "lease.force_broken", + "lane_name": "feat/hook-events", + "broken_by": "agent:sentinel", + "reason": "stale session", + } + msg = format_event(event) + assert "feat/hook-events" in msg + assert "agent:sentinel" in msg + assert "stale session" in msg + + def test_failure_resolved(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "failure.resolved", + "resolved_by": "agent:apollo", + "operation_id": "op_9f2a3b4c", + "lane_name": "feat/hook-events", + } + msg = format_event(event) + assert "agent:apollo" in msg + assert "op_9f2a3b4c" in msg + assert "feat/hook-events" in msg + + def test_lease_reclaimed(self): + from gr2.python_cli.channel_bridge import format_event + event = { + "type": "lease.reclaimed", + "lane_name": "feat/hook-events", + "previous_holder": "agent:atlas", + } + msg = format_event(event) + assert "feat/hook-events" in msg + assert "agent:atlas" in msg + + +# --------------------------------------------------------------------------- +# 2. Unmapped events return None (section 8 exclusion list) +# --------------------------------------------------------------------------- + +class TestUnmappedEvents: + """Events not in the mapping table produce no channel message.""" + + @pytest.mark.parametrize("event_type", [ + "hook.started", + "hook.completed", + "hook.skipped", + "lease.acquired", + "lease.released", + "lease.expired", + "sync.started", + "sync.repo_updated", + "sync.repo_skipped", + "sync.completed", + "workspace.materialized", + "workspace.file_projected", + "lane.switched", + "lane.archived", + ]) + def test_unmapped_returns_none(self, event_type): + from gr2.python_cli.channel_bridge import format_event + event = {"type": event_type, "actor": "agent:test", "owner_unit": "test"} + assert format_event(event) is None + + +# --------------------------------------------------------------------------- +# 3. run_bridge() cursor-based consumption +# --------------------------------------------------------------------------- + +class TestRunBridge: + """run_bridge() reads events via cursor and calls post_fn for each.""" + + def test_processes_mapped_events(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + from gr2.python_cli.channel_bridge import run_bridge + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + posted: list[str] = [] + run_bridge(workspace, post_fn=posted.append) + assert len(posted) == 1 + assert "agent:apollo entered apollo/feat/test" in posted[0] + + def test_skips_unmapped_events(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + from gr2.python_cli.channel_bridge import run_bridge + emit( + event_type=EventType.LEASE_ACQUIRED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "mode": "edit", "ttl_seconds": 900, "lease_id": "x"}, + ) + posted: list[str] = [] + run_bridge(workspace, post_fn=posted.append) + assert len(posted) == 0 + + def test_cursor_advances(self, workspace: Path): + """Second run_bridge call returns nothing if no new events.""" + from gr2.python_cli.events import emit, EventType + from gr2.python_cli.channel_bridge import run_bridge + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + posted: list[str] = [] + run_bridge(workspace, post_fn=posted.append) + assert len(posted) == 1 + # Second call: cursor advanced, no new events + posted.clear() + run_bridge(workspace, post_fn=posted.append) + assert len(posted) == 0 + + def test_processes_only_new_events(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + from gr2.python_cli.channel_bridge import run_bridge + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "first", "lane_type": "feature", "repos": ["grip"]}, + ) + posted: list[str] = [] + run_bridge(workspace, post_fn=posted.append) + assert len(posted) == 1 + # Emit a new event + emit( + event_type=EventType.LANE_EXITED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "first", "stashed_repos": []}, + ) + posted.clear() + run_bridge(workspace, post_fn=posted.append) + assert len(posted) == 1 + assert "exited" in posted[0] + + def test_mixed_mapped_and_unmapped(self, workspace: Path): + """Only mapped events produce messages; unmapped are silently skipped.""" + from gr2.python_cli.events import emit, EventType + from gr2.python_cli.channel_bridge import run_bridge + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + emit( + event_type=EventType.LEASE_ACQUIRED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "mode": "edit", "ttl_seconds": 900, "lease_id": "x"}, + ) + emit( + event_type=EventType.LANE_EXITED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "stashed_repos": ["grip"]}, + ) + posted: list[str] = [] + run_bridge(workspace, post_fn=posted.append) + # lane.entered and lane.exited are mapped; lease.acquired is not + assert len(posted) == 2 + assert "entered" in posted[0] + assert "exited" in posted[1] + + def test_returns_count(self, workspace: Path): + """run_bridge returns the number of messages posted.""" + from gr2.python_cli.events import emit, EventType + from gr2.python_cli.channel_bridge import run_bridge + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + result = run_bridge(workspace, post_fn=lambda msg: None) + assert result == 1 diff --git a/gr2/tests/test_events.py b/gr2/tests/test_events.py new file mode 100644 index 0000000..77cdf5f --- /dev/null +++ b/gr2/tests/test_events.py @@ -0,0 +1,493 @@ +"""Tests for gr2 event system runtime. + +These tests define the contract from HOOK-EVENT-CONTRACT.md sections 3-8. +Written TDD-first: they must fail until events.py is implemented. +""" +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from pathlib import Path + +import pytest + + +# --------------------------------------------------------------------------- +# 1. EventType enum (section 7.2) +# --------------------------------------------------------------------------- + +class TestEventTypeEnum: + """EventType enum must contain all 28 event types from the taxonomy.""" + + def test_import(self): + from gr2.python_cli.events import EventType + assert EventType is not None + + def test_lane_lifecycle_types(self): + from gr2.python_cli.events import EventType + assert EventType.LANE_CREATED == "lane.created" + assert EventType.LANE_ENTERED == "lane.entered" + assert EventType.LANE_EXITED == "lane.exited" + assert EventType.LANE_SWITCHED == "lane.switched" + assert EventType.LANE_ARCHIVED == "lane.archived" + + def test_lease_lifecycle_types(self): + from gr2.python_cli.events import EventType + assert EventType.LEASE_ACQUIRED == "lease.acquired" + assert EventType.LEASE_RELEASED == "lease.released" + assert EventType.LEASE_EXPIRED == "lease.expired" + assert EventType.LEASE_FORCE_BROKEN == "lease.force_broken" + + def test_hook_execution_types(self): + from gr2.python_cli.events import EventType + assert EventType.HOOK_STARTED == "hook.started" + assert EventType.HOOK_COMPLETED == "hook.completed" + assert EventType.HOOK_FAILED == "hook.failed" + assert EventType.HOOK_SKIPPED == "hook.skipped" + + def test_pr_lifecycle_types(self): + from gr2.python_cli.events import EventType + assert EventType.PR_CREATED == "pr.created" + assert EventType.PR_STATUS_CHANGED == "pr.status_changed" + assert EventType.PR_CHECKS_PASSED == "pr.checks_passed" + assert EventType.PR_CHECKS_FAILED == "pr.checks_failed" + assert EventType.PR_REVIEW_SUBMITTED == "pr.review_submitted" + assert EventType.PR_MERGED == "pr.merged" + assert EventType.PR_MERGE_FAILED == "pr.merge_failed" + + def test_sync_operation_types(self): + from gr2.python_cli.events import EventType + assert EventType.SYNC_STARTED == "sync.started" + assert EventType.SYNC_REPO_UPDATED == "sync.repo_updated" + assert EventType.SYNC_REPO_SKIPPED == "sync.repo_skipped" + assert EventType.SYNC_CONFLICT == "sync.conflict" + assert EventType.SYNC_COMPLETED == "sync.completed" + assert EventType.SYNC_CACHE_SEEDED == "sync.cache_seeded" + assert EventType.SYNC_CACHE_REFRESHED == "sync.cache_refreshed" + + def test_recovery_types(self): + from gr2.python_cli.events import EventType + assert EventType.FAILURE_RESOLVED == "failure.resolved" + assert EventType.LEASE_RECLAIMED == "lease.reclaimed" + + def test_workspace_operation_types(self): + from gr2.python_cli.events import EventType + assert EventType.WORKSPACE_MATERIALIZED == "workspace.materialized" + assert EventType.WORKSPACE_FILE_PROJECTED == "workspace.file_projected" + + def test_total_count(self): + from gr2.python_cli.events import EventType + # 5 lane + 4 lease + 4 hook + 7 PR + 7 sync + 2 recovery + 2 workspace = 31 + assert len(EventType) == 31 + + +# --------------------------------------------------------------------------- +# 2. emit() function (sections 4.2, 7.1) +# --------------------------------------------------------------------------- + +class TestEmit: + """emit() must produce flat JSONL events in .grip/events/outbox.jsonl.""" + + def test_creates_outbox_file(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + assert outbox.exists() + + def test_single_json_line(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + lines = outbox.read_text().strip().split("\n") + assert len(lines) == 1 + event = json.loads(lines[0]) + assert isinstance(event, dict) + + def test_flat_envelope(self, workspace: Path): + """Event must be flat: domain fields at top level, no nested payload.""" + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + # Domain fields must be top-level + assert event["lane_name"] == "feat/test" + assert event["lane_type"] == "feature" + assert event["repos"] == ["grip"] + # No nested payload key + assert "payload" not in event + + def test_envelope_fields(self, workspace: Path): + """Envelope fields: version, event_id, seq, timestamp, type.""" + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + assert event["version"] == 1 + assert event["type"] == "lane.entered" + assert "event_id" in event + assert "seq" in event + assert "timestamp" in event + + def test_event_id_format(self, workspace: Path): + """event_id must be 16-char hex from os.urandom(8).hex().""" + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + event_id = event["event_id"] + assert len(event_id) == 16 + assert all(c in "0123456789abcdef" for c in event_id) + + def test_context_fields(self, workspace: Path): + """Context fields: workspace, actor, owner_unit.""" + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + assert event["workspace"] == workspace.name + assert event["actor"] == "agent:apollo" + assert event["owner_unit"] == "apollo" + + def test_optional_agent_id(self, workspace: Path): + """agent_id is included when provided, absent when not.""" + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + agent_id="agent_apollo_xyz789", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + assert event["agent_id"] == "agent_apollo_xyz789" + + def test_agent_id_absent_when_not_provided(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + assert "agent_id" not in event + + def test_timestamp_is_iso8601_with_tz(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + ts = datetime.fromisoformat(event["timestamp"]) + assert ts.tzinfo is not None + + def test_reserved_name_collision_raises(self, workspace: Path): + """Payload keys must not collide with envelope/context field names.""" + from gr2.python_cli.events import emit, EventType + with pytest.raises((ValueError, KeyError)): + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"version": 99, "lane_name": "feat/test"}, + ) + + +# --------------------------------------------------------------------------- +# 3. Monotonic sequence numbers (section 4.2) +# --------------------------------------------------------------------------- + +class TestSequenceNumbers: + """seq must be strictly monotonically increasing, starting at 1.""" + + def test_first_event_seq_is_1(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + event = json.loads(outbox.read_text().strip()) + assert event["seq"] == 1 + + def test_monotonic_across_multiple_emits(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + for _ in range(5): + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + lines = outbox.read_text().strip().split("\n") + seqs = [json.loads(line)["seq"] for line in lines] + assert seqs == [1, 2, 3, 4, 5] + + def test_unique_event_ids(self, workspace: Path): + from gr2.python_cli.events import emit, EventType + for _ in range(10): + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + lines = outbox.read_text().strip().split("\n") + ids = [json.loads(line)["event_id"] for line in lines] + assert len(set(ids)) == 10 + + +# --------------------------------------------------------------------------- +# 4. Outbox rotation (section 4.3) +# --------------------------------------------------------------------------- + +class TestOutboxRotation: + """Outbox rotates at 10MB threshold.""" + + def test_rotation_creates_timestamped_archive(self, workspace: Path): + from gr2.python_cli.events import emit, EventType, _outbox_path + outbox = _outbox_path(workspace) + # Write a large payload to push past 10MB + outbox.parent.mkdir(parents=True, exist_ok=True) + outbox.write_text("x" * (10 * 1024 * 1024 + 1)) + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + # Old file should be renamed to outbox.{timestamp}.jsonl + archives = list(outbox.parent.glob("outbox.*.jsonl")) + assert len(archives) == 1 + # New outbox should exist with the fresh event + assert outbox.exists() + event = json.loads(outbox.read_text().strip()) + assert event["type"] == "lane.entered" + + def test_seq_continues_after_rotation(self, workspace: Path): + from gr2.python_cli.events import emit, EventType, _outbox_path + outbox = _outbox_path(workspace) + outbox.parent.mkdir(parents=True, exist_ok=True) + # Write 5 fake events to set seq baseline + lines = [] + for i in range(1, 6): + lines.append(json.dumps({"seq": i, "type": "test"})) + outbox.write_text("\n".join(lines) + "\n") + # Pad to trigger rotation + with outbox.open("a") as f: + f.write("x" * (10 * 1024 * 1024)) + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + event = json.loads(outbox.read_text().strip()) + assert event["seq"] == 6 # continues from last seq + + +# --------------------------------------------------------------------------- +# 5. Cursor-based consumption (section 5.1) +# --------------------------------------------------------------------------- + +class TestCursorModel: + """Cursor-based reading for event consumers.""" + + def test_read_events_from_empty_outbox(self, workspace: Path): + from gr2.python_cli.events import read_events + events = read_events(workspace, "test_consumer") + assert events == [] + + def test_read_events_returns_all_for_new_consumer(self, workspace: Path): + from gr2.python_cli.events import emit, read_events, EventType + for i in range(3): + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": f"lane-{i}", "lane_type": "feature", "repos": ["grip"]}, + ) + events = read_events(workspace, "test_consumer") + assert len(events) == 3 + assert [e["lane_name"] for e in events] == ["lane-0", "lane-1", "lane-2"] + + def test_cursor_advances_after_read(self, workspace: Path): + from gr2.python_cli.events import emit, read_events, EventType + for i in range(3): + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": f"lane-{i}", "lane_type": "feature", "repos": ["grip"]}, + ) + # First read: get all 3 + events = read_events(workspace, "my_consumer") + assert len(events) == 3 + # Second read: get nothing (cursor advanced) + events = read_events(workspace, "my_consumer") + assert len(events) == 0 + + def test_cursor_only_returns_new_events(self, workspace: Path): + from gr2.python_cli.events import emit, read_events, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "first", "lane_type": "feature", "repos": ["grip"]}, + ) + read_events(workspace, "my_consumer") + # Emit more after cursor advanced + emit( + event_type=EventType.LANE_EXITED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "first", "stashed_repos": []}, + ) + events = read_events(workspace, "my_consumer") + assert len(events) == 1 + assert events[0]["type"] == "lane.exited" + + def test_cursor_file_created(self, workspace: Path): + from gr2.python_cli.events import emit, read_events, EventType + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + read_events(workspace, "test_consumer") + cursor_file = workspace / ".grip" / "events" / "cursors" / "test_consumer.json" + assert cursor_file.exists() + cursor = json.loads(cursor_file.read_text()) + assert cursor["consumer"] == "test_consumer" + assert cursor["last_seq"] == 1 + + def test_independent_cursors(self, workspace: Path): + """Different consumers maintain independent cursors.""" + from gr2.python_cli.events import emit, read_events, EventType + for i in range(3): + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": f"lane-{i}", "lane_type": "feature", "repos": ["grip"]}, + ) + # Consumer A reads all 3 + events_a = read_events(workspace, "consumer_a") + assert len(events_a) == 3 + # Consumer B hasn't read yet, gets all 3 + events_b = read_events(workspace, "consumer_b") + assert len(events_b) == 3 + + +# --------------------------------------------------------------------------- +# 6. Outbox path helper (section 4.1) +# --------------------------------------------------------------------------- + +class TestOutboxPath: + + def test_outbox_path(self, workspace: Path): + from gr2.python_cli.events import _outbox_path + assert _outbox_path(workspace) == workspace / ".grip" / "events" / "outbox.jsonl" + + +# --------------------------------------------------------------------------- +# 7. emit() error handling (section 10.1) +# --------------------------------------------------------------------------- + +class TestEmitErrorHandling: + + def test_emit_does_not_raise_on_write_failure(self, workspace: Path): + """emit() logs to stderr but does not crash on write failure.""" + from gr2.python_cli.events import emit, EventType + # Make the events directory read-only to force a write failure + events_dir = workspace / ".grip" / "events" + events_dir.chmod(0o444) + try: + # Should not raise + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + finally: + events_dir.chmod(0o755) + + def test_emit_creates_events_dir_if_missing(self, workspace: Path): + """emit() creates .grip/events/ if it doesn't exist.""" + from gr2.python_cli.events import emit, EventType + events_dir = workspace / ".grip" / "events" + # Remove the events directory + events_dir.rmdir() + emit( + event_type=EventType.LANE_ENTERED, + workspace_root=workspace, + actor="agent:apollo", + owner_unit="apollo", + payload={"lane_name": "feat/test", "lane_type": "feature", "repos": ["grip"]}, + ) + assert (events_dir / "outbox.jsonl").exists() diff --git a/gr2/tests/test_hook_events.py b/gr2/tests/test_hook_events.py new file mode 100644 index 0000000..e4c6109 --- /dev/null +++ b/gr2/tests/test_hook_events.py @@ -0,0 +1,327 @@ +"""Tests for hook execution event emission. + +Verifies that run_lifecycle_stage emits hook.started, hook.completed, +hook.failed, and hook.skipped events per HOOK-EVENT-CONTRACT.md sections +3.2 (Hook Execution) and 6.2-6.4. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from gr2.python_cli.hooks import ( + HookContext, + HookRuntimeError, + LifecycleHook, + RepoHooks, + run_lifecycle_stage, +) + + +def _make_ctx(workspace: Path) -> HookContext: + repo_root = workspace / "repos" / "grip" + repo_root.mkdir(parents=True, exist_ok=True) + return HookContext( + workspace_root=workspace, + lane_root=workspace / "lanes" / "apollo" / "feat-test", + repo_root=repo_root, + repo_name="grip", + lane_owner="apollo", + lane_subject="grip", + lane_name="feat/test", + ) + + +def _make_hooks(lifecycle_hooks: list[LifecycleHook], stage: str = "on_enter") -> RepoHooks: + kwargs = {"on_materialize": [], "on_enter": [], "on_exit": []} + kwargs[stage] = lifecycle_hooks + return RepoHooks( + repo_name="grip", + file_links=[], + file_copies=[], + policy={}, + path=Path("/fake/.gr2/hooks.toml"), + **kwargs, + ) + + +def _read_outbox(workspace: Path) -> list[dict]: + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + if not outbox.exists(): + return [] + lines = outbox.read_text().strip().split("\n") + return [json.loads(line) for line in lines if line.strip()] + + +# --------------------------------------------------------------------------- +# 1. hook.completed (successful hook) +# --------------------------------------------------------------------------- + +class TestHookCompleted: + + def test_emits_started_and_completed(self, workspace: Path): + """Successful hook emits hook.started then hook.completed.""" + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="check-version", command="true", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert types == ["hook.started", "hook.completed"] + + def test_started_payload(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="check-version", command="echo hello", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + started = events[0] + assert started["type"] == "hook.started" + assert started["stage"] == "on_enter" + assert started["hook_name"] == "check-version" + assert started["repo"] == "grip" + assert "command" in started + assert "cwd" in started + + def test_completed_payload(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="check-version", command="true", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + completed = events[1] + assert completed["type"] == "hook.completed" + assert completed["stage"] == "on_enter" + assert completed["hook_name"] == "check-version" + assert completed["repo"] == "grip" + assert completed["exit_code"] == 0 + assert "duration_ms" in completed + assert isinstance(completed["duration_ms"], int) + + +# --------------------------------------------------------------------------- +# 2. hook.failed with on_failure="block" +# --------------------------------------------------------------------------- + +class TestHookFailedBlock: + + def test_emits_started_and_failed(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="install-deps", command="false", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ) + hooks = _make_hooks([hook]) + with pytest.raises(HookRuntimeError): + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert types == ["hook.started", "hook.failed"] + + def test_failed_payload(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="install-deps", command="echo bad >&2; false", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ) + hooks = _make_hooks([hook]) + with pytest.raises(HookRuntimeError): + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + failed = events[1] + assert failed["type"] == "hook.failed" + assert failed["stage"] == "on_enter" + assert failed["hook_name"] == "install-deps" + assert failed["repo"] == "grip" + assert failed["exit_code"] != 0 + assert failed["on_failure"] == "block" + assert "duration_ms" in failed + assert "stderr_tail" in failed + + +# --------------------------------------------------------------------------- +# 3. hook.failed with on_failure="warn" +# --------------------------------------------------------------------------- + +class TestHookFailedWarn: + + def test_emits_started_and_failed(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="lint", command="false", + cwd=str(ctx.repo_root), when="always", on_failure="warn", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert types == ["hook.started", "hook.failed"] + + def test_failed_payload_on_failure_warn(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="lint", command="false", + cwd=str(ctx.repo_root), when="always", on_failure="warn", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + failed = events[1] + assert failed["on_failure"] == "warn" + + +# --------------------------------------------------------------------------- +# 4. hook.failed with on_failure="skip" +# --------------------------------------------------------------------------- + +class TestHookFailedSkip: + + def test_emits_started_and_failed(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="optional", command="false", + cwd=str(ctx.repo_root), when="always", on_failure="skip", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert types == ["hook.started", "hook.failed"] + + def test_failed_payload_on_failure_skip(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="optional", command="false", + cwd=str(ctx.repo_root), when="always", on_failure="skip", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + failed = events[1] + assert failed["on_failure"] == "skip" + + +# --------------------------------------------------------------------------- +# 5. hook.skipped (when condition not met) +# --------------------------------------------------------------------------- + +class TestHookSkipped: + + def test_emits_skipped(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="first-only", command="true", + cwd=str(ctx.repo_root), when="first_materialize", on_failure="block", + ) + hooks = _make_hooks([hook]) + # first_materialize=False -> when=first_materialize does not match + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + assert len(events) == 1 + assert events[0]["type"] == "hook.skipped" + + def test_skipped_payload(self, workspace: Path): + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="first-only", command="true", + cwd=str(ctx.repo_root), when="first_materialize", on_failure="block", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + skipped = events[0] + assert skipped["hook_name"] == "first-only" + assert skipped["repo"] == "grip" + assert skipped["stage"] == "on_enter" + assert "reason" in skipped + + def test_skipped_no_started_event(self, workspace: Path): + """Skipped hooks must NOT emit hook.started.""" + ctx = _make_ctx(workspace) + hook = LifecycleHook( + stage="on_enter", name="first-only", command="true", + cwd=str(ctx.repo_root), when="first_materialize", on_failure="block", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert "hook.started" not in types + + +# --------------------------------------------------------------------------- +# 6. Multiple hooks in sequence +# --------------------------------------------------------------------------- + +class TestMultipleHooks: + + def test_two_hooks_both_succeed(self, workspace: Path): + ctx = _make_ctx(workspace) + hooks = _make_hooks([ + LifecycleHook( + stage="on_enter", name="hook-a", command="true", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ), + LifecycleHook( + stage="on_enter", name="hook-b", command="true", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ), + ]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert types == [ + "hook.started", "hook.completed", + "hook.started", "hook.completed", + ] + assert events[0]["hook_name"] == "hook-a" + assert events[2]["hook_name"] == "hook-b" + + def test_second_hook_skipped_first_succeeds(self, workspace: Path): + ctx = _make_ctx(workspace) + hooks = _make_hooks([ + LifecycleHook( + stage="on_enter", name="always-hook", command="true", + cwd=str(ctx.repo_root), when="always", on_failure="block", + ), + LifecycleHook( + stage="on_enter", name="dirty-only", command="true", + cwd=str(ctx.repo_root), when="dirty", on_failure="block", + ), + ]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + types = [e["type"] for e in events] + assert types == ["hook.started", "hook.completed", "hook.skipped"] + + +# --------------------------------------------------------------------------- +# 7. stderr_tail truncation (section 6.3) +# --------------------------------------------------------------------------- + +class TestStderrTail: + + def test_stderr_tail_truncated_to_500_bytes(self, workspace: Path): + ctx = _make_ctx(workspace) + # Generate > 500 bytes of stderr + long_stderr_cmd = "python3 -c \"import sys; sys.stderr.write('x' * 1000)\"; false" + hook = LifecycleHook( + stage="on_enter", name="noisy", command=long_stderr_cmd, + cwd=str(ctx.repo_root), when="always", on_failure="warn", + ) + hooks = _make_hooks([hook]) + run_lifecycle_stage(hooks, "on_enter", ctx, repo_dirty=False, first_materialize=False) + events = _read_outbox(workspace) + failed = [e for e in events if e["type"] == "hook.failed"][0] + assert len(failed["stderr_tail"]) <= 500 diff --git a/gr2/tests/test_pr_events.py b/gr2/tests/test_pr_events.py new file mode 100644 index 0000000..027df1d --- /dev/null +++ b/gr2/tests/test_pr_events.py @@ -0,0 +1,510 @@ +"""Tests for PR lifecycle event emission. + +Verifies that pr.py emits pr.created, pr.merged, pr.merge_failed, +pr.status_changed, pr.checks_passed, pr.checks_failed, and +pr.review_submitted events per HOOK-EVENT-CONTRACT.md section 3.2 +(PR Lifecycle) and PR-LIFECYCLE.md. + +Uses a FakeAdapter to avoid real GitHub calls. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from gr2.python_cli.platform import ( + AdapterError, + CreatePRRequest, + PRCheck, + PRRef, + PRStatus, +) + + +class FakeAdapter: + """Test double for PlatformAdapter. Records calls, returns canned data.""" + + name = "fake" + + def __init__(self) -> None: + self.created: list[CreatePRRequest] = [] + self.merged: list[tuple[str, int]] = [] + self.statuses: dict[tuple[str, int], PRStatus] = {} + self._fail_merge: set[tuple[str, int]] = set() + + def create_pr(self, request: CreatePRRequest) -> PRRef: + self.created.append(request) + n = len(self.created) + 100 + return PRRef( + repo=request.repo, + number=n, + url=f"https://github.com/test/{request.repo}/pull/{n}", + head_branch=request.head_branch, + base_branch=request.base_branch, + title=request.title, + ) + + def merge_pr(self, repo: str, number: int) -> PRRef: + if (repo, number) in self._fail_merge: + raise AdapterError(f"merge conflict in {repo}#{number}") + self.merged.append((repo, number)) + return PRRef(repo=repo, number=number) + + def pr_status(self, repo: str, number: int) -> PRStatus: + key = (repo, number) + if key in self.statuses: + return self.statuses[key] + return PRStatus( + ref=PRRef(repo=repo, number=number), + state="OPEN", + checks=[], + ) + + def list_prs(self, repo: str, *, head_branch: str | None = None) -> list[PRRef]: + return [] + + def pr_checks(self, repo: str, number: int) -> list[PRCheck]: + return self.pr_status(repo, number).checks + + def set_fail_merge(self, repo: str, number: int) -> None: + self._fail_merge.add((repo, number)) + + def set_status(self, repo: str, number: int, status: PRStatus) -> None: + self.statuses[(repo, number)] = status + + +def _read_outbox(workspace: Path) -> list[dict]: + outbox = workspace / ".grip" / "events" / "outbox.jsonl" + if not outbox.exists(): + return [] + lines = outbox.read_text().strip().split("\n") + return [json.loads(line) for line in lines if line.strip()] + + +def _events_of_type(workspace: Path, event_type: str) -> list[dict]: + return [e for e in _read_outbox(workspace) if e["type"] == event_type] + + +# --------------------------------------------------------------------------- +# 1. pr.created (section 3.2, PR-LIFECYCLE.md section 3.1) +# --------------------------------------------------------------------------- + +class TestPRCreated: + + def test_emits_pr_created(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group + adapter = FakeAdapter() + result = create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/hook-events", + title="feat: hook events", + base_branch="sprint-21", + head_branch="test/event-system-runtime", + repos=["grip", "synapt"], + adapter=adapter, + actor="agent:apollo", + ) + events = _events_of_type(workspace, "pr.created") + assert len(events) == 1 + + def test_pr_created_payload(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group + adapter = FakeAdapter() + result = create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/hook-events", + title="feat: hook events", + base_branch="sprint-21", + head_branch="test/event-system-runtime", + repos=["grip", "synapt"], + adapter=adapter, + actor="agent:apollo", + ) + event = _events_of_type(workspace, "pr.created")[0] + assert "pr_group_id" in event + assert isinstance(event["repos"], list) + assert len(event["repos"]) == 2 + for pr in event["repos"]: + assert "repo" in pr + assert "pr_number" in pr + assert "url" in pr + + def test_pr_group_id_format(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group + adapter = FakeAdapter() + result = create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/hook-events", + title="feat: hook events", + base_branch="sprint-21", + head_branch="test/event-system-runtime", + repos=["grip"], + adapter=adapter, + actor="agent:apollo", + ) + event = _events_of_type(workspace, "pr.created")[0] + gid = event["pr_group_id"] + assert gid.startswith("pg_") + assert len(gid) == 11 # pg_ + 8 hex chars + assert all(c in "0123456789abcdef" for c in gid[3:]) + + def test_pr_group_metadata_stored(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group + adapter = FakeAdapter() + result = create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/hook-events", + title="feat: hook events", + base_branch="sprint-21", + head_branch="test/event-system-runtime", + repos=["grip"], + adapter=adapter, + actor="agent:apollo", + ) + gid = result["pr_group_id"] + meta_path = workspace / ".grip" / "pr_groups" / f"{gid}.json" + assert meta_path.exists() + meta = json.loads(meta_path.read_text()) + assert meta["pr_group_id"] == gid + assert meta["lane_name"] == "feat/hook-events" + + def test_calls_adapter_per_repo(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group + adapter = FakeAdapter() + create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/hook-events", + title="feat: hook events", + base_branch="sprint-21", + head_branch="test/event-system-runtime", + repos=["grip", "synapt", "synapt-private"], + adapter=adapter, + actor="agent:apollo", + ) + assert len(adapter.created) == 3 + assert [r.repo for r in adapter.created] == ["grip", "synapt", "synapt-private"] + + +# --------------------------------------------------------------------------- +# 2. pr.merged (section 3.2, PR-LIFECYCLE.md section 3.3) +# --------------------------------------------------------------------------- + +class TestPRMerged: + + def _create_group(self, workspace: Path, adapter: FakeAdapter, repos: list[str] | None = None) -> dict: + from gr2.python_cli.pr import create_pr_group + return create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/test", + title="feat: test", + base_branch="sprint-21", + head_branch="feat/test", + repos=repos or ["grip", "synapt"], + adapter=adapter, + actor="agent:apollo", + ) + + def test_emits_pr_merged(self, workspace: Path): + from gr2.python_cli.pr import merge_pr_group + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + merge_pr_group( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events = _events_of_type(workspace, "pr.merged") + assert len(events) == 1 + + def test_pr_merged_payload(self, workspace: Path): + from gr2.python_cli.pr import merge_pr_group + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + merge_pr_group( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + event = _events_of_type(workspace, "pr.merged")[0] + assert event["pr_group_id"] == group["pr_group_id"] + assert isinstance(event["repos"], list) + assert len(event["repos"]) == 2 + + def test_merges_in_repo_order(self, workspace: Path): + from gr2.python_cli.pr import merge_pr_group + adapter = FakeAdapter() + group = self._create_group(workspace, adapter, repos=["grip", "synapt", "synapt-private"]) + merge_pr_group( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + assert [r for r, _ in adapter.merged] == ["grip", "synapt", "synapt-private"] + + +# --------------------------------------------------------------------------- +# 3. pr.merge_failed (section 3.2, PR-LIFECYCLE.md section 4.4) +# --------------------------------------------------------------------------- + +class TestPRMergeFailed: + + def _create_group(self, workspace: Path, adapter: FakeAdapter) -> dict: + from gr2.python_cli.pr import create_pr_group + return create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/test", + title="feat: test", + base_branch="sprint-21", + head_branch="feat/test", + repos=["grip", "synapt"], + adapter=adapter, + actor="agent:apollo", + ) + + def test_emits_merge_failed(self, workspace: Path): + from gr2.python_cli.pr import merge_pr_group, PRMergeError + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + # Make synapt fail + synapt_pr = [p for p in group["prs"] if p["repo"] == "synapt"][0] + adapter.set_fail_merge("synapt", synapt_pr["pr_number"]) + with pytest.raises(PRMergeError): + merge_pr_group( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events = _events_of_type(workspace, "pr.merge_failed") + assert len(events) == 1 + + def test_merge_failed_payload(self, workspace: Path): + from gr2.python_cli.pr import merge_pr_group, PRMergeError + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + synapt_pr = [p for p in group["prs"] if p["repo"] == "synapt"][0] + adapter.set_fail_merge("synapt", synapt_pr["pr_number"]) + with pytest.raises(PRMergeError): + merge_pr_group( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + event = _events_of_type(workspace, "pr.merge_failed")[0] + assert event["pr_group_id"] == group["pr_group_id"] + assert event["repo"] == "synapt" + assert "reason" in event + + def test_stops_after_first_failure(self, workspace: Path): + """Merge stops at first failure; remaining repos are not attempted.""" + from gr2.python_cli.pr import create_pr_group, merge_pr_group, PRMergeError + adapter = FakeAdapter() + group = create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/test", + title="feat: test", + base_branch="sprint-21", + head_branch="feat/test", + repos=["grip", "synapt", "synapt-private"], + adapter=adapter, + actor="agent:apollo", + ) + # Make grip (first repo) fail + grip_pr = [p for p in group["prs"] if p["repo"] == "grip"][0] + adapter.set_fail_merge("grip", grip_pr["pr_number"]) + with pytest.raises(PRMergeError): + merge_pr_group( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + # Only grip was attempted; synapt and synapt-private were not + assert len(adapter.merged) == 0 # grip failed, not in merged list + assert len(_events_of_type(workspace, "pr.merged")) == 0 + + +# --------------------------------------------------------------------------- +# 4. pr.status_changed, pr.checks_passed, pr.checks_failed +# --------------------------------------------------------------------------- + +class TestPRStatusEvents: + + def _create_group(self, workspace: Path, adapter: FakeAdapter) -> dict: + from gr2.python_cli.pr import create_pr_group + return create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/test", + title="feat: test", + base_branch="sprint-21", + head_branch="feat/test", + repos=["grip"], + adapter=adapter, + actor="agent:apollo", + ) + + def test_checks_passed_emitted(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group, check_pr_group_status + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + grip_pr = group["prs"][0] + # Set checks to all passing + adapter.set_status("grip", grip_pr["pr_number"], PRStatus( + ref=PRRef(repo="grip", number=grip_pr["pr_number"]), + state="OPEN", + checks=[ + PRCheck(name="ci/test", status="COMPLETED", conclusion="SUCCESS"), + PRCheck(name="ci/lint", status="COMPLETED", conclusion="SUCCESS"), + ], + )) + check_pr_group_status( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events = _events_of_type(workspace, "pr.checks_passed") + assert len(events) == 1 + assert events[0]["repo"] == "grip" + assert events[0]["pr_group_id"] == group["pr_group_id"] + + def test_checks_failed_emitted(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group, check_pr_group_status + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + grip_pr = group["prs"][0] + adapter.set_status("grip", grip_pr["pr_number"], PRStatus( + ref=PRRef(repo="grip", number=grip_pr["pr_number"]), + state="OPEN", + checks=[ + PRCheck(name="ci/test", status="COMPLETED", conclusion="FAILURE"), + PRCheck(name="ci/lint", status="COMPLETED", conclusion="SUCCESS"), + ], + )) + check_pr_group_status( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events = _events_of_type(workspace, "pr.checks_failed") + assert len(events) == 1 + assert events[0]["repo"] == "grip" + assert "ci/test" in events[0]["failed_checks"] + + def test_status_changed_emitted(self, workspace: Path): + from gr2.python_cli.pr import create_pr_group, check_pr_group_status + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + grip_pr = group["prs"][0] + adapter.set_status("grip", grip_pr["pr_number"], PRStatus( + ref=PRRef(repo="grip", number=grip_pr["pr_number"]), + state="MERGED", + checks=[], + )) + check_pr_group_status( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events = _events_of_type(workspace, "pr.status_changed") + assert len(events) == 1 + assert events[0]["repo"] == "grip" + assert events[0]["new_status"] == "MERGED" + + def test_no_event_when_status_unchanged(self, workspace: Path): + """Second status check with no changes emits no events.""" + from gr2.python_cli.pr import create_pr_group, check_pr_group_status + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + # Default status is OPEN with no checks -- first check caches it + check_pr_group_status( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events_before = len(_read_outbox(workspace)) + # Second check, same status + check_pr_group_status( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + adapter=adapter, + actor="agent:apollo", + ) + events_after = len(_read_outbox(workspace)) + # No new status_changed events + assert events_after == events_before + + +# --------------------------------------------------------------------------- +# 5. pr.review_submitted +# --------------------------------------------------------------------------- + +class TestPRReviewSubmitted: + + def _create_group(self, workspace: Path, adapter: FakeAdapter) -> dict: + from gr2.python_cli.pr import create_pr_group + return create_pr_group( + workspace_root=workspace, + owner_unit="apollo", + lane_name="feat/test", + title="feat: test", + base_branch="sprint-21", + head_branch="feat/test", + repos=["grip"], + adapter=adapter, + actor="agent:apollo", + ) + + def test_review_event_emitted(self, workspace: Path): + from gr2.python_cli.pr import record_pr_review + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + record_pr_review( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + repo="grip", + pr_number=group["prs"][0]["pr_number"], + reviewer="agent:sentinel", + state="APPROVED", + actor="agent:sentinel", + ) + events = _events_of_type(workspace, "pr.review_submitted") + assert len(events) == 1 + + def test_review_payload(self, workspace: Path): + from gr2.python_cli.pr import record_pr_review + adapter = FakeAdapter() + group = self._create_group(workspace, adapter) + record_pr_review( + workspace_root=workspace, + pr_group_id=group["pr_group_id"], + repo="grip", + pr_number=group["prs"][0]["pr_number"], + reviewer="agent:sentinel", + state="CHANGES_REQUESTED", + actor="agent:sentinel", + ) + event = _events_of_type(workspace, "pr.review_submitted")[0] + assert event["pr_group_id"] == group["pr_group_id"] + assert event["repo"] == "grip" + assert event["pr_number"] == group["prs"][0]["pr_number"] + assert event["reviewer"] == "agent:sentinel" + assert event["state"] == "CHANGES_REQUESTED"