diff --git a/scripts/p9.py b/scripts/p9.py index 2c17288..2b77b23 100644 --- a/scripts/p9.py +++ b/scripts/p9.py @@ -49,6 +49,7 @@ EXIT_EXTERNAL_ERROR = 4 # gh/Linear/network failure EXIT_CONCURRENCY_CEILING = 5 # max_concurrent_prs reached EXIT_HEAL_LOCK_TIMEOUT = 6 +EXIT_AUTO_MERGE_BLOCKED = 7 # auto_merge policy says require_human / notify EXIT_INVARIANT_VIOLATION = 99 # cardinal-rule breach: cannot persist state # ───────────────────────────────────────────────────────────────────────────── @@ -217,10 +218,41 @@ class CIHealPolicy: escalation_channel: EscalationChannel = field(default_factory=EscalationChannel) +@dataclass(frozen=True) +class AutoMergeRule: + """One auto-merge rule. Either branch_pattern or path_touched (not both). + + branch_pattern: fnmatch glob against the PR's head branch (e.g. "docs/*"). + path_touched: substring match against any file in the PR diff + (e.g. "CLAUDE.md" matches the literal path). + action: "auto" → run gh pr merge; "require_human" → hard-block, + exit 7; "notify" → emit MERGE_NOTIFIED event, exit 7. + + Rules are evaluated in declaration order; first match wins. + Matches are short-circuited: `require_human` and `notify` are blocking + states even if a later rule would auto-merge. + """ + branch_pattern: str | None = None + path_touched: str | None = None + action: str = "notify" + + +@dataclass(frozen=True) +class AutoMergePolicy: + enabled: bool = False + require_no_requested_changes: bool = True + require_branch_up_to_date: bool = True + merge_method: str = "squash" # squash | merge | rebase + delete_branch: bool = True + rules: tuple[AutoMergeRule, ...] = () + default_action: str = "notify" # fail-safe default + + @dataclass(frozen=True) class PolicyConfig: ci_watch: CIWatchPolicy ci_heal: CIHealPolicy + auto_merge: AutoMergePolicy = field(default_factory=AutoMergePolicy) @dataclass @@ -359,44 +391,85 @@ def _yaml_loader(): def _minimal_yaml_load(text: str) -> dict[str, Any]: - """Tiny YAML subset: top-level mappings, nested mappings, scalars, lists. + """Tiny YAML subset: top-level mappings, nested mappings, scalars, + inline lists `[a,b]`, and block-style list-of-dicts `- key: val`. - Sufficient for `.control/policy.yaml` ci_watch / ci_heal blocks. Not a - general YAML parser — intentionally narrow so it fails noisily on anything - unexpected and forces real YAML if added later. + Sufficient for `.control/policy.yaml` ci_watch / ci_heal / auto_merge + blocks. Not a general YAML parser — intentionally narrow so it fails + noisily on anything unexpected. """ out: dict[str, Any] = {} + # stack tracks (indent, dict) frames for nested mappings. stack: list[tuple[int, dict[str, Any]]] = [(-1, out)] + # list_stack tracks (indent, list) frames for active block lists. list_stack: list[tuple[int, list[Any]]] = [] + for raw_line in text.splitlines(): line = raw_line.rstrip() if not line or line.lstrip().startswith("#"): continue indent = len(line) - len(line.lstrip(" ")) body = line.strip() + if body.startswith("- "): - # list item + # block list item; pop deeper frames first while list_stack and list_stack[-1][0] >= indent: list_stack.pop() + while stack and stack[-1][0] >= indent: + stack.pop() if not list_stack: - continue - value_str = body[2:].strip() - list_stack[-1][1].append(_scalar(value_str)) + continue # orphan list item — ignore + target_list = list_stack[-1][1] + inner = body[2:].strip() + if ":" in inner and not (inner.startswith("[") or inner.startswith('"')): + # block-style list-of-dicts: `- key: val` starts a new dict + # and any subsequent more-indented `key: val` lines populate it + key, _, value_str = inner.partition(":") + value_str = value_str.strip() + new_dict: dict[str, Any] = {} + if value_str: + new_dict[key.strip()] = _scalar(value_str) + target_list.append(new_dict) + # Subsequent same-indent `- ` items pop this frame; nested + # `key: val` lines at deeper indent populate new_dict. + stack.append((indent, new_dict)) + else: + # scalar list item + target_list.append(_scalar(inner)) continue + if ":" not in body: continue key, _, value_str = body.partition(":") value_str = value_str.strip() - # Pop deeper stacks + # Pop deeper frames while stack and stack[-1][0] >= indent: stack.pop() while list_stack and list_stack[-1][0] >= indent: list_stack.pop() parent = stack[-1][1] if stack else out + if value_str == "": - new: dict[str, Any] = {} - parent[key.strip()] = new - stack.append((indent, new)) + # Mapping or list — we don't know yet. Default to mapping; + # if the next non-blank line is a `- `, the open list_stack + # frame below catches it. + new_map: dict[str, Any] = {} + new_list: list[Any] = [] + parent[key.strip()] = new_map + stack.append((indent, new_map)) + # Tentatively register a list at deeper indent; whichever + # the next line uses (mapping vs list) wins via stack pop. + # We need a sentinel: register the list only if the next + # `-` arrives at deeper indent. Easiest: pre-register both + # but only commit to one once content arrives. + # Concretely: replace mapping with list lazily on first `-`. + list_stack.append((indent, new_list)) + # If the next content is a `- ` deeper than `indent`, we'll + # convert: replace parent[key] with new_list and pop the dict. + stack[-1] = (indent, new_map) + # Stash a reference so a `- ` line can swap mapping → list + new_map.setdefault("__p9_yaml_pending_list_holder__", + (parent, key.strip(), new_list)) elif value_str.startswith("[") and value_str.endswith("]"): inner = value_str[1:-1].strip() parent[key.strip()] = ( @@ -405,9 +478,33 @@ def _minimal_yaml_load(text: str) -> dict[str, Any]: ) else: parent[key.strip()] = _scalar(value_str) + + # Second pass: any dict that still holds the pending-list sentinel + # AND has no other keys is actually an empty dict; any dict whose + # corresponding list got populated had its mapping replaced. + _resolve_pending_lists(out) return out +def _resolve_pending_lists(node: Any) -> None: + """Walk the parsed tree; convert mapping→list where a `-` block was + used. Sentinel removal must come before semantic validation.""" + if isinstance(node, dict): + sentinel_key = "__p9_yaml_pending_list_holder__" + if sentinel_key in node: + holder = node[sentinel_key] + del node[sentinel_key] + parent, key, the_list = holder + if the_list: + # `-` lines populated the list; replace mapping with list + parent[key] = the_list + for v in list(node.values()): + _resolve_pending_lists(v) + elif isinstance(node, list): + for item in node: + _resolve_pending_lists(item) + + def _scalar(s: str) -> Any: s = s.strip().strip('"').strip("'") if s.lower() == "true": @@ -482,9 +579,112 @@ def _parse_policy(data: dict[str, Any]) -> PolicyConfig: ), ), ), + auto_merge=_parse_auto_merge(data.get("auto_merge")), + ) + + +_AUTO_MERGE_ACTIONS = ("auto", "require_human", "notify") + + +def _parse_auto_merge(raw: Any) -> AutoMergePolicy: + """Parse the optional auto_merge: block. Absence is *not* an error — + auto-merge is opt-in; missing block defaults to disabled (fail-safe).""" + if raw is None: + return AutoMergePolicy() + if not isinstance(raw, dict): + raise PolicyError("auto_merge must be a mapping if present") + rules_raw = raw.get("rules") or [] + if not isinstance(rules_raw, list): + raise PolicyError("auto_merge.rules must be a list") + rules: list[AutoMergeRule] = [] + for i, r in enumerate(rules_raw): + if not isinstance(r, dict): + raise PolicyError(f"auto_merge.rules[{i}] must be a mapping") + action = str(r.get("action", "notify")) + if action not in _AUTO_MERGE_ACTIONS: + raise PolicyError( + f"auto_merge.rules[{i}].action must be one of {_AUTO_MERGE_ACTIONS}, " + f"got {action!r}" + ) + bp = r.get("branch_pattern") + pt = r.get("path_touched") + if bp and pt: + raise PolicyError( + f"auto_merge.rules[{i}] cannot set both branch_pattern and path_touched" + ) + if not bp and not pt: + raise PolicyError( + f"auto_merge.rules[{i}] must set either branch_pattern or path_touched" + ) + rules.append(AutoMergeRule( + branch_pattern=str(bp) if bp else None, + path_touched=str(pt) if pt else None, + action=action, + )) + default = str(raw.get("default_action", "notify")) + if default not in _AUTO_MERGE_ACTIONS: + raise PolicyError( + f"auto_merge.default_action must be one of {_AUTO_MERGE_ACTIONS}, " + f"got {default!r}" + ) + method = str(raw.get("merge_method", "squash")) + if method not in ("squash", "merge", "rebase"): + raise PolicyError( + f"auto_merge.merge_method must be squash|merge|rebase, got {method!r}" + ) + return AutoMergePolicy( + enabled=bool(raw.get("enabled", False)), + require_no_requested_changes=bool(raw.get("require_no_requested_changes", True)), + require_branch_up_to_date=bool(raw.get("require_branch_up_to_date", True)), + merge_method=method, + delete_branch=bool(raw.get("delete_branch", True)), + rules=tuple(rules), + default_action=default, ) +# ───────────────────────────────────────────────────────────────────────────── +# Auto-merge matcher +# ───────────────────────────────────────────────────────────────────────────── +import fnmatch # noqa: E402 (kept near use site for clarity) + + +def match_auto_merge_action( + policy: AutoMergePolicy, + *, + branch: str, + paths_touched: Iterable[str], +) -> tuple[str, str]: + """First-match-wins evaluator. Returns (action, reason). + + Path rules are evaluated FIRST regardless of order — a path-touched + `require_human` rule (e.g. CLAUDE.md) is a hard block that cannot be + overridden by a later branch rule. This implements the "governance + paths always block" invariant from the original brainstorming. + """ + paths = list(paths_touched) + + # Pass 1: any path rule with require_human is a hard block. + for rule in policy.rules: + if rule.path_touched and rule.action == "require_human": + for p in paths: + if rule.path_touched in p: + return ("require_human", + f"path rule blocks: {rule.path_touched!r} in {p}") + + # Pass 2: first match wins (path or branch). + for rule in policy.rules: + if rule.path_touched: + for p in paths: + if rule.path_touched in p: + return (rule.action, + f"path rule matched: {rule.path_touched!r} in {p}") + if rule.branch_pattern and fnmatch.fnmatch(branch, rule.branch_pattern): + return (rule.action, f"branch rule matched: {rule.branch_pattern!r}") + + return (policy.default_action, "no rule matched; using default_action") + + # ───────────────────────────────────────────────────────────────────────────── # Failure classifier (rubric-driven regex matcher) # ───────────────────────────────────────────────────────────────────────────── @@ -861,6 +1061,112 @@ def cmd_doctor(_args: argparse.Namespace) -> int: return EXIT_DEGRADED +def cmd_auto_merge(args: argparse.Namespace) -> int: + """Auto-merge actuator. Closes the gap between MERGE_READY signal and + actual `gh pr merge` execution. + + Flow: + 1. Load policy → bail if auto_merge.enabled is false. + 2. Verify PR is in MERGE_READY state. + 3. Fetch branch + touched paths via `gh pr view`. + 4. Match against policy rules → action ∈ {auto, require_human, notify}. + 5. auto → run `gh pr merge` (or print plan in --dry-run mode), transition + MERGE_READY → MERGED, return 0. + require_human / notify → idempotent self-transition with extra payload + indicating block reason, return 7 (EXIT_AUTO_MERGE_BLOCKED). + """ + pr = int(args.pr) + repo = args.repo or _detect_repo() or "" + policy = load_policy() + if not policy.auto_merge.enabled: + print("auto_merge.enabled=false in policy; refusing to merge", file=sys.stderr) + return EXIT_POLICY_ERROR + + state = current_pr_state(pr) + if state != PRState.MERGE_READY: + print( + f"PR #{pr} not in MERGE_READY (current={state.value if state else 'UNKNOWN'}); " + f"call `p9 merge-ready` first", + file=sys.stderr, + ) + return EXIT_DEGRADED + + branch, paths = _gh_pr_branch_and_paths(pr, repo) + action, reason = match_auto_merge_action( + policy.auto_merge, branch=branch, paths_touched=paths, + ) + + if action != "auto": + # Block: emit idempotent state event with rationale; never merge. + append_state_event(PRStateEvent( + ts=_utcnow(), + pr=pr, repo=repo, + from_state=PRState.MERGE_READY.value, + to_state=PRState.MERGE_READY.value, + watcher_id="auto-merge", + extra={"auto_merge": {"action": action, "reason": reason, + "branch": branch, "paths": list(paths)[:20]}}, + )) + print(f"auto-merge blocked: action={action}; reason={reason}", + file=sys.stderr) + return EXIT_AUTO_MERGE_BLOCKED + + # Auto path + if args.dry_run: + print(f"auto-merge dry-run: would merge PR #{pr} ({branch}) via " + f"`gh pr merge --{policy.auto_merge.merge_method}`") + return EXIT_OK + + rc = _gh_pr_merge( + pr, repo, + method=policy.auto_merge.merge_method, + delete_branch=policy.auto_merge.delete_branch, + ) + if rc != 0: + print(f"gh pr merge exited {rc}; PR not merged", file=sys.stderr) + return EXIT_EXTERNAL_ERROR + + append_state_event(PRStateEvent( + ts=_utcnow(), + pr=pr, repo=repo, + from_state=PRState.MERGE_READY.value, + to_state=PRState.MERGED.value, + watcher_id="auto-merge", + extra={"auto_merge": {"action": "auto", "reason": reason, + "branch": branch, + "method": policy.auto_merge.merge_method}}, + )) + print(f"auto-merge: PR #{pr} merged ({branch})") + return EXIT_OK + + +def _gh_pr_branch_and_paths(pr: int, repo: str) -> tuple[str, list[str]]: + """Return (head_branch, [files_touched]) for a PR via `gh pr view`.""" + cmd = ["gh", "pr", "view", str(pr), + "--json", "headRefName,files", + "-q", '{branch: .headRefName, files: [.files[].path]}'] + if repo: + cmd += ["--repo", repo] + out = subprocess.run(cmd, capture_output=True, text=True, timeout=30, check=False) + if out.returncode != 0: + raise P9Error(f"gh pr view failed: {out.stderr.strip()[:200]}") + try: + data = json.loads(out.stdout) + except json.JSONDecodeError as e: + raise P9Error(f"gh pr view returned non-JSON: {e}") from e + return str(data.get("branch", "")), list(data.get("files") or []) + + +def _gh_pr_merge(pr: int, repo: str, *, method: str, delete_branch: bool) -> int: + """Invoke `gh pr merge` with the configured method. Returns exit code.""" + cmd = ["gh", "pr", "merge", str(pr), f"--{method}"] + if delete_branch: + cmd.append("--delete-branch") + if repo: + cmd += ["--repo", repo] + return subprocess.run(cmd, check=False).returncode + + def cmd_conformance(args: argparse.Namespace) -> int: """Run the full pytest battery (unit + integration + chaos). @@ -1118,6 +1424,14 @@ def build_parser() -> argparse.ArgumentParser: pm.add_argument("--repo", default=None) pm.set_defaults(func=cmd_merge_ready) + pa = sub.add_parser("auto-merge", + help="Run policy-gated auto-merge on a MERGE_READY PR") + pa.add_argument("pr") + pa.add_argument("--repo", default=None) + pa.add_argument("--dry-run", action="store_true", + help="Print the planned merge instead of executing it") + pa.set_defaults(func=cmd_auto_merge) + pd = sub.add_parser("doctor", help="Health-check P9 dependencies") pd.set_defaults(func=cmd_doctor) diff --git a/tests/fixtures/policy-with-auto-merge.yaml b/tests/fixtures/policy-with-auto-merge.yaml new file mode 100644 index 0000000..14f1b6b --- /dev/null +++ b/tests/fixtures/policy-with-auto-merge.yaml @@ -0,0 +1,46 @@ +version: "1.0" +profile: governed +workspace: broomva-test + +ci_watch: + enabled: true + max_concurrent_prs: 1 + isolation_tier_map: + research: none + docs: none + code_independent: worktree + code_dependent: stacked_branch + governance: blocked + +ci_heal: + enabled: true + max_attempts: 5 + stability_floor: 0.3 + classified_failure_types: [lint, format, test_flaky, codegen_drift, import_missing] + escalation_channel: + linear_team: BRO + linear_label: ci-heal-escalation + notify_hook: skills/p9/scripts/p9-escalate-notify.sh + +auto_merge: + enabled: true + require_no_requested_changes: true + require_branch_up_to_date: true + merge_method: squash + delete_branch: true + rules: + # Governance paths ALWAYS block (enforced by pass-1 in matcher) + - path_touched: CLAUDE.md + action: require_human + - path_touched: AGENTS.md + action: require_human + - path_touched: .control/policy.yaml + action: require_human + # Auto-merge classes + - branch_pattern: "docs/*" + action: auto + - branch_pattern: "research/*" + action: auto + - branch_pattern: "feat/p9-*" + action: auto + default_action: notify diff --git a/tests/test_p9_auto_merge.py b/tests/test_p9_auto_merge.py new file mode 100644 index 0000000..cd1ca08 --- /dev/null +++ b/tests/test_p9_auto_merge.py @@ -0,0 +1,266 @@ +"""Tests for the auto-merge actuator (PR A).""" + +from __future__ import annotations + +import importlib +import json +import sys +from pathlib import Path + +import pytest + + +_HERE = Path(__file__).resolve().parent +_SCRIPTS = _HERE.parent / "scripts" +_FIXTURES = _HERE / "fixtures" +sys.path.insert(0, str(_SCRIPTS)) + + +@pytest.fixture() +def p9_am(tmp_path, monkeypatch): + """Fresh p9 import with auto-merge policy enabled.""" + monkeypatch.setenv("BROOMVA_P9_HOME", str(tmp_path)) + monkeypatch.setenv("BROOMVA_P9_POLICY", str(_FIXTURES / "policy-with-auto-merge.yaml")) + if "p9" in sys.modules: + del sys.modules["p9"] + return importlib.import_module("p9") + + +# ───────────────────────────────────────────────────────────────────────────── +# Policy parser +# ───────────────────────────────────────────────────────────────────────────── +class TestPolicyParse: + def test_loads_auto_merge_block(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + assert cfg.auto_merge.enabled is True + assert cfg.auto_merge.merge_method == "squash" + assert cfg.auto_merge.delete_branch is True + assert cfg.auto_merge.default_action == "notify" + assert len(cfg.auto_merge.rules) == 6 + + def test_missing_auto_merge_block_disables_safely(self, p9_am): + # Default policy fixture has no auto_merge block — should default disabled + cfg = p9_am.load_policy(_FIXTURES / "policy-good.yaml") + assert cfg.auto_merge.enabled is False + assert cfg.auto_merge.rules == () + + def test_invalid_action_rejected(self, p9_am, tmp_path): + bad = tmp_path / "bad.yaml" + bad.write_text( + "ci_watch:\n enabled: true\n max_concurrent_prs: 1\n" + " isolation_tier_map:\n research: none\n docs: none\n" + " code_independent: worktree\n code_dependent: stacked_branch\n" + " governance: blocked\n" + "ci_heal:\n enabled: true\n max_attempts: 5\n" + " stability_floor: 0.3\n classified_failure_types: [lint]\n" + " escalation_channel:\n linear_team: BRO\n" + " linear_label: ci-heal-escalation\n" + " notify_hook: x.sh\n" + "auto_merge:\n enabled: true\n rules:\n" + " - branch_pattern: \"x/*\"\n action: yolo\n", + encoding="utf-8", + ) + with pytest.raises(p9_am.PolicyError): + p9_am.load_policy(bad) + + def test_rule_without_branch_or_path_rejected(self, p9_am, tmp_path): + bad = tmp_path / "bad2.yaml" + bad.write_text( + "ci_watch:\n enabled: true\n max_concurrent_prs: 1\n" + " isolation_tier_map:\n research: none\n docs: none\n" + " code_independent: worktree\n code_dependent: stacked_branch\n" + " governance: blocked\n" + "ci_heal:\n enabled: true\n max_attempts: 5\n" + " stability_floor: 0.3\n classified_failure_types: [lint]\n" + " escalation_channel:\n linear_team: BRO\n" + " linear_label: ci-heal-escalation\n" + " notify_hook: x.sh\n" + "auto_merge:\n enabled: true\n rules:\n" + " - action: auto\n", + encoding="utf-8", + ) + with pytest.raises(p9_am.PolicyError): + p9_am.load_policy(bad) + + +# ───────────────────────────────────────────────────────────────────────────── +# Matcher +# ───────────────────────────────────────────────────────────────────────────── +class TestMatcher: + def test_governance_path_always_blocks(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + # Branch matches an auto rule, but PR touches CLAUDE.md → blocks + action, reason = p9_am.match_auto_merge_action( + cfg.auto_merge, + branch="docs/some-update", + paths_touched=["docs/foo.md", "CLAUDE.md"], + ) + assert action == "require_human" + assert "CLAUDE.md" in reason + + def test_docs_branch_auto_merges(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + action, _ = p9_am.match_auto_merge_action( + cfg.auto_merge, + branch="docs/typo-fix", + paths_touched=["README.md", "docs/foo.md"], + ) + assert action == "auto" + + def test_research_branch_auto_merges(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + action, _ = p9_am.match_auto_merge_action( + cfg.auto_merge, + branch="research/new-entity", + paths_touched=["research/entities/concept/foo.md"], + ) + assert action == "auto" + + def test_feat_p9_branch_auto_merges(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + action, _ = p9_am.match_auto_merge_action( + cfg.auto_merge, + branch="feat/p9-spec", + paths_touched=["docs/foo.md"], + ) + assert action == "auto" + + def test_unknown_branch_falls_to_default_notify(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + action, reason = p9_am.match_auto_merge_action( + cfg.auto_merge, + branch="feat/some-other-thing", + paths_touched=["src/foo.ts"], + ) + assert action == "notify" + assert "default" in reason.lower() + + def test_path_rule_first_match_wins(self, p9_am): + cfg = p9_am.load_policy(_FIXTURES / "policy-with-auto-merge.yaml") + # AGENTS.md is governance-class blocked; should beat docs/* auto + action, _ = p9_am.match_auto_merge_action( + cfg.auto_merge, + branch="docs/cleanup", + paths_touched=["AGENTS.md"], + ) + assert action == "require_human" + + +# ───────────────────────────────────────────────────────────────────────────── +# Subcommand integration (with subprocess mocked) +# ───────────────────────────────────────────────────────────────────────────── +class _FakeRun: + def __init__(self, *, stdout="", stderr="", returncode=0): + self.stdout = stdout + self.stderr = stderr + self.returncode = returncode + + +def _seed_merge_ready(p9, pr: int): + for prev, curr in [ + (p9.PRState.PUSHED, p9.PRState.WATCHING), + (p9.PRState.WATCHING, p9.PRState.GREEN), + (p9.PRState.GREEN, p9.PRState.MERGE_READY), + ]: + p9.append_state_event(p9.PRStateEvent( + ts="2026-05-05T00:00:00+00:00", + pr=pr, repo="broomva/test", + from_state=prev.value, to_state=curr.value, + watcher_id="seed", + )) + + +class TestCommand: + def test_blocks_when_pr_not_merge_ready(self, p9_am, capsys): + rc = p9_am.main(["auto-merge", "999", "--repo", "broomva/test"]) + assert rc == p9_am.EXIT_DEGRADED + + def test_dry_run_for_auto_branch(self, p9_am, monkeypatch, capsys): + _seed_merge_ready(p9_am, 100) + + def fake_view(cmd, *args, **kwargs): + assert cmd[:3] == ["gh", "pr", "view"] + return _FakeRun(stdout=json.dumps( + {"branch": "docs/typo", "files": ["README.md"]} + )) + + monkeypatch.setattr(p9_am.subprocess, "run", fake_view) + + rc = p9_am.main(["auto-merge", "100", "--repo", "broomva/test", "--dry-run"]) + out = capsys.readouterr().out + assert rc == 0 + assert "would merge PR #100" in out + # Did NOT transition to MERGED in dry-run + assert p9_am.current_pr_state(100) == p9_am.PRState.MERGE_READY + + def test_blocks_governance_path(self, p9_am, monkeypatch, capsys): + _seed_merge_ready(p9_am, 200) + + def fake_view(cmd, *args, **kwargs): + return _FakeRun(stdout=json.dumps( + {"branch": "docs/cleanup", "files": ["docs/x.md", "CLAUDE.md"]} + )) + + monkeypatch.setattr(p9_am.subprocess, "run", fake_view) + rc = p9_am.main(["auto-merge", "200", "--repo", "broomva/test"]) + assert rc == p9_am.EXIT_AUTO_MERGE_BLOCKED + # Idempotent self-transition recorded with reason + rows, _ = p9_am.jsonl_read_all(p9_am.state_jsonl()) + last = [r for r in rows if r["pr"] == 200][-1] + assert last["to_state"] == "MERGE_READY" + assert last["extra"]["auto_merge"]["action"] == "require_human" + + def test_auto_executes_gh_merge(self, p9_am, monkeypatch, capsys): + _seed_merge_ready(p9_am, 300) + calls = [] + + def fake_run(cmd, *args, **kwargs): + calls.append(cmd) + if cmd[:3] == ["gh", "pr", "view"]: + return _FakeRun(stdout=json.dumps( + {"branch": "docs/something", "files": ["docs/y.md"]} + )) + if cmd[:3] == ["gh", "pr", "merge"]: + return _FakeRun(returncode=0) + return _FakeRun(returncode=1) + + monkeypatch.setattr(p9_am.subprocess, "run", fake_run) + rc = p9_am.main(["auto-merge", "300", "--repo", "broomva/test"]) + assert rc == 0 + # Real merge call happened + merge_calls = [c for c in calls if c[:3] == ["gh", "pr", "merge"]] + assert len(merge_calls) == 1 + assert "--squash" in merge_calls[0] + assert "--delete-branch" in merge_calls[0] + # State transitioned to MERGED + assert p9_am.current_pr_state(300) == p9_am.PRState.MERGED + + def test_disabled_policy_refuses(self, tmp_path, monkeypatch, capsys): + # Use the default good policy (no auto_merge block → disabled) + monkeypatch.setenv("BROOMVA_P9_HOME", str(tmp_path)) + monkeypatch.setenv("BROOMVA_P9_POLICY", str(_FIXTURES / "policy-good.yaml")) + if "p9" in sys.modules: + del sys.modules["p9"] + mod = importlib.import_module("p9") + # seed MERGE_READY anyway + _seed_merge_ready(mod, 400) + rc = mod.main(["auto-merge", "400", "--repo", "broomva/test"]) + assert rc == mod.EXIT_POLICY_ERROR + + def test_external_merge_failure_reports_clean_error(self, p9_am, monkeypatch, capsys): + _seed_merge_ready(p9_am, 500) + + def fake_run(cmd, *args, **kwargs): + if cmd[:3] == ["gh", "pr", "view"]: + return _FakeRun(stdout=json.dumps( + {"branch": "docs/whatever", "files": ["docs/z.md"]} + )) + if cmd[:3] == ["gh", "pr", "merge"]: + return _FakeRun(returncode=1) + return _FakeRun(returncode=1) + + monkeypatch.setattr(p9_am.subprocess, "run", fake_run) + rc = p9_am.main(["auto-merge", "500", "--repo", "broomva/test"]) + assert rc == p9_am.EXIT_EXTERNAL_ERROR + # State did NOT transition to MERGED (external failure must not lie) + assert p9_am.current_pr_state(500) == p9_am.PRState.MERGE_READY