Skip to content

Commit 2247a48

Browse files
Merge pull request #2 from nanookclaw/feat/contract-trend-analyzer
feat(audit): ContractTrendAnalyzer for cross-session pass-rate regression detection
2 parents 4fa4bd4 + 0d37fd9 commit 2247a48

2 files changed

Lines changed: 479 additions & 0 deletions

File tree

gateframe/audit/trend.py

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
"""Cross-session contract reliability trend analysis for gateframe.
2+
3+
Provides :class:`ContractTrendAnalyzer` which reads an existing JSONL audit
4+
log produced by :class:`~gateframe.audit.exporters.JsonFileExporter`, groups
5+
entries by ``workflow_id``, and computes per-contract OLS pass-rate slopes to
6+
detect reliability regressions across workflow runs.
7+
8+
Usage::
9+
10+
from pathlib import Path
11+
from gateframe.audit.trend import ContractTrendAnalyzer
12+
13+
analyzer = ContractTrendAnalyzer(Path("audit.jsonl"), window=20)
14+
report = analyzer.analyze()
15+
if report.any_regression:
16+
print("Contracts degrading:", [r.contract_name for r in report.regressions])
17+
18+
CLI equivalent::
19+
20+
gateframe trend audit.jsonl --window 20
21+
22+
The *window* parameter specifies the number of most-recent **workflow runs**
23+
(i.e. distinct ``workflow_id`` groups) to include in the trend calculation.
24+
Entries without a ``workflow_id`` are ignored.
25+
26+
Reference: PDR in Production v2.5 — DOI 10.5281/zenodo.19362461
27+
"""
28+
29+
from __future__ import annotations
30+
31+
import json
32+
import statistics
33+
from collections import defaultdict
34+
from dataclasses import dataclass, field
35+
from pathlib import Path
36+
37+
# ── Data models ───────────────────────────────────────────────────────────────
38+
39+
40+
@dataclass
41+
class WorkflowRunSummary:
42+
"""Pass-rate statistics for a single workflow run.
43+
44+
A "workflow run" is the set of all audit entries that share the same
45+
``workflow_id``.
46+
47+
Attributes:
48+
workflow_id: The workflow identifier.
49+
contract_name: Name of the :class:`~gateframe.core.contract.ValidationContract`.
50+
total: Total number of validation events in this run.
51+
passed: Number of events where ``passed == True``.
52+
pass_rate: ``passed / total`` (``NaN`` when ``total == 0``).
53+
"""
54+
55+
workflow_id: str
56+
contract_name: str
57+
total: int
58+
passed: int
59+
60+
@property
61+
def pass_rate(self) -> float:
62+
return self.passed / self.total if self.total > 0 else float("nan")
63+
64+
65+
@dataclass
66+
class ContractTrend:
67+
"""OLS trend for a single contract across ordered workflow runs.
68+
69+
Attributes:
70+
contract_name: Name of the contract.
71+
run_summaries: Ordered (oldest first) per-run statistics.
72+
slope: OLS slope of ``pass_rate`` over run index (positive = improving).
73+
direction: ``"improving"``, ``"worsening"``, or ``"stable"``.
74+
regressed: ``True`` when the slope is below ``-regression_threshold``.
75+
"""
76+
77+
contract_name: str
78+
run_summaries: list[WorkflowRunSummary]
79+
slope: float
80+
direction: str
81+
regressed: bool
82+
83+
84+
@dataclass
85+
class TrendReport:
86+
"""Aggregated trend report across all contracts in the audit log.
87+
88+
Attributes:
89+
contract_trends: Per-contract trend results, sorted by ``contract_name``.
90+
any_regression: ``True`` if at least one contract is degrading.
91+
regressions: Subset of *contract_trends* where ``regressed == True``.
92+
window: Number of workflow runs included in the analysis.
93+
regression_threshold: Threshold used to classify a slope as a regression.
94+
"""
95+
96+
contract_trends: list[ContractTrend]
97+
any_regression: bool
98+
regressions: list[ContractTrend] = field(default_factory=list)
99+
window: int = 20
100+
regression_threshold: float = 0.02
101+
102+
103+
# ── Internal helpers ──────────────────────────────────────────────────────────
104+
105+
106+
def _ols_slope(values: list[float]) -> float:
107+
"""OLS slope of *values* over index 0 … n-1. Returns 0.0 for < 2 points."""
108+
n = len(values)
109+
if n < 2:
110+
return 0.0
111+
xs = list(range(n))
112+
slope, _ = statistics.linear_regression(xs, values)
113+
return slope
114+
115+
116+
def _direction(slope: float, threshold: float = 0.001) -> str:
117+
if slope > threshold:
118+
return "improving"
119+
if slope < -threshold:
120+
return "worsening"
121+
return "stable"
122+
123+
124+
def _is_valid_float(v: float) -> bool:
125+
import math
126+
return not (math.isnan(v) or math.isinf(v))
127+
128+
129+
# ── Public API ────────────────────────────────────────────────────────────────
130+
131+
132+
class ContractTrendAnalyzer:
133+
"""Reads a gateframe JSONL audit log and computes per-contract trend slopes.
134+
135+
Only audit entries that carry a ``workflow_id`` are considered. Entries are
136+
grouped by ``workflow_id``; the groups are ordered by the **earliest
137+
timestamp** seen within each group (oldest-first), so the trend slope
138+
represents change over time.
139+
140+
Args:
141+
audit_log_path: Path to the JSONL file written by
142+
:class:`~gateframe.audit.exporters.JsonFileExporter`.
143+
window: How many most-recent workflow-ID groups to include.
144+
regression_threshold: Minimum downward slope that constitutes a
145+
regression. Defaults to 0.02 (2 percentage-point drop per run).
146+
147+
Example::
148+
149+
analyzer = ContractTrendAnalyzer(Path("audit.jsonl"), window=20)
150+
report = analyzer.analyze()
151+
for ct in report.contract_trends:
152+
print(f"{ct.contract_name}: {ct.direction} (slope={ct.slope:.4f})")
153+
"""
154+
155+
def __init__(
156+
self,
157+
audit_log_path: Path | str,
158+
*,
159+
window: int = 20,
160+
regression_threshold: float = 0.02,
161+
) -> None:
162+
self._path = Path(audit_log_path)
163+
self._window = window
164+
self._regression_threshold = regression_threshold
165+
166+
# ── private ──────────────────────────────────────────────────────────────
167+
168+
def _read_entries(self) -> list[dict]:
169+
"""Parse the JSONL file; skip malformed lines."""
170+
entries: list[dict] = []
171+
try:
172+
with self._path.open(encoding="utf-8") as fh:
173+
for line in fh:
174+
line = line.strip()
175+
if not line:
176+
continue
177+
try:
178+
entries.append(json.loads(line))
179+
except json.JSONDecodeError:
180+
continue
181+
except FileNotFoundError:
182+
pass
183+
return entries
184+
185+
def _build_run_summaries(
186+
self, entries: list[dict]
187+
) -> dict[str, list[WorkflowRunSummary]]:
188+
"""Group entries by workflow_id, compute pass rates per contract.
189+
190+
Returns a dict mapping contract_name → list of WorkflowRunSummary,
191+
ordered oldest-first by first-seen timestamp within each workflow run.
192+
"""
193+
# workflow_id → contract_name → {total, passed}
194+
# Also track earliest timestamp per workflow_id for ordering.
195+
wf_contract: dict[str, dict[str, dict]] = defaultdict(
196+
lambda: defaultdict(lambda: {"total": 0, "passed": 0})
197+
)
198+
wf_first_ts: dict[str, str] = {}
199+
200+
for entry in entries:
201+
wf_id = entry.get("workflow_id")
202+
if not wf_id:
203+
continue
204+
contract = entry.get("contract_name", "unknown")
205+
passed = bool(entry.get("passed", False))
206+
ts = entry.get("timestamp", "")
207+
208+
if wf_id not in wf_first_ts or ts < wf_first_ts[wf_id]:
209+
wf_first_ts[wf_id] = ts
210+
211+
bucket = wf_contract[wf_id][contract]
212+
bucket["total"] += 1
213+
if passed:
214+
bucket["passed"] += 1
215+
216+
# Sort workflow runs by first-seen timestamp → oldest first
217+
ordered_wf_ids = sorted(wf_first_ts, key=lambda w: wf_first_ts[w])
218+
219+
# Apply window: keep only the most-recent *window* runs
220+
if len(ordered_wf_ids) > self._window:
221+
ordered_wf_ids = ordered_wf_ids[-self._window :]
222+
223+
# Invert: contract_name → list[WorkflowRunSummary] (time-ordered)
224+
contract_runs: dict[str, list[WorkflowRunSummary]] = defaultdict(list)
225+
for wf_id in ordered_wf_ids:
226+
for contract, counts in wf_contract[wf_id].items():
227+
contract_runs[contract].append(
228+
WorkflowRunSummary(
229+
workflow_id=wf_id,
230+
contract_name=contract,
231+
total=counts["total"],
232+
passed=counts["passed"],
233+
)
234+
)
235+
236+
return dict(contract_runs)
237+
238+
# ── public ───────────────────────────────────────────────────────────────
239+
240+
def analyze(self) -> TrendReport:
241+
"""Run the trend analysis and return a :class:`TrendReport`.
242+
243+
Returns:
244+
A :class:`TrendReport` summarising per-contract slopes and
245+
regression flags. If the audit log is empty or contains no
246+
entries with ``workflow_id``, the report will have no
247+
``contract_trends``.
248+
"""
249+
entries = self._read_entries()
250+
contract_runs = self._build_run_summaries(entries)
251+
252+
contract_trends: list[ContractTrend] = []
253+
for contract_name, run_summaries in sorted(contract_runs.items()):
254+
pass_rates = [
255+
s.pass_rate for s in run_summaries if _is_valid_float(s.pass_rate)
256+
]
257+
slope = _ols_slope(pass_rates)
258+
direction = _direction(slope)
259+
regressed = slope < -self._regression_threshold
260+
261+
contract_trends.append(
262+
ContractTrend(
263+
contract_name=contract_name,
264+
run_summaries=run_summaries,
265+
slope=slope,
266+
direction=direction,
267+
regressed=regressed,
268+
)
269+
)
270+
271+
regressions = [ct for ct in contract_trends if ct.regressed]
272+
return TrendReport(
273+
contract_trends=contract_trends,
274+
any_regression=bool(regressions),
275+
regressions=regressions,
276+
window=self._window,
277+
regression_threshold=self._regression_threshold,
278+
)

0 commit comments

Comments
 (0)