diff --git a/src/wardline/core/run.py b/src/wardline/core/run.py index 1dceb69a..9f2dc06a 100644 --- a/src/wardline/core/run.py +++ b/src/wardline/core/run.py @@ -9,7 +9,7 @@ from __future__ import annotations import hashlib -from dataclasses import dataclass, replace +from dataclasses import dataclass, field, replace from datetime import date from pathlib import Path from typing import TYPE_CHECKING @@ -45,7 +45,7 @@ def _fp(*parts: str) -> str: @dataclass(frozen=True, slots=True) class ScanSummary: total: int # every finding (defects + facts/metrics) - active: int # non-suppressed DEFECTs — the gate population + active: int # non-suppressed DEFECTs in the emitted findings baselined: int waived: int judged: int @@ -66,6 +66,10 @@ class ScanResult: # this exact run instead of re-deriving. Never serialised over MCP. context: AnalysisContext | None scanned_paths: tuple[str, ...] = () + # Unsuppressed findings used by fail-on gates. Repository-controlled baseline, + # waiver, and judged files annotate emitted findings, but must not be able to + # hide defects from CI gates that run on untrusted pull-request content. + gate_findings: list[Finding] = field(default_factory=list) @dataclass(frozen=True, slots=True) @@ -186,6 +190,9 @@ def run_scan( waivers = WaiverSet(parse_waivers(cfg.waivers)) judged = load_judged(root / ".wardline" / "judged.yaml") findings = apply_suppressions(raw, baseline, waivers, today=date.today(), judged=judged) + # Keep a separate gate population that applies only operator-supplied scan + # scoping (for example --new-since), not repository-controlled suppressions. + gate_findings = list(raw) if new_since is not None: changed_files = get_changed_files_since(new_since, root) @@ -195,18 +202,22 @@ def run_scan( else: affected = set() - new_findings = [] - for f in findings: - if f.kind is Kind.DEFECT and f.suppressed is SuppressionState.ACTIVE: - is_new = (f.location.path in changed_files) or (f.qualname is not None and f.qualname in affected) - if not is_new: - f = replace( - f, - suppressed=SuppressionState.BASELINED, - suppression_reason=f"delta: unchanged since {new_since}", - ) - new_findings.append(f) - findings = new_findings + def apply_delta_scope(candidates: list[Finding]) -> list[Finding]: + scoped = [] + for f in candidates: + if f.kind is Kind.DEFECT and f.suppressed is SuppressionState.ACTIVE: + is_new = (f.location.path in changed_files) or (f.qualname is not None and f.qualname in affected) + if not is_new: + f = replace( + f, + suppressed=SuppressionState.BASELINED, + suppression_reason=f"delta: unchanged since {new_since}", + ) + scoped.append(f) + return scoped + + findings = apply_delta_scope(findings) + gate_findings = apply_delta_scope(gate_findings) defects = [f for f in findings if f.kind is Kind.DEFECT] summary = ScanSummary( @@ -227,6 +238,7 @@ def run_scan( path.relative_to(resolved_root).as_posix() if path.is_relative_to(resolved_root) else path.as_posix() for path in files ), + gate_findings=gate_findings, ) @@ -234,5 +246,6 @@ def gate_decision(result: ScanResult, fail_on: Severity | None) -> GateDecision: """Translate a scan into a pass/fail verdict. A trip is data, not an error.""" if fail_on is None: return GateDecision(tripped=False, fail_on=None, exit_class=0) - tripped = gate_trips(result.findings, fail_on) + gate_findings = result.gate_findings or result.findings + tripped = gate_trips(gate_findings, fail_on) return GateDecision(tripped=tripped, fail_on=fail_on.value, exit_class=1 if tripped else 0) diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index efc6328e..09782238 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -343,7 +343,7 @@ def test_scan_fail_on_inert_without_flag(tmp_path) -> None: assert res.exit_code == 0, res.output # no --fail-on -> never gates -def test_scan_baseline_suppresses_and_clears_gate(tmp_path) -> None: +def test_scan_baseline_annotates_but_does_not_clear_gate(tmp_path) -> None: proj = tmp_path / "proj" proj.mkdir() _write(proj, "svc.py", _LEAKY) @@ -359,9 +359,10 @@ def test_scan_baseline_suppresses_and_clears_gate(tmp_path) -> None: "version: 1\nentries:\n - fingerprint: " + fp + "\n rule_id: PY-WL-101\n path: svc.py\n message: m\n", encoding="utf-8", ) - # Second scan: the defect is baselined -> annotated + gate clears. + # Second scan: the defect is baselined for reporting, but fail-on still trips + # because repository-controlled suppressions must not bypass the CI gate. res = CliRunner().invoke(scan, [str(proj), "--output", str(out), "--fail-on", "ERROR"]) - assert res.exit_code == 0, res.output + assert res.exit_code == 1, res.output findings2 = [_json.loads(ln) for ln in out.read_text().splitlines() if ln.strip()] leak = next(f for f in findings2 if f["rule_id"] == "PY-WL-101") assert leak["suppressed"] == "baselined" # annotate-and-keep @@ -466,10 +467,11 @@ def test_baseline_create_writes_file_and_suppresses_next_scan(tmp_path) -> None: doc = _yaml.safe_load(bl.read_text()) assert doc["version"] == 1 and len(doc["entries"]) >= 1 assert "baselined" in res.output - # Next scan: the captured defect is now baselined, gate clears. + # Next scan: the captured defect is now baselined for reporting, but the + # untrusted repository baseline must not clear the fail-on gate. out = tmp_path / "f.jsonl" res2 = runner.invoke(scan, [str(proj), "--output", str(out), "--fail-on", "ERROR"]) - assert res2.exit_code == 0, res2.output + assert res2.exit_code == 1, res2.output def test_baseline_create_refuses_if_exists(tmp_path) -> None: @@ -957,9 +959,9 @@ def test_judge_low_confidence_fp_held_back_from_write(monkeypatch, tmp_path) -> assert not (proj / ".wardline" / "judged.yaml").exists() -def test_judge_write_then_scan_gate_is_cleared(monkeypatch, tmp_path) -> None: - # The regression that pins the headline panel finding: a JUDGED FP written by - # `judge --write` must suppress the finding for `scan --fail-on` too. +def test_judge_write_then_scan_still_trips_gate(monkeypatch, tmp_path) -> None: + # JUDGED findings are still annotated in scan output, but repository-controlled + # judged state must not suppress the fail-on gate. import wardline.cli.judge as judge_cli from wardline.cli.main import cli @@ -974,9 +976,9 @@ def test_judge_write_then_scan_gate_is_cleared(monkeypatch, tmp_path) -> None: jres = CliRunner().invoke(cli, ["judge", str(proj), "--write"]) assert jres.exit_code == 0, jres.output assert (proj / ".wardline" / "judged.yaml").exists() - # 3) scan now sees the JUDGED suppression -> gate cleared, summary shows it + # 3) scan now sees the JUDGED suppression for reporting, but the gate still trips. after = CliRunner().invoke(cli, ["scan", str(proj), "--output", str(out), "--fail-on", "INFO"]) - assert after.exit_code == 0, after.output + assert after.exit_code == 1, after.output assert "judged" in after.output diff --git a/tests/unit/core/test_run.py b/tests/unit/core/test_run.py index e8503a9a..d36b67b5 100644 --- a/tests/unit/core/test_run.py +++ b/tests/unit/core/test_run.py @@ -3,7 +3,7 @@ import pytest from wardline.core.errors import ConfigError -from wardline.core.finding import Kind, Severity, SuppressionState +from wardline.core.finding import Finding, Kind, Location, Severity, SuppressionState from wardline.core.run import ScanResult, ScanSummary, gate_decision, run_scan FIXTURE = Path("tests/fixtures/sample_project") @@ -28,7 +28,7 @@ def test_run_scan_returns_findings_summary_and_context() -> None: # invariants (total == len(findings); active == active-defect count), which # hold for any fixture regardless of finding count. assert result.summary.total == len(result.findings) - # active is the count of non-suppressed DEFECTs (the gate population) + # active is the count of non-suppressed DEFECTs in the emitted findings active = sum(1 for f in result.findings if f.kind is Kind.DEFECT and f.suppressed is SuppressionState.ACTIVE) assert result.summary.active == active # context is carried for explain_finding to reuse @@ -47,6 +47,34 @@ def test_gate_decision_trips_on_active_error(tmp_path: Path) -> None: assert decision.fail_on == "ERROR" +def test_gate_decision_uses_unsuppressed_gate_population() -> None: + suppressed = Finding( + rule_id="PY-WL-101", + message="m", + severity=Severity.ERROR, + kind=Kind.DEFECT, + location=Location(path="svc.py", line_start=1), + fingerprint="a" * 64, + suppressed=SuppressionState.BASELINED, + ) + active_gate_copy = Finding( + rule_id="PY-WL-101", + message="m", + severity=Severity.ERROR, + kind=Kind.DEFECT, + location=Location(path="svc.py", line_start=1), + fingerprint="a" * 64, + ) + result = ScanResult( + findings=[suppressed], + summary=ScanSummary(total=1, active=0, baselined=1, waived=0, judged=0), + files_scanned=1, + context=None, + gate_findings=[active_gate_copy], + ) + + assert gate_decision(result, Severity.ERROR).tripped is True + def test_gate_decision_none_threshold_never_trips() -> None: result = run_scan(FIXTURE) decision = gate_decision(result, None) @@ -111,9 +139,29 @@ def test_run_scan_baselined_count_distinguishes_categories(tmp_path: Path) -> No assert result.summary.waived == 0 assert result.summary.judged == 0 assert result.summary.active == 0 - # And the gate clears now that the only ERROR defect is suppressed. - assert gate_decision(result, Severity.ERROR).tripped is False + # The emitted finding is suppressed, but fail-on gates over the unsuppressed + # population so repository-controlled baselines cannot hide defects in CI. + assert gate_decision(result, Severity.ERROR).tripped is True + +def test_gate_decision_ignores_repo_controlled_waivers(tmp_path: Path) -> None: + proj = tmp_path / "proj" + proj.mkdir() + (proj / "svc.py").write_text(_LEAKY, encoding="utf-8") + + first = run_scan(proj) + leak = next(f for f in first.findings if f.rule_id == "PY-WL-101") + (proj / "wardline.yaml").write_text( + "waivers:\n" + f" - fingerprint: {leak.fingerprint}\n" + " reason: attacker-controlled waiver\n", + encoding="utf-8", + ) + + result = run_scan(proj) + assert result.summary.waived == 1 + assert result.summary.active == 0 + assert gate_decision(result, Severity.ERROR).tripped is True def test_run_scan_counts_unanalyzed_parse_error(tmp_path: Path) -> None: # (b) A file that cannot be parsed is discovered-but-not-analysed: a