Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 289 additions & 2 deletions gr2/python_cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import contextlib
import io
import json
import os
from pathlib import Path
from types import SimpleNamespace
from typing import Optional

import typer

from . import execops
from . import failures
from . import migration
from . import pr as pr_ops
from . import syncops
from .gitops import (
branch_exists,
Expand All @@ -24,7 +27,8 @@
stash_if_dirty,
)
from .events import emit, EventType
from .hooks import HookContext, apply_file_projections, load_repo_hooks, run_lifecycle_stage
from .hooks import HookContext, HookRuntimeError, apply_file_projections, load_repo_hooks, run_lifecycle_stage
from .platform import PRRef, get_platform_adapter
from . import spec_apply
from gr2.prototypes import lane_workspace_prototype as lane_proto
from gr2.prototypes import repo_maintenance_prototype as repo_proto
Expand All @@ -37,6 +41,7 @@
lane_app = typer.Typer(help="Lane creation and navigation")
lease_app = typer.Typer(help="Lane lease operations")
review_app = typer.Typer(help="Review and reviewer requirement operations")
pr_app = typer.Typer(help="Cross-repo PR orchestration")
workspace_app = typer.Typer(help="Workspace bootstrap and materialization")
spec_app = typer.Typer(help="Declarative workspace spec operations")
exec_app = typer.Typer(help="Lane-aware execution planning and execution")
Expand All @@ -46,6 +51,7 @@
app.add_typer(lane_app, name="lane")
lane_app.add_typer(lease_app, name="lease")
app.add_typer(review_app, name="review")
app.add_typer(pr_app, name="pr")
app.add_typer(workspace_app, name="workspace")
app.add_typer(spec_app, name="spec")
app.add_typer(exec_app, name="exec")
Expand Down Expand Up @@ -196,6 +202,48 @@ def _repo_hook_context(workspace_root: Path, repo_root: Path) -> HookContext:
)


def _resolve_lane_name(workspace_root: Path, owner_unit: str, lane_name: Optional[str]) -> str:
if lane_name:
return lane_name
current_doc = lane_proto.load_current_lane_doc(workspace_root, owner_unit)
return str(current_doc["current"]["lane_name"])


def _find_pr_group(workspace_root: Path, owner_unit: str, lane_name: str) -> tuple[Path, dict[str, object]]:
root = workspace_root / ".grip" / "pr_groups"
if not root.exists():
raise SystemExit(f"pr group not found for {owner_unit}/{lane_name}: {root}")
for path in sorted(root.glob("*.json")):
doc = json.loads(path.read_text())
if doc.get("owner_unit") == owner_unit and doc.get("lane_name") == lane_name:
return path, doc
raise SystemExit(f"pr group not found for {owner_unit}/{lane_name}: {root}")


def _group_state_from_statuses(statuses: list[dict[str, object]]) -> str:
states = [str(item.get("state", "")).upper() for item in statuses]
if not states:
return "empty"
if all(state == "MERGED" for state in states):
return "merged"
if any(state == "MERGED" for state in states):
return "partially_merged"
if all(state in {"OPEN", "MERGEABLE", "CLEAN"} for state in states):
return "open"
return "mixed"


def _repo_slug_from_url(url: str, fallback_name: str) -> str:
cleaned = url.strip()
if cleaned.startswith("git@github.com:"):
slug = cleaned.split("git@github.com:", 1)[1]
return slug.removesuffix(".git")
if cleaned.startswith("https://github.com/"):
slug = cleaned.split("https://github.com/", 1)[1]
return slug.removesuffix(".git")
return fallback_name


