From f30fa84c31bf69f7107b2d1d5c6f0dabe5e7970f Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Jun 2026 17:35:58 +0200 Subject: [PATCH 1/2] feat: add provenance artifact reuse canary --- __init__.py | 94 ++- .../artifact_dedup_canary.py | 323 +++++++++ contextpilot/trace_validation/runner.py | 243 ++++++- docs/guides/hermes-monitor.md | 24 +- docs/guides/trace-validation.md | 21 +- tests/test_artifact_dedup_canary.py | 624 ++++++++++++++++++ tests/test_hermes_plugin_patch.py | 69 ++ 7 files changed, 1387 insertions(+), 11 deletions(-) create mode 100644 contextpilot/hermes_opportunities/artifact_dedup_canary.py create mode 100644 tests/test_artifact_dedup_canary.py diff --git a/__init__.py b/__init__.py index f535d7f..aea0440 100644 --- a/__init__.py +++ b/__init__.py @@ -391,6 +391,66 @@ def _apply_prompt_dedup_canary_to_api_messages( return result +# Telemetry class for the runtime artifact-dedup path. The analyzer module's +# ARTIFACT_DEDUP_CLASS is its own internal enum; the runtime path reports this +# stable, provenance-flavored class string in its telemetry/stats. +_ARTIFACT_DEDUP_RUNTIME_CLASS = "same_payload_exact_artifact_body" + + +def _apply_artifact_dedup_canary_to_api_messages( + api_messages: List[Dict[str, Any]], *, salt: str = "contextpilot-runtime-artifact-dedup-v1" +): + """Apply the default-off artifact-dedup canary to runtime API messages. + + This is a narrow adapter from Hermes/OpenAI-style messages to the analyzer + package's in-memory _LLMContent carrier. Only ``role=tool`` (mapped to + ``tool_result``) and ``role=assistant`` (mapped to ``assistant_context``) + messages are passed as mutable artifact bodies; user/system/skill content is + never scanned or rewritten. It mutates api_messages only when + CONTEXTPILOT_ARTIFACT_DEDUP_MODE=canary and the canary module replaces a + later exact-duplicate artifact body with a strictly shorter reference. + """ + try: + from contextpilot.hermes_opportunities.models import _LLMContent + from contextpilot.hermes_opportunities.artifact_dedup_canary import ( + apply_artifact_dedup_canary, + ) + except Exception as e: # noqa: BLE001 - canary must never break requests + logger.debug("[ContextPilot] artifact dedup canary unavailable: %s", e) + return None + + llm_items = [] + message_indexes = [] + for idx, msg in enumerate(api_messages): + if not isinstance(msg, dict): + continue + role = msg.get("role") + if role == "tool": + block_type = "tool_result" + elif role == "assistant": + block_type = "assistant_context" + else: + continue + content = msg.get("content") + if not isinstance(content, str): + continue + llm_items.append(_LLMContent(block_type=block_type, content=content)) + message_indexes.append(idx) + + if not llm_items: + return None + + result = apply_artifact_dedup_canary( + llm_items, + salt=salt, + min_block_chars=40, + ) + if result and result.mutated: + for item, idx in zip(llm_items, message_indexes): + api_messages[idx]["content"] = item.content + return result + + def _reorder_docs(docs: List[str], alpha: float = 0.001) -> List[str]: global _intercept_index if len(docs) < 2: @@ -812,6 +872,16 @@ def _tool_chars(msgs): else 0 ) + # Step 5b: Optional artifact-dedup canary (default off). The second + # runtime mutation path, limited to exact-duplicate tool_result / + # assistant_context artifact bodies (provenance-aware reference). + artifact_dedup_result = _apply_artifact_dedup_canary_to_api_messages(api_messages) + artifact_dedup_chars_saved = ( + artifact_dedup_result.chars_saved + if artifact_dedup_result is not None and artifact_dedup_result.mutated + else 0 + ) + # Step 6: Block-level dedup sys_content = None for msg in api_messages: @@ -825,7 +895,12 @@ def _tool_chars(msgs): {"messages": api_messages}, system_content=sys_content, ) - turn_chars_saved = doc_chars_saved + dedup_result.chars_saved + prompt_dedup_chars_saved + turn_chars_saved = ( + doc_chars_saved + + dedup_result.chars_saved + + prompt_dedup_chars_saved + + artifact_dedup_chars_saved + ) self._total_chars_saved += turn_chars_saved # Actual before/after of the full LLM-bound payload (chars). These are @@ -889,6 +964,15 @@ def _tool_chars(msgs): if prompt_dedup_result is not None and prompt_dedup_result.mutated else 0 ), "prompt_dedup_chars_saved": prompt_dedup_chars_saved, + "artifact_dedup_mode": ( + artifact_dedup_result.mode if artifact_dedup_result is not None else "off" + ), + "artifact_dedup_class": _ARTIFACT_DEDUP_RUNTIME_CLASS, + "artifact_dedup_blocks_replaced": ( + artifact_dedup_result.blocks_replaced + if artifact_dedup_result is not None and artifact_dedup_result.mutated else 0 + ), + "artifact_dedup_chars_saved": artifact_dedup_chars_saved, "blocks_deduped": dedup_result.blocks_deduped, "blocks_total": dedup_result.blocks_total, "docs_deduped": self._total_docs_deduped, @@ -916,6 +1000,14 @@ def _tool_chars(msgs): prompt_dedup_result.blocks_replaced if prompt_dedup_result is not None and prompt_dedup_result.mutated else 0 ), + "artifact_dedup_mode": ( + artifact_dedup_result.mode if artifact_dedup_result is not None else "off" + ), + "artifact_dedup_chars_saved": artifact_dedup_chars_saved, + "artifact_dedup_blocks_replaced": ( + artifact_dedup_result.blocks_replaced + if artifact_dedup_result is not None and artifact_dedup_result.mutated else 0 + ), "blocks_deduped": dedup_result.blocks_deduped, "blocks_total": dedup_result.blocks_total, "docs_deduped": self._total_docs_deduped, diff --git a/contextpilot/hermes_opportunities/artifact_dedup_canary.py b/contextpilot/hermes_opportunities/artifact_dedup_canary.py new file mode 100644 index 0000000..11a0c5d --- /dev/null +++ b/contextpilot/hermes_opportunities/artifact_dedup_canary.py @@ -0,0 +1,323 @@ +"""Default-OFF provenance-aware tool-artifact reuse canary. + +This is the *second* runtime mutation path in ContextPilot and, like +:mod:`.prompt_dedup_canary`, it is default-OFF and narrowly scoped. Where the +prompt-dedup canary rewrites duplicate ``skill_prompt`` *lines*, this canary +dedups whole **artifact bodies** carried by ``tool_result`` and +``assistant_context`` items: it keeps the FIRST full artifact body verbatim and +replaces a later EXACT duplicate body -- regardless of which of the two mutable +artifact block types it appears in (provenance-aware) -- with a deterministic, +strictly shorter reference string that records the canonical body's provenance +(type + salted hash). + +Risk gate (all conditions must hold before a single character is changed): + +* Mode must be ``canary``. The mode is read from + ``CONTEXTPILOT_ARTIFACT_DEDUP_MODE`` (``off`` | ``shadow`` | ``canary``) and + defaults to ``off``. ``off`` and ``shadow`` never mutate the payload. +* The escape-hatch env ``CONTEXTPILOT_ARTIFACT_DEDUP_DISABLE`` (any truthy + value) forces ``off`` regardless of the mode var -- an immediate kill switch. +* Only ``tool_result`` / ``assistant_context`` artifact bodies are mutable. + ``system_prompt`` / ``user_prompt`` / ``skill_prompt`` / ``unknown`` (and any + other non-artifact content) are protected and never scanned or rewritten. +* Only EXACT duplicate full bodies are eligible. The first occurrence (within + the mutable artifact types) is the canonical body and is always kept verbatim; + only later exact occurrences are replaced, and only when the deterministic + reference string is strictly shorter than the body it replaces (never grows + the payload). +* A reference may only point at an EARLIER canonical full body in the same + payload; :func:`dangling_artifact_references` exposes a check used by the + validation gate to reject a dangling reference. + +The reference string carries only a low-cardinality artifact-type enum and a +salted body hash -- never raw artifact content. Telemetry is metadata-only: +mode/class enums and integer counters; no artifact text and no realized-savings +claim unless an actual mutation occurred. +""" +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import Iterable + +from .models import _LLMContent +from .privacy import _assert_no_forbidden_keys, _salted_hash + +# Environment controls. ``off`` is the default and the safe state. +ARTIFACT_DEDUP_MODE_ENV = "CONTEXTPILOT_ARTIFACT_DEDUP_MODE" +ARTIFACT_DEDUP_DISABLE_ENV = "CONTEXTPILOT_ARTIFACT_DEDUP_DISABLE" +ARTIFACT_DEDUP_MODES = ("off", "shadow", "canary") +DEFAULT_ARTIFACT_DEDUP_MODE = "off" + +# The only block types whose artifact bodies this canary may dedup. Both are +# mutable; the duplicate may span them (provenance-aware, cross-type). +MUTABLE_ARTIFACT_BLOCK_TYPES = ("tool_result", "assistant_context") + +# The only duplicate class this canary acts on: an exact-duplicate full artifact +# body across the mutable artifact types. +ARTIFACT_DEDUP_CLASS = "same_payload_exact_artifact_body" + +# Deterministic placeholder left in place of a later duplicate body. ```` +# is the CANONICAL (first) body's provenance and ```` its salted +# fingerprint -- both low-cardinality, never raw artifact content. +ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE = ( + "[ContextPilot artifact dedup: duplicate artifact body omitted; " + "ref=:]" +) + +# Fixed head of the reference string (everything before the first placeholder), +# used to recognize a reference line without re-rendering it. +_REF_HEAD = ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE.split("", 1)[0] + + +@dataclass +class ArtifactDedupCanaryResult: + """Metadata-only outcome of an artifact-dedup canary pass. No raw text, ever. + + ``chars_saved`` / ``blocks_replaced`` are REALIZED figures and are non-zero + only when ``mode == 'canary'`` and an actual replacement occurred. The + ``candidate_*`` fields are advisory (what a canary *would* replace) and are + populated in ``shadow`` mode for visibility without mutating anything. + """ + + mode: str # off | shadow | canary + artifact_dedup_class: str # always ARTIFACT_DEDUP_CLASS + mutated: bool # True only if a real replacement happened + item_count: int # mutable artifact items scanned + candidate_group_count: int # eligible exact-duplicate body groups + candidate_chars: int # advisory chars later occurrences occupy + blocks_replaced: int # REALIZED replacements (canary only) + chars_saved: int # REALIZED chars saved (canary only) + notes: list[str] = field(default_factory=list) + + +def _truthy(value: str | None) -> bool: + return bool(value) and value.strip().lower() not in ("", "0", "false", "no", "off") + + +def resolve_artifact_dedup_mode(env: dict | None = None) -> str: + """Resolve the active artifact-dedup mode, defaulting to the safe ``off``. + + Unknown values fall back to ``off``. The escape-hatch disable variable, when + truthy, forces ``off`` regardless of the mode variable. + """ + source = os.environ if env is None else env + if _truthy(source.get(ARTIFACT_DEDUP_DISABLE_ENV)): + return "off" + raw = ( + source.get(ARTIFACT_DEDUP_MODE_ENV) or DEFAULT_ARTIFACT_DEDUP_MODE + ).strip().lower() + return raw if raw in ARTIFACT_DEDUP_MODES else DEFAULT_ARTIFACT_DEDUP_MODE + + +def _artifact_reference_string(canonical_type: str, body_hash: str) -> str: + """Render the reference that points at a canonical artifact body. + + Carries only the canonical provenance enum and the salted hash -- never the + artifact body itself. + """ + return ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE.replace( + "", canonical_type + ).replace("", body_hash) + + +def _parse_artifact_reference(line: str) -> str | None: + """Return the salted hash a reference line encodes, or ``None`` if not one.""" + if not (line.startswith(_REF_HEAD) and "ref=" in line and line.endswith("]")): + return None + after = line.rsplit("ref=", 1)[1][:-1] # strip the trailing ']' + _type, sep, body_hash = after.partition(":") + if not sep or not body_hash: + return None + return body_hash + + +def _scan_artifacts( + contents: list[_LLMContent], *, salt: str, min_block_chars: int +) -> tuple[dict[str, dict], int]: + """Fingerprint mutable artifact bodies in order. + + Returns ``(agg, item_count)`` where ``agg`` maps a body hash to + ``{canonical_type, char_length, occ}`` (``canonical_type`` is the FIRST + occurrence's provenance) and ``item_count`` is the number of mutable + artifact items seen. + """ + agg: dict[str, dict] = {} + item_count = 0 + for item in contents: + if item.block_type not in MUTABLE_ARTIFACT_BLOCK_TYPES: + continue + item_count += 1 + body = item.content + if len(body) < min_block_chars: + continue + # A reference left by an earlier pass is not itself a canonical body. + if _parse_artifact_reference(body) is not None: + continue + h = _salted_hash(body, salt) + entry = agg.get(h) + if entry is None: + agg[h] = { + "canonical_type": item.block_type, + "char_length": len(body), + "occ": 1, + } + else: + entry["occ"] += 1 + return agg, item_count + + +def _eligible_groups(agg: dict[str, dict]) -> tuple[int, int]: + """Advisory measurement of duplicate body groups that would actually shrink. + + Returns ``(candidate_group_count, candidate_chars)`` where ``candidate_chars`` + is the chars the later (replaceable) occurrences currently occupy. + """ + group_count = 0 + candidate_chars = 0 + for h, entry in agg.items(): + if entry["occ"] < 2: + continue # not a duplicate -> nothing to replace + ref = _artifact_reference_string(entry["canonical_type"], h) + if len(ref) >= entry["char_length"]: + continue # replacement would grow the payload -> skip + group_count += 1 + candidate_chars += (entry["occ"] - 1) * entry["char_length"] + return group_count, candidate_chars + + +def apply_artifact_dedup_canary( + contents: Iterable[_LLMContent], + *, + salt: str, + min_block_chars: int, + mode: str | None = None, + env: dict | None = None, +) -> ArtifactDedupCanaryResult: + """Run the artifact-dedup canary over LLM-bound content. + + ``contents`` are the in-memory ``_LLMContent`` items bound for the LLM. In + ``canary`` mode this MUTATES the ``content`` of eligible mutable artifact + items in place (keeping the first canonical body, replacing later exact + duplicates with a deterministic, strictly shorter reference). In ``off`` and + ``shadow`` modes nothing is mutated. + + ``mode`` overrides the resolved environment mode (used by tests); otherwise + the mode comes from :func:`resolve_artifact_dedup_mode`. + """ + items = list(contents) + resolved = mode if mode is not None else resolve_artifact_dedup_mode(env) + if resolved not in ARTIFACT_DEDUP_MODES: + resolved = DEFAULT_ARTIFACT_DEDUP_MODE + + if resolved == "off": + # Safe default: no scan, no candidates, no savings. + return ArtifactDedupCanaryResult( + mode="off", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=False, + item_count=0, + candidate_group_count=0, + candidate_chars=0, + blocks_replaced=0, + chars_saved=0, + notes=["artifact-dedup canary off (default): payload unchanged"], + ) + + agg, item_count = _scan_artifacts(items, salt=salt, min_block_chars=min_block_chars) + candidate_group_count, candidate_chars = _eligible_groups(agg) + + if resolved == "shadow": + # Measure what a canary would replace, but never touch the payload. + return ArtifactDedupCanaryResult( + mode="shadow", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=False, + item_count=item_count, + candidate_group_count=candidate_group_count, + candidate_chars=candidate_chars, + blocks_replaced=0, + chars_saved=0, + notes=["artifact-dedup canary shadow: candidates measured, payload unchanged"], + ) + + # --- canary: the ONLY branch that mutates LLM-bound payload --------------- + blocks_replaced = 0 + chars_saved = 0 + # hash -> canonical provenance type of the first (kept) occurrence. + canonical: dict[str, str] = {} + for item in items: + if item.block_type not in MUTABLE_ARTIFACT_BLOCK_TYPES: + continue # protected content is never touched + body = item.content + if len(body) < min_block_chars: + continue + if _parse_artifact_reference(body) is not None: + continue + h = _salted_hash(body, salt) + if h not in canonical: + canonical[h] = item.block_type # keep the first canonical body verbatim + continue + # Later exact duplicate: reference the EARLIER canonical body's provenance. + ref = _artifact_reference_string(canonical[h], h) + if len(ref) < len(body): # only when it actually shrinks the payload + item.content = ref + blocks_replaced += 1 + chars_saved += len(body) - len(ref) + + return ArtifactDedupCanaryResult( + mode="canary", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=blocks_replaced > 0, + item_count=item_count, + candidate_group_count=candidate_group_count, + candidate_chars=candidate_chars, + blocks_replaced=blocks_replaced, + chars_saved=chars_saved, + notes=["artifact-dedup canary active: exact duplicate artifact bodies only"], + ) + + +def dangling_artifact_references( + contents: Iterable[_LLMContent], *, salt: str +) -> list[int]: + """Return indices of artifact references that do not resolve to an earlier body. + + A reference is valid only if an EARLIER mutable artifact item carries the + full canonical body whose salted hash matches the reference. A reference with + no such earlier canonical body (or one that only appears later) is dangling. + """ + seen_full: set[str] = set() # hashes of earlier full canonical artifact bodies + dangling: list[int] = [] + for idx, item in enumerate(contents): + body = item.content + ref_hash = _parse_artifact_reference(body) + if ref_hash is not None: + if ref_hash not in seen_full: + dangling.append(idx) + continue + if item.block_type in MUTABLE_ARTIFACT_BLOCK_TYPES: + seen_full.add(_salted_hash(body, salt)) + return dangling + + +def build_artifact_canary_telemetry_record(result: ArtifactDedupCanaryResult) -> dict: + """Build a metadata-only telemetry record for an artifact-dedup canary pass. + + The aggregate ``chars_saved`` counter gains the artifact-dedup contribution + ONLY when a real mutation occurred (canary). ``off``/``shadow`` contribute 0 + to the total while still reporting the separated ``artifact_dedup_*`` fields. + Contains only mode/class enums and integer counters -- never artifact text. + """ + realized = result.chars_saved if result.mutated else 0 + record = { + "artifact_dedup_mode": result.mode, + "artifact_dedup_class": result.artifact_dedup_class, + "artifact_dedup_blocks_replaced": result.blocks_replaced if result.mutated else 0, + # Separated field: always present, mirrors the realized artifact-dedup save. + "artifact_dedup_chars_saved": realized, + # Aggregate total: includes artifact dedup only when a mutation occurred. + "chars_saved": realized, + } + _assert_no_forbidden_keys(record) + return record diff --git a/contextpilot/trace_validation/runner.py b/contextpilot/trace_validation/runner.py index 20c8582..8707138 100644 --- a/contextpilot/trace_validation/runner.py +++ b/contextpilot/trace_validation/runner.py @@ -39,6 +39,14 @@ apply_prompt_dedup_canary, resolve_prompt_dedup_mode, ) +from contextpilot.hermes_opportunities.artifact_dedup_canary import ( + MUTABLE_ARTIFACT_BLOCK_TYPES, + ArtifactDedupCanaryResult, + _parse_artifact_reference, + apply_artifact_dedup_canary, + dangling_artifact_references, + resolve_artifact_dedup_mode, +) from contextpilot.hermes_opportunities.tokenizer import resolve_tokenizer from .builder import DEFAULT_SALT @@ -355,6 +363,214 @@ def render_markdown(report: ValidationReport) -> str: return "\n".join(lines) + "\n" +# --------------------------------------------------------------------------- +# Artifact-dedup canary validation (provenance-aware tool-artifact reuse) +# --------------------------------------------------------------------------- + +# Stable invariant identifiers for the artifact-dedup gate. Mirrors the prompt +# gate but swaps in artifact-scope and reference-resolvability checks. +ARTIFACT_INVARIANT_NAMES = [ + "message_count_preserved", + "order_and_roles_preserved", + "protected_content_preserved", + "artifact_mutation_scope_allowed", + "artifact_reference_resolvable", + "savings_accounting_consistent", +] + + +def optimize_artifact_case( + messages: list[dict], *, mode: str, salt: str, min_block_chars: int +) -> tuple[list[dict], ArtifactDedupCanaryResult]: + """Run the artifact-dedup canary over a case's messages in the given mode. + + Returns ``(out_messages, result)``. The canary mutates only mutable artifact + bodies in place; ``out_messages`` mirrors the input role/block_type/order + with the (possibly) rewritten content so the caller can diff payloads. + """ + contents = [_LLMContent(m["block_type"], m["content"]) for m in messages] + result = apply_artifact_dedup_canary( + contents, salt=salt, min_block_chars=min_block_chars, mode=mode + ) + out = [ + {"role": m["role"], "block_type": m["block_type"], "content": c.content} + for m, c in zip(messages, contents) + ] + return out, result + + +def _artifact_mutation_scope_ok(base: dict, cand: dict) -> bool: + """A single message changed only within the allowed (artifact-only) scope.""" + if base["content"] == cand["content"]: + return True + # Only mutable artifact bodies may ever change. + if base["block_type"] not in MUTABLE_ARTIFACT_BLOCK_TYPES: + return False + # A changed body must become a reference placeholder strictly shorter than + # the body it replaced -- never new free text and never a growth. + if _parse_artifact_reference(cand["content"]) is None: + return False + return len(cand["content"]) < len(base["content"]) + + +def check_artifact_invariants( + baseline: list[dict], + candidate: list[dict], + result: ArtifactDedupCanaryResult, + *, + salt: str, +) -> tuple[dict[str, bool], int]: + """Check accuracy-preservation invariants for an artifact-dedup pass. + + Returns ``(invariant -> passed, realized_chars_saved)`` where + ``realized_chars_saved`` is the ACTUAL summed before/after character delta of + the processed payload (not an opportunity count). + """ + inv: dict[str, bool] = {} + + inv["message_count_preserved"] = len(baseline) == len(candidate) + + if inv["message_count_preserved"]: + inv["order_and_roles_preserved"] = all( + b["role"] == c["role"] and b["block_type"] == c["block_type"] + for b, c in zip(baseline, candidate) + ) + inv["protected_content_preserved"] = all( + b["content"] == c["content"] + for b, c in zip(baseline, candidate) + if b["block_type"] not in MUTABLE_ARTIFACT_BLOCK_TYPES + ) + inv["artifact_mutation_scope_allowed"] = all( + _artifact_mutation_scope_ok(b, c) for b, c in zip(baseline, candidate) + ) + cand_contents = [ + _LLMContent(c["block_type"], c["content"]) for c in candidate + ] + inv["artifact_reference_resolvable"] = ( + dangling_artifact_references(cand_contents, salt=salt) == [] + ) + realized = sum( + len(b["content"]) - len(c["content"]) + for b, c in zip(baseline, candidate) + ) + else: + # Count mismatch makes positional comparison meaningless; fail the rest. + inv["order_and_roles_preserved"] = False + inv["protected_content_preserved"] = False + inv["artifact_mutation_scope_allowed"] = False + inv["artifact_reference_resolvable"] = False + realized = 0 + + inv["savings_accounting_consistent"] = ( + realized >= 0 + and realized == result.chars_saved + and (realized > 0) == bool(result.mutated) + and (result.blocks_replaced > 0) == bool(result.mutated) + ) + return inv, realized + + +def run_artifact_validation( + cases: list[dict], + *, + baseline_mode: str = "off", + candidate_mode: str, + salt: str, + min_block_chars: int = DEFAULT_MIN_BLOCK_CHARS, + date: str, + tokenizer_spec: object | None = None, + optimize_fn: Callable[..., tuple[list[dict], ArtifactDedupCanaryResult]] | None = None, +) -> ValidationReport: + """Validate every case under baseline vs candidate for the artifact canary.""" + optimize_fn = optimize_fn or optimize_artifact_case + tokenizer = resolve_tokenizer(tokenizer_spec) + tok_status = "available" if tokenizer is not None else "unavailable" + + case_results: list[ValidationCaseResult] = [] + total_blocks = 0 + total_chars = 0 + total_actual_saved = 0 if tokenizer is not None else None + + for case in cases: + msgs = _messages(case) + baseline_msgs, _ = optimize_fn( + list(msgs), mode=baseline_mode, salt=salt, min_block_chars=min_block_chars + ) + candidate_msgs, result = optimize_fn( + list(msgs), mode=candidate_mode, salt=salt, min_block_chars=min_block_chars + ) + + inv, realized = check_artifact_invariants( + baseline_msgs, candidate_msgs, result, salt=salt + ) + failed = [name for name, ok in inv.items() if not ok] + + at_before = at_after = at_saved = None + if tokenizer is not None: + at_before = sum(tokenizer.count(m["content"]) for m in baseline_msgs) + at_after = sum(tokenizer.count(m["content"]) for m in candidate_msgs) + at_saved = at_before - at_after + total_actual_saved += at_saved + + artifact_items = sum( + 1 for m in msgs if m["block_type"] in MUTABLE_ARTIFACT_BLOCK_TYPES + ) + total_blocks += result.blocks_replaced if result.mutated else 0 + total_chars += realized + + case_results.append( + ValidationCaseResult( + case_id=str(case.get("case_id", "")), + source=case.get("source"), + message_count=len(msgs), + skill_item_count=artifact_items, + mutated=bool(result.mutated), + blocks_replaced=result.blocks_replaced if result.mutated else 0, + chars_saved=realized, + invariants=inv, + passed=not failed, + failed_invariants=failed, + actual_tokens_before=at_before, + actual_tokens_after=at_after, + actual_tokens_saved=at_saved, + ) + ) + + passed_cases = sum(1 for c in case_results if c.passed) + failed_cases = len(case_results) - passed_cases + notes = [ + "baseline runs the artifact canary in 'off' mode and must leave the " + "payload byte-identical; the candidate mode is the change under test", + "chars_saved is the REALIZED processed-payload before/after char delta, " + "not an opportunity count", + ] + if tokenizer is None: + notes.append( + "actual-token savings unavailable (no exact tokenizer backend configured); " + "no actual-token fields are reported" + ) + + return ValidationReport( + schema_version=VALIDATION_SET_SCHEMA_VERSION, + generated_date=date, + salt_fingerprint=_salt_fingerprint(salt), + baseline_mode=baseline_mode, + candidate_mode=candidate_mode, + case_count=len(case_results), + passed=failed_cases == 0, + passed_cases=passed_cases, + failed_cases=failed_cases, + total_blocks_replaced=total_blocks, + total_chars_saved=total_chars, + tokenizer_status=tok_status, + tokenizer_backend=tokenizer.name if tokenizer is not None else None, + total_actual_tokens_saved=total_actual_saved, + invariant_names=list(ARTIFACT_INVARIANT_NAMES), + cases=case_results, + notes=notes, + ) + + def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description=( @@ -364,12 +580,21 @@ def main(argv: list[str] | None = None) -> int: ) ) parser.add_argument("corpus", type=Path, help="path to the JSONL validation corpus") + parser.add_argument( + "--gate", + choices=["prompt", "artifact"], + default="prompt", + help=( + "which validation gate to run: 'prompt' for skill-prompt dedup " + "or 'artifact' for provenance-aware tool/artifact reuse (default: prompt)" + ), + ) parser.add_argument( "--candidate-mode", default=None, help=( - "prompt-dedup mode to validate (off|shadow|canary). Defaults to the " - "resolved CONTEXTPILOT_PROMPT_DEDUP_MODE environment value." + "dedup mode to validate (off|shadow|canary). Defaults to the " + "resolved CONTEXTPILOT_*_DEDUP_MODE env for the selected gate." ), ) parser.add_argument( @@ -398,14 +623,16 @@ def main(argv: list[str] | None = None) -> int: if not args.corpus.exists(): raise SystemExit(f"validation corpus not found: {args.corpus}") - candidate_mode = ( - args.candidate_mode - if args.candidate_mode is not None - else resolve_prompt_dedup_mode() - ) + if args.candidate_mode is not None: + candidate_mode = args.candidate_mode + elif args.gate == "artifact": + candidate_mode = resolve_artifact_dedup_mode() + else: + candidate_mode = resolve_prompt_dedup_mode() cases = load_cases(args.corpus) - report = run_validation( + run_fn = run_artifact_validation if args.gate == "artifact" else run_validation + report = run_fn( cases, baseline_mode=args.baseline_mode, candidate_mode=candidate_mode, diff --git a/docs/guides/hermes-monitor.md b/docs/guides/hermes-monitor.md index b7d6c53..9afffee 100644 --- a/docs/guides/hermes-monitor.md +++ b/docs/guides/hermes-monitor.md @@ -321,7 +321,29 @@ gate below before changing ContextPilot config or code. A defensive guard in `write_report` refuses to emit any forbidden raw-content key, so the reports are safe to ship from an unattended cron job. -## Accuracy gate +## Default-off canaries + +Prompt and artifact reuse are opt-in canaries. Ordinary installs keep both off +unless an operator explicitly enables them and restarts Hermes. + +```bash +# Skill-prompt exact duplicate canary (lowest-risk prompt class) +export CONTEXTPILOT_PROMPT_DEDUP_MODE=canary + +# Provenance-aware tool/artifact exact duplicate canary +export CONTEXTPILOT_ARTIFACT_DEDUP_MODE=canary + +# Emergency kill switches +export CONTEXTPILOT_PROMPT_DEDUP_DISABLE=1 +export CONTEXTPILOT_ARTIFACT_DEDUP_DISABLE=1 +``` + +The artifact canary only keeps the first full `tool_result`/`assistant_context` +artifact body and replaces later exact duplicates with a shorter ContextPilot +reference. It does not summarize, semantically compress, or drop user/system +content. + +## Safety gates This monitor reports processed-payload savings, exact tokenizer token deltas when recorded, and operational signals. Before shipping ContextPilot changes, run a fixed golden eval set and require: diff --git a/docs/guides/trace-validation.md b/docs/guides/trace-validation.md index bc76c57..4699143 100644 --- a/docs/guides/trace-validation.md +++ b/docs/guides/trace-validation.md @@ -51,6 +51,17 @@ Validate the current prompt-dedup canary candidate: ```bash python scripts/run_trace_validation.py \ ~/contextpilot/validation_sets/validation_set_YYYY-MM-DD.jsonl \ + --gate prompt \ + --candidate-mode canary \ + --format markdown +``` + +Validate the provenance-aware artifact/tool-context reuse canary candidate: + +```bash +python scripts/run_trace_validation.py \ + ~/contextpilot/validation_sets/validation_set_YYYY-MM-DD.jsonl \ + --gate artifact \ --candidate-mode canary \ --format markdown ``` @@ -88,11 +99,19 @@ non-zero on any failed invariant: - realized savings accounting matches the actual processed-payload before/after character delta. -For the current canary, the only allowed mutation scope is +For the current prompt canary, the only allowed mutation scope is `same_type_skill_prompt_only`: later exact duplicate `skill_prompt` lines may be replaced with a deterministic ContextPilot reference if and only if the reference is shorter and the line is not safety-denylisted. +For the artifact/tool-context reuse canary, the only allowed mutation scope is +`same_payload_exact_artifact_body`: later exact duplicate `tool_result` or +`assistant_context` artifact bodies may be replaced with a deterministic +ContextPilot artifact reference if and only if the first full canonical body +appears earlier in the same payload, the reference is shorter, and all +non-artifact content remains byte-identical. The artifact gate adds an explicit +reference-resolution invariant so dangling references fail the run. + ## When this is required Run this gate before merging or enabling any change that can affect accuracy, diff --git a/tests/test_artifact_dedup_canary.py b/tests/test_artifact_dedup_canary.py new file mode 100644 index 0000000..2a6006f --- /dev/null +++ b/tests/test_artifact_dedup_canary.py @@ -0,0 +1,624 @@ +"""RED-phase tests for the provenance-aware tool-artifact reuse canary. + +This canary is the second (and, like the prompt-dedup canary, default-OFF) +runtime mutation path. Where the prompt-dedup canary rewrites *skill_prompt* +lines, this one dedups whole **artifact bodies** carried by ``tool_result`` and +``assistant_context`` items: it keeps the FIRST full artifact body verbatim and +replaces a later EXACT duplicate body (regardless of which of the two artifact +block types it appears in -- provenance-aware) with a deterministic, strictly +shorter reference string that points back at the canonical body via a salted +hash. + +The safety contract these tests pin: + +1. ``off`` (default) and ``shadow`` never mutate the payload; only ``canary`` + replaces a later exact-duplicate artifact body, always keeping the first one. +2. Only ``tool_result`` / ``assistant_context`` artifact bodies are mutable; + ``system_prompt`` / ``user_prompt`` / ``skill_prompt`` (and any other + non-artifact content) are protected and survive byte-identical. +3. A reference is valid only if it resolves to an EARLIER canonical full body in + the same payload; the runner/validation gate fails a dangling reference or a + protected-content mutation. +4. Telemetry / the runner report carry REALIZED ``chars_saved`` only when an + actual mutation happened, and never any raw artifact content. + +Production is implemented in ``contextpilot.hermes_opportunities.artifact_dedup_canary``. +Fixtures are synthetic only. +""" +import json + +from contextpilot.hermes_opportunities.artifact_dedup_canary import ( + ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE, + ARTIFACT_DEDUP_CLASS, + ARTIFACT_DEDUP_DISABLE_ENV, + ARTIFACT_DEDUP_MODE_ENV, + MUTABLE_ARTIFACT_BLOCK_TYPES, + ArtifactDedupCanaryResult, + apply_artifact_dedup_canary, + build_artifact_canary_telemetry_record, + dangling_artifact_references, + resolve_artifact_dedup_mode, + _artifact_reference_string, +) +from contextpilot.hermes_opportunities.models import _LLMContent +from contextpilot.hermes_opportunities.privacy import _salted_hash +from contextpilot.trace_validation.runner import ( + ARTIFACT_INVARIANT_NAMES, + assert_report_privacy_safe, + render_markdown, + report_to_dict, + run_artifact_validation, +) + +SALT = "test-artifact-salt" +MIN = 40 + +# A synthetic artifact body comfortably longer than the reference placeholder so +# a replacement actually shrinks the payload. Free of secrets/raw user text. +LONG_ARTIFACT = ( + "Synthetic tool artifact body: rows=128 cols=8 checksum=alpha-bravo-charlie " + "delta-echo. Summary of the synthetic computation produced purely for this " + "test fixture and nothing else, padded to comfortably exceed the reference." +) +# A non-artifact protected block (system narration); duplicated to prove the +# canary never touches it. +SYS_BLOCK = ( + "Synthetic system narration describing the assistant persona and the general " + "tone it should adopt across replies in this fixture." +) +# Just over min_block_chars but shorter than any reference placeholder, so a +# replacement would GROW the payload and must be skipped. +SHORT_ARTIFACT = "Short synthetic artifact body just over forty chars." + + +def _ref(body: str, *, canonical_type: str = "tool_result") -> str: + """The deterministic reference string a canary would emit for ``body``.""" + return _artifact_reference_string(canonical_type, _salted_hash(body, SALT)) + + +def _ref_len(canonical_type: str = "tool_result") -> int: + return len(_ref(LONG_ARTIFACT, canonical_type=canonical_type)) + + +# --------------------------------------------------------------------------- +# Mode resolution + escape hatch (default OFF) +# --------------------------------------------------------------------------- + + +def test_mode_defaults_to_off(monkeypatch): + monkeypatch.delenv(ARTIFACT_DEDUP_MODE_ENV, raising=False) + monkeypatch.delenv(ARTIFACT_DEDUP_DISABLE_ENV, raising=False) + assert resolve_artifact_dedup_mode() == "off" + + +def test_mode_reads_env_values(): + assert resolve_artifact_dedup_mode({ARTIFACT_DEDUP_MODE_ENV: "shadow"}) == "shadow" + assert resolve_artifact_dedup_mode({ARTIFACT_DEDUP_MODE_ENV: "CANARY"}) == "canary" + # Unknown / garbage values fall back to the safe default. + assert resolve_artifact_dedup_mode({ARTIFACT_DEDUP_MODE_ENV: "aggressive"}) == "off" + + +def test_disable_env_is_a_kill_switch(): + env = {ARTIFACT_DEDUP_MODE_ENV: "canary", ARTIFACT_DEDUP_DISABLE_ENV: "1"} + assert resolve_artifact_dedup_mode(env) == "off" + + +# --------------------------------------------------------------------------- +# (1) off (default) and shadow never mutate +# --------------------------------------------------------------------------- + + +def test_default_off_does_not_change_payload(monkeypatch): + monkeypatch.delenv(ARTIFACT_DEDUP_MODE_ENV, raising=False) + monkeypatch.delenv(ARTIFACT_DEDUP_DISABLE_ENV, raising=False) + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + before = [c.content for c in contents] + result = apply_artifact_dedup_canary(contents, salt=SALT, min_block_chars=MIN) + assert result.mode == "off" + assert result.mutated is False + assert result.blocks_replaced == 0 + assert result.chars_saved == 0 + assert result.item_count == 0 # off does not even scan + assert [c.content for c in contents] == before # byte-identical + + +def test_disable_env_blocks_mutation_even_with_canary_set(monkeypatch): + monkeypatch.setenv(ARTIFACT_DEDUP_MODE_ENV, "canary") + monkeypatch.setenv(ARTIFACT_DEDUP_DISABLE_ENV, "true") + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + before = [c.content for c in contents] + result = apply_artifact_dedup_canary(contents, salt=SALT, min_block_chars=MIN) + assert result.mode == "off" + assert [c.content for c in contents] == before + + +def test_shadow_measures_duplicates_without_mutating(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + before = [c.content for c in contents] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="shadow" + ) + assert [c.content for c in contents] == before # never mutated + assert result.mode == "shadow" + assert result.mutated is False + assert result.blocks_replaced == 0 + assert result.chars_saved == 0 + # Advisory: one eligible duplicate group; later occurrence chars measured. + assert result.candidate_group_count == 1 + assert result.candidate_chars == len(LONG_ARTIFACT) + + +# --------------------------------------------------------------------------- +# (1) canary keeps the first full body, replaces later exact duplicates +# --------------------------------------------------------------------------- + + +def test_canary_keeps_first_full_and_replaces_later_duplicate(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert contents[0].content == LONG_ARTIFACT # first kept verbatim + assert contents[1].content == _ref(LONG_ARTIFACT) # later replaced + assert result.mode == "canary" + assert result.mutated is True + assert result.artifact_dedup_class == ARTIFACT_DEDUP_CLASS + assert result.blocks_replaced == 1 + assert result.chars_saved == len(LONG_ARTIFACT) - _ref_len() + + +def test_canary_three_occurrences_keeps_first_replaces_two(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("assistant_context", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert contents[0].content == LONG_ARTIFACT # canonical kept + assert contents[1].content == _ref(LONG_ARTIFACT) # later dup replaced + assert contents[2].content == _ref(LONG_ARTIFACT) + assert result.blocks_replaced == 2 + assert result.chars_saved == 2 * (len(LONG_ARTIFACT) - _ref_len()) + + +def test_canary_dedups_across_artifact_types_provenance_canonical_is_first(): + # tool_result first, assistant_context second: the duplicate spans both + # artifact types. The first (tool_result) is canonical; the reference left in + # the assistant_context records that canonical provenance. + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("assistant_context", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert contents[0].content == LONG_ARTIFACT + # Reference records the canonical (first) provenance == tool_result. + assert contents[1].content == _ref(LONG_ARTIFACT, canonical_type="tool_result") + assert "tool_result" in contents[1].content + assert "assistant_context" not in contents[1].content + assert result.blocks_replaced == 1 + + +def test_canary_reference_carries_no_raw_artifact_body(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + apply_artifact_dedup_canary(contents, salt=SALT, min_block_chars=MIN, mode="canary") + ref_line = contents[1].content + # Only a low-cardinality provenance enum + salted hash, never the body. + assert "tool_result" in ref_line + assert _salted_hash(LONG_ARTIFACT, SALT) in ref_line + assert LONG_ARTIFACT not in ref_line + assert len(ref_line) < len(LONG_ARTIFACT) # always shrinks + + +def test_canary_never_grows_payload_for_short_duplicate(): + assert len(SHORT_ARTIFACT) >= MIN + assert len(SHORT_ARTIFACT) < _ref_len() # reference would be longer + contents = [ + _LLMContent("tool_result", SHORT_ARTIFACT), + _LLMContent("tool_result", SHORT_ARTIFACT), + ] + before = [c.content for c in contents] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert [c.content for c in contents] == before # left alone (would grow) + assert result.blocks_replaced == 0 + assert result.chars_saved == 0 + + +# --------------------------------------------------------------------------- +# (2) only tool_result / assistant_context are mutable; others protected +# --------------------------------------------------------------------------- + + +def test_mutable_set_is_exactly_the_two_artifact_types(): + assert set(MUTABLE_ARTIFACT_BLOCK_TYPES) == {"tool_result", "assistant_context"} + + +def test_canary_leaves_non_artifact_block_types_untouched(): + # Duplicate bodies in every PROTECTED block type must survive byte-identical + # and must not even be scanned as artifact candidates. + contents = [ + _LLMContent("system_prompt", SYS_BLOCK), + _LLMContent("system_prompt", SYS_BLOCK), + _LLMContent("user_prompt", LONG_ARTIFACT), + _LLMContent("user_prompt", LONG_ARTIFACT), + _LLMContent("skill_prompt", LONG_ARTIFACT), + _LLMContent("skill_prompt", LONG_ARTIFACT), + _LLMContent("unknown", LONG_ARTIFACT), + _LLMContent("unknown", LONG_ARTIFACT), + ] + before = [c.content for c in contents] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert [c.content for c in contents] == before + assert result.blocks_replaced == 0 + assert result.item_count == 0 # no artifact items present to scan + + +def test_canary_does_not_dedup_artifact_against_protected_duplicate(): + # The same body appears in a protected user_prompt AND a tool_result. The + # artifact occurrence has no EARLIER artifact canonical, so nothing is + # replaced (a reference may only point at an artifact canonical body). + contents = [ + _LLMContent("user_prompt", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + before = [c.content for c in contents] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert [c.content for c in contents] == before + assert result.blocks_replaced == 0 + + +def test_assistant_context_is_a_mutable_artifact(): + contents = [ + _LLMContent("assistant_context", LONG_ARTIFACT), + _LLMContent("assistant_context", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + assert contents[0].content == LONG_ARTIFACT + assert contents[1].content == _ref(LONG_ARTIFACT, canonical_type="assistant_context") + assert result.blocks_replaced == 1 + + +# --------------------------------------------------------------------------- +# (3) reference resolution: must point at an earlier canonical full body +# --------------------------------------------------------------------------- + + +def test_canary_output_has_no_dangling_references(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("assistant_context", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + apply_artifact_dedup_canary(contents, salt=SALT, min_block_chars=MIN, mode="canary") + # Every reference the canary emitted resolves to an earlier canonical body. + assert dangling_artifact_references(contents, salt=SALT) == [] + + +def test_resolution_accepts_reference_after_its_canonical_body(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), # canonical full body + _LLMContent("tool_result", _ref(LONG_ARTIFACT)), # resolves to index 0 + ] + assert dangling_artifact_references(contents, salt=SALT) == [] + + +def test_resolution_flags_reference_with_no_earlier_canonical(): + # A reference whose canonical body never appears earlier is dangling. + contents = [ + _LLMContent("tool_result", _ref(LONG_ARTIFACT)), + ] + assert dangling_artifact_references(contents, salt=SALT) == [0] + + +def test_resolution_flags_reference_before_its_canonical_body(): + # The canonical body exists, but only AFTER the reference -> still dangling. + contents = [ + _LLMContent("tool_result", _ref(LONG_ARTIFACT)), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + assert dangling_artifact_references(contents, salt=SALT) == [0] + + +# --------------------------------------------------------------------------- +# (4) telemetry: realized chars_saved only on actual mutation, no raw content +# --------------------------------------------------------------------------- + + +def test_telemetry_records_no_savings_when_off(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="off" + ) + record = build_artifact_canary_telemetry_record(result) + assert record["artifact_dedup_mode"] == "off" + assert record["artifact_dedup_blocks_replaced"] == 0 + assert record["artifact_dedup_chars_saved"] == 0 + assert record["chars_saved"] == 0 + + +def test_telemetry_records_no_savings_in_shadow(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="shadow" + ) + record = build_artifact_canary_telemetry_record(result) + assert record["artifact_dedup_mode"] == "shadow" + # Shadow contributes nothing to the realized chars_saved total. + assert record["artifact_dedup_chars_saved"] == 0 + assert record["chars_saved"] == 0 + + +def test_telemetry_records_realized_savings_in_canary(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("assistant_context", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + record = build_artifact_canary_telemetry_record(result) + expected = 2 * (len(LONG_ARTIFACT) - _ref_len()) + assert record["artifact_dedup_mode"] == "canary" + assert record["artifact_dedup_class"] == ARTIFACT_DEDUP_CLASS + assert record["artifact_dedup_blocks_replaced"] == 2 + assert record["artifact_dedup_chars_saved"] == expected + assert record["chars_saved"] == expected + + +def test_telemetry_is_metadata_only_no_artifact_text(): + contents = [ + _LLMContent("tool_result", LONG_ARTIFACT), + _LLMContent("tool_result", LONG_ARTIFACT), + ] + result = apply_artifact_dedup_canary( + contents, salt=SALT, min_block_chars=MIN, mode="canary" + ) + record = build_artifact_canary_telemetry_record(result) + blob = json.dumps(record) + assert LONG_ARTIFACT not in blob + assert set(record) == { + "artifact_dedup_mode", + "artifact_dedup_class", + "artifact_dedup_blocks_replaced", + "artifact_dedup_chars_saved", + "chars_saved", + } + for key in ( + "artifact_dedup_blocks_replaced", + "artifact_dedup_chars_saved", + "chars_saved", + ): + assert isinstance(record[key], int) + + +# --------------------------------------------------------------------------- +# (3)+(4) runner/validation gate over synthetic artifact cases +# --------------------------------------------------------------------------- + + +def _artifact_cases() -> list[dict]: + """Two synthetic cases, each with an exact-duplicate artifact body.""" + return [ + { + "case_id": "syn-art-1", + "source": "synthetic", + "messages": [ + {"role": "system", "block_type": "system_prompt", "content": SYS_BLOCK}, + {"role": "tool", "block_type": "tool_result", "content": LONG_ARTIFACT}, + { + "role": "user", + "block_type": "user_prompt", + "content": "Synthetic user question referencing the artifact above.", + }, + # exact duplicate of the earlier tool_result -> replaced by ref. + {"role": "tool", "block_type": "tool_result", "content": LONG_ARTIFACT}, + ], + }, + { + "case_id": "syn-art-2", + "source": "synthetic", + "messages": [ + { + "role": "assistant", + "block_type": "assistant_context", + "content": LONG_ARTIFACT, + }, + # cross-type duplicate -> canonical is the assistant_context above. + {"role": "tool", "block_type": "tool_result", "content": LONG_ARTIFACT}, + ], + }, + ] + + +def test_artifact_runner_passes_on_synthetic_duplicate_artifacts(): + report = run_artifact_validation( + _artifact_cases(), + baseline_mode="off", + candidate_mode="canary", + salt=SALT, + min_block_chars=MIN, + date="2026-06-15", + ) + assert report.passed is True + assert report.failed_cases == 0 + assert report.invariant_names == ARTIFACT_INVARIANT_NAMES + assert report.total_blocks_replaced == 2 # one duplicate per case + assert report.total_chars_saved > 0 + assert any(c.mutated for c in report.cases) + # No tokenizer backend configured by default. + assert report.tokenizer_status == "unavailable" + assert report.total_actual_tokens_saved is None + + +def test_artifact_runner_shadow_passes_without_realized_savings(): + report = run_artifact_validation( + _artifact_cases(), + baseline_mode="off", + candidate_mode="shadow", + salt=SALT, + min_block_chars=MIN, + date="2026-06-15", + ) + assert report.passed is True + assert report.total_blocks_replaced == 0 + assert report.total_chars_saved == 0 + assert all(not c.mutated for c in report.cases) + + +def test_artifact_runner_report_is_privacy_safe(): + report = run_artifact_validation( + _artifact_cases(), + baseline_mode="off", + candidate_mode="canary", + salt=SALT, + min_block_chars=MIN, + date="2026-06-15", + ) + report_dict = report_to_dict(report) + raw_needles = [ + LONG_ARTIFACT, + SYS_BLOCK, + "Synthetic user question referencing the artifact above.", + ] + assert_report_privacy_safe(report_dict, raw_needles) + blob = json.dumps(report_dict, ensure_ascii=False) + md = render_markdown(report) + for needle in raw_needles: + assert needle not in blob + assert needle not in md + + +def test_artifact_runner_fails_on_dangling_reference(): + # A candidate that replaces the FIRST (canonical) artifact body with a + # reference leaves a dangling reference: nothing earlier resolves it. + def bad_dangling(messages, *, mode, salt, min_block_chars): + out = [dict(m) for m in messages] + if mode == "bad": + for m in out: + if m["block_type"] in MUTABLE_ARTIFACT_BLOCK_TYPES: + m["content"] = _artifact_reference_string( + m["block_type"], _salted_hash("no-such-canonical", salt) + ) + break + result = ArtifactDedupCanaryResult( + mode="canary", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=True, + item_count=2, + candidate_group_count=1, + candidate_chars=len(LONG_ARTIFACT), + blocks_replaced=1, + chars_saved=len(LONG_ARTIFACT) - _ref_len(), + ) + return out, result + result = ArtifactDedupCanaryResult( + mode="off", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=False, + item_count=0, + candidate_group_count=0, + candidate_chars=0, + blocks_replaced=0, + chars_saved=0, + ) + return out, result + + report = run_artifact_validation( + _artifact_cases()[:1], + baseline_mode="off", + candidate_mode="bad", + salt=SALT, + min_block_chars=MIN, + date="2026-06-15", + optimize_fn=bad_dangling, + ) + assert report.passed is False + assert report.failed_cases == 1 + assert "artifact_reference_resolvable" in report.cases[0].failed_invariants + + +def test_artifact_runner_fails_on_protected_mutation(): + # A candidate that rewrites a protected (non-artifact) user_prompt must fail + # the protected-content and mutation-scope invariants. + def bad_protected(messages, *, mode, salt, min_block_chars): + out = [dict(m) for m in messages] + if mode == "bad": + for m in out: + if m["block_type"] == "user_prompt": + m["content"] = "[dropped]" + break + result = ArtifactDedupCanaryResult( + mode="canary", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=True, + item_count=2, + candidate_group_count=0, + candidate_chars=0, + blocks_replaced=1, + chars_saved=1, + ) + return out, result + result = ArtifactDedupCanaryResult( + mode="off", + artifact_dedup_class=ARTIFACT_DEDUP_CLASS, + mutated=False, + item_count=0, + candidate_group_count=0, + candidate_chars=0, + blocks_replaced=0, + chars_saved=0, + ) + return out, result + + report = run_artifact_validation( + _artifact_cases()[:1], + baseline_mode="off", + candidate_mode="bad", + salt=SALT, + min_block_chars=MIN, + date="2026-06-15", + optimize_fn=bad_protected, + ) + assert report.passed is False + failed = report.cases[0].failed_invariants + assert "protected_content_preserved" in failed + assert "artifact_mutation_scope_allowed" in failed + + +def test_reference_template_is_low_cardinality_placeholder_only(): + # The template carries only / placeholders -- never content. + assert "" in ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE + assert "" in ARTIFACT_DEDUP_CANARY_REFERENCE_TEMPLATE diff --git a/tests/test_hermes_plugin_patch.py b/tests/test_hermes_plugin_patch.py index 6909f47..ef7eacc 100644 --- a/tests/test_hermes_plugin_patch.py +++ b/tests/test_hermes_plugin_patch.py @@ -477,3 +477,72 @@ def test_prompt_dedup_canary_does_not_replace_cross_type_or_denylisted_runtime(m assert out[1]["content"] == ordinary_system assert stats["prompt_dedup_chars_saved"] == 0 assert stats["prompt_dedup_blocks_replaced"] == 0 + + + +def test_artifact_dedup_canary_default_off_does_not_mutate_runtime(monkeypatch, tmp_path): + module, _ = _load_plugin_module(monkeypatch) + monkeypatch.setattr(module, "_check_reorder", lambda: False) + monkeypatch.setattr(module, "_CONTEXTPILOT_AVAILABLE", False) + monkeypatch.setattr(module, "dedup_chat_completions", _zero_dedup) + monkeypatch.delenv("CONTEXTPILOT_ARTIFACT_DEDUP_MODE", raising=False) + telemetry = tmp_path / "telemetry.jsonl" + monkeypatch.setenv("CONTEXTPILOT_TELEMETRY_FILE", str(telemetry)) + + repeated = "pytest terminal output line showing repeated failure details\n" * 12 + messages = [ + {"role": "tool", "tool_call_id": "call_1", "content": repeated}, + {"role": "tool", "tool_call_id": "call_2", "content": repeated}, + ] + + engine = module.ContextPilotEngine() + out, stats = engine.optimize_api_messages(messages) + + assert out[0]["content"] == repeated + assert out[1]["content"] == repeated + assert stats["artifact_dedup_mode"] == "off" + assert stats["artifact_dedup_chars_saved"] == 0 + assert not telemetry.exists() + + +def test_artifact_dedup_canary_mutates_repeated_tool_artifacts_runtime(monkeypatch, tmp_path): + import json + + module, _ = _load_plugin_module(monkeypatch) + monkeypatch.setattr(module, "_check_reorder", lambda: False) + monkeypatch.setattr(module, "_CONTEXTPILOT_AVAILABLE", False) + monkeypatch.setattr(module, "dedup_chat_completions", _zero_dedup) + monkeypatch.setenv("CONTEXTPILOT_ARTIFACT_DEDUP_MODE", "canary") + telemetry = tmp_path / "telemetry.jsonl" + monkeypatch.setenv("CONTEXTPILOT_TELEMETRY_FILE", str(telemetry)) + + repeated = "pytest terminal output line showing repeated failure details\n" * 12 + user_same = repeated + messages = [ + {"role": "tool", "tool_call_id": "call_1", "content": repeated}, + {"role": "assistant", "content": "ordinary assistant response stays untouched"}, + {"role": "tool", "tool_call_id": "call_2", "content": repeated}, + {"role": "user", "content": user_same}, + ] + + engine = module.ContextPilotEngine() + out, stats = engine.optimize_api_messages(messages) + + assert out[0]["content"] == repeated # canonical full copy kept + assert out[1]["content"] == "ordinary assistant response stays untouched" + assert "ContextPilot artifact dedup: duplicate" in out[2]["content"] + assert repeated not in out[2]["content"] + assert out[3]["content"] == user_same # protected same text is untouched + assert stats["artifact_dedup_mode"] == "canary" + assert stats["artifact_dedup_blocks_replaced"] == 1 + assert stats["artifact_dedup_chars_saved"] > 0 + assert stats["chars_saved"] == stats["artifact_dedup_chars_saved"] + + record = json.loads(telemetry.read_text(encoding="utf-8").splitlines()[0]) + assert record["artifact_dedup_mode"] == "canary" + assert record["artifact_dedup_class"] == "same_payload_exact_artifact_body" + assert record["artifact_dedup_blocks_replaced"] == 1 + assert record["artifact_dedup_chars_saved"] == stats["artifact_dedup_chars_saved"] + raw = telemetry.read_text(encoding="utf-8") + assert repeated not in raw + assert "pytest terminal output" not in raw From 5abd5780d94737d0cd5f336248f4511f53b6c4ec Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Jun 2026 17:45:41 +0200 Subject: [PATCH 2/2] fix: load artifact canary without heavy package import --- __init__.py | 89 ++++++++++++++++++++++++++----- tests/test_hermes_plugin_patch.py | 51 ++++++++++++++++++ 2 files changed, 126 insertions(+), 14 deletions(-) diff --git a/__init__.py b/__init__.py index aea0440..4095489 100644 --- a/__init__.py +++ b/__init__.py @@ -53,6 +53,11 @@ def _load_submodule(name: str, file_path: Path): _hermes_sanitizer_patched = False _bootstrap_attempted = False +# Cache for the directly-loaded hermes_opportunities canary modules. ``None`` +# means "not yet attempted"; ``False`` means "attempted and unavailable"; a dict +# means "loaded". +_canary_modules: Any = None + def _import_contextpilot_submodules(): global dedup_chat_completions @@ -324,6 +329,68 @@ def _measure_actual_tokens( } +def _load_canary_modules(): + """Load the hermes_opportunities canary modules without importing the + ``contextpilot`` package ``__init__``. + + ``from contextpilot.hermes_opportunities.* import ...`` would first execute + ``contextpilot/__init__.py``, which pulls in the pipeline / live-index stack + (numpy/scipy). Those are unavailable in the Hermes/plugin runtime, so the + package import fails and both canaries silently fall back to "off". Instead + we load the four pure-Python modules + (``models``/``privacy``/``prompt_dedup_canary``/``artifact_dedup_canary``) + directly from their files under a lightweight private package + (``_contextpilot_canary``) so their relative imports (``from .models``, + ``from .privacy``) resolve without touching the heavy package ``__init__``. + + Returns a dict with ``models``/``prompt_dedup_canary``/``artifact_dedup_canary`` + module objects, or ``None`` when the files cannot be loaded. + """ + global _canary_modules + if _canary_modules is not None: + return _canary_modules or None + + try: + pkg_name = "_contextpilot_canary" + ho_dir = _REPO_ROOT / "contextpilot" / "hermes_opportunities" + + pkg = sys.modules.get(pkg_name) + if pkg is None: + pkg_spec = _ilu.spec_from_loader(pkg_name, loader=None, is_package=True) + pkg = _ilu.module_from_spec(pkg_spec) + pkg.__path__ = [str(ho_dir)] + sys.modules[pkg_name] = pkg + + def _load(sub: str): + full = f"{pkg_name}.{sub}" + cached = sys.modules.get(full) + if cached is not None: + return cached + spec = _ilu.spec_from_file_location(full, str(ho_dir / f"{sub}.py")) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load {full}") + mod = _ilu.module_from_spec(spec) + # Register before exec so the canary modules' relative imports + # (``from .models``/``from .privacy``) resolve to these entries. + sys.modules[full] = mod + spec.loader.exec_module(mod) + return mod + + # Dependencies first: the canary modules import from these. + _load("models") + _load("privacy") + _canary_modules = { + "models": sys.modules[f"{pkg_name}.models"], + "prompt_dedup_canary": _load("prompt_dedup_canary"), + "artifact_dedup_canary": _load("artifact_dedup_canary"), + } + return _canary_modules + except Exception as e: # noqa: BLE001 - canary must never break requests + _canary_modules = False + logger.debug("[ContextPilot] canary modules unavailable: %s", e) + return None + + def _classify_prompt_content_for_canary(text: str) -> str: """Conservatively classify runtime system text for prompt-dedup canary. @@ -355,14 +422,11 @@ def _apply_prompt_dedup_canary_to_api_messages( same_type_skill_prompt_only duplicate. User/assistant/tool and ordinary system content are never passed as writable skill_prompt items. """ - try: - from contextpilot.hermes_opportunities.models import _LLMContent - from contextpilot.hermes_opportunities.prompt_dedup_canary import ( - apply_prompt_dedup_canary, - ) - except Exception as e: # noqa: BLE001 - canary must never break requests - logger.debug("[ContextPilot] prompt dedup canary unavailable: %s", e) + mods = _load_canary_modules() + if mods is None: return None + _LLMContent = mods["models"]._LLMContent + apply_prompt_dedup_canary = mods["prompt_dedup_canary"].apply_prompt_dedup_canary llm_items = [] message_indexes = [] @@ -410,14 +474,11 @@ def _apply_artifact_dedup_canary_to_api_messages( CONTEXTPILOT_ARTIFACT_DEDUP_MODE=canary and the canary module replaces a later exact-duplicate artifact body with a strictly shorter reference. """ - try: - from contextpilot.hermes_opportunities.models import _LLMContent - from contextpilot.hermes_opportunities.artifact_dedup_canary import ( - apply_artifact_dedup_canary, - ) - except Exception as e: # noqa: BLE001 - canary must never break requests - logger.debug("[ContextPilot] artifact dedup canary unavailable: %s", e) + mods = _load_canary_modules() + if mods is None: return None + _LLMContent = mods["models"]._LLMContent + apply_artifact_dedup_canary = mods["artifact_dedup_canary"].apply_artifact_dedup_canary llm_items = [] message_indexes = [] diff --git a/tests/test_hermes_plugin_patch.py b/tests/test_hermes_plugin_patch.py index ef7eacc..b627afd 100644 --- a/tests/test_hermes_plugin_patch.py +++ b/tests/test_hermes_plugin_patch.py @@ -546,3 +546,54 @@ def test_artifact_dedup_canary_mutates_repeated_tool_artifacts_runtime(monkeypat raw = telemetry.read_text(encoding="utf-8") assert repeated not in raw assert "pytest terminal output" not in raw + + +def test_artifact_dedup_canary_runs_when_contextpilot_package_init_unimportable( + monkeypatch, tmp_path +): + """Regression: the canary must load via direct-file loading even when the + ``contextpilot`` package ``__init__`` cannot be imported (e.g. scipy missing + in the Hermes/plugin runtime). Previously the apply helpers imported + ``contextpilot.hermes_opportunities.*`` directly, which executed the heavy + package ``__init__`` and silently fell back to ``artifact_dedup_mode=off``. + """ + import builtins + + module, _ = _load_plugin_module(monkeypatch) + monkeypatch.setattr(module, "_check_reorder", lambda: False) + monkeypatch.setattr(module, "_CONTEXTPILOT_AVAILABLE", False) + monkeypatch.setattr(module, "dedup_chat_completions", _zero_dedup) + monkeypatch.setenv("CONTEXTPILOT_ARTIFACT_DEDUP_MODE", "canary") + telemetry = tmp_path / "telemetry.jsonl" + monkeypatch.setenv("CONTEXTPILOT_TELEMETRY_FILE", str(telemetry)) + + # Force a fresh load attempt and simulate the unimportable package. + monkeypatch.setattr(module, "_canary_modules", None) + for mod_name in list(sys.modules): + if mod_name == "contextpilot" or mod_name.startswith("contextpilot."): + monkeypatch.delitem(sys.modules, mod_name, raising=False) + + real_import = builtins.__import__ + + def _poisoned_import(name, *args, **kwargs): + if name == "contextpilot" or name.startswith("contextpilot."): + raise ImportError("simulated: contextpilot package __init__ (scipy) unavailable") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", _poisoned_import) + + repeated = "pytest terminal output line showing repeated failure details\n" * 12 + messages = [ + {"role": "tool", "tool_call_id": "call_1", "content": repeated}, + {"role": "tool", "tool_call_id": "call_2", "content": repeated}, + ] + + engine = module.ContextPilotEngine() + out, stats = engine.optimize_api_messages(messages) + + assert out[0]["content"] == repeated # canonical full copy kept + assert "ContextPilot artifact dedup: duplicate" in out[1]["content"] + assert repeated not in out[1]["content"] + assert stats["artifact_dedup_mode"] == "canary" + assert stats["artifact_dedup_blocks_replaced"] == 1 + assert stats["artifact_dedup_chars_saved"] > 0