From ba8b314b1ba2d7a2c745fa6b26433e168b1b0dc6 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 13 Jun 2026 03:40:42 +0200 Subject: [PATCH] feat: add actual-token telemetry scaffold and prompt duplicate shadow --- __init__.py | 182 +++++++++++++++--- contextpilot/hermes_opportunities/__init__.py | 10 + contextpilot/hermes_opportunities/cli.py | 9 + .../hermes_opportunities/detection.py | 121 ++++++++++++ contextpilot/hermes_opportunities/models.py | 58 ++++++ contextpilot/hermes_opportunities/report.py | 52 +++++ docs/guides/hermes-monitor.md | 43 ++++- docs/guides/hermes.md | 30 +-- scripts/contextpilot_savings.py | 77 +++++++- tests/test_contextpilot_savings.py | 70 ++++++- ...est_hermes_context_opportunity_analyzer.py | 106 ++++++++++ tests/test_hermes_plugin_patch.py | 80 ++++++++ 12 files changed, 784 insertions(+), 54 deletions(-) diff --git a/__init__.py b/__init__.py index eac1f08..02bec5c 100644 --- a/__init__.py +++ b/__init__.py @@ -217,6 +217,113 @@ def _write_telemetry(record: Dict[str, Any]) -> None: logger.debug("[ContextPilot] telemetry write skipped: %s", e) +def _iter_message_text(messages: List[Dict[str, Any]]): + """Yield text fragments from an LLM-bound payload for in-memory measurement. + + Used only to *size* the payload (chars / exact tokens). Fragments are never + stored or emitted -- callers consume them immediately to produce integer + counts, then discard them. + """ + for msg in messages: + if not isinstance(msg, dict): + continue + content = msg.get("content") + if isinstance(content, str): + yield content + elif isinstance(content, list): + for block in content: + if isinstance(block, str): + yield block + elif isinstance(block, dict): + text = block.get("text") + if isinstance(text, str): + yield text + inner = block.get("content") + if isinstance(inner, str): + yield inner + + +def _payload_chars(messages: List[Dict[str, Any]]) -> int: + """Total character count of an LLM-bound payload (metadata-only measure).""" + return sum(len(frag) for frag in _iter_message_text(messages)) + + +# Sentinel so the (possibly None) tokenizer is resolved at most once per process. +_exact_tokenizer_cache: Any = "unset" + + +def _get_exact_tokenizer(): + """Return a callable ``(text) -> int`` for EXACT token counting, or None. + + Optional and best-effort: an exact tokenizer is used only when a backend is + installed and not disabled. This never raises and never installs anything; + when no backend is available the caller records an ``unavailable`` status + rather than emitting a fake (chars/4) token count. + + Backend selection via ``CONTEXTPILOT_EXACT_TOKENIZER`` = ``off`` (default) + | ``tiktoken``. It is opt-in so merely having a tokenizer library installed + never creates a misleading provider/tokenizer mismatch. The separate + disable environment flag also returns ``None`` immediately. + """ + + global _exact_tokenizer_cache + if _exact_tokenizer_cache != "unset": + return _exact_tokenizer_cache + _exact_tokenizer_cache = None + if os.environ.get("CONTEXTPILOT_DISABLE_EXACT_TOKENIZER") == "1": + return None + backend = os.environ.get("CONTEXTPILOT_EXACT_TOKENIZER", "off").lower() + if backend in ("off", "none", "disabled", "auto"): + return None + if backend == "tiktoken": + try: + import tiktoken # optional dependency; never a hard requirement + + encoding_name = os.environ.get( + "CONTEXTPILOT_TIKTOKEN_ENCODING", "cl100k_base" + ) + enc = tiktoken.get_encoding(encoding_name) + + def _count(text: str, _enc=enc) -> int: + return len(_enc.encode(text, disallowed_special=())) + + _count._backend = f"tiktoken:{encoding_name}" # type: ignore[attr-defined] + _exact_tokenizer_cache = _count + except Exception as e: # noqa: BLE001 - tokenizer is strictly optional + logger.debug("[ContextPilot] exact tokenizer unavailable: %s", e) + _exact_tokenizer_cache = None + return _exact_tokenizer_cache + + +def _measure_actual_tokens( + original_messages: List[Dict[str, Any]], + optimized_messages: List[Dict[str, Any]], +) -> Dict[str, Any]: + """Metadata-only EXACT before/after token measurement of the payload. + + Returns a dict carrying ``actual_token_status`` of ``available`` or + ``unavailable``. When unavailable (no exact tokenizer backend), it emits NO + token numbers -- callers must not substitute a chars/4 estimate for these + fields. Raw text is counted in-memory only and never stored. + """ + counter = _get_exact_tokenizer() + if counter is None: + return {"actual_token_status": "unavailable"} + try: + before = sum(counter(frag) for frag in _iter_message_text(original_messages)) + after = sum(counter(frag) for frag in _iter_message_text(optimized_messages)) + except Exception as e: # noqa: BLE001 - a measurement must never break optimization + logger.debug("[ContextPilot] exact token measurement failed: %s", e) + return {"actual_token_status": "unavailable"} + return { + "actual_token_status": "available", + "actual_tokenizer_backend": getattr(counter, "_backend", "unknown"), + "actual_tokens_before": before, + "actual_tokens_after": after, + "actual_tokens_saved": before - after, + } + + def _reorder_docs(docs: List[str], alpha: float = 0.001) -> List[str]: global _intercept_index if len(docs) < 2: @@ -645,44 +752,72 @@ def _tool_chars(msgs): turn_chars_saved = doc_chars_saved + dedup_result.chars_saved self._total_chars_saved += turn_chars_saved + # Actual before/after of the full LLM-bound payload (chars). These are + # measured directly from the original input vs the optimized output, so + # they reflect the realized processed-payload delta -- not a duplicate + # opportunity count. Cheap (string length only); always computed. + payload_chars_before = _payload_chars(original_messages) + payload_chars_after = _payload_chars(api_messages) + payload_chars_saved = payload_chars_before - payload_chars_after + # Step 6: Cache for next turn self._cached_messages = copy.deepcopy(api_messages) self._cached_original_messages = original_messages if turn_chars_saved > 0: logger.info( - "[ContextPilot] Turn %d: saved %d chars (~%d tokens) | cumulative: %d chars (~%d tokens)", + "[ContextPilot] Turn %d: saved %d chars by processing | cumulative: %d chars", self._optimize_count, turn_chars_saved, - turn_chars_saved // 4, self._total_chars_saved, - self._total_chars_saved // 4, ) # Metadata-only telemetry so the monitor does not depend solely on # gateway log lines. No content, prompts, or tool payloads here. - _write_telemetry( - { - "ts": time.time(), - "type": "turn", - "session_hash": ( - _hash_text(str(self._session_id)) - if self._session_id is not None else None - ), - "turn": self._optimize_count, - "chars_saved": turn_chars_saved, - "tokens_saved": turn_chars_saved // 4, - "doc_chars_saved": doc_chars_saved, - "block_chars_saved": dedup_result.chars_saved, - "blocks_deduped": dedup_result.blocks_deduped, - "blocks_total": dedup_result.blocks_total, - "docs_deduped": self._total_docs_deduped, - "system_blocks_matched": dedup_result.system_blocks_matched, - "cumulative_chars_saved": self._total_chars_saved, - } + # + # Token fields are deliberately separated by provenance: + # * ``tokens_saved`` is the LEGACY DERIVED estimate (chars/4); the + # ``tokens_saved_method`` tag makes that explicit so it is never + # mistaken for a tokenizer/API measurement. + # * ``actual_tokens_*`` come from an EXACT tokenizer and are present + # only when ``actual_token_status == "available"``. When no exact + # tokenizer backend is configured the status is ``unavailable`` + # and no token numbers are emitted (no fake counts). + telemetry_record = { + "ts": time.time(), + "type": "turn", + "session_hash": ( + _hash_text(str(self._session_id)) + if self._session_id is not None else None + ), + "turn": self._optimize_count, + # Actual processed-payload char delta (doc + block dedup). + "chars_saved": turn_chars_saved, + # Actual before/after of the full LLM-bound payload (chars). + "payload_chars_before": payload_chars_before, + "payload_chars_after": payload_chars_after, + "payload_chars_saved": payload_chars_saved, + # Legacy DERIVED token estimate (chars/4) -- NOT exact tokens. + "tokens_saved": turn_chars_saved // 4, + "tokens_saved_method": "estimated_chars_div_4", + "doc_chars_saved": doc_chars_saved, + "block_chars_saved": dedup_result.chars_saved, + "blocks_deduped": dedup_result.blocks_deduped, + "blocks_total": dedup_result.blocks_total, + "docs_deduped": self._total_docs_deduped, + "system_blocks_matched": dedup_result.system_blocks_matched, + "cumulative_chars_saved": self._total_chars_saved, + } + # Optional EXACT token measurement (only computed on a saving turn). + telemetry_record.update( + _measure_actual_tokens(original_messages, api_messages) ) + _write_telemetry(telemetry_record) return api_messages, { "chars_saved": turn_chars_saved, + "payload_chars_before": payload_chars_before, + "payload_chars_after": payload_chars_after, + "payload_chars_saved": payload_chars_saved, "doc_chars_saved": doc_chars_saved, "block_chars_saved": dedup_result.chars_saved, "blocks_deduped": dedup_result.blocks_deduped, @@ -720,11 +855,10 @@ def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> Non self._compressor.on_session_end(session_id, messages) if self._total_chars_saved > 0: logger.info( - "[ContextPilot] Session %s: %d turns, %d chars saved (~%d tokens)", + "[ContextPilot] Session %s: %d turns, %d chars saved by processing", session_id, self._optimize_count, self._total_chars_saved, - self._total_chars_saved // 4, ) def on_session_reset(self) -> None: diff --git a/contextpilot/hermes_opportunities/__init__.py b/contextpilot/hermes_opportunities/__init__.py index 4333e98..3f66f00 100644 --- a/contextpilot/hermes_opportunities/__init__.py +++ b/contextpilot/hermes_opportunities/__init__.py @@ -45,6 +45,7 @@ from .detection import ( analyze_llm_bound_blocks, detect_exact_duplicate_tool_outputs, + detect_prompt_duplicate_blocks, detect_repeated_blocks, summarize_tool_sizes, ) @@ -55,6 +56,7 @@ DEFAULT_MIN_BLOCK_REPEAT, DEFAULT_TOP_N, EST_CHARS_PER_TOKEN, + PROMPT_DUPLICATE_BLOCK_TYPES, ArtifactKindStat, ArtifactSourceCount, BlockTypeStat, @@ -64,6 +66,9 @@ OpportunityReport, ParentAggregationArtifacts, ParentAggregationGroup, + PromptDuplicateBlock, + PromptDuplicateShadow, + PromptDuplicateTypeCount, RepeatedBlock, RouterCandidateBlock, RouterLabelCount, @@ -100,6 +105,7 @@ "DEFAULT_MIN_ARTIFACT_CHARS", "EST_CHARS_PER_TOKEN", "BLOCK_TYPES", + "PROMPT_DUPLICATE_BLOCK_TYPES", "ROUTER_LABELS", "ARTIFACT_KINDS", "PARENT_AGGREGATION_SOURCE_TYPES", @@ -113,6 +119,9 @@ "ToolSizeStat", "HeavySession", "TelemetryCoverage", + "PromptDuplicateBlock", + "PromptDuplicateTypeCount", + "PromptDuplicateShadow", "RouterLabelCount", "RouterReasonCount", "RouterCandidateBlock", @@ -134,6 +143,7 @@ "detect_repeated_blocks", "summarize_tool_sizes", "analyze_llm_bound_blocks", + "detect_prompt_duplicate_blocks", # routing (shadow) "classify_router_label", "analyze_worker_routing_shadow", diff --git a/contextpilot/hermes_opportunities/cli.py b/contextpilot/hermes_opportunities/cli.py index 0cab47e..e2d1d8c 100644 --- a/contextpilot/hermes_opportunities/cli.py +++ b/contextpilot/hermes_opportunities/cli.py @@ -80,6 +80,14 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument( "--min-artifact-chars", type=int, default=DEFAULT_MIN_ARTIFACT_CHARS ) + parser.add_argument( + "--disable-prompt-duplicate-shadow", + action="store_true", + help=( + "skip the advisory system/skill prompt duplicate-block scan " + "(enabled by default; advisory only, never rewrites/dedups prompts)" + ), + ) args = parser.parse_args(argv) if not args.state_db.exists(): @@ -125,6 +133,7 @@ def main(argv: list[str] | None = None) -> int: top_n=args.top_n, worker_routing_shadow=not args.disable_worker_routing_shadow, parent_aggregation_shadow=not args.disable_parent_aggregation, + prompt_duplicate_shadow=not args.disable_prompt_duplicate_shadow, min_artifact_chars=args.min_artifact_chars, ) json_path, md_path = write_report(report, args.out_dir) diff --git a/contextpilot/hermes_opportunities/detection.py b/contextpilot/hermes_opportunities/detection.py index 5201c3f..0ca2433 100644 --- a/contextpilot/hermes_opportunities/detection.py +++ b/contextpilot/hermes_opportunities/detection.py @@ -10,9 +10,13 @@ from typing import Iterable from .models import ( + PROMPT_DUPLICATE_BLOCK_TYPES, BlockTypeStat, CrossTypeBlockGroup, DuplicateToolOutput, + PromptDuplicateBlock, + PromptDuplicateShadow, + PromptDuplicateTypeCount, RepeatedBlock, ToolSizeStat, TypeCount, @@ -147,6 +151,123 @@ def summarize_tool_sizes( return stats[:top_n] +def detect_prompt_duplicate_blocks( + contents: Iterable[_LLMContent], + *, + salt: str, + min_block_chars: int, + top_n: int, + enabled: bool = True, +) -> PromptDuplicateShadow: + """Advisory scan for EXACT duplicate blocks in system/skill prompt text. + + Restricted to ``system_prompt`` / ``skill_prompt`` items. Counts every block + instance (intra- and inter-prompt) so a block literally present multiple + times in the static prompt payload is detected. A "duplicate" is any block + fingerprint observed 2+ times. + + SHADOW/ADVISORY ONLY: output is salted hashes + counters + block-type enums; + char figures are ACTUAL duplicated chars and the token figure is an ADVISORY + chars/4 estimate. This never rewrites or dedups prompts and must never be + reported as a realized saving. + """ + scanned = list(PROMPT_DUPLICATE_BLOCK_TYPES) + if not enabled: + return PromptDuplicateShadow( + enabled=False, + item_count=0, + scanned_block_types=scanned, + duplicate_group_count=0, + total_duplicate_occurrences=0, + total_chars_duplicated=0, + advisory_est_duplicate_tokens_chars_div_4=0, + by_block_type=[], + top_duplicate_blocks=[], + notes=["prompt-duplicate shadow disabled"], + ) + + # block_hash -> {char_length, types: {block_type: occ}} + agg: dict[str, dict] = {} + item_counts: dict[str, int] = {} + for item in contents: + bt = item.block_type + if bt not in PROMPT_DUPLICATE_BLOCK_TYPES: + continue + item_counts[bt] = item_counts.get(bt, 0) + 1 + # Count every fingerprintable line (no intra-item dedup) so repeated + # blocks within a single prompt are surfaced too. + for line in item.content.splitlines(): + block = line.strip() + if len(block) < min_block_chars: + continue + h = _salted_hash(block, salt) + entry = agg.get(h) + if entry is None: + agg[h] = {"char_length": len(block), "types": {bt: 1}} + else: + entry["types"][bt] = entry["types"].get(bt, 0) + 1 + + dup_blocks: list[PromptDuplicateBlock] = [] + per_type: dict[str, dict] = {} + total_chars_dup = 0 + total_dup_occ = 0 + for h, entry in agg.items(): + types = entry["types"] + occ = sum(types.values()) + if occ < 2: + continue # not a duplicate + char_len = entry["char_length"] + chars_dup = (occ - 1) * char_len + total_chars_dup += chars_dup + total_dup_occ += occ + dup_blocks.append( + PromptDuplicateBlock( + block_hash=h, + block_types=sorted(types.keys()), + occurrences=occ, + char_length=char_len, + chars_duplicated=chars_dup, + advisory_est_duplicate_tokens_chars_div_4=_est_tokens(chars_dup), + ) + ) + for bt, type_occ in types.items(): + t = per_type.setdefault( + bt, {"blocks": 0, "occ": 0, "chars_dup": 0} + ) + t["blocks"] += 1 + t["occ"] += type_occ + # Attribute duplicated chars within this type (occ-1 of the in-type + # instances are duplicates); cross-type-only blocks contribute 0 here. + t["chars_dup"] += max(type_occ - 1, 0) * char_len + + by_block_type = [ + PromptDuplicateTypeCount( + block_type=bt, + duplicate_block_count=per_type.get(bt, {}).get("blocks", 0), + occurrence_count=per_type.get(bt, {}).get("occ", 0), + chars_duplicated=per_type.get(bt, {}).get("chars_dup", 0), + ) + for bt in scanned + ] + + dup_blocks.sort(key=lambda b: b.chars_duplicated, reverse=True) + notes: list[str] = [] + if not item_counts: + notes.append("no system/skill prompt items observed in the selected window") + return PromptDuplicateShadow( + enabled=True, + item_count=sum(item_counts.values()), + scanned_block_types=scanned, + duplicate_group_count=len(dup_blocks), + total_duplicate_occurrences=total_dup_occ, + total_chars_duplicated=total_chars_dup, + advisory_est_duplicate_tokens_chars_div_4=_est_tokens(total_chars_dup), + by_block_type=by_block_type, + top_duplicate_blocks=dup_blocks[:top_n], + notes=notes, + ) + + def _iter_blocks(content: str, min_block_chars: int) -> Iterable[str]: """Yield the distinct fingerprintable lines of one item (deduped in-item).""" seen: set[str] = set() diff --git a/contextpilot/hermes_opportunities/models.py b/contextpilot/hermes_opportunities/models.py index 67d4fd1..8c739e2 100644 --- a/contextpilot/hermes_opportunities/models.py +++ b/contextpilot/hermes_opportunities/models.py @@ -126,6 +126,62 @@ class TelemetryCoverage: malformed_records_skipped: int +# --------------------------------------------------------------------------- +# Prompt duplicate — SHADOW MODE structures (system/skill prompts only) +# --------------------------------------------------------------------------- + +# Block types this advisory section is allowed to scan. Static prompt text only; +# never user/assistant/tool message bodies. +PROMPT_DUPLICATE_BLOCK_TYPES = ("system_prompt", "skill_prompt") + + +@dataclass +class PromptDuplicateBlock: + """One exact block fingerprint seen 2+ times in system/skill prompt text. + + Salted hash + counters only -- never the block text. Char figures are ACTUAL + duplicated characters; the token figure is an ADVISORY chars/4 estimate. + """ + + block_hash: str + block_types: list[str] # which prompt types this block spans + occurrences: int + char_length: int + chars_duplicated: int # ACTUAL: (occurrences - 1) * char_length + advisory_est_duplicate_tokens_chars_div_4: int # ADVISORY only, NOT actual tokens + + +@dataclass +class PromptDuplicateTypeCount: + """Per-prompt-type occurrence rollup for duplicate blocks.""" + + block_type: str # system_prompt | skill_prompt + duplicate_block_count: int # distinct duplicate fingerprints touching this type + occurrence_count: int # total occurrences within this type + chars_duplicated: int # ACTUAL duplicated chars attributable within this type + + +@dataclass +class PromptDuplicateShadow: + """Advisory report of exact duplicate blocks in system/skill prompts. + + SHADOW/ADVISORY ONLY: this measures static prompt duplication; it never + rewrites, dedups, or otherwise mutates prompts, and its char/token figures + must never be reported as realized savings. + """ + + enabled: bool + item_count: int # system/skill prompt items scanned + scanned_block_types: list[str] + duplicate_group_count: int + total_duplicate_occurrences: int + total_chars_duplicated: int # ACTUAL duplicated chars (advisory, not realized) + advisory_est_duplicate_tokens_chars_div_4: int # ADVISORY chars/4, NOT actual tokens + by_block_type: list[PromptDuplicateTypeCount] + top_duplicate_blocks: list[PromptDuplicateBlock] + notes: list[str] = field(default_factory=list) + + # --------------------------------------------------------------------------- # Worker Context Routing — SHADOW MODE structures (P0 data collection only) # --------------------------------------------------------------------------- @@ -276,6 +332,8 @@ class OpportunityReport: llm_block_types: list[BlockTypeStat] cross_type_block_groups: list[CrossTypeBlockGroup] cross_type_wasted_tokens: int + # Prompt duplicate shadow (system/skill prompts only; advisory, never realized). + prompt_duplicates: PromptDuplicateShadow # Worker Context Routing shadow mode (P0 data collection; never prunes). worker_routing: WorkerRoutingShadow # Parent Aggregation Artifacts shadow mode (P0 telemetry; never dedups). diff --git a/contextpilot/hermes_opportunities/report.py b/contextpilot/hermes_opportunities/report.py index 381d7cc..32b399d 100644 --- a/contextpilot/hermes_opportunities/report.py +++ b/contextpilot/hermes_opportunities/report.py @@ -18,6 +18,7 @@ from .detection import ( analyze_llm_bound_blocks, detect_exact_duplicate_tool_outputs, + detect_prompt_duplicate_blocks, detect_repeated_blocks, summarize_tool_sizes, ) @@ -53,6 +54,7 @@ def build_report( top_n: int = DEFAULT_TOP_N, worker_routing_shadow: bool = True, parent_aggregation_shadow: bool = True, + prompt_duplicate_shadow: bool = True, min_artifact_chars: int = DEFAULT_MIN_ARTIFACT_CHARS, ) -> OpportunityReport: dups = detect_exact_duplicate_tool_outputs(tool_messages, salt=salt, top_n=top_n) @@ -76,6 +78,14 @@ def build_report( top_n=top_n, ) + prompt_duplicates = detect_prompt_duplicate_blocks( + llm_contents, + salt=salt, + min_block_chars=min_block_chars, + top_n=top_n, + enabled=prompt_duplicate_shadow, + ) + worker_routing = analyze_worker_routing_shadow( llm_contents, salt=salt, @@ -106,6 +116,7 @@ def build_report( "llm-bound scan covers only content sent to the LLM: system/skill prompts, active user/assistant/tool messages", "worker-routing section is SHADOW MODE P0: it labels blocks for a future router but never drops/summarizes context", "parent-aggregation section is SHADOW MODE P0 telemetry: it groups exact artifact bodies but never dedups/replaces context", + "prompt-duplicate section is ADVISORY ONLY (system/skill prompts): it counts exact duplicate prompt blocks but never rewrites/dedups prompts; its chars/tokens are NOT realized savings", ] if all_sessions: notes.append("all-sessions mode: time window ignored; scanned all non-archived sessions/active messages") @@ -135,6 +146,7 @@ def build_report( llm_block_types=block_type_stats, cross_type_block_groups=cross_groups, cross_type_wasted_tokens=cross_wasted, + prompt_duplicates=prompt_duplicates, worker_routing=worker_routing, parent_aggregation=parent_aggregation, notes=notes, @@ -170,6 +182,11 @@ def write_report(report: OpportunityReport, out_dir: Path) -> tuple[Path, Path]: f"- LLM-bound items scanned: {report.llm_bound_item_count}", f"- Cross-type repeated blocks: {len(report.cross_type_block_groups)} " f"(~{report.cross_type_wasted_tokens} wasted tokens)", + f"- Prompt duplicates (system/skill, advisory): " + f"{report.prompt_duplicates.duplicate_group_count} groups, " + f"{report.prompt_duplicates.total_chars_duplicated} chars duplicated " + f"(~{report.prompt_duplicates.advisory_est_duplicate_tokens_chars_div_4} " + f"advisory chars/4 tokens) — NOT realized savings", f"- Telemetry: {t.events} events, {t.chars_saved} chars saved by processing; " f"derived chars/4 tokens={t.tokens_saved}, ratio={t.coverage_ratio_pct}%", f"- Worker routing (shadow): {report.worker_routing.classified_block_count} blocks " @@ -196,6 +213,41 @@ def write_report(report: OpportunityReport, out_dir: Path) -> tuple[Path, Path]: f"chars={g.char_length} ~wasted={g.est_wasted_tokens} tokens" ) md.append("") + pd = report.prompt_duplicates + md.append("## Prompt duplicate blocks — system/skill (advisory only)") + if not pd.enabled: + md.append("- disabled") + else: + md.append( + f"- Scanned prompt types: {', '.join(pd.scanned_block_types)} " + f"(items: {pd.item_count})" + ) + md.append( + f"- Duplicate groups: {pd.duplicate_group_count} " + f"(occurrences: {pd.total_duplicate_occurrences})" + ) + md.append( + f"- Chars duplicated (actual): {pd.total_chars_duplicated} " + f"(~{pd.advisory_est_duplicate_tokens_chars_div_4} advisory chars/4 tokens, " + f"NOT actual tokens, NOT a realized saving)" + ) + md.append("") + md.append("### Occurrences by prompt type") + for tc in pd.by_block_type: + md.append( + f"- {tc.block_type}: dup_blocks={tc.duplicate_block_count} " + f"occ={tc.occurrence_count} chars_duplicated={tc.chars_duplicated}" + ) + md.append("") + md.append("### Top duplicate prompt blocks (hashed)") + for b in pd.top_duplicate_blocks: + md.append( + f"- `{b.block_hash}` types=[{', '.join(b.block_types)}] " + f"x{b.occurrences} chars={b.char_length} " + f"chars_duplicated={b.chars_duplicated} " + f"(~{b.advisory_est_duplicate_tokens_chars_div_4} advisory chars/4 tokens)" + ) + md.append("") md.append("## Top exact-duplicate tool outputs") for d in report.exact_duplicate_groups: md.append( diff --git a/docs/guides/hermes-monitor.md b/docs/guides/hermes-monitor.md index 1536bf7..a5bdf1d 100644 --- a/docs/guides/hermes-monitor.md +++ b/docs/guides/hermes-monitor.md @@ -55,10 +55,14 @@ Then read the generated Markdown report for today and send a short Chinese summa ## Quick savings summary (lightweight) -If you just want to answer "how many tokens did ContextPilot save?", use the -lightweight `scripts/contextpilot_savings.py` command instead of this monitor or -the analyzer below. It reads **only** the metadata-only telemetry file, imports -no Hermes internals, and prints a one-screen summary: +If you just want a lightweight realized-savings summary, use the +`scripts/contextpilot_savings.py` command instead of this monitor or the analyzer +below. It reads **only** the metadata-only telemetry file, imports no Hermes +internals, and prints a one-screen summary. Character savings are measured from +ContextPilot's actual before/after processed payload; exact tokenizer tokens are +shown only when telemetry recorded an explicitly configured exact tokenizer +backend. The legacy chars/4 counter is labelled as derived; tokenizer measurement +is off by default to avoid provider/tokenizer mismatches. ```bash python scripts/contextpilot_savings.py # last 24h @@ -68,10 +72,10 @@ python scripts/contextpilot_savings.py --format json python ~/.hermes/plugins/ContextPilot/scripts/contextpilot_savings.py ``` -It reports events, chars saved, telemetry tokens saved, the window, and average -tokens per event. This is the right tool for ordinary users; the monitor in this -guide (which also reads `state.db` metadata) and the content-aware analyzer below -are for deeper investigation. +It reports events, processed-payload chars saved, exact tokenizer tokens when +available, and the legacy derived chars/4 counter. This is the right tool for +ordinary users; the monitor in this guide (which also reads `state.db` metadata) +and the content-aware analyzer below are for deeper investigation. ### Ask Hermes for savings @@ -100,7 +104,9 @@ It surfaces concrete token-reduction opportunities: - repeated line/block fingerprints (shared boilerplate across outputs), - large tool outputs grouped by `tool_name`, - heavy sessions by input-token / tool-call / message counts (hashed ids), -- ContextPilot telemetry coverage and savings ratios, +- **Prompt duplicate shadow telemetry** for exact system/skill prompt template + repeats (advisory only; no prompt rewriting), +- ContextPilot telemetry coverage and processed-payload savings counters, - **Worker Context Routing shadow labels** for future router training/eval, - **Parent Aggregation Artifact telemetry** (exact duplicate worker/parent artifacts grouped by hash) for future parent-aggregation dedup eval. @@ -129,6 +135,23 @@ aggregated. The report then shows: prompt *and* a tool result *and* a user prompt). Reported only as a hash plus per-type counters — never the raw text. +### Prompt duplicate shadow mode + +The analyzer includes a dedicated **Prompt duplicate blocks — system/skill** +section for the static-template opportunity found in Hermes workloads. It scans +only `system_prompt` and `skill_prompt` blocks, groups **EXACT** duplicate block +fingerprints, and reports: + +- duplicate group count and duplicate occurrence count, +- actual duplicated characters observed in prompt assembly, +- a derived chars/4 advisory token counter labelled as advisory, +- per-type counters and top salted hashes. + +This section is **advisory only**. It never rewrites, summarizes, deduplicates, +or replaces prompt text, and its counters are not realized savings. Use it to +prioritize a future prompt-assembly A/B where before/after payloads are measured +with an exact tokenizer/API usage comparison. + ### Worker Context Routing shadow mode The analyzer now includes a **Worker Context Routing — shadow mode** section by @@ -219,7 +242,7 @@ safe to ship from an unattended cron job. ## Accuracy gate -This monitor only measures token/cost savings and operational signals. Before shipping ContextPilot changes, run a fixed golden eval set and require: +This monitor reports processed-payload savings, exact tokenizer token deltas when recorded, and operational signals. Before shipping ContextPilot changes, run a fixed golden eval set and require: - no task-success regression, - no drop in context recall beyond the chosen threshold, diff --git a/docs/guides/hermes.md b/docs/guides/hermes.md index 9bddf1c..e55d8ad 100644 --- a/docs/guides/hermes.md +++ b/docs/guides/hermes.md @@ -64,10 +64,18 @@ print(engine.get_status()) # {'engine': 'contextpilot', 'contextpilot_chars_saved': 18420, ...} ``` -## See token savings - -Once ContextPilot has run for a bit, you can see how many tokens it saved with a -single command from the ContextPilot repo or plugin directory: +## See processed-payload savings + +Once ContextPilot has run for a bit, you can see realized savings from the +metadata-only telemetry with a single command from the ContextPilot repo or +plugin directory. Character savings are always measured from the actual +before/after LLM-bound payload after ContextPilot processing. Exact tokenizer +savings are shown only when telemetry recorded an exact tokenizer backend; the +legacy chars/4 counter is labelled as derived. To record tokenizer-based deltas, +configure an exact matching tokenizer explicitly, for example +`CONTEXTPILOT_EXACT_TOKENIZER=tiktoken` with +`CONTEXTPILOT_TIKTOKEN_ENCODING=`; it is off by default to avoid +provider/tokenizer mismatches. ```bash python scripts/contextpilot_savings.py @@ -76,12 +84,12 @@ python ~/.hermes/plugins/ContextPilot/scripts/contextpilot_savings.py ``` ``` -ContextPilot token savings (last 24h) - Events: 117 - Chars saved: 6,147,074 - Telemetry tokens saved: 1,536,728 - Avg tokens/event: 13,134 - Telemetry file: /root/.hermes/contextpilot/telemetry.jsonl +ContextPilot savings (last 24h) + Events: 117 + Chars saved: 6,147,074 + Est. tokens saved (chars/4, derived): 1,536,728 + Actual tokens saved (tokenizer): unavailable (no exact tokenizer backend recorded) + Telemetry file: /root/.hermes/contextpilot/telemetry.jsonl ``` Useful options: @@ -142,6 +150,6 @@ ContextPilot runs *before* the threshold-based compressor, reducing how often th **Plugin not discovered after install.** Check `~/.hermes/plugins/ContextPilot/plugin.yaml` exists and contains `type: context_engine`. Run `hermes plugins list` to confirm. -**No token savings logged.** Dedup only fires when the LLM reads the same file content more than once in a session. On first reads, content is indexed but not deduplicated. +**No savings logged.** Dedup only fires when the LLM reads the same file content more than once in a session. On first reads, content is indexed but not deduplicated. **`ModuleNotFoundError: No module named 'numpy'`.** Reorder requires numpy. If unavailable, ContextPilot silently falls back to dedup-only mode. diff --git a/scripts/contextpilot_savings.py b/scripts/contextpilot_savings.py index 089ddbb..060cf2f 100644 --- a/scripts/contextpilot_savings.py +++ b/scripts/contextpilot_savings.py @@ -51,8 +51,21 @@ def summarize_telemetry( "window_start_iso": None, "events": 0, "chars_saved": 0, + # ``tokens_saved`` is a LEGACY DERIVED estimate (chars/4), NOT a real + # tokenizer/API count. ``tokens_saved_method`` makes that explicit so it + # is never mistaken for actual tokens. "tokens_saved": 0, + "tokens_saved_method": "estimated_chars_div_4", "avg_tokens_per_event": None, + # EXACT tokenizer measurements, surfaced separately and only populated + # from records that carry ``actual_token_status == "available"``. No + # fake/derived numbers are ever written into these fields. + "actual_token_status": "unavailable", + "actual_token_events": 0, + "actual_tokens_before": 0, + "actual_tokens_after": 0, + "actual_tokens_saved": 0, + "actual_tokenizer_backends": [], "skipped_lines": 0, } @@ -71,6 +84,11 @@ def summarize_telemetry( chars = 0 tokens = 0 skipped = 0 + actual_events = 0 + actual_before = 0 + actual_after = 0 + actual_saved = 0 + actual_backends: set[str] = set() with telemetry_path.open("r", encoding="utf-8", errors="replace") as f: for line in f: line = line.strip() @@ -107,12 +125,37 @@ def summarize_telemetry( else int(cs) // 4 ) + # EXACT tokenizer measurement, only when the writer marked it as + # available. Anything else (missing/unavailable) is left out -- we + # never substitute the chars/4 estimate into the actual-token totals. + if record.get("actual_token_status") == "available": + ats = record.get("actual_tokens_saved") + if isinstance(ats, (int, float)): + actual_events += 1 + actual_saved += int(ats) + atb = record.get("actual_tokens_before") + if isinstance(atb, (int, float)): + actual_before += int(atb) + ata = record.get("actual_tokens_after") + if isinstance(ata, (int, float)): + actual_after += int(ata) + backend = record.get("actual_tokenizer_backend") + if isinstance(backend, str) and backend: + actual_backends.add(backend) + result["events"] = events result["chars_saved"] = chars result["tokens_saved"] = tokens result["skipped_lines"] = skipped if events > 0: result["avg_tokens_per_event"] = round(tokens / events, 1) + if actual_events > 0: + result["actual_token_status"] = "available" + result["actual_token_events"] = actual_events + result["actual_tokens_before"] = actual_before + result["actual_tokens_after"] = actual_after + result["actual_tokens_saved"] = actual_saved + result["actual_tokenizer_backends"] = sorted(actual_backends) return result @@ -154,16 +197,33 @@ def render_text(summary: Dict[str, Any]) -> str: ) lines = [ - f"ContextPilot token savings ({window})", - f" Events: {summary['events']}", - f" Chars saved: {summary['chars_saved']:,}", - f" Telemetry tokens saved: {summary['tokens_saved']:,}", + f"ContextPilot savings ({window})", + f" Events: {summary['events']}", + f" Chars saved: {summary['chars_saved']:,}", + # Make provenance unmistakable: this is a chars/4 estimate, not real tokens. + f" Est. tokens saved (chars/4, derived): {summary['tokens_saved']:,}", ] if summary["avg_tokens_per_event"] is not None: lines.append( - f" Avg tokens/event: {summary['avg_tokens_per_event']:,}" + f" Avg est. tokens/event: {summary['avg_tokens_per_event']:,}" + ) + # Actual tokenizer tokens are shown ONLY when the telemetry recorded them + # from an exact tokenizer backend; otherwise we say so rather than fake it. + if summary["actual_token_status"] == "available": + backends = ", ".join(summary["actual_tokenizer_backends"]) or "unknown" + lines.append( + f" Actual tokens saved (tokenizer): {summary['actual_tokens_saved']:,}" + ) + lines.append( + f" backend: {backends} | status: available | " + f"events: {summary['actual_token_events']}" + ) + else: + lines.append( + " Actual tokens saved (tokenizer): unavailable " + "(no exact tokenizer backend recorded)" ) - lines.append(f" Telemetry file: {path}") + lines.append(f" Telemetry file: {path}") if summary["skipped_lines"]: lines.append( f" (skipped {summary['skipped_lines']} malformed telemetry line(s))" @@ -173,7 +233,10 @@ def render_text(summary: Dict[str, Any]) -> str: def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( - description="Show how many tokens ContextPilot saved (metadata-only).", + description=( + "Show ContextPilot processed-payload savings (metadata-only); " + "exact tokenizer tokens are shown only when telemetry recorded them." + ), ) parser.add_argument( "--telemetry-file", diff --git a/tests/test_contextpilot_savings.py b/tests/test_contextpilot_savings.py index 2256e1c..f0d49fc 100644 --- a/tests/test_contextpilot_savings.py +++ b/tests/test_contextpilot_savings.py @@ -131,12 +131,25 @@ def test_json_output_schema_and_no_raw_content(tmp_path, capsys): "events", "chars_saved", "tokens_saved", + "tokens_saved_method", "avg_tokens_per_event", + "actual_token_status", + "actual_token_events", + "actual_tokens_before", + "actual_tokens_after", + "actual_tokens_saved", + "actual_tokenizer_backends", "skipped_lines", } assert set(data.keys()) == expected_keys assert data["events"] == 1 assert data["tokens_saved"] == 100 + # Legacy counter is explicitly flagged as a chars/4 estimate. + assert data["tokens_saved_method"] == "estimated_chars_div_4" + # This record had no exact-tokenizer fields, so actual tokens stay empty. + assert data["actual_token_status"] == "unavailable" + assert data["actual_tokens_saved"] == 0 + assert data["actual_tokenizer_backends"] == [] assert "SECRET CONVERSATION TEXT" not in out assert "SECRET SYSTEM PROMPT" not in out @@ -151,11 +164,64 @@ def test_text_output_renders_savings(tmp_path, capsys): rc = savings.main(["--telemetry-file", str(tel), "--since-hours", "24"]) assert rc == 0 out = capsys.readouterr().out - assert "ContextPilot token savings (last 24h)" in out - assert "Telemetry tokens saved" in out + assert "ContextPilot savings (last 24h)" in out + # The legacy token figure must be labelled as a derived chars/4 estimate, + # never presented as actual tokenizer/API tokens. + assert "Est. tokens saved (chars/4, derived)" in out + assert "Telemetry tokens saved" not in out + # With no actual-token telemetry, say so plainly instead of faking a number. + assert "Actual tokens saved (tokenizer): unavailable" in out assert str(tel) in out +def test_actual_tokenizer_tokens_surfaced_separately(tmp_path, capsys): + """Exact tokenizer fields are aggregated and shown apart from the chars/4 estimate.""" + tel = tmp_path / "telemetry.jsonl" + now = time.time() + _write_jsonl( + tel, + [ + { + "ts": now, + "type": "turn", + "chars_saved": 400, + "tokens_saved": 100, + "actual_token_status": "available", + "actual_tokenizer_backend": "tiktoken:cl100k_base", + "actual_tokens_before": 90, + "actual_tokens_after": 30, + "actual_tokens_saved": 60, + }, + # A record with no exact tokenizer must not pollute the actual totals. + { + "ts": now, + "type": "turn", + "chars_saved": 200, + "tokens_saved": 50, + "actual_token_status": "unavailable", + }, + ], + ) + summary = savings.summarize_telemetry(tel, since_hours=None) + # Derived (legacy) totals still count every saving event. + assert summary["events"] == 2 + assert summary["tokens_saved"] == 150 + assert summary["tokens_saved_method"] == "estimated_chars_div_4" + # Actual tokens come only from the "available" record -- no chars/4 fallback. + assert summary["actual_token_status"] == "available" + assert summary["actual_token_events"] == 1 + assert summary["actual_tokens_before"] == 90 + assert summary["actual_tokens_after"] == 30 + assert summary["actual_tokens_saved"] == 60 + assert summary["actual_tokenizer_backends"] == ["tiktoken:cl100k_base"] + + text = savings.render_text(summary) + assert "Est. tokens saved (chars/4, derived): 150" in text + assert "Actual tokens saved (tokenizer): 60" in text + assert "tiktoken:cl100k_base" in text + assert "status: available" in text + + def test_no_events_in_window_message(tmp_path, capsys): tel = tmp_path / "telemetry.jsonl" _write_jsonl( diff --git a/tests/test_hermes_context_opportunity_analyzer.py b/tests/test_hermes_context_opportunity_analyzer.py index fc997fc..4fe3acb 100644 --- a/tests/test_hermes_context_opportunity_analyzer.py +++ b/tests/test_hermes_context_opportunity_analyzer.py @@ -939,3 +939,109 @@ def test_worker_routing_intact_alongside_parent_aggregation(tmp_path): assert report.worker_routing.est_drop_candidate_tokens > 0 # And parent aggregation independently sees the same body as a duplicate. assert report.parent_aggregation.duplicate_group_count == 1 + + +# --------------------------------------------------------------------------- +# Prompt duplicate shadow (system/skill prompts only; advisory only) +# --------------------------------------------------------------------------- + + +def test_prompt_duplicate_shadow_detects_system_skill_duplicates(): + line = "This is a sufficiently long duplicated instruction line here." + sys_unique = "A completely unique system instruction line that is long." + skill_unique = "Skill body unique line that is also clearly long enough." + contents = [ + analyzer._LLMContent( + block_type="system_prompt", content=f"{line}\n{sys_unique}\n{line}" + ), + analyzer._LLMContent(block_type="skill_prompt", content=f"{line}\n{skill_unique}"), + # Non-prompt duplicates must be ignored by this prompt-only section. + analyzer._LLMContent(block_type="tool_result", content=f"{line}\n{line}"), + analyzer._LLMContent(block_type="user_prompt", content=f"{line}\n{line}"), + ] + shadow = analyzer.detect_prompt_duplicate_blocks( + contents, salt="s", min_block_chars=40, top_n=20 + ) + assert shadow.enabled + assert shadow.item_count == 2 # only system + skill items scanned + assert shadow.scanned_block_types == ["system_prompt", "skill_prompt"] + # `line` appears 2x (system) + 1x (skill) = 3 across prompt types only. + assert shadow.duplicate_group_count == 1 + grp = shadow.top_duplicate_blocks[0] + assert grp.occurrences == 3 + assert grp.block_types == ["skill_prompt", "system_prompt"] + assert grp.chars_duplicated == (3 - 1) * len(line) + assert shadow.total_chars_duplicated == grp.chars_duplicated + # Advisory token figure is exactly chars/4, never an actual token count. + assert ( + shadow.advisory_est_duplicate_tokens_chars_div_4 + == shadow.total_chars_duplicated // 4 + ) + assert ( + grp.advisory_est_duplicate_tokens_chars_div_4 == grp.chars_duplicated // 4 + ) + # Occurrences are broken out per prompt type. + types = {tc.block_type: tc for tc in shadow.by_block_type} + assert set(types) == {"system_prompt", "skill_prompt"} + assert types["system_prompt"].occurrence_count == 2 + assert types["skill_prompt"].occurrence_count == 1 + + +def test_prompt_duplicate_shadow_in_report_no_leak_and_advisory(tmp_path): + db = tmp_path / "state.db" + secret_line = "SECRET-PROMPT-LINE-THAT-REPEATS-AND-IS-PLENTY-LONG" + other_line = "some other distinct system instruction text here now" + sys_prompt = f"{secret_line}\n{other_line}\n{secret_line}" + _make_db( + db, + [("tool", "irrelevant tool output", "Bash")], + sessions=[("raw-session-id", "discord", None, 1, 1, 100, 10, 1, sys_prompt)], + ) + report = _analyze(db, tmp_path) + pd = report.prompt_duplicates + assert pd.enabled + assert pd.duplicate_group_count == 1 + assert pd.total_chars_duplicated == len(secret_line) + # Advisory figures are NOT folded into realized telemetry savings. + assert report.telemetry.chars_saved == 0 + assert pd.total_chars_duplicated > 0 + + json_path, md_path = analyzer.write_report(report, tmp_path / "out") + md_text = md_path.read_text(encoding="utf-8") + blob = json_path.read_text(encoding="utf-8") + md_text + # Raw prompt text must never appear in the report. + assert secret_line not in blob + assert other_line not in blob + # Section is present and clearly labelled advisory / not-realized. + assert "Prompt duplicate blocks" in md_text + assert "advisory" in md_text.lower() + assert "NOT a realized saving" in md_text or "NOT realized savings" in md_text + + +def test_prompt_duplicate_shadow_can_be_disabled(tmp_path): + db = tmp_path / "state.db" + _make_db(db, [("tool", "out", "Bash")]) + tool_messages = analyzer.load_tool_messages(db, since_hours=WIDE_WINDOW) + llm = analyzer.load_llm_bound_content(db, since_hours=WIDE_WINDOW) + heavy = analyzer.load_heavy_sessions( + db, since_hours=WIDE_WINDOW, salt="s", top_n=20 + ) + tel = analyzer.parse_telemetry( + tmp_path / "none.jsonl", since_hours=WIDE_WINDOW, total_input_tokens=0 + ) + report = analyzer.build_report( + date="2100-01-01", + since_hours=24, + salt="s", + tool_messages=tool_messages, + heavy_sessions=heavy, + telemetry=tel, + llm_contents=llm, + prompt_duplicate_shadow=False, + ) + assert report.prompt_duplicates.enabled is False + _, md_path = analyzer.write_report(report, tmp_path / "out") + # Section still renders, marked disabled; report writing stays healthy. + md_text = md_path.read_text(encoding="utf-8") + assert "Prompt duplicate blocks" in md_text + assert "disabled" in md_text diff --git a/tests/test_hermes_plugin_patch.py b/tests/test_hermes_plugin_patch.py index 74bcd19..d328d01 100644 --- a/tests/test_hermes_plugin_patch.py +++ b/tests/test_hermes_plugin_patch.py @@ -242,6 +242,86 @@ def test_optimize_writes_metadata_only_telemetry_line(monkeypatch, tmp_path): assert forbidden.isdisjoint(record.keys()) +def test_telemetry_records_payload_chars_and_derived_token_method(monkeypatch, tmp_path): + """Before/after payload chars are actual; the chars/4 counter is labelled derived.""" + import json + + module, _ = _load_plugin_module(monkeypatch) + monkeypatch.setattr(module, "_check_reorder", lambda: False) + monkeypatch.setattr(module, "_CONTEXTPILOT_AVAILABLE", False) + monkeypatch.setattr(module, "dedup_chat_completions", _saving_dedup) + # Force the exact tokenizer OFF so this case is deterministic everywhere. + monkeypatch.setenv("CONTEXTPILOT_DISABLE_EXACT_TOKENIZER", "1") + + telemetry = tmp_path / "telemetry.jsonl" + monkeypatch.setenv("CONTEXTPILOT_TELEMETRY_FILE", str(telemetry)) + + engine = module.ContextPilotEngine() + messages = [ + {"role": "user", "content": "read file"}, + {"role": "tool", "tool_call_id": "call_1", "content": "FULL TOOL RESULT"}, + ] + _out, stats = engine.optimize_api_messages(messages) + + record = json.loads(telemetry.read_text(encoding="utf-8").splitlines()[0]) + + # Actual processed-payload before/after char measurement. + assert record["payload_chars_before"] > record["payload_chars_after"] + assert ( + record["payload_chars_saved"] + == record["payload_chars_before"] - record["payload_chars_after"] + ) + # The legacy token counter is explicitly tagged as a derived chars/4 estimate. + assert record["tokens_saved"] == record["chars_saved"] // 4 + assert record["tokens_saved_method"] == "estimated_chars_div_4" + # No exact tokenizer -> a clear status and NO fabricated token numbers. + assert record["actual_token_status"] == "unavailable" + assert "actual_tokens_before" not in record + assert "actual_tokens_after" not in record + assert "actual_tokens_saved" not in record + # Returned stats expose the same payload-char measurement. + assert stats["payload_chars_saved"] == record["payload_chars_saved"] + + +def test_telemetry_records_exact_tokens_when_backend_available(monkeypatch, tmp_path): + """When an exact tokenizer backend is present, actual token fields are emitted.""" + import json + + module, _ = _load_plugin_module(monkeypatch) + monkeypatch.setattr(module, "_check_reorder", lambda: False) + monkeypatch.setattr(module, "_CONTEXTPILOT_AVAILABLE", False) + monkeypatch.setattr(module, "dedup_chat_completions", _saving_dedup) + + # Inject a deterministic fake exact tokenizer (1 token per 3 chars). + def fake_counter(text): + return len(text) // 3 + + fake_counter._backend = "fake:test-encoding" + monkeypatch.setattr(module, "_get_exact_tokenizer", lambda: fake_counter) + + telemetry = tmp_path / "telemetry.jsonl" + monkeypatch.setenv("CONTEXTPILOT_TELEMETRY_FILE", str(telemetry)) + + engine = module.ContextPilotEngine() + messages = [ + {"role": "user", "content": "read file"}, + {"role": "tool", "tool_call_id": "call_1", "content": "FULL TOOL RESULT"}, + ] + engine.optimize_api_messages(messages) + + record = json.loads(telemetry.read_text(encoding="utf-8").splitlines()[0]) + + assert record["actual_token_status"] == "available" + assert record["actual_tokenizer_backend"] == "fake:test-encoding" + assert record["actual_tokens_before"] >= record["actual_tokens_after"] + assert ( + record["actual_tokens_saved"] + == record["actual_tokens_before"] - record["actual_tokens_after"] + ) + # Actual tokens are distinct from the legacy chars/4 estimate. + assert "tokens_saved_method" in record + + def test_optimize_telemetry_skipped_when_nothing_saved(monkeypatch, tmp_path): module, _ = _load_plugin_module(monkeypatch) monkeypatch.setattr(module, "_check_reorder", lambda: False)