From 0762841693b4cdc2649ef418f85a6802cd86a435 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Jun 2026 20:01:03 +0200 Subject: [PATCH 1/5] feat: add lightweight provenance profiler --- contextpilot/__init__.py | 131 ++++++++++-------- contextpilot/hermes_opportunities/__init__.py | 9 ++ contextpilot/hermes_opportunities/models.py | 46 ++++++ .../hermes_opportunities/provenance.py | 65 +++++++++ contextpilot/hermes_opportunities/report.py | 16 ++- tests/test_analyzer_lightweight_provenance.py | 130 +++++++++++++++++ 6 files changed, 339 insertions(+), 58 deletions(-) create mode 100644 contextpilot/hermes_opportunities/provenance.py create mode 100644 tests/test_analyzer_lightweight_provenance.py diff --git a/contextpilot/__init__.py b/contextpilot/__init__.py index a2523d7..3052c6d 100644 --- a/contextpilot/__init__.py +++ b/contextpilot/__init__.py @@ -17,70 +17,87 @@ >>> results = pipeline.run(queries=["What is AI?"]) See docs/reference/api.md for detailed documentation. -""" - -from .pipeline import ( - RAGPipeline, - RetrieverConfig, - OptimizerConfig, - InferenceConfig, - PipelineConfig, -) - -from .context_index import ( - ContextIndex, - IndexResult, -) - -from .context_ordering import ( - IntraContextOrderer, -) -from .server.live_index import ContextPilot - -from .dedup import ( - dedup_chat_completions, - dedup_responses_api, - DedupResult, -) - -from .api import optimize, optimize_batch +Imports are lazy (PEP 562): the heavy RAG stack (``pipeline`` -> ``context_index`` +-> ``scipy``) is only pulled in when one of its names is first accessed. This +keeps lightweight, dependency-free consumers -- such as the standalone token +monitor / provenance profiler in :mod:`contextpilot.hermes_opportunities` -- +importable inside minimal environments where SciPy and friends are absent. +""" +from __future__ import annotations -from .retriever import ( - BM25Retriever, - FAISSRetriever, - FAISS_AVAILABLE, - Mem0Retriever, - create_mem0_corpus_map, - MEM0_AVAILABLE, -) +import importlib +from typing import TYPE_CHECKING __version__ = "0.4.1" -__all__ = [ +# Map each public name to the submodule that defines it. Submodules are imported +# on first attribute access, so importing ``contextpilot`` (or any lightweight +# subpackage like ``hermes_opportunities``) never eagerly drags in SciPy/NumPy. +_LAZY_EXPORTS = { # High-level pipeline API - "RAGPipeline", - "RetrieverConfig", - "OptimizerConfig", - "InferenceConfig", - "PipelineConfig", + "RAGPipeline": ".pipeline", + "RetrieverConfig": ".pipeline", + "OptimizerConfig": ".pipeline", + "InferenceConfig": ".pipeline", + "PipelineConfig": ".pipeline", # Core components - "ContextIndex", - "IndexResult", - "IntraContextOrderer", - "ContextPilot", + "ContextIndex": ".context_index", + "IndexResult": ".context_index", + "IntraContextOrderer": ".context_ordering", + "ContextPilot": ".server.live_index", # Deduplication - "dedup_chat_completions", - "dedup_responses_api", - "DedupResult", + "dedup_chat_completions": ".dedup", + "dedup_responses_api": ".dedup", + "DedupResult": ".dedup", # Convenience functions - "optimize", - "optimize_batch", + "optimize": ".api", + "optimize_batch": ".api", # Retrievers - "BM25Retriever", - "FAISSRetriever", - "FAISS_AVAILABLE", - "Mem0Retriever", - "create_mem0_corpus_map", - "MEM0_AVAILABLE", -] + "BM25Retriever": ".retriever", + "FAISSRetriever": ".retriever", + "FAISS_AVAILABLE": ".retriever", + "Mem0Retriever": ".retriever", + "create_mem0_corpus_map": ".retriever", + "MEM0_AVAILABLE": ".retriever", +} + +__all__ = list(_LAZY_EXPORTS) + + +def __getattr__(name: str): + """Lazily resolve a public name to its (heavy) submodule on first access.""" + module_name = _LAZY_EXPORTS.get(name) + if module_name is None: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + module = importlib.import_module(module_name, __name__) + value = getattr(module, name) + globals()[name] = value # cache so subsequent lookups skip the import machinery + return value + + +def __dir__(): + return sorted(list(globals()) + __all__) + + +if TYPE_CHECKING: # pragma: no cover - import-time hints for type checkers only + from .api import optimize, optimize_batch + from .context_index import ContextIndex, IndexResult + from .context_ordering import IntraContextOrderer + from .dedup import DedupResult, dedup_chat_completions, dedup_responses_api + from .pipeline import ( + InferenceConfig, + OptimizerConfig, + PipelineConfig, + RAGPipeline, + RetrieverConfig, + ) + from .retriever import ( + FAISS_AVAILABLE, + MEM0_AVAILABLE, + BM25Retriever, + FAISSRetriever, + Mem0Retriever, + create_mem0_corpus_map, + ) + from .server.live_index import ContextPilot diff --git a/contextpilot/hermes_opportunities/__init__.py b/contextpilot/hermes_opportunities/__init__.py index 561c98a..9ca912f 100644 --- a/contextpilot/hermes_opportunities/__init__.py +++ b/contextpilot/hermes_opportunities/__init__.py @@ -78,6 +78,8 @@ PromptDuplicateBlock, PromptDuplicateShadow, PromptDuplicateTypeCount, + ProvenanceProfile, + ProvenanceSourceStat, RepeatedBlock, RouterCandidateBlock, RouterLabelCount, @@ -85,11 +87,13 @@ TelemetryCoverage, ToolSizeStat, TypeCount, + UNKNOWN_SOURCE, WorkerRoutingShadow, _est_tokens, _LLMContent, _ToolMessage, ) +from .provenance import build_provenance_profile from .privacy import ( FORBIDDEN_OUTPUT_KEYS, _assert_no_forbidden_keys, @@ -163,7 +167,12 @@ "ParentAggregationGroup", "ArtifactKindStat", "ParentAggregationArtifacts", + "ProvenanceSourceStat", + "ProvenanceProfile", "OpportunityReport", + # provenance profile (token-monitor view) + "UNKNOWN_SOURCE", + "build_provenance_profile", # loaders "load_tool_messages", "load_llm_bound_content", diff --git a/contextpilot/hermes_opportunities/models.py b/contextpilot/hermes_opportunities/models.py index 2a2ed48..532cce4 100644 --- a/contextpilot/hermes_opportunities/models.py +++ b/contextpilot/hermes_opportunities/models.py @@ -116,6 +116,50 @@ class HeavySession: api_call_count: int +# Source label used when a Hermes session carries no recorded provenance source. +# Keeps the provenance profile a low-cardinality enum view rather than leaking +# a null/raw value. +UNKNOWN_SOURCE = "unknown" + + +@dataclass +class ProvenanceSourceStat: + """Token-monitor rollup for one provenance source. + + Privacy-safe by construction: a low-cardinality source label plus numeric + aggregates only -- never raw session ids/hashes, prompts, content, or + reasoning. This is the per-source ``by_source`` row of + :class:`ProvenanceProfile`. + """ + + source: str # low-cardinality provenance label (e.g. "discord") + session_count: int + input_tokens: int + output_tokens: int + message_count: int + tool_call_count: int + api_call_count: int + total_tokens: int # input_tokens + output_tokens + + +@dataclass +class ProvenanceProfile: + """Privacy-safe per-source token-usage profile (the token-monitor view). + + Aggregates :class:`HeavySession` rows by their provenance ``source`` into + numeric counters. Emits only low-cardinality source enums and integer + aggregates -- no session hashes/ids, prompts, content, or reasoning ever + appear in this structure. + """ + + source_count: int + session_count: int + input_tokens: int + output_tokens: int + total_tokens: int + by_source: list[ProvenanceSourceStat] + + @dataclass class TelemetryCoverage: events: int @@ -398,6 +442,8 @@ class OpportunityReport: repeated_blocks: list[RepeatedBlock] large_tool_outputs_by_tool: list[ToolSizeStat] heavy_sessions: list[HeavySession] + # Token-monitor/provenance-profiler rollup (source enums + numeric aggregates only). + provenance_profile: ProvenanceProfile telemetry: TelemetryCoverage # LLM-bound block analysis (system/skill prompts, prompts, tool results). llm_bound_item_count: int diff --git a/contextpilot/hermes_opportunities/provenance.py b/contextpilot/hermes_opportunities/provenance.py new file mode 100644 index 0000000..4cf5dad --- /dev/null +++ b/contextpilot/hermes_opportunities/provenance.py @@ -0,0 +1,65 @@ +"""Privacy-safe provenance profiling -- the standalone token-monitor view. + +``build_provenance_profile`` rolls a list of :class:`HeavySession` rows up by +their provenance ``source`` into numeric token/usage aggregates. It is the +lightweight "where are my tokens going?" report: no SciPy/RAG machinery, no +content, prompts, reasoning, or raw session ids/hashes -- only low-cardinality +source labels and integer counters. +""" +from __future__ import annotations + +from collections.abc import Iterable + +from .models import ( + UNKNOWN_SOURCE, + HeavySession, + ProvenanceProfile, + ProvenanceSourceStat, +) + + +def build_provenance_profile( + heavy_sessions: Iterable[HeavySession], +) -> ProvenanceProfile: + """Aggregate heavy sessions into a per-source token-usage profile. + + Sessions with no recorded ``source`` are folded into the ``"unknown"`` + bucket so the output stays a low-cardinality enum view. ``by_source`` rows + are returned sorted by descending total tokens (then source name) for stable, + human-meaningful ordering. + """ + buckets: dict[str, ProvenanceSourceStat] = {} + for session in heavy_sessions: + source = session.source or UNKNOWN_SOURCE + stat = buckets.get(source) + if stat is None: + stat = ProvenanceSourceStat( + source=source, + session_count=0, + input_tokens=0, + output_tokens=0, + message_count=0, + tool_call_count=0, + api_call_count=0, + total_tokens=0, + ) + buckets[source] = stat + stat.session_count += 1 + stat.input_tokens += session.input_tokens + stat.output_tokens += session.output_tokens + stat.message_count += session.message_count + stat.tool_call_count += session.tool_call_count + stat.api_call_count += session.api_call_count + stat.total_tokens += session.input_tokens + session.output_tokens + + by_source = sorted( + buckets.values(), key=lambda s: (-s.total_tokens, s.source) + ) + return ProvenanceProfile( + source_count=len(by_source), + session_count=sum(s.session_count for s in by_source), + input_tokens=sum(s.input_tokens for s in by_source), + output_tokens=sum(s.output_tokens for s in by_source), + total_tokens=sum(s.total_tokens for s in by_source), + by_source=by_source, + ) diff --git a/contextpilot/hermes_opportunities/report.py b/contextpilot/hermes_opportunities/report.py index 5fd1b99..af856f8 100644 --- a/contextpilot/hermes_opportunities/report.py +++ b/contextpilot/hermes_opportunities/report.py @@ -36,6 +36,7 @@ _ToolMessage, ) from .privacy import _assert_no_forbidden_keys, _salt_fingerprint +from .provenance import build_provenance_profile from .routing import analyze_worker_routing_shadow from .tokenizer import TokenizerBackend @@ -126,6 +127,7 @@ def build_report( top_n=top_n, enabled=parent_aggregation_shadow, ) + provenance_profile = build_provenance_profile(heavy_sessions) total_chars = sum(len(m.content) for m in tool_messages) dup_wasted = sum(d.est_wasted_tokens for d in dups) @@ -166,6 +168,7 @@ def build_report( repeated_blocks=blocks, large_tool_outputs_by_tool=sizes, heavy_sessions=heavy_sessions, + provenance_profile=provenance_profile, telemetry=telemetry, llm_bound_item_count=len(llm_contents), llm_block_types=block_type_stats, @@ -223,9 +226,20 @@ def write_report(report: OpportunityReport, out_dir: Path) -> tuple[Path, Path]: f"- Parent aggregation (shadow): {report.parent_aggregation.duplicate_group_count} " f"duplicate artifact groups, " f"~{report.parent_aggregation.est_duplicate_tokens} advisory duplicate tokens", + f"- Provenance profile: {report.provenance_profile.session_count} sessions across " + f"{report.provenance_profile.source_count} sources, " + f"{report.provenance_profile.total_tokens} actual input+output tokens", "", - "## LLM-bound redundancy by block type", + "## Token profile by source", ] + for src in report.provenance_profile.by_source: + md.append( + f"- {src.source}: sessions={src.session_count} input={src.input_tokens} " + f"output={src.output_tokens} total={src.total_tokens} " + f"messages={src.message_count} tools={src.tool_call_count} api_calls={src.api_call_count}" + ) + md.append("") + md.append("## LLM-bound redundancy by block type") for bt in report.llm_block_types: md.append( f"- {bt.block_type}: items={bt.item_count} blocks={bt.block_count} " diff --git a/tests/test_analyzer_lightweight_provenance.py b/tests/test_analyzer_lightweight_provenance.py new file mode 100644 index 0000000..296bda9 --- /dev/null +++ b/tests/test_analyzer_lightweight_provenance.py @@ -0,0 +1,130 @@ +"""RED-phase tests for the lightweight provenance profiler / token monitor. + +These tests pin two requirements for using ContextPilot purely as a standalone +token monitor / profiler against a Hermes state DB: + +1. ``scripts/analyze_hermes_context_opportunities.py`` must import and expose its + public API even when SciPy is not installed. SciPy is a heavy, optional + ContextPilot dependency that ``contextpilot/__init__`` pulls in transitively + (``contextpilot.pipeline`` -> ``contextpilot.context_index`` -> + ``scipy.cluster.hierarchy``). The analyzer only reads Hermes' SQLite state + DB and never needs the RAG pipeline, so a missing SciPy must not break it. + This reproduces the real ``ModuleNotFoundError: No module named 'scipy'`` + observed when running the analyzer inside the Hermes venv. + +2. The analyzer must offer a privacy-safe provenance profile that aggregates + token usage per source (the token-monitor view) using numeric aggregates and + low-cardinality source enums only -- never raw content, prompts, reasoning, + or raw session ids/hashes. + +Both tests fail today (RED): importing the script triggers +``contextpilot/__init__``, which eagerly imports ``contextpilot.pipeline`` and +hence SciPy, so the script cannot even load when SciPy is absent. +""" +from __future__ import annotations + +import dataclasses +import importlib.util +import sys +from pathlib import Path + +MODULE_PATH = ( + Path(__file__).resolve().parents[1] + / "scripts" + / "analyze_hermes_context_opportunities.py" +) + + +class _BlockScipyFinder: + """Meta-path finder that makes ``scipy`` look uninstalled. + + Raising ``ModuleNotFoundError`` from ``find_spec`` reproduces exactly what + the Hermes venv does at import time, regardless of whether SciPy happens to + be installed in the test environment. + """ + + def find_spec(self, fullname, path=None, target=None): + if fullname == "scipy" or fullname.startswith("scipy."): + raise ModuleNotFoundError( + f"No module named 'scipy' (blocked for test: {fullname})" + ) + return None + + +def _purge(prefixes): + for name in list(sys.modules): + if any(name == p or name.startswith(p + ".") for p in prefixes): + del sys.modules[name] + + +_PURGE_PREFIXES = ["scipy", "contextpilot", "analyze_hermes_context_opportunities"] + + +def _load_analyzer_without_scipy(): + """Load the analyzer script by file path with ``scipy`` forced absent.""" + finder = _BlockScipyFinder() + saved_modules = { + name: module + for name, module in sys.modules.items() + if any(name == p or name.startswith(p + ".") for p in _PURGE_PREFIXES) + } + sys.meta_path.insert(0, finder) + _purge(_PURGE_PREFIXES) + try: + spec = importlib.util.spec_from_file_location( + "analyze_hermes_context_opportunities", MODULE_PATH + ) + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + finally: + try: + sys.meta_path.remove(finder) + except ValueError: + pass + # Restore the import cache exactly as it was so this import-isolation + # test cannot perturb later multiprocessing/pickling tests that depend + # on module object identity. + _purge(_PURGE_PREFIXES) + sys.modules.update(saved_modules) + + +def test_analyzer_imports_without_scipy(): + module = _load_analyzer_without_scipy() + # The token-monitor entry points must all be reachable without SciPy. + assert callable(module.main) + assert callable(module.load_tool_messages) + assert callable(module.load_heavy_sessions) + assert callable(module.build_report) + + +def test_provenance_profile_is_privacy_safe(): + module = _load_analyzer_without_scipy() + build_provenance_profile = getattr(module, "build_provenance_profile", None) + assert callable(build_provenance_profile), ( + "analyzer must expose build_provenance_profile() for the token-monitor view" + ) + HeavySession = module.HeavySession + sessions = [ + HeavySession("hash-a", "discord", 1000, 200, 6, 4, 3), + HeavySession("hash-b", "discord", 500, 100, 4, 2, 2), + HeavySession("hash-c", "slack", 300, 50, 3, 1, 1), + ] + profile = build_provenance_profile(sessions) + + by_source = {e.source: e for e in profile.by_source} + assert set(by_source) == {"discord", "slack"} + assert by_source["discord"].input_tokens == 1500 + assert by_source["discord"].output_tokens == 300 + assert by_source["discord"].session_count == 2 + assert by_source["slack"].input_tokens == 300 + assert by_source["slack"].session_count == 1 + + # Provenance output is numeric aggregates + low-cardinality source enums only; + # no raw content/prompts/reasoning and no raw session ids/hashes may leak. + data = dataclasses.asdict(profile) + module._assert_no_forbidden_keys(data) + blob = repr(data) + for raw_hash in ("hash-a", "hash-b", "hash-c"): + assert raw_hash not in blob From 7671f502b0137607edccdb58143369842b89b588 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 18 Jun 2026 15:19:00 +0200 Subject: [PATCH 2/5] feat: dedup provenance fenced artifact blocks --- .../artifact_dedup_canary.py | 144 ++++++++++++++++-- contextpilot/trace_validation/runner.py | 32 +++- tests/test_artifact_dedup_canary.py | 89 +++++++++++ 3 files changed, 250 insertions(+), 15 deletions(-) diff --git a/contextpilot/hermes_opportunities/artifact_dedup_canary.py b/contextpilot/hermes_opportunities/artifact_dedup_canary.py index 11a0c5d..3588b13 100644 --- a/contextpilot/hermes_opportunities/artifact_dedup_canary.py +++ b/contextpilot/hermes_opportunities/artifact_dedup_canary.py @@ -132,6 +132,60 @@ def _parse_artifact_reference(line: str) -> str | None: return body_hash +def _segment_fenced_blocks(body: str) -> list[tuple[str, str]]: + """Split ``body`` into reversible prose/fence segments. + + Only closed triple-backtick fences are marked as ``"fence"``. Unterminated + fences are deliberately treated as prose so the canary never guesses a block + boundary. Concatenating the segment text always reproduces ``body`` exactly. + """ + segments: list[tuple[str, str]] = [] + pos = 0 + n = len(body) + while pos < n: + start = body.find("```", pos) + if start == -1: + if pos < n: + segments.append(("prose", body[pos:])) + break + close = body.find("```", start + 3) + if close == -1: + if pos < n: + segments.append(("prose", body[pos:])) + break + if start > pos: + segments.append(("prose", body[pos:start])) + end = close + 3 + segments.append(("fence", body[start:end])) + pos = end + return segments + + +def _scan_fenced_block_candidates( + contents: list[_LLMContent], *, salt: str, min_block_chars: int +) -> tuple[int, int]: + """Advisory duplicate count for exact fenced sub-artifacts.""" + agg: dict[str, dict] = {} + for item in contents: + if item.block_type not in MUTABLE_ARTIFACT_BLOCK_TYPES: + continue + # Whole-body references are not canonical sources for sub-blocks. + if _parse_artifact_reference(item.content) is not None: + continue + for kind, text in _segment_fenced_blocks(item.content): + if kind != "fence" or len(text) < min_block_chars: + continue + if _parse_artifact_reference(text) is not None: + continue + h = _salted_hash(text, salt) + entry = agg.get(h) + if entry is None: + agg[h] = {"canonical_type": f"{item.block_type}#block", "char_length": len(text), "occ": 1} + else: + entry["occ"] += 1 + return _eligible_groups(agg) + + def _scan_artifacts( contents: list[_LLMContent], *, salt: str, min_block_chars: int ) -> tuple[dict[str, dict], int]: @@ -226,6 +280,11 @@ def apply_artifact_dedup_canary( agg, item_count = _scan_artifacts(items, salt=salt, min_block_chars=min_block_chars) candidate_group_count, candidate_chars = _eligible_groups(agg) + block_group_count, block_candidate_chars = _scan_fenced_block_candidates( + items, salt=salt, min_block_chars=min_block_chars + ) + candidate_group_count += block_group_count + candidate_chars += block_candidate_chars if resolved == "shadow": # Measure what a canary would replace, but never touch the payload. @@ -246,6 +305,8 @@ def apply_artifact_dedup_canary( chars_saved = 0 # hash -> canonical provenance type of the first (kept) occurrence. canonical: dict[str, str] = {} + # hash -> canonical provenance type for exact fenced sub-artifacts. + block_canonical: dict[str, str] = {} for item in items: if item.block_type not in MUTABLE_ARTIFACT_BLOCK_TYPES: continue # protected content is never touched @@ -255,15 +316,54 @@ def apply_artifact_dedup_canary( if _parse_artifact_reference(body) is not None: continue h = _salted_hash(body, salt) - if h not in canonical: - canonical[h] = item.block_type # keep the first canonical body verbatim + already_has_whole_canonical = h in canonical + if already_has_whole_canonical: + # Later exact duplicate whole body: reference the EARLIER canonical + # body's provenance and do not also scan sub-blocks (no double count). + ref = _artifact_reference_string(canonical[h], h) + if len(ref) < len(body): # only when it actually shrinks the payload + item.content = ref + blocks_replaced += 1 + chars_saved += len(body) - len(ref) + continue + + # If the whole body is not replaced, opportunistically dedup exact + # duplicate fenced sub-artifacts within/across mutable artifact bodies. + segments = _segment_fenced_blocks(body) + if not any(kind == "fence" for kind, _text in segments): + if not already_has_whole_canonical: + canonical[h] = item.block_type # keep the first canonical body verbatim continue - # Later exact duplicate: reference the EARLIER canonical body's provenance. - ref = _artifact_reference_string(canonical[h], h) - if len(ref) < len(body): # only when it actually shrinks the payload - item.content = ref - blocks_replaced += 1 - chars_saved += len(body) - len(ref) + new_segments: list[str] = [] + changed = False + for kind, text in segments: + if kind != "fence" or len(text) < min_block_chars: + new_segments.append(text) + continue + if _parse_artifact_reference(text) is not None: + new_segments.append(text) + continue + bh = _salted_hash(text, salt) + if bh not in block_canonical: + block_canonical[bh] = f"{item.block_type}#block" + new_segments.append(text) + continue + ref = _artifact_reference_string(block_canonical[bh], bh) + if len(ref) < len(text): + new_segments.append(ref) + blocks_replaced += 1 + chars_saved += len(text) - len(ref) + changed = True + else: + new_segments.append(text) + if changed: + item.content = "".join(new_segments) + # Register only the post-mutation whole body as canonical. Registering + # the original pre-mutation hash would let a later whole-body + # reference point to a body no longer present in the payload. + canonical[_salted_hash(item.content, salt)] = item.block_type + elif not already_has_whole_canonical: + canonical[h] = item.block_type # keep the first canonical body verbatim return ArtifactDedupCanaryResult( mode="canary", @@ -287,7 +387,7 @@ def dangling_artifact_references( full canonical body whose salted hash matches the reference. A reference with no such earlier canonical body (or one that only appears later) is dangling. """ - seen_full: set[str] = set() # hashes of earlier full canonical artifact bodies + seen_full: set[str] = set() # hashes of earlier full canonical artifact bodies/blocks dangling: list[int] = [] for idx, item in enumerate(contents): body = item.content @@ -296,8 +396,30 @@ def dangling_artifact_references( if ref_hash not in seen_full: dangling.append(idx) continue - if item.block_type in MUTABLE_ARTIFACT_BLOCK_TYPES: - seen_full.add(_salted_hash(body, salt)) + if item.block_type not in MUTABLE_ARTIFACT_BLOCK_TYPES: + continue + + # Whole artifact body can satisfy whole-body references. + seen_full.add(_salted_hash(body, salt)) + + # Within a body, ordering matters: an earlier fenced block can satisfy a + # later reference segment in the same body, but a later block cannot. + for kind, text in _segment_fenced_blocks(body): + if kind != "fence": + # References may also appear as standalone prose lines after a + # fenced block replacement. Embedded prose around the line stays + # protected; only exact standalone reference lines are accepted. + for line in text.splitlines(): + seg_ref = _parse_artifact_reference(line.strip()) + if seg_ref is not None and seg_ref not in seen_full: + dangling.append(idx) + continue + seg_ref = _parse_artifact_reference(text) + if seg_ref is not None: + if seg_ref not in seen_full: + dangling.append(idx) + continue + seen_full.add(_salted_hash(text, salt)) return dangling diff --git a/contextpilot/trace_validation/runner.py b/contextpilot/trace_validation/runner.py index 8707138..42aa940 100644 --- a/contextpilot/trace_validation/runner.py +++ b/contextpilot/trace_validation/runner.py @@ -43,6 +43,7 @@ MUTABLE_ARTIFACT_BLOCK_TYPES, ArtifactDedupCanaryResult, _parse_artifact_reference, + _segment_fenced_blocks, apply_artifact_dedup_canary, dangling_artifact_references, resolve_artifact_dedup_mode, @@ -406,11 +407,34 @@ def _artifact_mutation_scope_ok(base: dict, cand: dict) -> bool: # Only mutable artifact bodies may ever change. if base["block_type"] not in MUTABLE_ARTIFACT_BLOCK_TYPES: return False - # A changed body must become a reference placeholder strictly shorter than - # the body it replaced -- never new free text and never a growth. - if _parse_artifact_reference(cand["content"]) is None: + if len(cand["content"]) >= len(base["content"]): return False - return len(cand["content"]) < len(base["content"]) + + # Whole-body replacement remains valid. + if _parse_artifact_reference(cand["content"]) is not None: + return True + + # Fenced sub-artifact replacement: prose must be byte-identical and only a + # whole fenced segment may be swapped for one strictly shorter reference line. + pos = 0 + changed = False + for kind, text in _segment_fenced_blocks(base["content"]): + if kind != "fence": + if not cand["content"].startswith(text, pos): + return False + pos += len(text) + continue + if cand["content"].startswith(text, pos): + pos += len(text) + continue + newline = cand["content"].find("\n", pos) + end = len(cand["content"]) if newline == -1 else newline + ref = cand["content"][pos:end] + if _parse_artifact_reference(ref) is None or len(ref) >= len(text): + return False + pos = end + changed = True + return changed and pos == len(cand["content"]) def check_artifact_invariants( diff --git a/tests/test_artifact_dedup_canary.py b/tests/test_artifact_dedup_canary.py index 2a6006f..755596a 100644 --- a/tests/test_artifact_dedup_canary.py +++ b/tests/test_artifact_dedup_canary.py @@ -39,6 +39,7 @@ dangling_artifact_references, resolve_artifact_dedup_mode, _artifact_reference_string, + _segment_fenced_blocks, ) from contextpilot.hermes_opportunities.models import _LLMContent from contextpilot.hermes_opportunities.privacy import _salted_hash @@ -69,6 +70,20 @@ # Just over min_block_chars but shorter than any reference placeholder, so a # replacement would GROW the payload and must be skipped. SHORT_ARTIFACT = "Short synthetic artifact body just over forty chars." +LONG_FENCE_BLOCK = ( + "```log\n" + "synthetic provenance artifact line 001: worker output checksum=alpha\n" + "synthetic provenance artifact line 002: worker output checksum=bravo\n" + "synthetic provenance artifact line 003: worker output checksum=charlie\n" + "```" +) +FENCED_PARENT_ARTIFACT = ( + "Parent aggregation summary before first artifact.\n" + f"{LONG_FENCE_BLOCK}\n" + "Short prose between artifacts must survive byte-identical.\n" + f"{LONG_FENCE_BLOCK}\n" + "Parent aggregation summary after duplicate artifact." +) def _ref(body: str, *, canonical_type: str = "tool_result") -> str: @@ -80,6 +95,10 @@ def _ref_len(canonical_type: str = "tool_result") -> int: return len(_ref(LONG_ARTIFACT, canonical_type=canonical_type)) +def _block_ref(block: str, *, canonical_type: str = "tool_result#block") -> str: + return _artifact_reference_string(canonical_type, _salted_hash(block, SALT)) + + # --------------------------------------------------------------------------- # Mode resolution + escape hatch (default OFF) # --------------------------------------------------------------------------- @@ -214,6 +233,76 @@ def test_canary_dedups_across_artifact_types_provenance_canonical_is_first(): assert result.blocks_replaced == 1 +def test_segment_fenced_blocks_round_trips_and_marks_closed_fences(): + segments = _segment_fenced_blocks(FENCED_PARENT_ARTIFACT) + assert "".join(text for _kind, text in segments) == FENCED_PARENT_ARTIFACT + assert [kind for kind, _text in segments].count("fence") == 2 + + +def test_canary_replaces_later_exact_duplicate_fenced_block_inside_artifact_body(): + contents = [_LLMContent("assistant_context", FENCED_PARENT_ARTIFACT)] + + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + + assert contents[0].content.count(LONG_FENCE_BLOCK) == 1 + expected_ref = _block_ref(LONG_FENCE_BLOCK, canonical_type="assistant_context#block") + assert expected_ref in contents[0].content + assert "Short prose between artifacts must survive byte-identical." in contents[0].content + assert result.blocks_replaced == 1 + assert result.chars_saved == len(LONG_FENCE_BLOCK) - len(expected_ref) + assert dangling_artifact_references(contents, salt=SALT) == [] + + +def test_canary_replaces_duplicate_fenced_block_across_artifact_types(): + first = f"tool output wrapper\n{LONG_FENCE_BLOCK}\nend" + second = f"assistant rollup wrapper\n{LONG_FENCE_BLOCK}\nend" + contents = [ + _LLMContent("tool_result", first), + _LLMContent("assistant_context", second), + ] + + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + + assert contents[0].content == first + assert LONG_FENCE_BLOCK not in contents[1].content + assert _block_ref(LONG_FENCE_BLOCK, canonical_type="tool_result#block") in contents[1].content + assert result.blocks_replaced == 1 + + +def test_whole_body_canonical_is_not_registered_before_internal_block_rewrite(): + contents = [ + _LLMContent("assistant_context", FENCED_PARENT_ARTIFACT), + _LLMContent("assistant_context", FENCED_PARENT_ARTIFACT), + ] + + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + + assert result.blocks_replaced >= 2 + # A later reference must never point at the pre-mutation whole-body hash after + # the first body was internally rewritten; every emitted reference resolves + # to an earlier full fenced block/body still present in the payload. + assert dangling_artifact_references(contents, salt=SALT) == [] + + +def test_unterminated_fence_is_treated_as_prose_and_not_mutated(): + body = "prefix\n```log\n" + ("unterminated synthetic artifact line\n" * 8) + contents = [_LLMContent("tool_result", body + body)] + before = contents[0].content + + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + + assert contents[0].content == before + assert result.blocks_replaced == 0 + + def test_canary_reference_carries_no_raw_artifact_body(): contents = [ _LLMContent("tool_result", LONG_ARTIFACT), From 30418c8ecc5376c7ea6ce5fa154e3de6dbbcfcc7 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 19 Jun 2026 00:24:55 +0200 Subject: [PATCH 3/5] feat: add declared source-span artifact backrefs --- __init__.py | 29 +++ .../artifact_dedup_canary.py | 166 +++++++++++++++++- contextpilot/trace_validation/runner.py | 78 ++++++-- tests/test_artifact_dedup_canary.py | 143 +++++++++++++++ 4 files changed, 400 insertions(+), 16 deletions(-) diff --git a/__init__.py b/__init__.py index 4095489..3180876 100644 --- a/__init__.py +++ b/__init__.py @@ -478,6 +478,7 @@ def _apply_artifact_dedup_canary_to_api_messages( if mods is None: return None _LLMContent = mods["models"]._LLMContent + ArtifactSpanLink = mods["artifact_dedup_canary"].ArtifactSpanLink apply_artifact_dedup_canary = mods["artifact_dedup_canary"].apply_artifact_dedup_canary llm_items = [] @@ -501,10 +502,38 @@ def _apply_artifact_dedup_canary_to_api_messages( if not llm_items: return None + llm_index_by_message_index = {msg_idx: llm_idx for llm_idx, msg_idx in enumerate(message_indexes)} + span_links = [] + for msg_idx, msg in enumerate(api_messages): + raw_links = msg.get("contextpilot_span_links") if isinstance(msg, dict) else None + if isinstance(msg, dict): + msg.pop("contextpilot_span_links", None) + if not isinstance(raw_links, list): + continue + for raw in raw_links: + if not isinstance(raw, dict): + continue + try: + src_msg = int(raw["source_message_index"]) + tgt_msg = int(raw.get("target_message_index", msg_idx)) + span_links.append( + ArtifactSpanLink( + source_index=llm_index_by_message_index[src_msg], + source_start=int(raw["source_start"]), + source_end=int(raw["source_end"]), + target_index=llm_index_by_message_index[tgt_msg], + target_start=int(raw["target_start"]), + target_end=int(raw["target_end"]), + ) + ) + except (KeyError, TypeError, ValueError, IndexError): + continue + result = apply_artifact_dedup_canary( llm_items, salt=salt, min_block_chars=40, + span_links=span_links, ) if result and result.mutated: for item, idx in zip(llm_items, message_indexes): diff --git a/contextpilot/hermes_opportunities/artifact_dedup_canary.py b/contextpilot/hermes_opportunities/artifact_dedup_canary.py index 3588b13..fca203f 100644 --- a/contextpilot/hermes_opportunities/artifact_dedup_canary.py +++ b/contextpilot/hermes_opportunities/artifact_dedup_canary.py @@ -56,6 +56,7 @@ # The only duplicate class this canary acts on: an exact-duplicate full artifact # body across the mutable artifact types. ARTIFACT_DEDUP_CLASS = "same_payload_exact_artifact_body" +ARTIFACT_SPAN_PROVENANCE_CLASS = "declared_source_span_backref" # Deterministic placeholder left in place of a later duplicate body. ```` # is the CANONICAL (first) body's provenance and ```` its salted @@ -70,6 +71,23 @@ _REF_HEAD = ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE.split("", 1)[0] +@dataclass +class ArtifactSpanLink: + """Declared source-span provenance edge, in Python string offsets. + + The canary treats this as untrusted metadata: it rewrites only when the + declared target slice byte-equals the earlier source slice and all scope / + line-alignment / never-grow gates pass. + """ + + source_index: int + source_start: int + source_end: int + target_index: int + target_start: int + target_end: int + + @dataclass class ArtifactDedupCanaryResult: """Metadata-only outcome of an artifact-dedup canary pass. No raw text, ever. @@ -88,6 +106,10 @@ class ArtifactDedupCanaryResult: candidate_chars: int # advisory chars later occurrences occupy blocks_replaced: int # REALIZED replacements (canary only) chars_saved: int # REALIZED chars saved (canary only) + span_candidate_count: int = 0 # advisory declared span replacements + span_candidate_chars: int = 0 # advisory chars later declared spans occupy + span_blocks_replaced: int = 0 # REALIZED source-span replacements + span_chars_saved: int = 0 # REALIZED source-span chars saved notes: list[str] = field(default_factory=list) @@ -186,6 +208,104 @@ def _scan_fenced_block_candidates( return _eligible_groups(agg) +def _in_range(text: str, start: int, end: int) -> bool: + return 0 <= start < end <= len(text) + + +def _line_aligned(text: str, start: int, end: int) -> bool: + """Require standalone line spans so emitted refs are standalone lines.""" + return ( + _in_range(text, start, end) + and (start == 0 or text[start - 1] == "\n") + and (end == len(text) or text[end] == "\n") + ) + + +def _valid_span_link( + items: list[_LLMContent], link: ArtifactSpanLink, *, min_block_chars: int, salt: str +) -> tuple[str, str, str] | None: + """Return ``(target_text, ref, source_hash)`` if a declared link is safe.""" + if not (0 <= link.source_index < len(items) and 0 <= link.target_index < len(items)): + return None + if link.source_index >= link.target_index: + return None + source = items[link.source_index] + target = items[link.target_index] + if source.block_type != "tool_result" or target.block_type != "assistant_context": + return None + if not _line_aligned(source.content, link.source_start, link.source_end): + return None + if not _line_aligned(target.content, link.target_start, link.target_end): + return None + source_text = source.content[link.source_start:link.source_end] + target_text = target.content[link.target_start:link.target_end] + if len(target_text) < min_block_chars or target_text != source_text: + return None + h = _salted_hash(source_text, salt) + ref = _artifact_reference_string(f"{source.block_type}#span", h) + if len(ref) >= len(target_text): + return None + return target_text, ref, h + + +def _scan_span_candidates( + items: list[_LLMContent], span_links: Iterable[ArtifactSpanLink], *, salt: str, min_block_chars: int +) -> tuple[int, int]: + count = 0 + chars = 0 + seen_targets: set[tuple[int, int, int]] = set() + for link in span_links: + valid = _valid_span_link(items, link, min_block_chars=min_block_chars, salt=salt) + key = (link.target_index, link.target_start, link.target_end) + if valid is None or key in seen_targets: + continue + seen_targets.add(key) + target_text, _ref, _h = valid + count += 1 + chars += len(target_text) + return count, chars + + +def _apply_span_links( + items: list[_LLMContent], span_links: Iterable[ArtifactSpanLink], *, salt: str, min_block_chars: int +) -> tuple[int, int, set[int], set[int]]: + """Apply safe declared source-span replacements right-to-left per target.""" + by_target: dict[int, dict[tuple[int, int], tuple[int, int, str, int, int]]] = {} + for link in span_links: + valid = _valid_span_link(items, link, min_block_chars=min_block_chars, salt=salt) + if valid is None: + continue + target_text, ref, _h = valid + by_target.setdefault(link.target_index, {})[(link.target_start, link.target_end)] = ( + link.target_start, link.target_end, ref, len(target_text), link.source_index + ) + + blocks = 0 + saved = 0 + mutated_targets: set[int] = set() + preserved_sources: set[int] = set() + for target_index, replacement_map in by_target.items(): + # First-cut validation scope certifies one line-aligned source-span swap + # per target body. Keep multi-span targets in shadow/advisory until the + # gate can prove several replacements in one message. + if len(replacement_map) != 1: + continue + replacements = list(replacement_map.values()) + replacements.sort(key=lambda r: r[0], reverse=True) + ordered = sorted(replacements, key=lambda r: r[0]) + if any(a[1] > b[0] for a, b in zip(ordered, ordered[1:])): + continue + body = items[target_index].content + for start, end, ref, old_len, source_index in replacements: + body = body[:start] + ref + body[end:] + blocks += 1 + saved += old_len - len(ref) + preserved_sources.add(source_index) + items[target_index].content = body + mutated_targets.add(target_index) + return blocks, saved, mutated_targets, preserved_sources + + def _scan_artifacts( contents: list[_LLMContent], *, salt: str, min_block_chars: int ) -> tuple[dict[str, dict], int]: @@ -247,6 +367,7 @@ def apply_artifact_dedup_canary( min_block_chars: int, mode: str | None = None, env: dict | None = None, + span_links: Iterable[ArtifactSpanLink] | None = None, ) -> ArtifactDedupCanaryResult: """Run the artifact-dedup canary over LLM-bound content. @@ -260,6 +381,7 @@ def apply_artifact_dedup_canary( the mode comes from :func:`resolve_artifact_dedup_mode`. """ items = list(contents) + links = list(span_links or []) resolved = mode if mode is not None else resolve_artifact_dedup_mode(env) if resolved not in ARTIFACT_DEDUP_MODES: resolved = DEFAULT_ARTIFACT_DEDUP_MODE @@ -283,8 +405,11 @@ def apply_artifact_dedup_canary( block_group_count, block_candidate_chars = _scan_fenced_block_candidates( items, salt=salt, min_block_chars=min_block_chars ) - candidate_group_count += block_group_count - candidate_chars += block_candidate_chars + span_candidate_count, span_candidate_chars = _scan_span_candidates( + items, links, salt=salt, min_block_chars=min_block_chars + ) + candidate_group_count += block_group_count + span_candidate_count + candidate_chars += block_candidate_chars + span_candidate_chars if resolved == "shadow": # Measure what a canary would replace, but never touch the payload. @@ -297,17 +422,28 @@ def apply_artifact_dedup_canary( candidate_chars=candidate_chars, blocks_replaced=0, chars_saved=0, + span_candidate_count=span_candidate_count, + span_candidate_chars=span_candidate_chars, notes=["artifact-dedup canary shadow: candidates measured, payload unchanged"], ) # --- canary: the ONLY branch that mutates LLM-bound payload --------------- blocks_replaced = 0 chars_saved = 0 + span_blocks_replaced, span_chars_saved, span_mutated_targets, span_preserved_sources = _apply_span_links( + items, links, salt=salt, min_block_chars=min_block_chars + ) + blocks_replaced += span_blocks_replaced + chars_saved += span_chars_saved # hash -> canonical provenance type of the first (kept) occurrence. canonical: dict[str, str] = {} # hash -> canonical provenance type for exact fenced sub-artifacts. block_canonical: dict[str, str] = {} - for item in items: + for idx, item in enumerate(items): + if idx in span_mutated_targets or idx in span_preserved_sources: + if item.block_type in MUTABLE_ARTIFACT_BLOCK_TYPES: + canonical[_salted_hash(item.content, salt)] = item.block_type + continue if item.block_type not in MUTABLE_ARTIFACT_BLOCK_TYPES: continue # protected content is never touched body = item.content @@ -374,12 +510,16 @@ def apply_artifact_dedup_canary( candidate_chars=candidate_chars, blocks_replaced=blocks_replaced, chars_saved=chars_saved, + span_candidate_count=span_candidate_count, + span_candidate_chars=span_candidate_chars, + span_blocks_replaced=span_blocks_replaced, + span_chars_saved=span_chars_saved, notes=["artifact-dedup canary active: exact duplicate artifact bodies only"], ) def dangling_artifact_references( - contents: Iterable[_LLMContent], *, salt: str + contents: Iterable[_LLMContent], *, salt: str, span_links: Iterable[ArtifactSpanLink] | None = None ) -> list[int]: """Return indices of artifact references that do not resolve to an earlier body. @@ -387,9 +527,13 @@ def dangling_artifact_references( full canonical body whose salted hash matches the reference. A reference with no such earlier canonical body (or one that only appears later) is dangling. """ - seen_full: set[str] = set() # hashes of earlier full canonical artifact bodies/blocks + seen_full: set[str] = set() # hashes of earlier full canonical artifact bodies/blocks/spans dangling: list[int] = [] - for idx, item in enumerate(contents): + items = list(contents) + span_by_source: dict[int, list[ArtifactSpanLink]] = {} + for link in span_links or []: + span_by_source.setdefault(link.source_index, []).append(link) + for idx, item in enumerate(items): body = item.content ref_hash = _parse_artifact_reference(body) if ref_hash is not None: @@ -401,6 +545,14 @@ def dangling_artifact_references( # Whole artifact body can satisfy whole-body references. seen_full.add(_salted_hash(body, salt)) + # Declared source spans can satisfy later #span references, but only from + # their earlier source item and only when the declared source slice is + # still byte-identical in the payload. + for link in span_by_source.get(idx, []): + if link.source_index >= link.target_index: + continue + if _line_aligned(body, link.source_start, link.source_end): + seen_full.add(_salted_hash(body[link.source_start:link.source_end], salt)) # Within a body, ordering matters: an earlier fenced block can satisfy a # later reference segment in the same body, but a later block cannot. @@ -436,6 +588,8 @@ def build_artifact_canary_telemetry_record(result: ArtifactDedupCanaryResult) -> "artifact_dedup_mode": result.mode, "artifact_dedup_class": result.artifact_dedup_class, "artifact_dedup_blocks_replaced": result.blocks_replaced if result.mutated else 0, + "artifact_span_blocks_replaced": result.span_blocks_replaced if result.mutated else 0, + "artifact_span_chars_saved": result.span_chars_saved if result.mutated else 0, # Separated field: always present, mirrors the realized artifact-dedup save. "artifact_dedup_chars_saved": realized, # Aggregate total: includes artifact dedup only when a mutation occurred. diff --git a/contextpilot/trace_validation/runner.py b/contextpilot/trace_validation/runner.py index 42aa940..a34fe7e 100644 --- a/contextpilot/trace_validation/runner.py +++ b/contextpilot/trace_validation/runner.py @@ -42,6 +42,7 @@ from contextpilot.hermes_opportunities.artifact_dedup_canary import ( MUTABLE_ARTIFACT_BLOCK_TYPES, ArtifactDedupCanaryResult, + ArtifactSpanLink, _parse_artifact_reference, _segment_fenced_blocks, apply_artifact_dedup_canary, @@ -380,8 +381,27 @@ def render_markdown(report: ValidationReport) -> str: ] +def _span_links(case: dict) -> list[ArtifactSpanLink]: + links = [] + for raw in case.get("span_links") or []: + try: + links.append( + ArtifactSpanLink( + source_index=int(raw["source_index"]), + source_start=int(raw["source_start"]), + source_end=int(raw["source_end"]), + target_index=int(raw["target_index"]), + target_start=int(raw["target_start"]), + target_end=int(raw["target_end"]), + ) + ) + except (KeyError, TypeError, ValueError): + continue + return links + + def optimize_artifact_case( - messages: list[dict], *, mode: str, salt: str, min_block_chars: int + messages: list[dict], *, mode: str, salt: str, min_block_chars: int, span_links: list[ArtifactSpanLink] | None = None ) -> tuple[list[dict], ArtifactDedupCanaryResult]: """Run the artifact-dedup canary over a case's messages in the given mode. @@ -391,7 +411,7 @@ def optimize_artifact_case( """ contents = [_LLMContent(m["block_type"], m["content"]) for m in messages] result = apply_artifact_dedup_canary( - contents, salt=salt, min_block_chars=min_block_chars, mode=mode + contents, salt=salt, min_block_chars=min_block_chars, mode=mode, span_links=span_links ) out = [ {"role": m["role"], "block_type": m["block_type"], "content": c.content} @@ -414,6 +434,34 @@ def _artifact_mutation_scope_ok(base: dict, cand: dict) -> bool: if _parse_artifact_reference(cand["content"]) is not None: return True + # Declared source-span replacement: a byte-identical line-aligned span may be + # swapped for one standalone strictly shorter reference while surrounding + # prose remains byte-identical. + base_text = base["content"] + cand_text = cand["content"] + prefix = 0 + while prefix < len(base_text) and prefix < len(cand_text) and base_text[prefix] == cand_text[prefix]: + prefix += 1 + suffix = 0 + while ( + suffix < len(base_text) - prefix + and suffix < len(cand_text) - prefix + and base_text[len(base_text) - 1 - suffix] == cand_text[len(cand_text) - 1 - suffix] + ): + suffix += 1 + base_end = len(base_text) - suffix + cand_end = len(cand_text) - suffix + old_mid = base_text[prefix:base_end] + new_mid = cand_text[prefix:cand_end] + if ( + old_mid + and _parse_artifact_reference(new_mid.strip()) is not None + and len(new_mid.strip()) < len(old_mid) + and (prefix == 0 or base_text[prefix - 1] == "\n") + and (base_end == len(base_text) or base_text[base_end] == "\n") + ): + return True + # Fenced sub-artifact replacement: prose must be byte-identical and only a # whole fenced segment may be swapped for one strictly shorter reference line. pos = 0 @@ -443,6 +491,7 @@ def check_artifact_invariants( result: ArtifactDedupCanaryResult, *, salt: str, + span_links: list[ArtifactSpanLink] | None = None, ) -> tuple[dict[str, bool], int]: """Check accuracy-preservation invariants for an artifact-dedup pass. @@ -471,7 +520,7 @@ def check_artifact_invariants( _LLMContent(c["block_type"], c["content"]) for c in candidate ] inv["artifact_reference_resolvable"] = ( - dangling_artifact_references(cand_contents, salt=salt) == [] + dangling_artifact_references(cand_contents, salt=salt, span_links=span_links) == [] ) realized = sum( len(b["content"]) - len(c["content"]) @@ -517,15 +566,24 @@ def run_artifact_validation( for case in cases: msgs = _messages(case) - baseline_msgs, _ = optimize_fn( - list(msgs), mode=baseline_mode, salt=salt, min_block_chars=min_block_chars - ) - candidate_msgs, result = optimize_fn( - list(msgs), mode=candidate_mode, salt=salt, min_block_chars=min_block_chars - ) + span_links = _span_links(case) + if span_links: + baseline_msgs, _ = optimize_fn( + list(msgs), mode=baseline_mode, salt=salt, min_block_chars=min_block_chars, span_links=span_links + ) + candidate_msgs, result = optimize_fn( + list(msgs), mode=candidate_mode, salt=salt, min_block_chars=min_block_chars, span_links=span_links + ) + else: + baseline_msgs, _ = optimize_fn( + list(msgs), mode=baseline_mode, salt=salt, min_block_chars=min_block_chars + ) + candidate_msgs, result = optimize_fn( + list(msgs), mode=candidate_mode, salt=salt, min_block_chars=min_block_chars + ) inv, realized = check_artifact_invariants( - baseline_msgs, candidate_msgs, result, salt=salt + baseline_msgs, candidate_msgs, result, salt=salt, span_links=span_links ) failed = [name for name, ok in inv.items() if not ok] diff --git a/tests/test_artifact_dedup_canary.py b/tests/test_artifact_dedup_canary.py index 755596a..803f3e0 100644 --- a/tests/test_artifact_dedup_canary.py +++ b/tests/test_artifact_dedup_canary.py @@ -34,6 +34,7 @@ ARTIFACT_DEDUP_MODE_ENV, MUTABLE_ARTIFACT_BLOCK_TYPES, ArtifactDedupCanaryResult, + ArtifactSpanLink, apply_artifact_dedup_canary, build_artifact_canary_telemetry_record, dangling_artifact_references, @@ -84,6 +85,21 @@ f"{LONG_FENCE_BLOCK}\n" "Parent aggregation summary after duplicate artifact." ) +SOURCE_SPAN_BLOCK = ( + "worker-span-line-001 provenance payload alpha bravo charlie\n" + "worker-span-line-002 provenance payload delta echo foxtrot\n" + "worker-span-line-003 provenance payload golf hotel india" +) +SOURCE_SPAN_TOOL = ( + "tool preamble stays canonical\n" + f"{SOURCE_SPAN_BLOCK}\n" + "tool epilogue stays canonical" +) +SOURCE_SPAN_PARENT = ( + "parent summary before copied worker span\n" + f"{SOURCE_SPAN_BLOCK}\n" + "parent summary after copied worker span" +) def _ref(body: str, *, canonical_type: str = "tool_result") -> str: @@ -99,6 +115,23 @@ def _block_ref(block: str, *, canonical_type: str = "tool_result#block") -> str: return _artifact_reference_string(canonical_type, _salted_hash(block, SALT)) +def _source_span_link() -> ArtifactSpanLink: + src_start = SOURCE_SPAN_TOOL.index(SOURCE_SPAN_BLOCK) + tgt_start = SOURCE_SPAN_PARENT.index(SOURCE_SPAN_BLOCK) + return ArtifactSpanLink( + source_index=0, + source_start=src_start, + source_end=src_start + len(SOURCE_SPAN_BLOCK), + target_index=1, + target_start=tgt_start, + target_end=tgt_start + len(SOURCE_SPAN_BLOCK), + ) + + +def _span_ref(span: str, *, canonical_type: str = "tool_result#span") -> str: + return _artifact_reference_string(canonical_type, _salted_hash(span, SALT)) + + # --------------------------------------------------------------------------- # Mode resolution + escape hatch (default OFF) # --------------------------------------------------------------------------- @@ -303,6 +336,112 @@ def test_unterminated_fence_is_treated_as_prose_and_not_mutated(): assert result.blocks_replaced == 0 +# --------------------------------------------------------------------------- +# Level 2: declared source-span provenance (metadata-driven, not discovery) +# --------------------------------------------------------------------------- + + +def test_source_span_canary_replaces_declared_parent_span_only(): + contents = [ + _LLMContent("tool_result", SOURCE_SPAN_TOOL), + _LLMContent("assistant_context", SOURCE_SPAN_PARENT), + ] + link = _source_span_link() + + result = apply_artifact_dedup_canary( + contents, + salt=SALT, + min_block_chars=MIN, + mode="canary", + span_links=[link], + ) + + expected_ref = _span_ref(SOURCE_SPAN_BLOCK) + assert contents[0].content == SOURCE_SPAN_TOOL + assert contents[1].content == SOURCE_SPAN_PARENT.replace(SOURCE_SPAN_BLOCK, expected_ref) + assert result.span_blocks_replaced == 1 + assert result.span_chars_saved == len(SOURCE_SPAN_BLOCK) - len(expected_ref) + assert result.blocks_replaced == 1 + assert result.chars_saved == result.span_chars_saved + assert dangling_artifact_references(contents, salt=SALT, span_links=[link]) == [] + + +def test_source_span_shadow_measures_without_mutating(): + contents = [ + _LLMContent("tool_result", SOURCE_SPAN_TOOL), + _LLMContent("assistant_context", SOURCE_SPAN_PARENT), + ] + before = [c.content for c in contents] + + result = apply_artifact_dedup_canary( + contents, + salt=SALT, + min_block_chars=MIN, + mode="shadow", + span_links=[_source_span_link()], + ) + + assert [c.content for c in contents] == before + assert result.span_blocks_replaced == 0 + assert result.span_candidate_count == 1 + assert result.span_candidate_chars == len(SOURCE_SPAN_BLOCK) + + +def test_source_span_mismatch_or_forward_link_is_not_mutated(): + mismatch_parent = SOURCE_SPAN_PARENT.replace("alpha", "ALPHA", 1) + contents = [ + _LLMContent("tool_result", SOURCE_SPAN_TOOL), + _LLMContent("assistant_context", mismatch_parent), + ] + result = apply_artifact_dedup_canary( + contents, + salt=SALT, + min_block_chars=MIN, + mode="canary", + span_links=[_source_span_link()], + ) + assert contents[1].content == mismatch_parent + assert result.span_blocks_replaced == 0 + + forward = ArtifactSpanLink(1, 0, len(SOURCE_SPAN_BLOCK), 0, 0, len(SOURCE_SPAN_BLOCK)) + before = [c.content for c in contents] + result = apply_artifact_dedup_canary( + contents, + salt=SALT, + min_block_chars=MIN, + mode="canary", + span_links=[forward], + ) + assert [c.content for c in contents] == before + assert result.span_blocks_replaced == 0 + + +def test_source_span_rejects_protected_or_inline_scope(): + inline_parent = SOURCE_SPAN_PARENT.replace("\n" + SOURCE_SPAN_BLOCK + "\n", SOURCE_SPAN_BLOCK) + contents = [ + _LLMContent("tool_result", SOURCE_SPAN_TOOL), + _LLMContent("assistant_context", inline_parent), + _LLMContent("user_prompt", SOURCE_SPAN_PARENT), + ] + inline_start = inline_parent.index(SOURCE_SPAN_BLOCK) + links = [ + ArtifactSpanLink(0, SOURCE_SPAN_TOOL.index(SOURCE_SPAN_BLOCK), SOURCE_SPAN_TOOL.index(SOURCE_SPAN_BLOCK) + len(SOURCE_SPAN_BLOCK), 1, inline_start, inline_start + len(SOURCE_SPAN_BLOCK)), + ArtifactSpanLink(0, SOURCE_SPAN_TOOL.index(SOURCE_SPAN_BLOCK), SOURCE_SPAN_TOOL.index(SOURCE_SPAN_BLOCK) + len(SOURCE_SPAN_BLOCK), 2, SOURCE_SPAN_PARENT.index(SOURCE_SPAN_BLOCK), SOURCE_SPAN_PARENT.index(SOURCE_SPAN_BLOCK) + len(SOURCE_SPAN_BLOCK)), + ] + before = [c.content for c in contents] + + result = apply_artifact_dedup_canary( + contents, + salt=SALT, + min_block_chars=MIN, + mode="canary", + span_links=links, + ) + + assert [c.content for c in contents] == before + assert result.span_blocks_replaced == 0 + + def test_canary_reference_carries_no_raw_artifact_body(): contents = [ _LLMContent("tool_result", LONG_ARTIFACT), @@ -502,11 +641,15 @@ def test_telemetry_is_metadata_only_no_artifact_text(): "artifact_dedup_mode", "artifact_dedup_class", "artifact_dedup_blocks_replaced", + "artifact_span_blocks_replaced", + "artifact_span_chars_saved", "artifact_dedup_chars_saved", "chars_saved", } for key in ( "artifact_dedup_blocks_replaced", + "artifact_span_blocks_replaced", + "artifact_span_chars_saved", "artifact_dedup_chars_saved", "chars_saved", ): From 9f40523eabb9c69ea33d046289868fad53b0de3b Mon Sep 17 00:00:00 2001 From: root Date: Fri, 19 Jun 2026 00:38:12 +0200 Subject: [PATCH 4/5] fix: validate source-span refs by declared offsets --- contextpilot/trace_validation/runner.py | 34 ++++++++++++++++++-- tests/test_artifact_dedup_canary.py | 41 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/contextpilot/trace_validation/runner.py b/contextpilot/trace_validation/runner.py index a34fe7e..c70b677 100644 --- a/contextpilot/trace_validation/runner.py +++ b/contextpilot/trace_validation/runner.py @@ -45,6 +45,7 @@ ArtifactSpanLink, _parse_artifact_reference, _segment_fenced_blocks, + _line_aligned, apply_artifact_dedup_canary, dangling_artifact_references, resolve_artifact_dedup_mode, @@ -420,7 +421,13 @@ def optimize_artifact_case( return out, result -def _artifact_mutation_scope_ok(base: dict, cand: dict) -> bool: +def _artifact_mutation_scope_ok( + idx: int, + base: dict, + cand: dict, + *, + span_links: list[ArtifactSpanLink] | None = None, +) -> bool: """A single message changed only within the allowed (artifact-only) scope.""" if base["content"] == cand["content"]: return True @@ -434,6 +441,28 @@ def _artifact_mutation_scope_ok(base: dict, cand: dict) -> bool: if _parse_artifact_reference(cand["content"]) is not None: return True + # Declared source-span replacement: validate against the declared target + # offsets instead of maximal prefix/suffix inference. Prefix/suffix inference + # can accidentally consume a trailing ']' from the replacement reference when + # the original copied span also ends with ']', causing a false gate failure. + for link in span_links or []: + if link.target_index != idx: + continue + if not _line_aligned(base["content"], link.target_start, link.target_end): + continue + prefix_text = base["content"][: link.target_start] + suffix_text = base["content"][link.target_end :] + if not (cand["content"].startswith(prefix_text) and cand["content"].endswith(suffix_text)): + continue + new_mid = cand["content"][len(prefix_text) : len(cand["content"]) - len(suffix_text)] + old_mid = base["content"][link.target_start : link.target_end] + if ( + old_mid + and _parse_artifact_reference(new_mid) is not None + and len(new_mid) < len(old_mid) + ): + return True + # Declared source-span replacement: a byte-identical line-aligned span may be # swapped for one standalone strictly shorter reference while surrounding # prose remains byte-identical. @@ -514,7 +543,8 @@ def check_artifact_invariants( if b["block_type"] not in MUTABLE_ARTIFACT_BLOCK_TYPES ) inv["artifact_mutation_scope_allowed"] = all( - _artifact_mutation_scope_ok(b, c) for b, c in zip(baseline, candidate) + _artifact_mutation_scope_ok(i, b, c, span_links=span_links) + for i, (b, c) in enumerate(zip(baseline, candidate)) ) cand_contents = [ _LLMContent(c["block_type"], c["content"]) for c in candidate diff --git a/tests/test_artifact_dedup_canary.py b/tests/test_artifact_dedup_canary.py index 803f3e0..a9b4792 100644 --- a/tests/test_artifact_dedup_canary.py +++ b/tests/test_artifact_dedup_canary.py @@ -730,6 +730,47 @@ def test_artifact_runner_shadow_passes_without_realized_savings(): assert all(not c.mutated for c in report.cases) +def test_artifact_runner_accepts_source_span_that_ends_with_reference_suffix_char(): + source_span = ( + "worker copied JSON-ish payload line 001 alpha bravo charlie delta echo\n" + "worker copied JSON-ish payload line 002 foxtrot golf hotel india juliet\n" + "worker copied JSON-ish payload line 003 kilo lima mike november oscar]" + ) + tool = "tool wrapper before\n" + source_span + "\ntool wrapper after" + parent = "parent summary before\n" + source_span + "\nparent summary after" + link = ArtifactSpanLink( + source_index=0, + source_start=tool.index(source_span), + source_end=tool.index(source_span) + len(source_span), + target_index=1, + target_start=parent.index(source_span), + target_end=parent.index(source_span) + len(source_span), + ) + case = { + "case_id": "syn-art-span-bracket", + "source": "synthetic", + "span_links": [link.__dict__], + "messages": [ + {"role": "tool", "block_type": "tool_result", "content": tool}, + {"role": "assistant", "block_type": "assistant_context", "content": parent}, + ], + } + + report = run_artifact_validation( + [case], + baseline_mode="off", + candidate_mode="canary", + salt=SALT, + min_block_chars=MIN, + date="2026-06-15", + ) + + assert report.passed is True + assert report.failed_cases == 0 + assert report.total_blocks_replaced == 1 + assert report.total_chars_saved > 0 + + def test_artifact_runner_report_is_privacy_safe(): report = run_artifact_validation( _artifact_cases(), From 1c04dae9b03bbbb8eed03a87133ba7dbf532cfac Mon Sep 17 00:00:00 2001 From: root Date: Fri, 19 Jun 2026 01:14:47 +0200 Subject: [PATCH 5/5] test: add synthetic artifact precision eval --- ...tifact_precision_synthetic_2026-06-19.json | 172 ++++++++++++++ scripts/evaluate_artifact_precision.py | 223 ++++++++++++++++++ tests/test_artifact_precision_eval.py | 47 ++++ 3 files changed, 442 insertions(+) create mode 100644 evals/artifact_precision_synthetic_2026-06-19.json create mode 100644 scripts/evaluate_artifact_precision.py create mode 100644 tests/test_artifact_precision_eval.py diff --git a/evals/artifact_precision_synthetic_2026-06-19.json b/evals/artifact_precision_synthetic_2026-06-19.json new file mode 100644 index 0000000..73ec064 --- /dev/null +++ b/evals/artifact_precision_synthetic_2026-06-19.json @@ -0,0 +1,172 @@ +{ + "schema_version": 1, + "generated_at": "2026-06-19T01:12:09+0200", + "corpus": "synthetic_labeled_artifact_precision_v1", + "claim_scope": "synthetic exact/provenance gate self-consistency; not field/model/product precision", + "case_count": 16, + "synthetic_event_tp": 7, + "synthetic_event_fp": 0, + "synthetic_event_fn": 0, + "synthetic_negative_case_tn": 10, + "synthetic_negative_case_fpr": 0.0, + "synthetic_event_precision": 1.0, + "synthetic_event_recall": 1.0, + "synthetic_case_accuracy": 1.0, + "predicted_replacements": 7, + "expected_replacements": 7, + "synthetic_realized_chars_saved": 2158, + "mode_gate_checks": { + "off_no_mutation": true, + "shadow_no_mutation": true, + "disable_env_no_mutation": true + }, + "validation_gate_checks": { + "forged_reference_detected": true + }, + "rows": [ + { + "name": "whole_tool_exact_duplicate", + "expected_replacements": 1, + "actual_replacements": 1, + "pass": true, + "chars_saved": 293, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "whole_cross_type_exact_duplicate", + "expected_replacements": 1, + "actual_replacements": 1, + "pass": true, + "chars_saved": 293, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "whole_near_duplicate_not_mutated", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "fenced_internal_duplicate", + "expected_replacements": 1, + "actual_replacements": 1, + "pass": true, + "chars_saved": 358, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "two_fenced_duplicates", + "expected_replacements": 2, + "actual_replacements": 2, + "pass": true, + "chars_saved": 716, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "protected_user_system_duplicates", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "short_duplicate_never_grow", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "unterminated_fence_not_mutated", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "copied_plain_span_without_declared_link", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "protected_duplicate_tool_vs_user", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "protected_duplicate_tool_vs_system", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "declared_source_span_exact", + "expected_replacements": 1, + "actual_replacements": 1, + "pass": true, + "chars_saved": 249, + "span_replacements": 1, + "dangling": [] + }, + { + "name": "declared_span_content_differs", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "forward_span_link_rejected", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "oob_span_link_rejected", + "expected_replacements": 0, + "actual_replacements": 0, + "pass": true, + "chars_saved": 0, + "span_replacements": 0, + "dangling": [] + }, + { + "name": "duplicate_span_declaration_counts_once", + "expected_replacements": 1, + "actual_replacements": 1, + "pass": true, + "chars_saved": 249, + "span_replacements": 1, + "dangling": [] + } + ] +} diff --git a/scripts/evaluate_artifact_precision.py b/scripts/evaluate_artifact_precision.py new file mode 100644 index 0000000..be68213 --- /dev/null +++ b/scripts/evaluate_artifact_precision.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +"""Run a reproducible synthetic precision/recall check for artifact canary rewrites. + +This is intentionally *not* a product/model precision benchmark. It is a small, +hand-labeled synthetic self-consistency suite that checks the current exact +whole-body, fenced-block, and declared source-span rewrite gates against planted +positive/negative cases. Top-level metric names are prefixed with ``synthetic`` +so they are not confused with field precision on real traces. +""" +from __future__ import annotations + +import argparse +import json +import os +import pathlib +import sys +import time +from dataclasses import dataclass +from typing import Iterable + +REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from contextpilot.hermes_opportunities.artifact_dedup_canary import ( + ARTIFACT_DEDUP_DISABLE_ENV, + ARTIFACT_DEDUP_MODE_ENV, + ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE, + ArtifactSpanLink, + apply_artifact_dedup_canary, + dangling_artifact_references, +) +from contextpilot.hermes_opportunities.models import _LLMContent + +SALT = "precision-eval-salt" +MIN = 40 +ART = ("Synthetic artifact payload alpha bravo charlie delta echo foxtrot. " * 6).strip() +SHORT = "Short duplicate body just above forty chars." +FENCE = ( + "```log\n" + + ("synthetic repeated fenced worker output alpha bravo charlie\n" * 8).rstrip("\n") + + "\n```" +) +SPAN = ("declared source span line alpha bravo charlie\n" * 8).rstrip("\n") + + +@dataclass(frozen=True) +class LabeledCase: + name: str + expected_replacements: int + items: list[_LLMContent] + span_links: list[ArtifactSpanLink] + + +def _content(block_type: str, text: str) -> _LLMContent: + return _LLMContent(block_type, text) + + +def _clone(items: Iterable[_LLMContent]) -> list[_LLMContent]: + return [_LLMContent(item.block_type, item.content) for item in items] + + +def _span_case(*, exact: bool = True, forward: bool = False, oob: bool = False) -> tuple[list[_LLMContent], list[ArtifactSpanLink]]: + src = "tool pre\n" + SPAN + "\ntool post\n" + target_span = SPAN if exact else SPAN.replace("charlie", "changed", 1) + tgt = "parent pre\n" + target_span + "\nparent post\n" + source_start = src.index(SPAN) + source_end = source_start + len(SPAN) + target_start = tgt.index(target_span) + target_end = target_start + len(target_span) + items = [_content("tool_result", src), _content("assistant_context", tgt)] + if forward: + link = ArtifactSpanLink(1, target_start, target_end, 0, source_start, source_end) + elif oob: + link = ArtifactSpanLink(0, source_start, len(src) + 100, 1, target_start, target_end) + else: + link = ArtifactSpanLink(0, source_start, source_end, 1, target_start, target_end) + return items, [link] + + +def build_cases() -> list[LabeledCase]: + cases: list[LabeledCase] = [ + LabeledCase("whole_tool_exact_duplicate", 1, [_content("tool_result", ART), _content("tool_result", ART)], []), + LabeledCase("whole_cross_type_exact_duplicate", 1, [_content("tool_result", ART), _content("assistant_context", ART)], []), + LabeledCase("whole_near_duplicate_not_mutated", 0, [_content("tool_result", ART), _content("tool_result", ART + " changed")], []), + LabeledCase("fenced_internal_duplicate", 1, [_content("assistant_context", "before\n" + FENCE + "\nmiddle\n" + FENCE + "\nafter")], []), + LabeledCase("two_fenced_duplicates", 2, [_content("assistant_context", "a\n" + FENCE + "\nb\n" + FENCE + "\nc\n" + FENCE + "\nd")], []), + LabeledCase("protected_user_system_duplicates", 0, [_content("user_ctx", ART), _content("system_ctx", ART)], []), + LabeledCase("short_duplicate_never_grow", 0, [_content("tool_result", SHORT), _content("tool_result", SHORT)], []), + LabeledCase("unterminated_fence_not_mutated", 0, [_content("assistant_context", ("prefix\n```log\n" + ("unterminated line alpha bravo charlie\n" * 8)) * 2)], []), + LabeledCase("copied_plain_span_without_declared_link", 0, [_content("tool_result", "tool\n" + SPAN + "\nend"), _content("assistant_context", "parent\n" + SPAN + "\nend")], []), + LabeledCase("protected_duplicate_tool_vs_user", 0, [_content("tool_result", ART), _content("user_ctx", ART)], []), + LabeledCase("protected_duplicate_tool_vs_system", 0, [_content("tool_result", ART), _content("system_ctx", ART)], []), + ] + items, links = _span_case(exact=True) + cases.append(LabeledCase("declared_source_span_exact", 1, items, links)) + items, links = _span_case(exact=False) + cases.append(LabeledCase("declared_span_content_differs", 0, items, links)) + items, links = _span_case(exact=True, forward=True) + cases.append(LabeledCase("forward_span_link_rejected", 0, items, links)) + items, links = _span_case(exact=True, oob=True) + cases.append(LabeledCase("oob_span_link_rejected", 0, items, links)) + items, links = _span_case(exact=True) + # Duplicate declaration for the same target is deduplicated to one event. + cases.append(LabeledCase("duplicate_span_declaration_counts_once", 1, items, links + links)) + return cases + + +def _mode_gate_checks() -> dict[str, bool]: + base = [_content("tool_result", ART), _content("tool_result", ART)] + off_items = _clone(base) + off = apply_artifact_dedup_canary(off_items, salt=SALT, min_block_chars=MIN, mode="off") + shadow_items = _clone(base) + shadow = apply_artifact_dedup_canary(shadow_items, salt=SALT, min_block_chars=MIN, mode="shadow") + disable_items = _clone(base) + old_mode = os.environ.get(ARTIFACT_DEDUP_MODE_ENV) + old_disable = os.environ.get(ARTIFACT_DEDUP_DISABLE_ENV) + try: + os.environ[ARTIFACT_DEDUP_MODE_ENV] = "canary" + os.environ[ARTIFACT_DEDUP_DISABLE_ENV] = "1" + disabled = apply_artifact_dedup_canary(disable_items, salt=SALT, min_block_chars=MIN) + finally: + if old_mode is None: + os.environ.pop(ARTIFACT_DEDUP_MODE_ENV, None) + else: + os.environ[ARTIFACT_DEDUP_MODE_ENV] = old_mode + if old_disable is None: + os.environ.pop(ARTIFACT_DEDUP_DISABLE_ENV, None) + else: + os.environ[ARTIFACT_DEDUP_DISABLE_ENV] = old_disable + return { + "off_no_mutation": not off.mutated and [i.content for i in off_items] == [i.content for i in base], + "shadow_no_mutation": not shadow.mutated and [i.content for i in shadow_items] == [i.content for i in base], + "disable_env_no_mutation": not disabled.mutated and [i.content for i in disable_items] == [i.content for i in base], + } + + +def _validation_gate_checks() -> dict[str, bool]: + forged_ref = ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE.replace("", "tool_result").replace("", "deadbeef") + forged = [_content("assistant_context", forged_ref)] + return { + "forged_reference_detected": dangling_artifact_references(forged, salt=SALT) == [0], + } + + +def build_report() -> dict: + rows = [] + tp = fp = fn = tn = 0 + expected_total = predicted_total = chars_saved = 0 + for case in build_cases(): + items = _clone(case.items) + result = apply_artifact_dedup_canary( + items, + salt=SALT, + min_block_chars=MIN, + mode="canary", + span_links=case.span_links, + ) + actual = result.blocks_replaced + expected = case.expected_replacements + dangling = dangling_artifact_references(items, salt=SALT, span_links=case.span_links) + case_pass = actual == expected and not dangling + this_tp = min(actual, expected) + this_fp = max(0, actual - expected) + this_fn = max(0, expected - actual) + this_tn = 1 if actual == 0 and expected == 0 else 0 + tp += this_tp + fp += this_fp + fn += this_fn + tn += this_tn + expected_total += expected + predicted_total += actual + chars_saved += result.chars_saved + rows.append( + { + "name": case.name, + "expected_replacements": expected, + "actual_replacements": actual, + "pass": case_pass, + "chars_saved": result.chars_saved, + "span_replacements": result.span_blocks_replaced, + "dangling": dangling, + } + ) + + return { + "schema_version": 1, + "generated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"), + "corpus": "synthetic_labeled_artifact_precision_v1", + "claim_scope": "synthetic exact/provenance gate self-consistency; not field/model/product precision", + "case_count": len(rows), + "synthetic_event_tp": tp, + "synthetic_event_fp": fp, + "synthetic_event_fn": fn, + "synthetic_negative_case_tn": tn, + "synthetic_negative_case_fpr": fp / (fp + tn) if fp + tn else 0.0, + "synthetic_event_precision": tp / (tp + fp) if tp + fp else 1.0, + "synthetic_event_recall": tp / (tp + fn) if tp + fn else 1.0, + "synthetic_case_accuracy": sum(1 for row in rows if row["pass"]) / len(rows), + "predicted_replacements": predicted_total, + "expected_replacements": expected_total, + "synthetic_realized_chars_saved": chars_saved, + "mode_gate_checks": _mode_gate_checks(), + "validation_gate_checks": _validation_gate_checks(), + "rows": rows, + } + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--output", type=pathlib.Path, help="Optional JSON output path") + args = parser.parse_args() + report = build_report() + text = json.dumps(report, indent=2, ensure_ascii=False) + if args.output: + args.output.parent.mkdir(parents=True, exist_ok=True) + args.output.write_text(text + "\n") + print(text) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_artifact_precision_eval.py b/tests/test_artifact_precision_eval.py new file mode 100644 index 0000000..b651e3c --- /dev/null +++ b/tests/test_artifact_precision_eval.py @@ -0,0 +1,47 @@ +import json +import subprocess +import sys +from pathlib import Path + +from scripts.evaluate_artifact_precision import build_report + + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def test_synthetic_artifact_precision_report_is_namespaced_and_reproducible(): + report = build_report() + + assert report["corpus"] == "synthetic_labeled_artifact_precision_v1" + assert "precision" not in report + assert "recall" not in report + assert report["synthetic_event_precision"] == 1.0 + assert report["synthetic_event_recall"] == 1.0 + assert report["synthetic_case_accuracy"] == 1.0 + assert report["case_count"] >= 15 + assert report["synthetic_event_tp"] == report["expected_replacements"] + assert report["synthetic_event_fp"] == 0 + assert report["synthetic_event_fn"] == 0 + assert report["mode_gate_checks"] == { + "off_no_mutation": True, + "shadow_no_mutation": True, + "disable_env_no_mutation": True, + } + assert report["synthetic_negative_case_fpr"] == 0.0 + assert report["validation_gate_checks"]["forged_reference_detected"] is True + + +def test_synthetic_artifact_precision_cli_writes_json(tmp_path): + out = tmp_path / "precision.json" + completed = subprocess.run( + [sys.executable, "scripts/evaluate_artifact_precision.py", "--output", str(out)], + cwd=REPO_ROOT, + text=True, + capture_output=True, + check=True, + ) + + stdout_report = json.loads(completed.stdout) + file_report = json.loads(out.read_text()) + assert file_report == stdout_report + assert stdout_report["synthetic_event_precision"] == 1.0