Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 158 additions & 24 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,113 @@ def _write_telemetry(record: Dict[str, Any]) -> None:
logger.debug("[ContextPilot] telemetry write skipped: %s", e)


def _iter_message_text(messages: List[Dict[str, Any]]):
"""Yield text fragments from an LLM-bound payload for in-memory measurement.

Used only to *size* the payload (chars / exact tokens). Fragments are never
stored or emitted -- callers consume them immediately to produce integer
counts, then discard them.
"""
for msg in messages:
if not isinstance(msg, dict):
continue
content = msg.get("content")
if isinstance(content, str):
yield content
elif isinstance(content, list):
for block in content:
if isinstance(block, str):
yield block
elif isinstance(block, dict):
text = block.get("text")
if isinstance(text, str):
yield text
inner = block.get("content")
if isinstance(inner, str):
yield inner


def _payload_chars(messages: List[Dict[str, Any]]) -> int:
"""Total character count of an LLM-bound payload (metadata-only measure)."""
return sum(len(frag) for frag in _iter_message_text(messages))


# Sentinel so the (possibly None) tokenizer is resolved at most once per process.
_exact_tokenizer_cache: Any = "unset"


def _get_exact_tokenizer():
"""Return a callable ``(text) -> int`` for EXACT token counting, or None.

Optional and best-effort: an exact tokenizer is used only when a backend is
installed and not disabled. This never raises and never installs anything;
when no backend is available the caller records an ``unavailable`` status
rather than emitting a fake (chars/4) token count.

Backend selection via ``CONTEXTPILOT_EXACT_TOKENIZER`` = ``off`` (default)
| ``tiktoken``. It is opt-in so merely having a tokenizer library installed
never creates a misleading provider/tokenizer mismatch. The separate
disable environment flag also returns ``None`` immediately.
"""

global _exact_tokenizer_cache
if _exact_tokenizer_cache != "unset":
return _exact_tokenizer_cache
_exact_tokenizer_cache = None
if os.environ.get("CONTEXTPILOT_DISABLE_EXACT_TOKENIZER") == "1":
return None
backend = os.environ.get("CONTEXTPILOT_EXACT_TOKENIZER", "off").lower()
if backend in ("off", "none", "disabled", "auto"):
return None
if backend == "tiktoken":
try:
import tiktoken # optional dependency; never a hard requirement

encoding_name = os.environ.get(
"CONTEXTPILOT_TIKTOKEN_ENCODING", "cl100k_base"
)
enc = tiktoken.get_encoding(encoding_name)

def _count(text: str, _enc=enc) -> int:
return len(_enc.encode(text, disallowed_special=()))

_count._backend = f"tiktoken:{encoding_name}" # type: ignore[attr-defined]
_exact_tokenizer_cache = _count
except Exception as e: # noqa: BLE001 - tokenizer is strictly optional
logger.debug("[ContextPilot] exact tokenizer unavailable: %s", e)
_exact_tokenizer_cache = None
return _exact_tokenizer_cache


