diff --git a/contextpilot/hermes_opportunities/__init__.py b/contextpilot/hermes_opportunities/__init__.py index 3f66f00..9b2591c 100644 --- a/contextpilot/hermes_opportunities/__init__.py +++ b/contextpilot/hermes_opportunities/__init__.py @@ -36,6 +36,7 @@ classify_artifact_kind, ) from .cli import main +from .dedup_ab import simulate_prompt_dedup_ab from .db import ( load_heavy_sessions, load_llm_bound_content, @@ -56,6 +57,8 @@ DEFAULT_MIN_BLOCK_REPEAT, DEFAULT_TOP_N, EST_CHARS_PER_TOKEN, + PROMPT_DEDUP_AB_CLASSES, + PROMPT_DEDUP_AB_REFERENCE_TEMPLATE, PROMPT_DUPLICATE_BLOCK_TYPES, ArtifactKindStat, ArtifactSourceCount, @@ -66,6 +69,8 @@ OpportunityReport, ParentAggregationArtifacts, ParentAggregationGroup, + PromptDedupABClass, + PromptDedupABSimulation, PromptDuplicateBlock, PromptDuplicateShadow, PromptDuplicateTypeCount, @@ -95,6 +100,7 @@ classify_router_label, ) from .telemetry import parse_telemetry +from .tokenizer import TokenizerBackend, resolve_tokenizer __all__ = [ # tunables / enums @@ -119,6 +125,8 @@ "ToolSizeStat", "HeavySession", "TelemetryCoverage", + "PromptDedupABClass", + "PromptDedupABSimulation", "PromptDuplicateBlock", "PromptDuplicateTypeCount", "PromptDuplicateShadow", @@ -144,6 +152,9 @@ "summarize_tool_sizes", "analyze_llm_bound_blocks", "detect_prompt_duplicate_blocks", + "simulate_prompt_dedup_ab", + "TokenizerBackend", + "resolve_tokenizer", # routing (shadow) "classify_router_label", "analyze_worker_routing_shadow", diff --git a/contextpilot/hermes_opportunities/cli.py b/contextpilot/hermes_opportunities/cli.py index e2d1d8c..1a84f00 100644 --- a/contextpilot/hermes_opportunities/cli.py +++ b/contextpilot/hermes_opportunities/cli.py @@ -27,6 +27,7 @@ ) from .report import build_report, write_report from .telemetry import parse_telemetry +from .tokenizer import resolve_tokenizer def main(argv: list[str] | None = None) -> int: @@ -88,6 +89,24 @@ def main(argv: list[str] | None = None) -> int: "(enabled by default; advisory only, never rewrites/dedups prompts)" ), ) + parser.add_argument( + "--disable-prompt-dedup-ab", + action="store_true", + help=( + "skip the offline prompt-dedup A/B simulation section " + "(enabled by default; offline simulation only, never mutates prompts; " + "this is the evidence gate before any canary replace)" + ), + ) + parser.add_argument( + "--prompt-dedup-tokenizer", + default=None, + help=( + "opt-in exact tokenizer backend for the prompt-dedup A/B simulation, " + "e.g. 'tiktoken:cl100k_base' (off by default; without it the A/B " + "section reports tokenizer_status=unavailable and no actual-token fields)" + ), + ) args = parser.parse_args(argv) if not args.state_db.exists(): @@ -96,6 +115,9 @@ def main(argv: list[str] | None = None) -> int: # Harden for unattended cron use: never dump a traceback (which would echo # the DB path / SQL); emit only the exception class name and a non-zero code. try: + # Opt-in tokenizer; off by default -> A/B simulation reports actual tokens + # as unavailable rather than fabricating chars/4 figures. + dedup_ab_tokenizer = resolve_tokenizer(args.prompt_dedup_tokenizer) tool_messages = load_tool_messages( args.state_db, since_hours=args.since_hours, all_sessions=args.all_sessions ) @@ -134,6 +156,8 @@ def main(argv: list[str] | None = None) -> int: worker_routing_shadow=not args.disable_worker_routing_shadow, parent_aggregation_shadow=not args.disable_parent_aggregation, prompt_duplicate_shadow=not args.disable_prompt_duplicate_shadow, + prompt_dedup_ab=not args.disable_prompt_dedup_ab, + prompt_dedup_ab_tokenizer=dedup_ab_tokenizer, min_artifact_chars=args.min_artifact_chars, ) json_path, md_path = write_report(report, args.out_dir) diff --git a/contextpilot/hermes_opportunities/dedup_ab.py b/contextpilot/hermes_opportunities/dedup_ab.py new file mode 100644 index 0000000..768eca7 --- /dev/null +++ b/contextpilot/hermes_opportunities/dedup_ab.py @@ -0,0 +1,229 @@ +"""Prompt dedup A/B simulation harness (OFFLINE simulation + measurement only). + +This is the evidence gate to evaluate *before* any canary prompt replacement. +It scans ONLY ``system_prompt`` / ``skill_prompt`` LLM-bound blocks, fingerprints +exact duplicate blocks, and simulates -- in accounting only -- keeping the first +occurrence of each duplicate while replacing every later occurrence with a +deterministic reference placeholder. + +Hard guarantees: + +* It never mutates the DB, runtime state, or any emitted prompt; it produces no + side effects beyond the privacy-safe report dataclasses below. +* It emits salted hashes / counters / low-cardinality enums only -- never raw + prompt text and never the reference placeholder filled with real content. +* Char and token deltas are SIMULATED candidate figures, explicitly NOT realized + savings. ContextPilot performs no canonicalization or replacement at runtime. +* Exact token figures appear only when an explicitly configured tokenizer backend + is available (opt-in, off by default); otherwise the status is ``unavailable`` + and no actual-token fields are populated. +""" +from __future__ import annotations + +from typing import Iterable + +from .models import ( + PROMPT_DEDUP_AB_CLASSES, + PROMPT_DEDUP_AB_REFERENCE_TEMPLATE, + PROMPT_DUPLICATE_BLOCK_TYPES, + PromptDedupABClass, + PromptDedupABSimulation, + _LLMContent, +) +from .privacy import _salted_hash +from .tokenizer import TokenizerBackend + +# Per-class risk label + advisory note. The skill-only class is the lowest-risk +# first canary candidate; the other classes are reported but flagged higher risk. +_CLASS_META = { + "same_type_skill_prompt_only": ( + "low", + "first canary candidate: exact duplicate blocks within skill prompts only", + ), + "same_type_system_prompt_only": ( + "high", + "higher risk: exact duplicate blocks within system prompts only", + ), + "cross_type_system_skill": ( + "high", + "higher risk: exact duplicate blocks shared across system and skill prompts", + ), +} + + +def _classify_group(types: dict[str, int]) -> str | None: + """Map a duplicate group's prompt-type spread to a candidate class.""" + present = set(types) + if present == {"skill_prompt"}: + return "same_type_skill_prompt_only" + if present == {"system_prompt"}: + return "same_type_system_prompt_only" + if present == {"system_prompt", "skill_prompt"}: + return "cross_type_system_skill" + return None # only system/skill are scanned; anything else is ignored + + +def _canonical_type(types: dict[str, int]) -> str: + """Deterministically pick the canonical prompt type for the reference string. + + Dominant by occurrence count; ties broken by sorted type name so the choice + is stable across runs and inputs. + """ + return sorted(types.items(), key=lambda kv: (-kv[1], kv[0]))[0][0] + + +def _reference_string(canonical_type: str, block_hash: str) -> str: + return PROMPT_DEDUP_AB_REFERENCE_TEMPLATE.replace("", canonical_type).replace( + "", block_hash + ) + + +def simulate_prompt_dedup_ab( + contents: Iterable[_LLMContent], + *, + salt: str, + min_block_chars: int, + tokenizer: TokenizerBackend | None = None, + enabled: bool = True, +) -> PromptDedupABSimulation: + """Simulate prompt-dedup replacement over system/skill prompt blocks. + + Restricted to ``system_prompt`` / ``skill_prompt`` items. Every fingerprintable + line is counted (intra- and inter-prompt), and any fingerprint seen 2+ times + is a duplicate group. Each group is assigned to exactly one candidate class and + simulated independently: the first occurrence is kept full, every later + occurrence is replaced (in accounting only) by the deterministic reference + string ``[Prompt duplicate omitted in simulation; canonical=:]``. + + Returns a privacy-safe :class:`PromptDedupABSimulation` -- hashes, counters, + and enums only. No DB/runtime/payload is touched. + """ + scanned = list(PROMPT_DUPLICATE_BLOCK_TYPES) + tok_status = "available" if tokenizer is not None else "unavailable" + tok_backend = tokenizer.name if tokenizer is not None else None + + if not enabled: + return PromptDedupABSimulation( + enabled=False, + item_count=0, + scanned_block_types=scanned, + tokenizer_status="unavailable", + tokenizer_backend=None, + reference_string_template=PROMPT_DEDUP_AB_REFERENCE_TEMPLATE, + classes=[], + notes=["prompt-dedup A/B simulation disabled"], + ) + + # block_hash -> {char_length, text (in-memory only), types: {block_type: occ}} + agg: dict[str, dict] = {} + item_count = 0 + for item in contents: + bt = item.block_type + if bt not in PROMPT_DUPLICATE_BLOCK_TYPES: + continue + item_count += 1 + for line in item.content.splitlines(): + block = line.strip() + if len(block) < min_block_chars: + continue + h = _salted_hash(block, salt) + entry = agg.get(h) + if entry is None: + # ``text`` is held in-memory only for exact token counting; it is + # never written to the report (no dataclass field carries it). + agg[h] = {"char_length": len(block), "text": block, "types": {bt: 1}} + else: + entry["types"][bt] = entry["types"].get(bt, 0) + 1 + + # Per-class running totals. + acc: dict[str, dict] = { + cls: { + "groups": 0, + "repl_occ": 0, + "chars_before": 0, + "chars_after": 0, + "tok_before": 0, + "tok_after": 0, + } + for cls in PROMPT_DEDUP_AB_CLASSES + } + + for h, entry in agg.items(): + types = entry["types"] + occ = sum(types.values()) + if occ < 2: + continue # not a duplicate -> no replacement candidate + cls = _classify_group(types) + if cls is None: + continue + char_len = entry["char_length"] + ref = _reference_string(_canonical_type(types), h) + ref_len = len(ref) + + a = acc[cls] + a["groups"] += 1 + a["repl_occ"] += occ - 1 + a["chars_before"] += occ * char_len + # Keep first occurrence full; later occurrences become the reference str. + a["chars_after"] += char_len + (occ - 1) * ref_len + if tokenizer is not None: + tb = tokenizer.count(entry["text"]) + tr = tokenizer.count(ref) + a["tok_before"] += occ * tb + a["tok_after"] += tb + (occ - 1) * tr + + classes: list[PromptDedupABClass] = [] + for cls in PROMPT_DEDUP_AB_CLASSES: + a = acc[cls] + risk, note = _CLASS_META[cls] + if tokenizer is not None: + tok_before = a["tok_before"] + tok_after = a["tok_after"] + tok_delta = tok_before - tok_after + else: + tok_before = tok_after = tok_delta = None + classes.append( + PromptDedupABClass( + candidate_class=cls, + risk_label=risk, + candidate_group_count=a["groups"], + replacement_occurrence_count=a["repl_occ"], + chars_before=a["chars_before"], + chars_after_simulated=a["chars_after"], + chars_delta_simulated=a["chars_before"] - a["chars_after"], + tokenizer_status=tok_status, + actual_tokens_before=tok_before, + actual_tokens_after=tok_after, + actual_tokens_delta=tok_delta, + note=note, + ) + ) + + notes = [ + "OFFLINE SIMULATION + MEASUREMENT ONLY: no DB/runtime/prompt is mutated; " + "ContextPilot performs no replacement or canonicalization", + "char/token deltas are SIMULATED candidate figures, NOT realized savings", + "this A/B evidence is the gate to evaluate before any canary prompt replacement", + "same_type_skill_prompt_only is the lowest-risk first canary candidate; " + "system-only and cross-type classes are higher risk", + "chars_delta_simulated is signed: negative means a short duplicate would grow " + "if replaced by the reference placeholder", + ] + if tokenizer is None: + notes.append( + "actual-token measurement unavailable (no exact tokenizer backend configured); " + "no actual-token fields are reported" + ) + if item_count == 0: + notes.append("no system/skill prompt items observed in the selected window") + + return PromptDedupABSimulation( + enabled=True, + item_count=item_count, + scanned_block_types=scanned, + tokenizer_status=tok_status, + tokenizer_backend=tok_backend, + reference_string_template=PROMPT_DEDUP_AB_REFERENCE_TEMPLATE, + classes=classes, + notes=notes, + ) diff --git a/contextpilot/hermes_opportunities/models.py b/contextpilot/hermes_opportunities/models.py index 8c739e2..2a2ed48 100644 --- a/contextpilot/hermes_opportunities/models.py +++ b/contextpilot/hermes_opportunities/models.py @@ -182,6 +182,78 @@ class PromptDuplicateShadow: notes: list[str] = field(default_factory=list) +# --------------------------------------------------------------------------- +# Prompt Dedup A/B — OFFLINE SIMULATION structures (system/skill prompts only) +# --------------------------------------------------------------------------- + +# The reference placeholder a *simulated* replacement would leave in place of a +# later duplicate occurrence. Used for accounting only -- ContextPilot never +# emits this string into a real payload. ```` / ```` are filled with +# the canonical prompt type and the salted block fingerprint. +PROMPT_DEDUP_AB_REFERENCE_TEMPLATE = ( + "[Prompt duplicate omitted in simulation; canonical=:]" +) + +# Safe candidate classes, simulated separately. The skill-only class is the +# lowest-risk first canary candidate; the others are reported but higher risk. +PROMPT_DEDUP_AB_CLASSES = ( + "same_type_skill_prompt_only", + "same_type_system_prompt_only", + "cross_type_system_skill", +) + + +@dataclass +class PromptDedupABClass: + """Simulated A/B accounting for one candidate class. + + All figures are OFFLINE SIMULATION over static system/skill prompt text and + are NOT realized savings -- ContextPilot performs no replacement at runtime. + ``chars_delta_simulated`` is signed: positive means the simulated reference + replacement would shrink the payload, negative means it would grow it (a + short duplicate replaced by a longer placeholder). + + Actual-token fields are populated ONLY when an exact tokenizer backend is + configured; otherwise they are ``None`` and ``tokenizer_status`` is + ``"unavailable"`` -- never a fabricated chars/4 figure. + """ + + candidate_class: str + risk_label: str # "low" (canary candidate) | "high" + candidate_group_count: int # distinct exact-duplicate block groups + replacement_occurrence_count: int # occurrences beyond the first, summed + chars_before: int # chars of all candidate occurrences + chars_after_simulated: int # first kept full, later -> reference str + chars_delta_simulated: int # chars_before - chars_after_simulated + tokenizer_status: str # "available" | "unavailable" + actual_tokens_before: int | None # only when tokenizer available + actual_tokens_after: int | None # only when tokenizer available + actual_tokens_delta: int | None # only when tokenizer available + note: str + + +@dataclass +class PromptDedupABSimulation: + """Offline A/B simulation harness for prompt dedup (system/skill prompts). + + OFFLINE SIMULATION + MEASUREMENT ONLY. This is the evidence gate to evaluate + *before* any canary replacement: it scans only ``system_prompt`` / + ``skill_prompt`` LLM-bound blocks, keeps the first occurrence of every exact + duplicate and replaces only later occurrences in a *simulated* accounting. It + never mutates the DB, runtime, or emitted prompts, and its char/token deltas + are NOT realized savings. + """ + + enabled: bool + item_count: int # system/skill prompt items scanned + scanned_block_types: list[str] + tokenizer_status: str # "available" | "unavailable" + tokenizer_backend: str | None # backend name when available, else None + reference_string_template: str + classes: list[PromptDedupABClass] + notes: list[str] = field(default_factory=list) + + # --------------------------------------------------------------------------- # Worker Context Routing — SHADOW MODE structures (P0 data collection only) # --------------------------------------------------------------------------- @@ -334,6 +406,8 @@ class OpportunityReport: cross_type_wasted_tokens: int # Prompt duplicate shadow (system/skill prompts only; advisory, never realized). prompt_duplicates: PromptDuplicateShadow + # Prompt dedup A/B simulation (system/skill prompts only; offline, never realized). + prompt_dedup_ab: PromptDedupABSimulation # Worker Context Routing shadow mode (P0 data collection; never prunes). worker_routing: WorkerRoutingShadow # Parent Aggregation Artifacts shadow mode (P0 telemetry; never dedups). diff --git a/contextpilot/hermes_opportunities/report.py b/contextpilot/hermes_opportunities/report.py index 32b399d..5fd1b99 100644 --- a/contextpilot/hermes_opportunities/report.py +++ b/contextpilot/hermes_opportunities/report.py @@ -15,6 +15,7 @@ DEFAULT_MIN_ARTIFACT_CHARS, analyze_parent_aggregation_artifacts, ) +from .dedup_ab import simulate_prompt_dedup_ab from .detection import ( analyze_llm_bound_blocks, detect_exact_duplicate_tool_outputs, @@ -36,6 +37,19 @@ ) from .privacy import _assert_no_forbidden_keys, _salt_fingerprint from .routing import analyze_worker_routing_shadow +from .tokenizer import TokenizerBackend + + +def _dedup_ab_summary(ab) -> str: + """One-line rollup of the prompt-dedup A/B simulation for the summary block.""" + if not ab.enabled: + return "disabled" + groups = sum(c.candidate_group_count for c in ab.classes) + chars_delta = sum(c.chars_delta_simulated for c in ab.classes) + return ( + f"{groups} candidate groups, {chars_delta} simulated chars delta, " + f"tokenizer {ab.tokenizer_status}" + ) def build_report( @@ -55,6 +69,8 @@ def build_report( worker_routing_shadow: bool = True, parent_aggregation_shadow: bool = True, prompt_duplicate_shadow: bool = True, + prompt_dedup_ab: bool = True, + prompt_dedup_ab_tokenizer: TokenizerBackend | None = None, min_artifact_chars: int = DEFAULT_MIN_ARTIFACT_CHARS, ) -> OpportunityReport: dups = detect_exact_duplicate_tool_outputs(tool_messages, salt=salt, top_n=top_n) @@ -86,6 +102,14 @@ def build_report( enabled=prompt_duplicate_shadow, ) + prompt_dedup_ab_sim = simulate_prompt_dedup_ab( + llm_contents, + salt=salt, + min_block_chars=min_block_chars, + tokenizer=prompt_dedup_ab_tokenizer, + enabled=prompt_dedup_ab, + ) + worker_routing = analyze_worker_routing_shadow( llm_contents, salt=salt, @@ -117,6 +141,7 @@ def build_report( "worker-routing section is SHADOW MODE P0: it labels blocks for a future router but never drops/summarizes context", "parent-aggregation section is SHADOW MODE P0 telemetry: it groups exact artifact bodies but never dedups/replaces context", "prompt-duplicate section is ADVISORY ONLY (system/skill prompts): it counts exact duplicate prompt blocks but never rewrites/dedups prompts; its chars/tokens are NOT realized savings", + "prompt-dedup A/B section is OFFLINE SIMULATION ONLY (system/skill prompts): it simulates keeping the first occurrence and replacing later duplicate occurrences to measure candidate savings; it performs NO runtime replacement/canonicalization and its deltas are NOT realized savings; it is the evidence gate before any canary replace", ] if all_sessions: notes.append("all-sessions mode: time window ignored; scanned all non-archived sessions/active messages") @@ -147,6 +172,7 @@ def build_report( cross_type_block_groups=cross_groups, cross_type_wasted_tokens=cross_wasted, prompt_duplicates=prompt_duplicates, + prompt_dedup_ab=prompt_dedup_ab_sim, worker_routing=worker_routing, parent_aggregation=parent_aggregation, notes=notes, @@ -187,6 +213,8 @@ def write_report(report: OpportunityReport, out_dir: Path) -> tuple[Path, Path]: f"{report.prompt_duplicates.total_chars_duplicated} chars duplicated " f"(~{report.prompt_duplicates.advisory_est_duplicate_tokens_chars_div_4} " f"advisory chars/4 tokens) — NOT realized savings", + f"- Prompt dedup A/B (simulation): " + f"{_dedup_ab_summary(report.prompt_dedup_ab)} — NOT realized savings", f"- Telemetry: {t.events} events, {t.chars_saved} chars saved by processing; " f"derived chars/4 tokens={t.tokens_saved}, ratio={t.coverage_ratio_pct}%", f"- Worker routing (shadow): {report.worker_routing.classified_block_count} blocks " @@ -248,6 +276,47 @@ def write_report(report: OpportunityReport, out_dir: Path) -> tuple[Path, Path]: f"(~{b.advisory_est_duplicate_tokens_chars_div_4} advisory chars/4 tokens)" ) md.append("") + ab = report.prompt_dedup_ab + md.append("## Prompt dedup A/B simulation — system/skill (offline only)") + if not ab.enabled: + md.append("- disabled") + else: + md.append( + f"- Scanned prompt types: {', '.join(ab.scanned_block_types)} " + f"(items: {ab.item_count})" + ) + backend = ab.tokenizer_backend or "none" + md.append( + f"- Tokenizer: status={ab.tokenizer_status} backend={backend} " + f"(actual tokens shown only when an exact backend is configured)" + ) + md.append(f"- Simulated reference string: `{ab.reference_string_template}`") + md.append( + "- OFFLINE SIMULATION ONLY — no runtime replacement/canonicalization; " + "deltas below are candidate figures, NOT realized savings" + ) + md.append("") + md.append("### Candidate classes (simulated separately)") + for c in ab.classes: + line = ( + f"- {c.candidate_class} [risk={c.risk_label}]: " + f"groups={c.candidate_group_count} " + f"replacements={c.replacement_occurrence_count} " + f"chars_before={c.chars_before} " + f"chars_after_simulated={c.chars_after_simulated} " + f"chars_delta_simulated={c.chars_delta_simulated}" + ) + if c.tokenizer_status == "available": + line += ( + f" actual_tokens_before={c.actual_tokens_before} " + f"actual_tokens_after={c.actual_tokens_after} " + f"actual_tokens_delta={c.actual_tokens_delta}" + ) + else: + line += " actual_tokens=unavailable" + md.append(line) + md.append(f" - {c.note}") + md.append("") md.append("## Top exact-duplicate tool outputs") for d in report.exact_duplicate_groups: md.append( diff --git a/contextpilot/hermes_opportunities/tokenizer.py b/contextpilot/hermes_opportunities/tokenizer.py new file mode 100644 index 0000000..e3e226d --- /dev/null +++ b/contextpilot/hermes_opportunities/tokenizer.py @@ -0,0 +1,74 @@ +"""Optional, opt-in exact tokenizer helper for offline simulation only. + +Mirrors the philosophy of the actual-token prompt shadow (#53): exact token +counts are surfaced ONLY when an explicitly configured tokenizer backend is +available. By default this module resolves to ``None`` (status ``unavailable``), +and callers must never fabricate token figures in that case. + +This helper runs in-memory over block text purely to produce integer counts; it +never emits or persists any text. It is used by the prompt-dedup A/B *simulation* +harness, which measures candidate savings offline and never mutates runtime +payloads. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable + + +@dataclass +class TokenizerBackend: + """A resolved exact tokenizer. + + ``name`` is a low-cardinality backend identifier safe to emit (e.g. + ``"tiktoken:cl100k_base"``); ``count`` maps a string to its exact token + count. Counting happens in-memory only -- the text is never emitted. + """ + + name: str + count: Callable[[str], int] + + +def resolve_tokenizer(spec: object | None) -> TokenizerBackend | None: + """Resolve an explicitly-configured exact tokenizer backend, or ``None``. + + Off by default: ``spec=None`` (the default everywhere) returns ``None`` so + the A/B harness reports ``tokenizer_status=unavailable`` and emits no actual + token fields. ``spec`` may be: + + * ``None`` -> not configured; returns ``None``. + * a :class:`TokenizerBackend` -> used directly (test/dependency injection). + * a string ``"tiktoken:"`` -> best-effort load of a tiktoken + encoding. If tiktoken (or the encoding) is unavailable, returns ``None`` + rather than guessing; the caller then reports ``unavailable``. + + Any backend that cannot be resolved exactly yields ``None`` -- we never + substitute a chars/4 estimate behind an "actual tokens" label. + """ + if spec is None: + return None + if isinstance(spec, TokenizerBackend): + return spec + if isinstance(spec, str): + return _resolve_named(spec) + return None + + +def _resolve_named(spec: str) -> TokenizerBackend | None: + spec = spec.strip() + if not spec: + return None + if spec.startswith("tiktoken:"): + encoding = spec.split(":", 1)[1].strip() or "cl100k_base" + try: + import tiktoken # type: ignore + + enc = tiktoken.get_encoding(encoding) + except Exception: # noqa: BLE001 - missing dep/encoding -> unavailable, never fake + return None + return TokenizerBackend( + name=f"tiktoken:{encoding}", + count=lambda text: len(enc.encode(text)), + ) + # Unknown backend spec: stay unavailable rather than fabricate counts. + return None diff --git a/docs/guides/hermes-monitor.md b/docs/guides/hermes-monitor.md index a5bdf1d..8d62093 100644 --- a/docs/guides/hermes-monitor.md +++ b/docs/guides/hermes-monitor.md @@ -152,6 +152,32 @@ or replaces prompt text, and its counters are not realized savings. Use it to prioritize a future prompt-assembly A/B where before/after payloads are measured with an exact tokenizer/API usage comparison. +### Prompt dedup A/B simulation + +The analyzer also includes a **Prompt dedup A/B simulation — system/skill** +section. This is the evidence gate before any canary replacement. It still does +not mutate runtime payloads: it keeps prompt text in memory, groups exact +duplicate `system_prompt` / `skill_prompt` blocks, and simulates the accounting +for keeping the first occurrence while replacing only later occurrences with a +deterministic reference placeholder. + +The simulation reports candidate classes separately: + +- `same_type_skill_prompt_only` — lowest-risk first canary candidate, +- `same_type_system_prompt_only` — higher risk, +- `cross_type_system_skill` — higher risk because it crosses prompt hierarchy. + +For each class the report includes group counts, replacement occurrence counts, +`chars_before`, `chars_after_simulated`, and signed `chars_delta_simulated`. +When you pass an explicitly configured tokenizer backend, for example +`--prompt-dedup-tokenizer tiktoken:cl100k_base`, it also reports actual tokenizer +before/after/delta fields for the simulation. Without that opt-in backend, +`tokenizer_status=unavailable` and no fake actual-token numbers are emitted. + +Use `--disable-prompt-dedup-ab` to omit this section. Even when enabled, all +figures are **simulation-only**, **not realized savings**, and no prompt text is +rewritten, summarized, deduplicated, or emitted. + ### Worker Context Routing shadow mode The analyzer now includes a **Worker Context Routing — shadow mode** section by diff --git a/tests/test_hermes_context_opportunity_analyzer.py b/tests/test_hermes_context_opportunity_analyzer.py index 4fe3acb..81cae4e 100644 --- a/tests/test_hermes_context_opportunity_analyzer.py +++ b/tests/test_hermes_context_opportunity_analyzer.py @@ -1045,3 +1045,123 @@ def test_prompt_duplicate_shadow_can_be_disabled(tmp_path): md_text = md_path.read_text(encoding="utf-8") assert "Prompt duplicate blocks" in md_text assert "disabled" in md_text + + +# --------------------------------------------------------------------------- +# Prompt dedup A/B simulation (offline only; no replacement) +# --------------------------------------------------------------------------- + + +def test_prompt_dedup_ab_simulates_candidate_classes_without_tokenizer(): + skill_line = "Skill duplicate instruction line long enough for hashing." + sys_line = "System duplicate instruction line long enough for hashing." + cross_line = "Cross prompt duplicate instruction line long enough for hashing." + contents = [ + analyzer._LLMContent( + block_type="skill_prompt", + content=f"{skill_line}\n{skill_line}\n{cross_line}", + ), + analyzer._LLMContent( + block_type="system_prompt", + content=f"{sys_line}\n{sys_line}\n{cross_line}", + ), + analyzer._LLMContent( + block_type="tool_result", + content=f"{skill_line}\n{skill_line}", + ), + ] + sim = analyzer.simulate_prompt_dedup_ab( + contents, salt="s", min_block_chars=40, tokenizer=None + ) + assert sim.enabled + assert sim.item_count == 2 + assert sim.tokenizer_status == "unavailable" + classes = {c.candidate_class: c for c in sim.classes} + assert classes["same_type_skill_prompt_only"].candidate_group_count == 1 + assert classes["same_type_skill_prompt_only"].replacement_occurrence_count == 1 + assert classes["same_type_system_prompt_only"].candidate_group_count == 1 + assert classes["cross_type_system_skill"].candidate_group_count == 1 + for cls in classes.values(): + assert cls.actual_tokens_before is None + assert cls.actual_tokens_after is None + assert cls.actual_tokens_delta is None + # Tool duplicates with the same text are ignored by this prompt-only harness. + assert all("tool" not in c.candidate_class for c in sim.classes) + + +def test_prompt_dedup_ab_uses_injected_tokenizer_only_when_available(): + line = "Skill duplicate instruction line long enough for tokenizer counting." + fake = analyzer.TokenizerBackend( + name="fake:chars", + count=lambda text: len(text), + ) + sim = analyzer.simulate_prompt_dedup_ab( + [ + analyzer._LLMContent( + block_type="skill_prompt", content=f"{line}\n{line}\n{line}" + ) + ], + salt="s", + min_block_chars=40, + tokenizer=fake, + ) + assert sim.tokenizer_status == "available" + assert sim.tokenizer_backend == "fake:chars" + cls = {c.candidate_class: c for c in sim.classes}["same_type_skill_prompt_only"] + assert cls.actual_tokens_before == 3 * len(line) + assert cls.actual_tokens_after is not None + assert cls.actual_tokens_delta == cls.actual_tokens_before - cls.actual_tokens_after + + +def test_prompt_dedup_ab_report_no_leak_and_not_realized(tmp_path): + db = tmp_path / "state.db" + secret_line = "SECRET-PROMPT-AB-LINE-THAT-REPEATS-AND-IS-LONG-ENOUGH" + sys_prompt = f"{secret_line}\n{secret_line}" + _make_db( + db, + [("tool", "irrelevant tool output", "Bash")], + sessions=[("raw-session-id", "discord", None, 1, 1, 100, 10, 1, sys_prompt)], + ) + report = _analyze(db, tmp_path) + ab = report.prompt_dedup_ab + assert ab.enabled + cls = {c.candidate_class: c for c in ab.classes}["same_type_system_prompt_only"] + assert cls.candidate_group_count == 1 + assert cls.replacement_occurrence_count == 1 + # A/B simulation is not realized telemetry savings. + assert report.telemetry.chars_saved == 0 + + json_path, md_path = analyzer.write_report(report, tmp_path / "out") + blob = json_path.read_text(encoding="utf-8") + md_path.read_text(encoding="utf-8") + assert secret_line not in blob + assert "Prompt dedup A/B simulation" in blob + assert "OFFLINE SIMULATION ONLY" in blob + assert "NOT realized savings" in blob + + +def test_prompt_dedup_ab_can_be_disabled(tmp_path): + db = tmp_path / "state.db" + _make_db(db, [("tool", "out", "Bash")]) + tool_messages = analyzer.load_tool_messages(db, since_hours=WIDE_WINDOW) + llm = analyzer.load_llm_bound_content(db, since_hours=WIDE_WINDOW) + heavy = analyzer.load_heavy_sessions( + db, since_hours=WIDE_WINDOW, salt="s", top_n=20 + ) + tel = analyzer.parse_telemetry( + tmp_path / "none.jsonl", since_hours=WIDE_WINDOW, total_input_tokens=0 + ) + report = analyzer.build_report( + date="2100-01-01", + since_hours=24, + salt="s", + tool_messages=tool_messages, + heavy_sessions=heavy, + telemetry=tel, + llm_contents=llm, + prompt_dedup_ab=False, + ) + assert report.prompt_dedup_ab.enabled is False + _, md_path = analyzer.write_report(report, tmp_path / "out") + md_text = md_path.read_text(encoding="utf-8") + assert "Prompt dedup A/B simulation" in md_text + assert "disabled" in md_text