From 93375508c330343b02a4cd3fec7b9202644a9d67 Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Thu, 28 May 2026 15:24:59 -0400 Subject: [PATCH 1/3] Separate structured rollout evidence --- rollout-friction/scripts/analyze_rollouts.py | 251 ++++++++++++++++-- .../scripts/validate_analyze_rollouts.py | 103 +++++++ 2 files changed, 331 insertions(+), 23 deletions(-) diff --git a/rollout-friction/scripts/analyze_rollouts.py b/rollout-friction/scripts/analyze_rollouts.py index 1f41904..79198f0 100644 --- a/rollout-friction/scripts/analyze_rollouts.py +++ b/rollout-friction/scripts/analyze_rollouts.py @@ -17,6 +17,7 @@ import re import sys from collections import Counter +from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable, NamedTuple @@ -70,6 +71,13 @@ r"(?:GraphQL|rate limit|No runs found|mergeable UNKNOWN|blocked|Auto Review)", re.I, ) +INVESTIGATION_NOISE_RE = re.compile( + r"\b(analyze_rollouts\.py|validate_analyze_rollouts\.py|rollout-friction)\b[^\n]{0,240}" + r"\b(GraphQL|rate limit|error|failed|blocked|timeout|grep|pattern|signal)\b|" + r"\bgrep\b[^\n]{0,160}\b(GraphQL|rate limit|error|failed|blocked|timeout)\b|" + r"\bassistant\b[^\n]{0,160}\b(discussion|summary|mentioned|investigation)\b", + re.I, +) INTERESTING_JSON_KEYS = ( "type", "message", @@ -83,6 +91,10 @@ "exit_code", "status", "command", + "error_reason", + "capture_incomplete", + "cleanup", + "wait", "mergeable", "mergeable_state", "statusCheckRollup", @@ -98,6 +110,24 @@ "error=", "command=", ) +STRUCTURED_PAYLOAD_KEYS = { + "status", + "error_reason", + "capture_incomplete", + "cleanup", + "wait", + "exit_code", + "error", +} +NESTED_JSON_STRING_KEYS = { + "output", + "aggregated_output", + "stdout", + "stderr", + "formatted_output", + "content", + "message", +} class Signal: @@ -139,16 +169,18 @@ def __init__( class Hit: - __slots__ = ("file", "line", "snippet") + __slots__ = ("file", "line", "snippet", "structured") file: Path line: int snippet: str + structured: bool - def __init__(self, file: Path, line: int, snippet: str) -> None: + def __init__(self, file: Path, line: int, snippet: str, structured: bool = False) -> None: self.file = file self.line = line self.snippet = snippet + self.structured = structured class Finding: @@ -169,10 +201,15 @@ def add(self, hit: Hit) -> None: def count(self) -> int: return len(self.hits) + @property + def structured_count(self) -> int: + return sum(1 for hit in self.hits if hit.structured) + class Fragment(NamedTuple): text: str summary: bool + structured: bool = False SIGNALS: list[Signal] = [ @@ -311,13 +348,33 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--max-files", type=int, default=DEFAULT_MAX_FILES) parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES) parser.add_argument("--context-chars", type=int, default=DEFAULT_CONTEXT_CHARS) + parser.add_argument("--since", help="Only scan timestamped JSON records at or after this ISO timestamp.") + parser.add_argument("--until", help="Only scan timestamped JSON records before or at this ISO timestamp.") + parser.add_argument("--after-file", type=Path, help="Apply --after-line only to this file path.") + parser.add_argument("--after-line", type=int, help="Only scan records or lines after this line/record number.") + parser.add_argument( + "--suppress-investigation-noise", + action="store_true", + help="Skip conservative self-referential analyzer/grep discussion lines while preserving raw helper payloads.", + ) parser.add_argument("--json", action="store_true", help="Emit JSON instead of a readable report.") args = parser.parse_args() if not args.paths and not args.root: parser.error("provide at least one trace path or an explicit --root") + if args.after_line is not None and args.after_line < 0: + parser.error("--after-line must be non-negative") + args.since_ts = parse_timestamp_arg(args.since, "--since") if args.since else None + args.until_ts = parse_timestamp_arg(args.until, "--until") if args.until else None return args +def parse_timestamp_arg(value: str, flag: str) -> float: + parsed = parse_timestamp(value) + if parsed is None: + raise SystemExit(f"error: {flag} must be an ISO timestamp, got {value!r}") + return parsed + + def iter_candidate_files(paths: list[Path], max_files: int) -> list[Path]: explicit_files: list[Path] = [] discovered_files: list[Path] = [] @@ -382,23 +439,110 @@ def line_text_from_json(value: Any) -> str: return json.dumps(value, sort_keys=True, default=str) -def json_fragments(value: Any) -> Iterable[Fragment]: +def json_fragments(value: Any, structured_context: bool = False) -> Iterable[Fragment]: if isinstance(value, dict): + structured = structured_context or is_structured_payload(value) summary = line_text_from_json(value) for key, child in value.items(): + if isinstance(child, str) and key in NESTED_JSON_STRING_KEYS: + nested = parse_nested_json_object(child) + if nested is not None and contains_structured_payload(nested): + yield from json_fragments(nested, True) + continue if key in INTERESTING_JSON_KEYS and not isinstance(child, dict | list): - yield Fragment(f"{key}={child}", False) + yield Fragment(f"{key}={child}", False, structured) continue - yield from json_fragments(child) + yield from json_fragments(child, structured) if summary != json.dumps(value, sort_keys=True, default=str): - yield Fragment(summary, True) + yield Fragment(summary, True, structured) elif isinstance(value, list): for child in value: - yield from json_fragments(child) + yield from json_fragments(child, structured_context) elif isinstance(value, str): - yield Fragment(value, False) + yield Fragment(value, False, structured_context) elif value is not None: - yield Fragment(json.dumps(value, sort_keys=True, default=str), False) + yield Fragment(json.dumps(value, sort_keys=True, default=str), False, structured_context) + + +def is_structured_payload(value: dict[str, Any]) -> bool: + return bool(STRUCTURED_PAYLOAD_KEYS & set(value.keys())) + + +def contains_structured_payload(value: Any) -> bool: + if isinstance(value, dict): + if is_structured_payload(value): + return True + return any(contains_structured_payload(child) for child in value.values()) + if isinstance(value, list): + return any(contains_structured_payload(child) for child in value) + return False + + +def parse_nested_json_object(text: str) -> Any | None: + stripped = text.strip() + if not (stripped.startswith("{") or stripped.startswith("[")): + return None + try: + parsed = json.loads(stripped) + except json.JSONDecodeError: + return None + if isinstance(parsed, dict | list): + return parsed + return None + + +def parse_timestamp(value: Any) -> float | None: + if not isinstance(value, str) or not value.strip(): + return None + text = value.strip() + if text.endswith("Z"): + text = text[:-1] + "+00:00" + try: + parsed = datetime.fromisoformat(text) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.timestamp() + + +def record_timestamp(value: Any) -> float | None: + if isinstance(value, dict): + for key in ("timestamp", "time", "createdAt", "created_at", "updatedAt", "updated_at"): + parsed = parse_timestamp(value.get(key)) + if parsed is not None: + return parsed + for child in value.values(): + parsed = record_timestamp(child) + if parsed is not None: + return parsed + elif isinstance(value, list): + for child in value: + parsed = record_timestamp(child) + if parsed is not None: + return parsed + return None + + +def in_time_window(value: Any, since_ts: float | None, until_ts: float | None) -> bool: + if since_ts is None and until_ts is None: + return True + timestamp = record_timestamp(value) + if timestamp is None: + return False + if since_ts is not None and timestamp < since_ts: + return False + if until_ts is not None and timestamp > until_ts: + return False + return True + + +def after_checkpoint(path: Path, line_no: int, after_file: Path | None, after_line: int | None) -> bool: + if after_line is None: + return True + if after_file is not None and path.resolve() != after_file.expanduser().resolve(): + return True + return line_no > after_line def top_level_json_records(value: Any) -> Iterable[tuple[int, Any]]: @@ -409,12 +553,19 @@ def top_level_json_records(value: Any) -> Iterable[tuple[int, Any]]: yield 1, value -def iter_lines(path: Path, max_bytes: int) -> Iterable[tuple[int, str, bool]]: +def iter_lines( + path: Path, + max_bytes: int, + since_ts: float | None = None, + until_ts: float | None = None, + after_file: Path | None = None, + after_line: int | None = None, +) -> Iterable[tuple[int, str, bool, bool]]: try: with path.open("rb") as handle: data = handle.read(max_bytes + 1) except OSError as exc: - yield 0, f"scanner_io_error unable to read file: {exc}", False + yield 0, f"scanner_io_error unable to read file: {exc}", False, False return if len(data) > max_bytes: @@ -427,32 +578,56 @@ def iter_lines(path: Path, max_bytes: int) -> Iterable[tuple[int, str, bool]]: pass else: for idx, record in top_level_json_records(parsed): + if not after_checkpoint(path, idx, after_file, after_line): + continue + if not in_time_window(record, since_ts, until_ts): + continue for fragment in json_fragments(record): - yield idx, fragment.text, fragment.summary + yield idx, fragment.text, fragment.summary, fragment.structured return for idx, raw in enumerate(text.splitlines(), start=1): raw = raw.strip() if not raw: continue + if not after_checkpoint(path, idx, after_file, after_line): + continue if raw.startswith("{"): try: - for fragment in json_fragments(json.loads(raw)): - yield idx, fragment.text, fragment.summary + parsed = json.loads(raw) + if not in_time_window(parsed, since_ts, until_ts): + continue + for fragment in json_fragments(parsed): + yield idx, fragment.text, fragment.summary, fragment.structured continue except json.JSONDecodeError: pass - yield idx, raw, False - - -def scan(files: list[Path], max_bytes: int, context_chars: int) -> dict[str, Finding]: + if since_ts is not None or until_ts is not None: + continue + yield idx, raw, False, False + + +def scan( + files: list[Path], + max_bytes: int, + context_chars: int, + since_ts: float | None = None, + until_ts: float | None = None, + after_file: Path | None = None, + after_line: int | None = None, + suppress_investigation_noise: bool = False, +) -> dict[str, Finding]: findings: dict[str, Finding] = {signal.name: Finding(signal) for signal in SIGNALS} seen_hits: set[tuple[Path, int, str, str, int]] = set() for path in files: line_signal_texts: dict[tuple[Path, int, str], set[str]] = {} - for line_no, text, is_summary in iter_lines(path, max_bytes): + for line_no, text, is_summary, is_structured in iter_lines( + path, max_bytes, since_ts, until_ts, after_file, after_line + ): if is_meta_echo(text): continue + if suppress_investigation_noise and not is_structured and is_investigation_noise(text): + continue for signal in SIGNALS: canonical_text = canonical_hit_text(text) for occurrence, _match in enumerate(signal.pattern.finditer(text)): @@ -466,7 +641,12 @@ def scan(files: list[Path], max_bytes: int, context_chars: int) -> dict[str, Fin seen_hits.add(hit_key) line_texts.add(canonical_text) findings[signal.name].add( - Hit(file=path, line=line_no, snippet=redacted(text, context_chars)) + Hit( + file=path, + line=line_no, + snippet=redacted(text, context_chars), + structured=is_structured, + ) ) return { name: finding @@ -496,6 +676,11 @@ def is_meta_echo(text: str) -> bool: return bool(META_ECHO_RE.search(normalized)) +def is_investigation_noise(text: str) -> bool: + normalized = canonical_hit_text(" ".join(text.strip().split())) + return bool(INVESTIGATION_NOISE_RE.search(normalized)) + + def stable_file_id(path: Path) -> str: return hashlib.sha256(str(path).encode("utf-8", errors="replace")).hexdigest()[:12] @@ -507,12 +692,19 @@ def finding_to_json(finding: Finding) -> dict[str, Any]: "severity": finding.signal.severity, "category": finding.signal.category, "count": finding.count, + "structured_payload_count": finding.structured_count, + "broad_context_count": finding.count - finding.structured_count, "files": [ {"id": stable_file_id(Path(path)), "hits": count} for path, count in finding.files.most_common() ], "evidence": [ - {"file_id": stable_file_id(hit.file), "line": hit.line, "snippet": hit.snippet} + { + "file_id": stable_file_id(hit.file), + "line": hit.line, + "snippet": hit.snippet, + "evidence_type": "structured_payload" if hit.structured else "broad_context", + } for hit in examples ], "likely_cause": finding.signal.likely_cause, @@ -542,12 +734,16 @@ def emit_text(files: list[Path], findings: dict[str, Finding]) -> None: for finding in sorted(findings.values(), key=lambda f: (severity_rank(f.signal.severity), -f.count)): print() print(f"[{finding.signal.severity}] {finding.signal.name} ({finding.count} hit(s))") + if finding.structured_count: + print(f"structured_payload_hits: {finding.structured_count}") + print(f"broad_context_hits: {finding.count - finding.structured_count}") print(f"category: {finding.signal.category}") print(f"recommended_destination: {finding.signal.destination}") print(f"likely_cause: {finding.signal.likely_cause}") print("evidence:") for hit in finding.hits[:3]: - print(f"- file_id={stable_file_id(hit.file)} line={hit.line}: {hit.snippet}") + evidence_type = "structured_payload" if hit.structured else "broad_context" + print(f"- file_id={stable_file_id(hit.file)} line={hit.line} type={evidence_type}: {hit.snippet}") def severity_rank(severity: str) -> int: @@ -558,7 +754,16 @@ def main() -> int: args = parse_args() paths = args.paths or [args.root] files = iter_candidate_files(paths, args.max_files) - findings = scan(files, args.max_bytes, args.context_chars) + findings = scan( + files, + args.max_bytes, + args.context_chars, + since_ts=args.since_ts, + until_ts=args.until_ts, + after_file=args.after_file, + after_line=args.after_line, + suppress_investigation_noise=args.suppress_investigation_noise, + ) if args.json: emit_json(files, findings) else: diff --git a/rollout-friction/scripts/validate_analyze_rollouts.py b/rollout-friction/scripts/validate_analyze_rollouts.py index 4423381..b869dcc 100644 --- a/rollout-friction/scripts/validate_analyze_rollouts.py +++ b/rollout-friction/scripts/validate_analyze_rollouts.py @@ -351,6 +351,106 @@ def test_real_trace_evidence_survives_meta_echo_filter() -> None: raise AssertionError(f"real trace evidence should still report {expected}") +def test_structured_payload_counts_are_separate_from_broad_context() -> None: + module = load_module() + with tempfile.TemporaryDirectory() as tmp: + trace = write_trace( + Path(tmp), + [ + { + "timestamp": "2026-05-28T18:00:00Z", + "type": "function_call_output", + "payload": { + "output": json.dumps( + { + "status": "error", + "error_reason": "GraphQL secondary rate limit", + } + ) + }, + }, + {"message": "GraphQL secondary rate limit mentioned in discussion"}, + ], + ) + findings = module.scan([trace], max_bytes=100_000, context_chars=240) + + finding = findings.get("github_graphql_rate_limit") + if finding is None: + raise AssertionError("structured nested helper payload should produce GraphQL finding") + if finding.structured_count != 1: + raise AssertionError(f"expected one structured payload hit, got {finding.structured_count}") + payload = module.finding_to_json(finding) + if payload["structured_payload_count"] != 1 or payload["broad_context_count"] != 1: + raise AssertionError(f"structured and broad counts should be separate: {payload}") + if payload["evidence"][0]["evidence_type"] != "structured_payload": + raise AssertionError(f"structured evidence should be labeled: {payload}") + + +def test_since_and_after_line_bound_scan_records() -> None: + module = load_module() + with tempfile.TemporaryDirectory() as tmp: + trace = write_trace( + Path(tmp), + [ + {"timestamp": "2026-05-28T17:00:00Z", "message": "GraphQL rate limit before checkpoint"}, + {"timestamp": "2026-05-28T18:00:00Z", "message": "GraphQL rate limit after checkpoint"}, + {"timestamp": "2026-05-28T19:00:00Z", "message": "No runs found for workflow CodeQL"}, + ], + ) + files = module.iter_candidate_files([trace], max_files=10) + findings = module.scan( + files, + max_bytes=100_000, + context_chars=240, + since_ts=module.parse_timestamp("2026-05-28T18:30:00Z"), + ) + after_line_findings = module.scan( + files, + max_bytes=100_000, + context_chars=240, + after_file=trace, + after_line=1, + ) + + if "github_graphql_rate_limit" in findings: + raise AssertionError("--since should filter older GraphQL records") + if "github_workflow_wait_miss" not in findings: + raise AssertionError("--since should keep fresh records in the window") + graph = after_line_findings.get("github_graphql_rate_limit") + if graph is None or graph.count != 1: + raise AssertionError("--after-line should keep only records after the checkpoint") + + +def test_investigation_noise_suppression_preserves_structured_payloads() -> None: + module = load_module() + with tempfile.TemporaryDirectory() as tmp: + trace = write_trace( + Path(tmp), + [ + {"message": "grep GraphQL rate limit in rollout-friction/scripts/analyze_rollouts.py"}, + { + "payload": { + "output": json.dumps( + {"status": "error", "error_reason": "GraphQL rate limit from helper"} + ) + } + }, + ], + ) + findings = module.scan( + [trace], + max_bytes=100_000, + context_chars=240, + suppress_investigation_noise=True, + ) + + finding = findings.get("github_graphql_rate_limit") + if finding is None: + raise AssertionError("structured helper payload should survive noise suppression") + if finding.count != 1 or finding.structured_count != 1: + raise AssertionError("investigation noise should be suppressed without hiding structured payloads") + + def main() -> int: test_github_wait_and_rollup_signals() test_json_object_summary_preserves_multi_field_signals() @@ -367,6 +467,9 @@ def main() -> int: test_scanner_io_errors_do_not_count_as_command_failures() test_meta_echoes_do_not_create_findings() test_real_trace_evidence_survives_meta_echo_filter() + test_structured_payload_counts_are_separate_from_broad_context() + test_since_and_after_line_bound_scan_records() + test_investigation_noise_suppression_preserves_structured_payloads() print("ok validate-analyze-rollouts") return 0 From 9d793d8f42b60f7b32bf49387cdf073c053063cf Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Thu, 28 May 2026 15:55:41 -0400 Subject: [PATCH 2/3] Tighten structured rollout evidence detection --- rollout-friction/scripts/analyze_rollouts.py | 1 - .../scripts/validate_analyze_rollouts.py | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/rollout-friction/scripts/analyze_rollouts.py b/rollout-friction/scripts/analyze_rollouts.py index 79198f0..6650838 100644 --- a/rollout-friction/scripts/analyze_rollouts.py +++ b/rollout-friction/scripts/analyze_rollouts.py @@ -111,7 +111,6 @@ "command=", ) STRUCTURED_PAYLOAD_KEYS = { - "status", "error_reason", "capture_incomplete", "cleanup", diff --git a/rollout-friction/scripts/validate_analyze_rollouts.py b/rollout-friction/scripts/validate_analyze_rollouts.py index b869dcc..9bae63d 100644 --- a/rollout-friction/scripts/validate_analyze_rollouts.py +++ b/rollout-friction/scripts/validate_analyze_rollouts.py @@ -451,6 +451,22 @@ def test_investigation_noise_suppression_preserves_structured_payloads() -> None raise AssertionError("investigation noise should be suppressed without hiding structured payloads") +def test_status_wrapper_does_not_make_discussion_structured() -> None: + module = load_module() + fragments = list( + module.json_fragments( + { + "status": "ok", + "message": "grep GraphQL rate limit in rollout-friction/scripts/analyze_rollouts.py", + } + ) + ) + + for fragment in fragments: + if "GraphQL" in fragment.text and fragment.structured: + raise AssertionError(f"status-only wrapper should not mark discussion as structured: {fragments}") + + def main() -> int: test_github_wait_and_rollup_signals() test_json_object_summary_preserves_multi_field_signals() @@ -470,6 +486,7 @@ def main() -> int: test_structured_payload_counts_are_separate_from_broad_context() test_since_and_after_line_bound_scan_records() test_investigation_noise_suppression_preserves_structured_payloads() + test_status_wrapper_does_not_make_discussion_structured() print("ok validate-analyze-rollouts") return 0 From 7798e0fcb4abb29c48729589fabf1ed12e19d322 Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Thu, 28 May 2026 16:34:29 -0400 Subject: [PATCH 3/3] Preserve structured rollout context --- rollout-friction/scripts/analyze_rollouts.py | 8 +++- .../scripts/validate_analyze_rollouts.py | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/rollout-friction/scripts/analyze_rollouts.py b/rollout-friction/scripts/analyze_rollouts.py index 6650838..51288fa 100644 --- a/rollout-friction/scripts/analyze_rollouts.py +++ b/rollout-friction/scripts/analyze_rollouts.py @@ -445,8 +445,12 @@ def json_fragments(value: Any, structured_context: bool = False) -> Iterable[Fra for key, child in value.items(): if isinstance(child, str) and key in NESTED_JSON_STRING_KEYS: nested = parse_nested_json_object(child) - if nested is not None and contains_structured_payload(nested): - yield from json_fragments(nested, True) + if nested is not None: + if structured: + for fragment in json_fragments(nested, True): + yield Fragment(fragment.text, fragment.summary, True) + elif key in NESTED_JSON_STRING_KEYS and contains_structured_payload(nested): + yield from json_fragments(nested, True) continue if key in INTERESTING_JSON_KEYS and not isinstance(child, dict | list): yield Fragment(f"{key}={child}", False, structured) diff --git a/rollout-friction/scripts/validate_analyze_rollouts.py b/rollout-friction/scripts/validate_analyze_rollouts.py index 9bae63d..4c1a357 100644 --- a/rollout-friction/scripts/validate_analyze_rollouts.py +++ b/rollout-friction/scripts/validate_analyze_rollouts.py @@ -451,6 +451,44 @@ def test_investigation_noise_suppression_preserves_structured_payloads() -> None raise AssertionError("investigation noise should be suppressed without hiding structured payloads") +def test_nested_wrapped_helper_payload_retains_structured_context() -> None: + module = load_module() + with tempfile.TemporaryDirectory() as tmp: + trace = write_trace( + Path(tmp), + [ + { + "timestamp": "2026-05-28T18:00:00Z", + "type": "function_call_output", + "status": "error", + "payload": { + "error_reason": "GraphQL secondary rate limit", + "reason": json.dumps( + {"status": "error", "error_reason": "GraphQL secondary rate limit"} + ) + }, + } + ], + ) + findings = module.scan( + [trace], + max_bytes=100_000, + context_chars=240, + suppress_investigation_noise=True, + ) + + finding = findings.get("github_graphql_rate_limit") + if finding is None: + raise AssertionError("nested wrapped helper payload should still produce a finding") + if finding.structured_count < 1: + raise AssertionError(f"nested wrapped helper payload should stay structured, got {finding.structured_count}") + payload = module.finding_to_json(finding) + if payload["structured_payload_count"] < 1 or payload["broad_context_count"] != 0: + raise AssertionError(f"nested wrapped helper payload should not become broad context: {payload}") + if payload["evidence"][0]["evidence_type"] != "structured_payload": + raise AssertionError(f"nested wrapped helper payload should be labeled structured: {payload}") + + def test_status_wrapper_does_not_make_discussion_structured() -> None: module = load_module() fragments = list( @@ -486,6 +524,7 @@ def main() -> int: test_structured_payload_counts_are_separate_from_broad_context() test_since_and_after_line_bound_scan_records() test_investigation_noise_suppression_preserves_structured_payloads() + test_nested_wrapped_helper_payload_retains_structured_context() test_status_wrapper_does_not_make_discussion_structured() print("ok validate-analyze-rollouts") return 0