def _measure_actual_tokens(
original_messages: List[Dict[str, Any]],
optimized_messages: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Metadata-only EXACT before/after token measurement of the payload.

Returns a dict carrying ``actual_token_status`` of ``available`` or
``unavailable``. When unavailable (no exact tokenizer backend), it emits NO
token numbers -- callers must not substitute a chars/4 estimate for these
fields. Raw text is counted in-memory only and never stored.
"""
counter = _get_exact_tokenizer()
if counter is None:
return {"actual_token_status": "unavailable"}
try:
before = sum(counter(frag) for frag in _iter_message_text(original_messages))
after = sum(counter(frag) for frag in _iter_message_text(optimized_messages))
except Exception as e: # noqa: BLE001 - a measurement must never break optimization
logger.debug("[ContextPilot] exact token measurement failed: %s", e)
return {"actual_token_status": "unavailable"}
return {
"actual_token_status": "available",
"actual_tokenizer_backend": getattr(counter, "_backend", "unknown"),
"actual_tokens_before": before,
"actual_tokens_after": after,
"actual_tokens_saved": before - after,
}


def _reorder_docs(docs: List[str], alpha: float = 0.001) -> List[str]:
global _intercept_index
if len(docs) < 2:
Expand Down Expand Up @@ -645,44 +752,72 @@ def _tool_chars(msgs):
turn_chars_saved = doc_chars_saved + dedup_result.chars_saved
self._total_chars_saved += turn_chars_saved

# Actual before/after of the full LLM-bound payload (chars). These are
# measured directly from the original input vs the optimized output, so
# they reflect the realized processed-payload delta -- not a duplicate
# opportunity count. Cheap (string length only); always computed.
payload_chars_before = _payload_chars(original_messages)
payload_chars_after = _payload_chars(api_messages)
payload_chars_saved = payload_chars_before - payload_chars_after

# Step 6: Cache for next turn
self._cached_messages = copy.deepcopy(api_messages)
self._cached_original_messages = original_messages

if turn_chars_saved > 0:
logger.info(
"[ContextPilot] Turn %d: saved %d chars (~%d tokens) | cumulative: %d chars (~%d tokens)",
"[ContextPilot] Turn %d: saved %d chars by processing | cumulative: %d chars",
self._optimize_count,
turn_chars_saved,
turn_chars_saved // 4,
self._total_chars_saved,
self._total_chars_saved // 4,
)
# Metadata-only telemetry so the monitor does not depend solely on
# gateway log lines. No content, prompts, or tool payloads here.
_write_telemetry(
{
"ts": time.time(),
"type": "turn",
"session_hash": (
_hash_text(str(self._session_id))
if self._session_id is not None else None
),
"turn": self._optimize_count,
"chars_saved": turn_chars_saved,
"tokens_saved": turn_chars_saved // 4,
"doc_chars_saved": doc_chars_saved,
"block_chars_saved": dedup_result.chars_saved,
"blocks_deduped": dedup_result.blocks_deduped,
"blocks_total": dedup_result.blocks_total,
"docs_deduped": self._total_docs_deduped,
"system_blocks_matched": dedup_result.system_blocks_matched,
"cumulative_chars_saved": self._total_chars_saved,
}
#
# Token fields are deliberately separated by provenance:
# * ``tokens_saved`` is the LEGACY DERIVED estimate (chars/4); the
# ``tokens_saved_method`` tag makes that explicit so it is never
# mistaken for a tokenizer/API measurement.
# * ``actual_tokens_*`` come from an EXACT tokenizer and are present
# only when ``actual_token_status == "available"``. When no exact
# tokenizer backend is configured the status is ``unavailable``
# and no token numbers are emitted (no fake counts).
telemetry_record = {
"ts": time.time(),
"type": "turn",
"session_hash": (
_hash_text(str(self._session_id))
if self._session_id is not None else None
),
"turn": self._optimize_count,
# Actual processed-payload char delta (doc + block dedup).
"chars_saved": turn_chars_saved,
# Actual before/after of the full LLM-bound payload (chars).
"payload_chars_before": payload_chars_before,
"payload_chars_after": payload_chars_after,
"payload_chars_saved": payload_chars_saved,
# Legacy DERIVED token estimate (chars/4) -- NOT exact tokens.
"tokens_saved": turn_chars_saved // 4,
"tokens_saved_method": "estimated_chars_div_4",
"doc_chars_saved": doc_chars_saved,
"block_chars_saved": dedup_result.chars_saved,
"blocks_deduped": dedup_result.blocks_deduped,
"blocks_total": dedup_result.blocks_total,
"docs_deduped": self._total_docs_deduped,
"system_blocks_matched": dedup_result.system_blocks_matched,
"cumulative_chars_saved": self._total_chars_saved,
}
# Optional EXACT token measurement (only computed on a saving turn).
telemetry_record.update(
_measure_actual_tokens(original_messages, api_messages)
)
_write_telemetry(telemetry_record)

return api_messages, {
"chars_saved": turn_chars_saved,
"payload_chars_before": payload_chars_before,
"payload_chars_after": payload_chars_after,
"payload_chars_saved": payload_chars_saved,
"doc_chars_saved": doc_chars_saved,
"block_chars_saved": dedup_result.chars_saved,
"blocks_deduped": dedup_result.blocks_deduped,
Expand Down Expand Up @@ -720,11 +855,10 @@ def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> Non
self._compressor.on_session_end(session_id, messages)
if self._total_chars_saved > 0:
logger.info(
"[ContextPilot] Session %s: %d turns, %d chars saved (~%d tokens)",
"[ContextPilot] Session %s: %d turns, %d chars saved by processing",
session_id,
self._optimize_count,
self._total_chars_saved,
self._total_chars_saved // 4,
)

def on_session_reset(self) -> None:
Expand Down
10 changes: 10 additions & 0 deletions contextpilot/hermes_opportunities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from .detection import (
analyze_llm_bound_blocks,
detect_exact_duplicate_tool_outputs,
detect_prompt_duplicate_blocks,
detect_repeated_blocks,
summarize_tool_sizes,
)
Expand All @@ -55,6 +56,7 @@
DEFAULT_MIN_BLOCK_REPEAT,
DEFAULT_TOP_N,
EST_CHARS_PER_TOKEN,
PROMPT_DUPLICATE_BLOCK_TYPES,
ArtifactKindStat,
ArtifactSourceCount,
BlockTypeStat,
Expand All @@ -64,6 +66,9 @@
OpportunityReport,
ParentAggregationArtifacts,
ParentAggregationGroup,
PromptDuplicateBlock,
PromptDuplicateShadow,
PromptDuplicateTypeCount,
RepeatedBlock,
RouterCandidateBlock,
RouterLabelCount,
Expand Down Expand Up @@ -100,6 +105,7 @@
"DEFAULT_MIN_ARTIFACT_CHARS",
"EST_CHARS_PER_TOKEN",
"BLOCK_TYPES",
"PROMPT_DUPLICATE_BLOCK_TYPES",
"ROUTER_LABELS",
"ARTIFACT_KINDS",
"PARENT_AGGREGATION_SOURCE_TYPES",
Expand All @@ -113,6 +119,9 @@
"ToolSizeStat",
"HeavySession",
"TelemetryCoverage",
"PromptDuplicateBlock",
"PromptDuplicateTypeCount",
"PromptDuplicateShadow",
"RouterLabelCount",
"RouterReasonCount",
"RouterCandidateBlock",
Expand All @@ -134,6 +143,7 @@
"detect_repeated_blocks",
"summarize_tool_sizes",
"analyze_llm_bound_blocks",
"detect_prompt_duplicate_blocks",
# routing (shadow)
"classify_router_label",
"analyze_worker_routing_shadow",
Expand Down
9 changes: 9 additions & 0 deletions contextpilot/hermes_opportunities/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ def main(argv: list[str] | None = None) -> int:
parser.add_argument(
"--min-artifact-chars", type=int, default=DEFAULT_MIN_ARTIFACT_CHARS
)
parser.add_argument(
"--disable-prompt-duplicate-shadow",
action="store_true",
help=(
"skip the advisory system/skill prompt duplicate-block scan "
"(enabled by default; advisory only, never rewrites/dedups prompts)"
),
)
args = parser.parse_args(argv)

if not args.state_db.exists():
Expand Down Expand Up @@ -125,6 +133,7 @@ def main(argv: list[str] | None = None) -> int:
top_n=args.top_n,
worker_routing_shadow=not args.disable_worker_routing_shadow,
parent_aggregation_shadow=not args.disable_parent_aggregation,
prompt_duplicate_shadow=not args.disable_prompt_duplicate_shadow,
min_artifact_chars=args.min_artifact_chars,
)
json_path, md_path = write_report(report, args.out_dir)
Expand Down
Loading