def _write_workspace_spec(workspace_root: Path, repos: list[dict[str, str]], default_unit: str) -> Path:
spec_path = _workspace_spec_path(workspace_root)
spec_path.parent.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -644,7 +692,38 @@ def lane_enter(
) -> None:
"""Enter a lane and optionally emit channel/recall-compatible events."""
workspace_root = workspace_root.resolve()
_run_lane_stage(workspace_root, owner_unit, lane_name, "on_enter", manual_hooks=manual_hooks)
unresolved = failures.unresolved_lane_failure(workspace_root, owner_unit, lane_name)
if unresolved:
typer.echo(
json.dumps(
{
"status": "blocked",
"code": "unresolved_failure_marker",
"operation_id": unresolved["operation_id"],
"lane_name": lane_name,
},
indent=2,
)
)
raise typer.Exit(code=1)
try:
_run_lane_stage(workspace_root, owner_unit, lane_name, "on_enter", manual_hooks=manual_hooks)
except HookRuntimeError as exc:
payload = exc.payload
repo_name = Path(str(payload.get("cwd", ""))).name or lane_name
event = failures.write_failure_marker(
workspace_root,
operation="lane.enter",
stage=str(payload.get("stage", "on_enter")),
hook_name=str(payload.get("hook", payload.get("name", "unknown"))),
repo=repo_name,
owner_unit=owner_unit,
lane_name=lane_name,
partial_state={},
event_id=None,
)
typer.echo(json.dumps(event, indent=2))
raise typer.Exit(code=1)
ns = SimpleNamespace(
workspace_root=workspace_root,
owner_unit=owner_unit,
Expand All @@ -668,6 +747,30 @@ def lane_enter(
)


@lane_app.command("resolve")
def lane_resolve(
workspace_root: Path,
owner_unit: str,
operation_id: str,
actor: str = typer.Option(..., help="Actor label, e.g. agent:atlas"),
resolution: str = typer.Option(..., help="Resolution note: retry | skip | escalate"),
json_output: bool = typer.Option(False, "--json", help="Emit machine-readable JSON"),
) -> None:
"""Resolve a blocking failure marker for a lane-scoped operation."""
workspace_root = workspace_root.resolve()
payload = failures.resolve_failure_marker(
workspace_root,
operation_id=operation_id,
resolved_by=actor,
resolution=resolution,
owner_unit=owner_unit,
)
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))


@lane_app.command("exit")
def lane_exit(
workspace_root: Path,
Expand Down Expand Up @@ -873,5 +976,189 @@ def review_checkout_pr(
typer.echo(json.dumps(payload, indent=2))


@pr_app.command("create")
def pr_create(
workspace_root: Path,
owner_unit: str,
lane_name: Optional[str] = typer.Argument(None, help="Lane name. Defaults to the unit's current lane."),
platform: str = typer.Option("github", "--platform", help="Platform adapter name"),
base_branch: str = typer.Option("main", "--base", help="Base branch for created PRs"),
draft: bool = typer.Option(False, "--draft", help="Create PRs as drafts"),
json_output: bool = typer.Option(False, "--json", help="Emit machine-readable JSON"),
) -> None:
"""Create a grouped set of per-repo PRs for a lane."""
workspace_root = workspace_root.resolve()
resolved_lane = _resolve_lane_name(workspace_root, owner_unit, lane_name)
lane_doc = lane_proto.load_lane_doc(workspace_root, owner_unit, resolved_lane)
spec = lane_proto.load_workspace_spec(workspace_root)
adapter = get_platform_adapter(platform)
branch_map = dict(lane_doc.get("branch_map", {}))
repos: list[str] = []
for repo_name in lane_doc.get("repos", []):
repo_spec = next(repo for repo in spec.get("repos", []) if repo.get("name") == repo_name)
repos.append(_repo_slug_from_url(str(repo_spec.get("url", "")), repo_name))
payload = pr_ops.create_pr_group(
workspace_root=workspace_root,
owner_unit=owner_unit,
lane_name=resolved_lane,
title=resolved_lane,
base_branch=base_branch,
head_branch=str(branch_map.get(next(iter(lane_doc.get("repos", [])), resolved_lane), resolved_lane)),
repos=repos,
adapter=adapter,
actor=f"agent:{owner_unit}",
body=f"gr2 PR group for {owner_unit}/{resolved_lane}",
draft=draft,
)
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))


@pr_app.command("status")
def pr_status(
workspace_root: Path,
owner_unit: str,
lane_name: Optional[str] = typer.Argument(None, help="Lane name. Defaults to the unit's current lane."),
json_output: bool = typer.Option(False, "--json", help="Emit machine-readable JSON"),
) -> None:
"""Show grouped PR status for a lane."""
workspace_root = workspace_root.resolve()
resolved_lane = _resolve_lane_name(workspace_root, owner_unit, lane_name)
group_path, group = _find_pr_group(workspace_root, owner_unit, resolved_lane)
adapter = get_platform_adapter(str(group.get("platform", "github")))
group = pr_ops.check_pr_group_status(
workspace_root=workspace_root,
pr_group_id=str(group["pr_group_id"]),
adapter=adapter,
actor=f"agent:{owner_unit}",
)
statuses = []
for pr_info in group.get("prs", []):
repo = str(pr_info["repo"])
number = int(pr_info["pr_number"])
statuses.append(adapter.pr_status(repo, number).as_dict())
payload = {
"pr_group_id": group["pr_group_id"],
"owner_unit": owner_unit,
"lane_name": resolved_lane,
"group_state": _group_state_from_statuses(statuses),
"statuses": statuses,
"state_path": str(group_path),
}
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))


