From ad7ff2db5d5acba9358cab6087e3df86863923fc Mon Sep 17 00:00:00 2001 From: Carlos Escobar Date: Tue, 5 May 2026 13:15:13 -0500 Subject: [PATCH] fix(p9): watcher folds gh exit into state; +abandon, +cleanup, --background alias MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes five bugs surfaced by a fresh-session test against this very primitive — proving the dogfood loop works as a feedback channel. B1 (doc/code drift): AGENTS.md reflexive rule references `p9 watch --background`, but the engine only had `--dry-run`. Every session following the rule hit `error: unrecognized arguments: --background`. Fix: add `--background` and `--block` as aliases for the (now-default) foreground behavior. B2 (architectural — silent state drop): `cmd_watch` spawned `gh pr checks --watch` detached and returned immediately. When the gh subprocess exited, *nothing* wrote a state transition. State.jsonl stayed at WATCHING forever even after CI completed — a direct violation of the "never silently drop state" cardinal rule. Fix: foreground-by-default. `p9 watch` blocks on the gh subprocess, then folds the exit code into a state event: - exit 0 → WATCHING → GREEN - exit non-0 → WATCHING → RED_UNCLASSIFIED (caller runs `p9 heal --classify` next) The agent invokes `p9 watch` via `run_in_background`, so the bg-task notification fires when the *whole* watch+fold completes — which is what the cardinal protocol actually wants. The bg-wrapper is the agent's harness; `p9 watch` no longer second-guesses with its own detach. Old behavior is preserved via the new `--detach` flag for fire-and-forget callers. B3 (no terminal subcommand for orphans): when a PR is closed via `gh pr close` (or merged outside `p9 auto-merge`), the local state row was orphaned. With `max_concurrent_prs=1`, that orphan would deadlock all future watches. Fix: - `p9 abandon [--reason TEXT]` — explicit terminal transition (idempotent on already-terminal states; error on unknown PR). - `p9 cleanup` — polls `gh pr view` for every open row; PRs that GitHub reports as MERGED or CLOSED get a terminal ABANDONED event (with reason). PRs still OPEN are left alone. Failed gh queries are reported but not abandoned (no false-positive drain). State-machine table extended: ABANDONED now reachable from PUSHED, GREEN, and MERGE_READY (already had WATCHING/HEALING/RED_*). All terminal states remain absorbing. B5 (`load_policy` crashes on str): the function signature claimed `Path | None` but the body called `.exists()` directly without coercion. A test caller `p9.load_policy('/some/path')` raised AttributeError. Fix: accept `Path | str | None`; coerce with `Path(path)` at the boundary. Tests: 25 new under tests/test_p9_pr_e.py — watch fold (green/red/dry/ detach/--background/--block aliases), abandon (open/unknown/idempotent/ from-green), cleanup (no-op/merged/closed/open/gh-failure), state- machine transitions (parametrized over all non-terminals → ABANDONED). Total now 108 passing (~15s). Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/p9.py | 173 +++++++++++++++++++++++++-- tests/test_p9_pr_e.py | 265 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 428 insertions(+), 10 deletions(-) create mode 100644 tests/test_p9_pr_e.py diff --git a/scripts/p9.py b/scripts/p9.py index 2b77b23..3868c9c 100644 --- a/scripts/p9.py +++ b/scripts/p9.py @@ -160,11 +160,16 @@ class PRState(str, enum.Enum): (PRState.GREEN, PRState.MERGE_READY), (PRState.MERGE_READY, PRState.MERGED), (PRState.MERGE_READY, PRState.WATCHING), # rare: human pushed amend post-green - # Terminal "abandoned" reachable from any non-terminal + # Terminal "abandoned" reachable from any non-terminal — needed for + # `p9 abandon` and `p9 cleanup` to drain orphans regardless of the + # state they're parked in. + (PRState.PUSHED, PRState.ABANDONED), (PRState.WATCHING, PRState.ABANDONED), (PRState.HEALING, PRState.ABANDONED), (PRState.RED_CLASSIFIED, PRState.ABANDONED), (PRState.RED_UNCLASSIFIED, PRState.ABANDONED), + (PRState.GREEN, PRState.ABANDONED), + (PRState.MERGE_READY, PRState.ABANDONED), } @@ -524,9 +529,13 @@ def _scalar(s: str) -> Any: return s -def load_policy(path: Path | None = None) -> PolicyConfig: - """Load .control/policy.yaml. **Fail-closed** on missing/malformed blocks.""" - p = path or policy_yaml_path() +def load_policy(path: Path | str | None = None) -> PolicyConfig: + """Load .control/policy.yaml. **Fail-closed** on missing/malformed blocks. + + Accepts both `Path` and `str` (str is coerced — historic callers were + inconsistent). None falls back to `policy_yaml_path()`. + """ + p = Path(path) if path is not None else policy_yaml_path() if not p.exists(): raise PolicyError(f"policy.yaml not found at {p}") try: @@ -1061,6 +1070,87 @@ def cmd_doctor(_args: argparse.Namespace) -> int: return EXIT_DEGRADED +def cmd_abandon(args: argparse.Namespace) -> int: + """Mark a PR as ABANDONED. + + Idempotent on already-terminal states. Emits a terminal-state event so + `max_concurrent_prs` accounting is freed and `p9 cleanup` doesn't + re-flag it. + """ + pr = int(args.pr) + state = current_pr_state(pr) + if state is None: + print(f"PR #{pr}: no state to abandon", file=sys.stderr) + return EXIT_DEGRADED + if is_terminal(state): + print(f"PR #{pr}: already terminal ({state.value}); no-op") + return EXIT_OK + repo = args.repo or _detect_repo() or "" + append_state_event(PRStateEvent( + ts=_utcnow(), pr=pr, repo=repo, + from_state=state.value, + to_state=PRState.ABANDONED.value, + watcher_id="abandon", + extra={"reason": args.reason or "manual abandon"}, + )) + print(f"PR #{pr}: {state.value} → ABANDONED") + return EXIT_OK + + +def cmd_cleanup(args: argparse.Namespace) -> int: + """Drain orphan watchers by polling GitHub for each open row's true state. + + For every PR in a non-terminal local state, queries + `gh pr view --json state,mergedAt`. If GitHub reports MERGED or CLOSED, + appends a terminal ABANDONED event with the reason. If GitHub reports + OPEN, leaves the row alone. PRs that fail to query are reported but + not abandoned (no false-positive cleanup). + """ + rows = open_prs() + if not rows: + print("p9 cleanup: no open PRs") + return EXIT_OK + cleaned = 0 + skipped = 0 + for row in rows: + pr = row["pr"] + repo = row.get("repo") or _detect_repo() or "" + cmd = ["gh", "pr", "view", str(pr), "--json", "state,mergedAt"] + if repo: + cmd += ["--repo", repo] + result = subprocess.run(cmd, capture_output=True, text=True, + timeout=30, check=False) + if result.returncode != 0: + print(f" #{pr}: cannot query gh; leaving as {row['to_state']} " + f"({result.stderr.strip()[:80]})") + skipped += 1 + continue + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + print(f" #{pr}: gh returned non-JSON; leaving as {row['to_state']}") + skipped += 1 + continue + gh_state = (data.get("state") or "").upper() + from_state = PRState(row["to_state"]) + if gh_state in ("MERGED", "CLOSED"): + reason = ("merged outside p9" if gh_state == "MERGED" + else "closed outside p9") + append_state_event(PRStateEvent( + ts=_utcnow(), pr=pr, repo=repo, + from_state=from_state.value, + to_state=PRState.ABANDONED.value, + watcher_id="cleanup", + extra={"reason": reason, "gh_state": gh_state}, + )) + print(f" #{pr}: {from_state.value} → ABANDONED ({reason})") + cleaned += 1 + else: + print(f" #{pr}: still OPEN; leaving as {from_state.value}") + print(f"p9 cleanup: drained {cleaned}, skipped {skipped}") + return EXIT_OK + + def cmd_auto_merge(args: argparse.Namespace) -> int: """Auto-merge actuator. Closes the gap between MERGE_READY signal and actual `gh pr merge` execution. @@ -1195,6 +1285,20 @@ def cmd_conformance(args: argparse.Namespace) -> int: def cmd_watch(args: argparse.Namespace) -> int: + """Watch CI on a PR. + + Default behavior (PR E onwards): foreground — block on + `gh pr checks --watch`, then fold the subprocess exit code into a state + transition (WATCHING → GREEN on exit 0, WATCHING → RED_UNCLASSIFIED + otherwise). Callers (the agent) wrap this in `run_in_background` so the + bg-task notification fires when the *whole* watch+fold has finished — + which is what the cardinal protocol actually wants. + + --detach reverts to the old fire-and-forget behavior (no fold; the + caller is responsible for polling state). --background and --block are + aliases for the default; they exist so historic AGENTS.md guidance + using `p9 watch --background` keeps working. + """ policy = load_policy() if not policy.ci_watch.enabled: print("ci_watch.enabled=false in policy; refusing to watch", file=sys.stderr) @@ -1205,7 +1309,7 @@ def cmd_watch(args: argparse.Namespace) -> int: watcher_id = uuid.uuid4().hex[:12] proc = spawn_watcher(pr, repo, dry_run=args.dry_run) pid = proc.pid if proc else 0 - event = PRStateEvent( + append_state_event(PRStateEvent( ts=_utcnow(), pr=pr, repo=repo or "", @@ -1213,18 +1317,41 @@ def cmd_watch(args: argparse.Namespace) -> int: to_state=PRState.WATCHING.value, watcher_id=watcher_id, attempt=0, - extra={"pid": pid, "dry_run": args.dry_run}, - ) - append_state_event(event) + extra={"pid": pid, "dry_run": args.dry_run, "detach": args.detach}, + )) if args.json: print(json.dumps({ "watcher_id": watcher_id, "pid": pid, "pr": pr, "repo": repo, + "mode": "detach" if args.detach else "foreground", })) else: - print(f"watcher_id={watcher_id} pid={pid} pr={pr} repo={repo}") + mode = "detach" if args.detach else "foreground" + print(f"watcher_id={watcher_id} pid={pid} pr={pr} repo={repo} mode={mode}") + + # Detach / dry-run: do NOT block; caller polls state.jsonl. + if args.detach or args.dry_run or proc is None: + return EXIT_OK + + # Foreground: block on subprocess, then fold result into a state event. + rc = proc.wait() + next_state = PRState.GREEN if rc == 0 else PRState.RED_UNCLASSIFIED + append_state_event(PRStateEvent( + ts=_utcnow(), + pr=pr, + repo=repo or "", + from_state=PRState.WATCHING.value, + to_state=next_state.value, + watcher_id=watcher_id, + attempt=0, + extra={"gh_exit_code": rc, "folded_by": "p9 watch"}, + )) + if args.json: + print(json.dumps({"watcher_id": watcher_id, "result": next_state.value, "gh_exit_code": rc})) + else: + print(f"folded: {next_state.value} (gh exit {rc})") return EXIT_OK @@ -1378,11 +1505,24 @@ def build_parser() -> argparse.ArgumentParser: ) sub = p.add_subparsers(dest="cmd", required=True) - pw = sub.add_parser("watch", help="Start a CI watcher for a PR") + pw = sub.add_parser("watch", help="Watch CI on a PR (foreground; folds result into state)") pw.add_argument("pr", help="PR number") pw.add_argument("--repo", help="OWNER/REPO (auto-detected if omitted)") pw.add_argument("--dry-run", action="store_true", help="Do not actually spawn `gh pr checks --watch` (test mode)") + pw.add_argument("--detach", action="store_true", + help="Fire-and-forget: spawn the watcher but do not block " + "or fold its exit into a state event. The caller is " + "responsible for finalizing state. Default is " + "foreground (block + fold).") + # `--background` and `--block` are aliases for the default foreground + # behavior. They exist so historic AGENTS.md guidance using + # `p9 watch --background` keeps working without surprise errors. + pw.add_argument("--background", action="store_true", + help="Alias for default foreground behavior (kept for " + "backwards compatibility with reflexive-rule guidance)") + pw.add_argument("--block", action="store_true", + help="Alias for default foreground behavior") pw.add_argument("--json", action="store_true") pw.set_defaults(func=cmd_watch) @@ -1424,6 +1564,19 @@ def build_parser() -> argparse.ArgumentParser: pm.add_argument("--repo", default=None) pm.set_defaults(func=cmd_merge_ready) + pab = sub.add_parser("abandon", + help="Mark a PR as ABANDONED (frees concurrency slot)") + pab.add_argument("pr") + pab.add_argument("--repo", default=None) + pab.add_argument("--reason", default=None, + help="Free-text reason recorded in extra.reason") + pab.set_defaults(func=cmd_abandon) + + pcu = sub.add_parser("cleanup", + help="Drain orphan WATCHING/HEALING rows by polling " + "GitHub for each open PR's true state") + pcu.set_defaults(func=cmd_cleanup) + pa = sub.add_parser("auto-merge", help="Run policy-gated auto-merge on a MERGE_READY PR") pa.add_argument("pr") diff --git a/tests/test_p9_pr_e.py b/tests/test_p9_pr_e.py new file mode 100644 index 0000000..865eb1a --- /dev/null +++ b/tests/test_p9_pr_e.py @@ -0,0 +1,265 @@ +"""Tests for PR E — watcher state-folding, abandon, cleanup, str-path policy. + +Closes the bugs surfaced in the fresh-session test: + B1: --background flag mismatch with reflexive-rule guidance. + B2: detached watcher never folds exit into state transition. + B3: no terminal subcommand for orphaned PRs (closed externally). + B5: load_policy crashes on str input. +""" + +from __future__ import annotations + +import importlib +import json +import sys +from pathlib import Path + +import pytest + + +_HERE = Path(__file__).resolve().parent +_SCRIPTS = _HERE.parent / "scripts" +_FIXTURES = _HERE / "fixtures" +sys.path.insert(0, str(_SCRIPTS)) + + +@pytest.fixture() +def p9(tmp_path, monkeypatch): + monkeypatch.setenv("BROOMVA_P9_HOME", str(tmp_path)) + monkeypatch.setenv("BROOMVA_P9_POLICY", str(_FIXTURES / "policy-good.yaml")) + if "p9" in sys.modules: + del sys.modules["p9"] + return importlib.import_module("p9") + + +# ───────────────────────────────────────────────────────────────────────────── +# B5: load_policy accepts str path +# ───────────────────────────────────────────────────────────────────────────── +def test_load_policy_accepts_str(p9): + cfg = p9.load_policy(str(_FIXTURES / "policy-good.yaml")) + assert cfg.ci_watch.enabled is True + + +def test_load_policy_accepts_path(p9): + cfg = p9.load_policy(_FIXTURES / "policy-good.yaml") + assert cfg.ci_watch.enabled is True + + +# ───────────────────────────────────────────────────────────────────────────── +# B2: cmd_watch folds gh exit into state transition +# ───────────────────────────────────────────────────────────────────────────── +class _FakeProc: + def __init__(self, returncode: int): + self._rc = returncode + self.pid = 12345 + + def wait(self): + return self._rc + + +class TestWatchFold: + def test_foreground_green_writes_watching_then_green(self, p9, monkeypatch): + monkeypatch.setattr(p9, "spawn_watcher", + lambda *a, **kw: _FakeProc(returncode=0)) + rc = p9.main(["watch", "100", "--repo", "broomva/test"]) + assert rc == 0 + assert p9.current_pr_state(100) == p9.PRState.GREEN + # State.jsonl should hold both events + rows, _ = p9.jsonl_read_all(p9.state_jsonl()) + states = [(r["from_state"], r["to_state"]) for r in rows if r["pr"] == 100] + assert ("PUSHED", "WATCHING") in states + assert ("WATCHING", "GREEN") in states + + def test_foreground_red_writes_red_unclassified(self, p9, monkeypatch): + monkeypatch.setattr(p9, "spawn_watcher", + lambda *a, **kw: _FakeProc(returncode=1)) + rc = p9.main(["watch", "200", "--repo", "broomva/test"]) + assert rc == 0 + assert p9.current_pr_state(200) == p9.PRState.RED_UNCLASSIFIED + + def test_detach_skips_fold(self, p9, monkeypatch): + called = [] + class _NeverWait: + pid = 99 + def wait(self): + called.append("wait") # should NOT happen + return 0 + monkeypatch.setattr(p9, "spawn_watcher", + lambda *a, **kw: _NeverWait()) + rc = p9.main(["watch", "300", "--repo", "broomva/test", "--detach"]) + assert rc == 0 + assert called == [] # detach must not block + assert p9.current_pr_state(300) == p9.PRState.WATCHING + + def test_background_alias_is_foreground(self, p9, monkeypatch): + # B1 fix: --background must not error and must behave like default + monkeypatch.setattr(p9, "spawn_watcher", + lambda *a, **kw: _FakeProc(returncode=0)) + rc = p9.main(["watch", "400", "--repo", "broomva/test", "--background"]) + assert rc == 0 + assert p9.current_pr_state(400) == p9.PRState.GREEN + + def test_block_alias_is_foreground(self, p9, monkeypatch): + monkeypatch.setattr(p9, "spawn_watcher", + lambda *a, **kw: _FakeProc(returncode=0)) + rc = p9.main(["watch", "401", "--repo", "broomva/test", "--block"]) + assert rc == 0 + assert p9.current_pr_state(401) == p9.PRState.GREEN + + def test_dry_run_emits_watching_only(self, p9): + rc = p9.main(["watch", "500", "--repo", "broomva/test", "--dry-run"]) + assert rc == 0 + assert p9.current_pr_state(500) == p9.PRState.WATCHING + + +# ───────────────────────────────────────────────────────────────────────────── +# B3: cmd_abandon +# ───────────────────────────────────────────────────────────────────────────── +class TestAbandon: + def test_abandon_open_pr(self, p9): + # Seed a WATCHING state + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T20:00:00+00:00", + pr=600, repo="broomva/test", + from_state=p9.PRState.PUSHED.value, + to_state=p9.PRState.WATCHING.value, + watcher_id="seed", + )) + rc = p9.main(["abandon", "600", "--reason", "test"]) + assert rc == 0 + assert p9.current_pr_state(600) == p9.PRState.ABANDONED + + def test_abandon_unknown_pr(self, p9): + rc = p9.main(["abandon", "999"]) + assert rc == p9.EXIT_DEGRADED + + def test_abandon_idempotent_on_terminal(self, p9): + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T20:00:00+00:00", + pr=700, repo="broomva/test", + from_state=p9.PRState.PUSHED.value, + to_state=p9.PRState.WATCHING.value, + watcher_id="seed", + )) + # Move to terminal MERGED via the legal chain + for prev, curr in [ + (p9.PRState.WATCHING, p9.PRState.GREEN), + (p9.PRState.GREEN, p9.PRState.MERGE_READY), + (p9.PRState.MERGE_READY, p9.PRState.MERGED), + ]: + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T20:00:00+00:00", + pr=700, repo="broomva/test", + from_state=prev.value, to_state=curr.value, + watcher_id="seed", + )) + # Abandon should be no-op + rc = p9.main(["abandon", "700"]) + assert rc == 0 + assert p9.current_pr_state(700) == p9.PRState.MERGED + + def test_abandon_from_green_legal(self, p9): + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T20:00:00+00:00", + pr=750, repo="broomva/test", + from_state=p9.PRState.PUSHED.value, + to_state=p9.PRState.WATCHING.value, + watcher_id="seed", + )) + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T20:00:00+00:00", + pr=750, repo="broomva/test", + from_state=p9.PRState.WATCHING.value, + to_state=p9.PRState.GREEN.value, + watcher_id="seed", + )) + rc = p9.main(["abandon", "750"]) + assert rc == 0 + assert p9.current_pr_state(750) == p9.PRState.ABANDONED + + +# ───────────────────────────────────────────────────────────────────────────── +# cmd_cleanup — orphan drainer +# ───────────────────────────────────────────────────────────────────────────── +class _FakeRun: + def __init__(self, *, stdout="", stderr="", returncode=0): + self.stdout = stdout + self.stderr = stderr + self.returncode = returncode + + +def _seed_watching(p9, pr): + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T20:00:00+00:00", + pr=pr, repo="broomva/test", + from_state=p9.PRState.PUSHED.value, + to_state=p9.PRState.WATCHING.value, + watcher_id="seed", + )) + + +class TestCleanup: + def test_cleanup_no_open_prs(self, p9, capsys): + rc = p9.main(["cleanup"]) + assert rc == 0 + assert "no open PRs" in capsys.readouterr().out + + def test_cleanup_drains_merged(self, p9, monkeypatch): + _seed_watching(p9, 800) + monkeypatch.setattr(p9.subprocess, "run", + lambda *a, **kw: _FakeRun( + stdout=json.dumps({"state": "MERGED", + "mergedAt": "2026-05-05T19:00:00Z"}))) + rc = p9.main(["cleanup"]) + assert rc == 0 + assert p9.current_pr_state(800) == p9.PRState.ABANDONED + + def test_cleanup_drains_closed(self, p9, monkeypatch): + _seed_watching(p9, 801) + monkeypatch.setattr(p9.subprocess, "run", + lambda *a, **kw: _FakeRun( + stdout=json.dumps({"state": "CLOSED", + "mergedAt": None}))) + rc = p9.main(["cleanup"]) + assert rc == 0 + assert p9.current_pr_state(801) == p9.PRState.ABANDONED + + def test_cleanup_leaves_open_alone(self, p9, monkeypatch): + _seed_watching(p9, 802) + monkeypatch.setattr(p9.subprocess, "run", + lambda *a, **kw: _FakeRun( + stdout=json.dumps({"state": "OPEN", + "mergedAt": None}))) + rc = p9.main(["cleanup"]) + assert rc == 0 + # Untouched + assert p9.current_pr_state(802) == p9.PRState.WATCHING + + def test_cleanup_skips_on_gh_failure(self, p9, monkeypatch, capsys): + _seed_watching(p9, 803) + monkeypatch.setattr(p9.subprocess, "run", + lambda *a, **kw: _FakeRun(returncode=1, + stderr="auth required")) + rc = p9.main(["cleanup"]) + assert rc == 0 + # Untouched — false-positive cleanup is forbidden + assert p9.current_pr_state(803) == p9.PRState.WATCHING + out = capsys.readouterr().out + assert "skipped 1" in out + + +# ───────────────────────────────────────────────────────────────────────────── +# State machine: new transitions for ABANDONED from any non-terminal +# ───────────────────────────────────────────────────────────────────────────── +class TestAbandonTransitions: + @pytest.mark.parametrize("from_state", [ + "PUSHED", "WATCHING", "HEALING", "RED_CLASSIFIED", + "RED_UNCLASSIFIED", "GREEN", "MERGE_READY", + ]) + def test_any_non_terminal_to_abandoned_is_legal(self, p9, from_state): + # Should not raise + p9.assert_legal_transition(p9.PRState(from_state), p9.PRState.ABANDONED) + + def test_terminal_to_abandoned_is_illegal(self, p9): + with pytest.raises(p9.IllegalTransitionError): + p9.assert_legal_transition(p9.PRState.MERGED, p9.PRState.ABANDONED)