Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion gr2/python_cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
repo_dirty,
stash_if_dirty,
)
from .events import emit, EventType
from .hooks import HookContext, apply_file_projections, load_repo_hooks, run_lifecycle_stage
from . import spec_apply
from gr2.prototypes import lane_workspace_prototype as lane_proto
Expand Down Expand Up @@ -607,6 +608,28 @@ def lane_create(
)
_exit(lane_proto.create_lane(ns))
_materialize_lane_repos(workspace_root, owner_unit, lane_name, manual_hooks=manual_hooks)
repo_list = [r.strip() for r in repos.split(",")]
branch_parts = branch.split(",")
branch_map = {}
for part in branch_parts:
if "=" in part:
k, v = part.split("=", 1)
branch_map[k.strip()] = v.strip()
else:
for r in repo_list:
branch_map[r] = part.strip()
emit(
event_type=EventType.LANE_CREATED,
workspace_root=workspace_root,
actor=source,
owner_unit=owner_unit,
payload={
"lane_name": lane_name,
"lane_type": lane_type,
"repos": repo_list,
"branch_map": branch_map,
},
)


@lane_app.command("enter")
Expand All @@ -631,6 +654,18 @@ def lane_enter(
recall=recall,
)
_exit(lane_proto.enter_lane(ns))
lane_doc = lane_proto.load_lane_doc(workspace_root, owner_unit, lane_name)
emit(
event_type=EventType.LANE_ENTERED,
workspace_root=workspace_root,
actor=actor,
owner_unit=owner_unit,
payload={
"lane_name": lane_name,
"lane_type": lane_doc.get("type", "feature"),
"repos": lane_doc.get("repos", []),
},
)


@lane_app.command("exit")
Expand All @@ -647,10 +682,12 @@ def lane_exit(
current_doc = lane_proto.load_current_lane_doc(workspace_root, owner_unit)
lane_name = current_doc["current"]["lane_name"]
lane_doc = lane_proto.load_lane_doc(workspace_root, owner_unit, lane_name)
stashed_repos: list[str] = []
for repo_name in lane_doc.get("repos", []):
repo_root = _lane_repo_root(workspace_root, owner_unit, lane_name, repo_name)
if repo_root.exists():
stash_if_dirty(repo_root, f"gr2 exit {owner_unit}/{lane_name}")
if stash_if_dirty(repo_root, f"gr2 exit {owner_unit}/{lane_name}"):
stashed_repos.append(repo_name)
_run_lane_stage(workspace_root, owner_unit, lane_name, "on_exit", manual_hooks=manual_hooks)
ns = SimpleNamespace(
workspace_root=workspace_root,
Expand All @@ -660,6 +697,16 @@ def lane_exit(
recall=recall,
)
_exit(lane_proto.exit_lane(ns))
emit(
event_type=EventType.LANE_EXITED,
workspace_root=workspace_root,
actor=actor,
owner_unit=owner_unit,
payload={
"lane_name": lane_name,
"stashed_repos": stashed_repos,
},
)


@lane_app.command("current")
Expand Down Expand Up @@ -698,6 +745,18 @@ def lane_lease_acquire(
force=force,
)
_exit(lane_proto.acquire_lane_lease(ns))
emit(
event_type=EventType.LEASE_ACQUIRED,
workspace_root=workspace_root,
actor=actor,
owner_unit=owner_unit,
payload={
"lane_name": lane_name,
"mode": mode,
"ttl_seconds": ttl_seconds,
"lease_id": f"{owner_unit}:{lane_name}",
},
)


@lease_app.command("release")
Expand All @@ -715,6 +774,16 @@ def lane_lease_release(
actor=actor,
)
_exit(lane_proto.release_lane_lease(ns))
emit(
event_type=EventType.LEASE_RELEASED,
workspace_root=workspace_root,
actor=actor,
owner_unit=owner_unit,
payload={
"lane_name": lane_name,
"lease_id": f"{owner_unit}:{lane_name}",
},
)


@lease_app.command("show")
Expand Down
113 changes: 113 additions & 0 deletions gr2/python_cli/channel_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""gr2 channel bridge consumer.

Translates outbox events into channel messages per the mapping table in
HOOK-EVENT-CONTRACT.md section 8. Uses cursor-based consumption from
events.read_events().

The bridge is a pure function layer: format_event() maps an event dict to
a message string (or None), and run_bridge() orchestrates cursor reads and
posts via a caller-provided post_fn. This keeps the MCP/recall_channel
dependency out of the module and makes it fully testable.
"""
from __future__ import annotations

from pathlib import Path
from typing import Callable

from .events import read_events


_CONSUMER_NAME = "channel_bridge"


def format_event(event: dict[str, object]) -> str | None:
"""Apply the section 8 mapping table to produce a channel message.

Returns None if the event type is not mapped (silently dropped).
"""
etype = event.get("type", "")

if etype == "lane.created":
return (
f"{event['actor']} created lane {event['lane_name']}"
f" [{event.get('lane_type', 'unknown')}]"
f" repos={event.get('repos', [])}"
)

if etype == "lane.entered":
return f"{event['actor']} entered {event['owner_unit']}/{event['lane_name']}"

if etype == "lane.exited":
return f"{event['actor']} exited {event['owner_unit']}/{event['lane_name']}"

if etype == "pr.created":
repos = event.get("repos", [])
if isinstance(repos, list) and repos and isinstance(repos[0], dict):
repo_names = [r.get("repo", "") for r in repos]
else:
repo_names = repos
return (
f"{event['actor']} opened PR group {event['pr_group_id']}: {repo_names}"
)

if etype == "pr.merged":
return f"{event['actor']} merged PR group {event['pr_group_id']}"

if etype == "pr.checks_failed":
failed = event.get("failed_checks", [])
return f"CI failed on {event['repo']}#{event['pr_number']}: {failed}"

if etype == "hook.failed":
# Only blocking hook failures produce channel messages.
if event.get("on_failure") != "block":
return None
return (
f"Hook {event['hook_name']} failed in {event['repo']}"
f" (blocking): {event.get('stderr_tail', '')}"
)

if etype == "sync.conflict":
files = event.get("conflicting_files", [])
return f"Sync conflict in {event['repo']}: {files}"

if etype == "lease.force_broken":
return (
f"Lease on {event['lane_name']} force-broken"
f" by {event['broken_by']}: {event.get('reason', '')}"
)

if etype == "failure.resolved":
return (
f"{event['resolved_by']} resolved failure"
f" {event['operation_id']} on {event['lane_name']}"
)

if etype == "lease.reclaimed":
return (
f"Stale lease on {event['lane_name']} reclaimed"
f" (was held by {event['previous_holder']})"
)

# Unmapped event type: silently dropped.
return None


def run_bridge(
workspace_root: Path,
*,
post_fn: Callable[[str], object],
) -> int:
"""Read new events from the outbox and post mapped messages.

Uses the 'channel_bridge' cursor. Returns the number of messages posted.
The post_fn receives formatted message strings; the caller decides how to
deliver them (recall_channel, print, log, etc.).
"""
events = read_events(workspace_root, _CONSUMER_NAME)
posted = 0
for event in events:
msg = format_event(event)
if msg is not None:
post_fn(msg)
posted += 1
return posted
Loading