@pr_app.command("checks")
def pr_checks(
workspace_root: Path,
owner_unit: str,
lane_name: Optional[str] = typer.Argument(None, help="Lane name. Defaults to the unit's current lane."),
json_output: bool = typer.Option(False, "--json", help="Emit machine-readable JSON"),
) -> None:
"""Show grouped PR checks for a lane."""
workspace_root = workspace_root.resolve()
resolved_lane = _resolve_lane_name(workspace_root, owner_unit, lane_name)
group_path, group = _find_pr_group(workspace_root, owner_unit, resolved_lane)
adapter = get_platform_adapter(str(group.get("platform", "github")))
rows = []
for pr_info in group.get("prs", []):
ref = PRRef(repo=str(pr_info["repo"]), number=int(pr_info["pr_number"]), url=pr_info.get("url"))
rows.append(
{
"repo": ref.repo,
"number": ref.number,
"checks": [item.as_dict() for item in adapter.pr_checks(ref.repo, int(ref.number))],
}
)
payload = {
"pr_group_id": group["pr_group_id"],
"owner_unit": owner_unit,
"lane_name": resolved_lane,
"checks": rows,
"state_path": str(group_path),
}
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))


@pr_app.command("merge")
def pr_merge(
workspace_root: Path,
owner_unit: str,
lane_name: Optional[str] = typer.Argument(None, help="Lane name. Defaults to the unit's current lane."),
json_output: bool = typer.Option(False, "--json", help="Emit machine-readable JSON"),
) -> None:
"""Merge grouped PRs for a lane."""
workspace_root = workspace_root.resolve()
resolved_lane = _resolve_lane_name(workspace_root, owner_unit, lane_name)
group_path, group = _find_pr_group(workspace_root, owner_unit, resolved_lane)
adapter = get_platform_adapter(str(group.get("platform", "github")))
merged: list[str] = []
failed: list[dict[str, object]] = []
if failed:
group["group_state"] = "partially_merged" if merged else "merge_failed"
group["merged"] = merged
group_path.write_text(json.dumps(group, indent=2) + "\n")
payload = {
"status": "partial_failure" if merged else "failed",
"pr_group_id": group["pr_group_id"],
"owner_unit": owner_unit,
"lane_name": resolved_lane,
"merged": merged,
"failed": failed,
"state_path": str(group_path),
}
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))
raise typer.Exit(code=1)
try:
pr_ops.merge_pr_group(
workspace_root=workspace_root,
pr_group_id=str(group["pr_group_id"]),
adapter=adapter,
actor=f"agent:{owner_unit}",
)
merged = [str(pr_info["repo"]) for pr_info in group.get("prs", [])]
payload = {
"pr_group_id": group["pr_group_id"],
"owner_unit": owner_unit,
"lane_name": resolved_lane,
"merged": merged,
"state_path": str(group_path),
}
except pr_ops.PRMergeError as exc:
merged = [str(pr_info["repo"]) for pr_info in group.get("prs", []) if str(pr_info["repo"]) != exc.repo]
group["group_state"] = "partially_merged" if merged else "merge_failed"
group["merged"] = merged
group_path.write_text(json.dumps(group, indent=2) + "\n")
payload = {
"status": "partial_failure" if merged else "failed",
"pr_group_id": group["pr_group_id"],
"owner_unit": owner_unit,
"lane_name": resolved_lane,
"merged": merged,
"failed": [{"repo": exc.repo, "number": exc.pr_number, "reason": exc.reason}],
"state_path": str(group_path),
}
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))
raise typer.Exit(code=1)
if json_output:
typer.echo(json.dumps(payload, indent=2))
else:
typer.echo(json.dumps(payload, indent=2))


if __name__ == "__main__":
app()
Loading