From adaa614d462452e7bba784b17fdb830dceafa70b Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 09:30:56 +0900 Subject: [PATCH 1/7] feat(evaluators): add built-in budget evaluator for per-agent cost tracking Closes #130 Add BudgetEvaluator -- a deterministic evaluator that tracks cumulative LLM token and cost usage per agent, per channel, per user, with configurable time windows (daily/weekly/monthly/cumulative). Components: - BudgetStore protocol + InMemoryBudgetStore (dict + threading.Lock) - BudgetSnapshot frozen dataclass for atomic state reads - BudgetEvaluator with scope key building, period key derivation, token extraction, and optional model pricing estimation - BudgetLimitRule config with scope, per, window, limit_usd, limit_tokens - 48 tests covering store, config, evaluator, registration Design: - In-memory only (no PostgreSQL, no new dependencies) - Store is "dumb" (accumulate + check), evaluator is "smart" (resolve scope, derive period, extract tokens, check limits) - record_and_check() is atomic (single lock acquisition) - Evaluator instances are cached per config (thread-safe by design) - matched=True only when limit exceeded, confidence=1.0 always - Utilization ratio in metadata, not confidence --- .../src/agent_control_evaluators/__init__.py | 13 + .../budget/__init__.py | 17 + .../agent_control_evaluators/budget/config.py | 80 ++++ .../budget/evaluator.py | 296 +++++++++++++ .../agent_control_evaluators/budget/store.py | 220 ++++++++++ evaluators/builtin/tests/budget/__init__.py | 0 .../builtin/tests/budget/test_budget.py | 389 ++++++++++++++++++ 7 files changed, 1015 insertions(+) create mode 100644 evaluators/builtin/src/agent_control_evaluators/budget/__init__.py create mode 100644 evaluators/builtin/src/agent_control_evaluators/budget/config.py create mode 100644 evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py create mode 100644 evaluators/builtin/src/agent_control_evaluators/budget/store.py create mode 100644 evaluators/builtin/tests/budget/__init__.py create mode 100644 evaluators/builtin/tests/budget/test_budget.py diff --git a/evaluators/builtin/src/agent_control_evaluators/__init__.py b/evaluators/builtin/src/agent_control_evaluators/__init__.py index b1dabd9e..d3f246b2 100644 --- a/evaluators/builtin/src/agent_control_evaluators/__init__.py +++ b/evaluators/builtin/src/agent_control_evaluators/__init__.py @@ -9,6 +9,7 @@ - list: List-based value matching - json: JSON validation - sql: SQL query validation + - budget: Cumulative LLM token and cost tracking Naming convention: - Built-in: "regex", "list", "json", "sql" @@ -47,6 +48,13 @@ from agent_control_evaluators.json import JSONEvaluator, JSONEvaluatorConfig from agent_control_evaluators.list import ListEvaluator, ListEvaluatorConfig from agent_control_evaluators.regex import RegexEvaluator, RegexEvaluatorConfig +from agent_control_evaluators.budget import ( + BudgetEvaluator, + BudgetEvaluatorConfig, + BudgetSnapshot, + BudgetStore, + InMemoryBudgetStore, +) from agent_control_evaluators.sql import SQLEvaluator, SQLEvaluatorConfig __all__ = [ @@ -73,4 +81,9 @@ "JSONEvaluatorConfig", "SQLEvaluator", "SQLEvaluatorConfig", + "BudgetEvaluator", + "BudgetEvaluatorConfig", + "BudgetSnapshot", + "BudgetStore", + "InMemoryBudgetStore", ] diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/__init__.py b/evaluators/builtin/src/agent_control_evaluators/budget/__init__.py new file mode 100644 index 00000000..3f539821 --- /dev/null +++ b/evaluators/builtin/src/agent_control_evaluators/budget/__init__.py @@ -0,0 +1,17 @@ +"""Budget evaluator for per-agent LLM cost and token tracking.""" + +from agent_control_evaluators.budget.config import BudgetEvaluatorConfig +from agent_control_evaluators.budget.evaluator import BudgetEvaluator +from agent_control_evaluators.budget.store import ( + BudgetSnapshot, + BudgetStore, + InMemoryBudgetStore, +) + +__all__ = [ + "BudgetEvaluator", + "BudgetEvaluatorConfig", + "BudgetSnapshot", + "BudgetStore", + "InMemoryBudgetStore", +] diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/config.py b/evaluators/builtin/src/agent_control_evaluators/budget/config.py new file mode 100644 index 00000000..d18372b5 --- /dev/null +++ b/evaluators/builtin/src/agent_control_evaluators/budget/config.py @@ -0,0 +1,80 @@ +"""Configuration for the budget evaluator.""" + +from __future__ import annotations + +from typing import Any, Literal + +from pydantic import Field, field_validator, model_validator + +from agent_control_evaluators._base import EvaluatorConfig + + +class BudgetLimitRule(EvaluatorConfig): + """A single budget limit rule. + + Each rule defines a ceiling (USD and/or tokens) for a combination + of scope dimensions and time window. Multiple rules can apply to + the same step -- the evaluator checks all of them and triggers + on the first breach. + + Attributes: + scope: Static scope dimensions that must match for this rule + to apply. Empty dict = global rule. + per: If set, the limit is applied independently for each unique + value of this metadata field (e.g. "user_id" creates per-user + budgets within the scope). + window: Time window for accumulation. None = cumulative (no reset). + limit_usd: Maximum USD spend in the window. None = uncapped. + limit_tokens: Maximum tokens in the window. None = uncapped. + """ + + scope: dict[str, str] = Field(default_factory=dict) + per: str | None = None + window: Literal["daily", "weekly", "monthly"] | None = None + limit_usd: float | None = None + limit_tokens: int | None = None + + @model_validator(mode="after") + def at_least_one_limit(self) -> "BudgetLimitRule": + if self.limit_usd is None and self.limit_tokens is None: + raise ValueError("At least one of limit_usd or limit_tokens must be set") + return self + + @field_validator("limit_usd") + @classmethod + def validate_limit_usd(cls, v: float | None) -> float | None: + if v is not None and v <= 0: + raise ValueError("limit_usd must be positive") + return v + + @field_validator("limit_tokens") + @classmethod + def validate_limit_tokens(cls, v: int | None) -> int | None: + if v is not None and v <= 0: + raise ValueError("limit_tokens must be positive") + return v + + +class BudgetEvaluatorConfig(EvaluatorConfig): + """Configuration for the budget evaluator. + + Attributes: + limits: List of budget limit rules. Each is checked independently. + pricing: Optional model pricing table. Maps model name to per-1K + token rates. Used to derive cost_usd from token counts when + cost is not provided in step data. + token_path: Dot-notation path to extract token usage from step + data (e.g. "usage.total_tokens"). If None, looks for standard + fields (input_tokens, output_tokens, total_tokens, usage). + cost_path: Dot-notation path to extract cost from step data. + model_path: Dot-notation path to extract model name (for pricing lookup). + metadata_paths: Mapping of metadata field name to dot-notation path + in step data. Used to extract scope dimensions (channel, user_id, etc). + """ + + limits: list[BudgetLimitRule] = Field(min_length=1) + pricing: dict[str, dict[str, float]] | None = None + token_path: str | None = None + cost_path: str | None = None + model_path: str | None = None + metadata_paths: dict[str, str] = Field(default_factory=dict) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py new file mode 100644 index 00000000..73d5e658 --- /dev/null +++ b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py @@ -0,0 +1,296 @@ +"""Budget evaluator -- tracks cumulative LLM token/cost usage. + +Deterministic evaluator: confidence is always 1.0, matched is True when +any configured limit is exceeded. Utilization ratio and spend breakdown +are returned in result metadata, not in confidence. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +from agent_control_evaluators._base import Evaluator, EvaluatorMetadata +from agent_control_evaluators._registry import register_evaluator +from agent_control_evaluators.budget.config import BudgetEvaluatorConfig, BudgetLimitRule +from agent_control_evaluators.budget.store import BudgetSnapshot, InMemoryBudgetStore +from agent_control_models import EvaluatorResult + +logger = logging.getLogger(__name__) + + +def _derive_period_key(window: str | None) -> str: + """Derive the current period key from UTC time. + + Returns: + "2026-03-21" for daily, "2026-W12" for weekly, "2026-03" for monthly, + "" for cumulative (no window). + """ + if window is None: + return "" + now = datetime.now(timezone.utc) + if window == "daily": + return now.strftime("%Y-%m-%d") + if window == "weekly": + iso = now.isocalendar() + return f"{iso[0]}-W{iso[1]:02d}" + if window == "monthly": + return now.strftime("%Y-%m") + return "" + + +def _build_scope_key( + scope: dict[str, str], + per: str | None, + metadata: dict[str, str], +) -> str: + """Build a composite scope key from static dimensions and per-field. + + Format: "channel=slack|user_id=u1" or "__global__" if empty. + """ + parts: list[str] = [] + for k, v in sorted(scope.items()): + parts.append(f"{k}={v}") + if per and per in metadata: + parts.append(f"{per}={metadata[per]}") + return "|".join(parts) if parts else "__global__" + + +def _extract_by_path(data: Any, path: str) -> Any: + """Extract a value from nested data using dot-notation path.""" + current = data + for part in path.split("."): + if isinstance(current, dict): + current = current.get(part) + elif hasattr(current, part): + current = getattr(current, part) + else: + return None + if current is None: + return None + return current + + +def _extract_tokens(data: Any, token_path: str | None) -> tuple[int, int]: + """Extract (input_tokens, output_tokens) from step data. + + Tries token_path first, then standard field names. + Returns (0, 0) if no token information found. + """ + if data is None: + return 0, 0 + + # Custom path + if token_path: + val = _extract_by_path(data, token_path) + if isinstance(val, int) and val >= 0: + return 0, val # total tokens as output (conservative) + if isinstance(val, dict): + data = val # fall through to standard extraction + + # Standard dict fields + if isinstance(data, dict): + usage = data.get("usage", data) + if isinstance(usage, dict): + inp = usage.get("input_tokens") + if inp is None: + inp = usage.get("prompt_tokens") + out = usage.get("output_tokens") + if out is None: + out = usage.get("completion_tokens") + if isinstance(inp, int) and isinstance(out, int): + return max(0, inp), max(0, out) + total = usage.get("total_tokens") + if isinstance(total, int) and total > 0: + return 0, max(0, total) + return 0, 0 + + +def _extract_cost(data: Any, cost_path: str | None) -> float | None: + """Extract cost_usd from step data. Returns None if not found.""" + if data is None or cost_path is None: + return None + val = _extract_by_path(data, cost_path) + if isinstance(val, (int, float)) and val >= 0: + return float(val) + return None + + +def _estimate_cost( + model: str | None, + input_tokens: int, + output_tokens: int, + pricing: dict[str, dict[str, float]] | None, +) -> float: + """Estimate cost from model pricing table. Returns 0.0 if unknown.""" + if not model or not pricing: + return 0.0 + rates = pricing.get(model) + if not rates: + return 0.0 + input_rate = rates.get("input_per_1k", 0.0) + output_rate = rates.get("output_per_1k", 0.0) + return (input_tokens * input_rate + output_tokens * output_rate) / 1000.0 + + +def _extract_metadata(data: Any, metadata_paths: dict[str, str]) -> dict[str, str]: + """Extract metadata fields from step data using configured paths.""" + result: dict[str, str] = {} + for field_name, path in metadata_paths.items(): + val = _extract_by_path(data, path) + if val is not None: + result[field_name] = str(val) + return result + + +@register_evaluator +class BudgetEvaluator(Evaluator[BudgetEvaluatorConfig]): + """Tracks cumulative LLM token and cost usage per scope and time window. + + Deterministic evaluator: matched=True when any configured limit is + exceeded, confidence=1.0 always. Utilization and spend breakdown + are returned in result metadata. + + The evaluator is stateful (it accumulates usage in a BudgetStore), + but the state lives in the store, not on the evaluator instance. + The store is created per evaluator config and is thread-safe. + + IMPORTANT: Unlike other evaluators that are purely functional, + BudgetEvaluator maintains state via the store. The store is created + in __init__ and lives for the lifetime of the evaluator instance. + Since evaluator instances are cached per config, all requests with + the same budget config share the same store -- which is the intended + behavior for budget tracking. + """ + + metadata = EvaluatorMetadata( + name="budget", + version="1.0.0", + description="Cumulative LLM token and cost budget tracking", + ) + config_model = BudgetEvaluatorConfig + + def __init__(self, config: BudgetEvaluatorConfig) -> None: + super().__init__(config) + self._store = InMemoryBudgetStore() + + async def evaluate(self, data: Any) -> EvaluatorResult: + """Evaluate step data against all configured budget limits. + + Extracts token counts, cost, model, and metadata from step data, + then checks each limit rule. Returns matched=True on first breach. + + Args: + data: Step data extracted by selector. + + Returns: + EvaluatorResult with matched=True if any limit exceeded. + """ + if data is None: + return EvaluatorResult( + matched=False, + confidence=1.0, + message="No data to evaluate", + ) + + # Extract tokens and cost from step data + input_tokens, output_tokens = _extract_tokens(data, self.config.token_path) + cost_usd = _extract_cost(data, self.config.cost_path) + + # Extract model for pricing lookup + model: str | None = None + if self.config.model_path: + val = _extract_by_path(data, self.config.model_path) + if val is not None: + model = str(val) + + # Estimate cost if not provided directly + if cost_usd is None: + cost_usd = _estimate_cost( + model, input_tokens, output_tokens, self.config.pricing + ) + + # Extract metadata for scope key building + step_metadata = _extract_metadata(data, self.config.metadata_paths) + + # Check each limit rule + breached_rules: list[dict[str, Any]] = [] + all_snapshots: list[dict[str, Any]] = [] + + for rule in self.config.limits: + # Check if rule scope matches step metadata + if not _scope_matches(rule, step_metadata): + continue + + scope_key = _build_scope_key(rule.scope, rule.per, step_metadata) + period_key = _derive_period_key(rule.window) + + snapshot = self._store.record_and_check( + scope_key=scope_key, + period_key=period_key, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + limit_usd=rule.limit_usd, + limit_tokens=rule.limit_tokens, + ) + + snap_info = { + "scope_key": scope_key, + "period_key": period_key, + "window": rule.window or "cumulative", + "spent_usd": snapshot.spent_usd, + "spent_tokens": snapshot.spent_tokens, + "limit_usd": snapshot.limit_usd, + "limit_tokens": snapshot.limit_tokens, + "utilization": round(snapshot.utilization, 4), + "exceeded": snapshot.exceeded, + } + all_snapshots.append(snap_info) + + if snapshot.exceeded: + breached_rules.append(snap_info) + + if breached_rules: + first = breached_rules[0] + return EvaluatorResult( + matched=True, + confidence=1.0, + message=( + f"Budget exceeded: {first['scope_key']} " + f"({first['window']}, utilization={first['utilization']:.0%})" + ), + metadata={ + "breached_rules": breached_rules, + "all_snapshots": all_snapshots, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cost_usd": cost_usd, + }, + ) + + max_util = max((s["utilization"] for s in all_snapshots), default=0.0) + return EvaluatorResult( + matched=False, + confidence=1.0, + message=f"Within budget (utilization={max_util:.0%})", + metadata={ + "all_snapshots": all_snapshots, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cost_usd": cost_usd, + "max_utilization": round(max_util, 4), + }, + ) + + +def _scope_matches(rule: BudgetLimitRule, metadata: dict[str, str]) -> bool: + """Check if rule's static scope dimensions match step metadata. + + An empty scope dict matches everything (global rule). + """ + for key, expected in rule.scope.items(): + if metadata.get(key) != expected: + return False + return True diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/store.py b/evaluators/builtin/src/agent_control_evaluators/budget/store.py new file mode 100644 index 00000000..be7a6365 --- /dev/null +++ b/evaluators/builtin/src/agent_control_evaluators/budget/store.py @@ -0,0 +1,220 @@ +"""BudgetStore protocol and InMemoryBudgetStore implementation. + +Provides atomic record-and-check semantics for cumulative budget tracking. +The store is intentionally "dumb" -- it accumulates usage and checks limits +but does not own scope key derivation or period key computation. The +BudgetEvaluator is the "smart" layer that resolves those before calling +the store. +""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass +from typing import Any, Protocol, runtime_checkable + + +@dataclass(frozen=True) +class BudgetSnapshot: + """Immutable view of budget state at a point in time. + + Attributes: + spent_usd: Cumulative USD spent in this scope+period. + spent_tokens: Cumulative tokens (input + output) in this scope+period. + limit_usd: Configured USD ceiling, or None if uncapped. + limit_tokens: Configured token ceiling, or None if uncapped. + utilization: max(usd_ratio, token_ratio) clamped to [0.0, 1.0]. + 0.0 when no limits are set. + exceeded: True when any limit is breached. + """ + + spent_usd: float + spent_tokens: int + limit_usd: float | None + limit_tokens: int | None + utilization: float + exceeded: bool + + +@runtime_checkable +class BudgetStore(Protocol): + """Protocol for budget storage backends. + + Implementations must provide atomic record-and-check: a single call + that records usage and returns the current totals. This prevents + read-then-write race conditions under concurrent access. + + Built-in: InMemoryBudgetStore (dict + threading.Lock). + External: Redis, PostgreSQL, etc. (separate packages). + """ + + def record_and_check( + self, + scope_key: str, + period_key: str, + input_tokens: int, + output_tokens: int, + cost_usd: float, + limit_usd: float | None = None, + limit_tokens: int | None = None, + ) -> BudgetSnapshot: ... + + +@dataclass +class _Bucket: + """Internal mutable accumulator for a single (scope, period) pair.""" + + spent_usd: float = 0.0 + input_tokens: int = 0 + output_tokens: int = 0 + + @property + def total_tokens(self) -> int: + return self.input_tokens + self.output_tokens + + +def _compute_utilization( + spent_usd: float, + spent_tokens: int, + limit_usd: float | None, + limit_tokens: int | None, +) -> float: + """Return max(usd_ratio, token_ratio) clamped to [0.0, 1.0].""" + ratios: list[float] = [] + if limit_usd is not None and limit_usd > 0: + ratios.append(min(spent_usd / limit_usd, 1.0)) + if limit_tokens is not None and limit_tokens > 0: + ratios.append(min(spent_tokens / limit_tokens, 1.0)) + return max(ratios) if ratios else 0.0 + + +class InMemoryBudgetStore: + """Thread-safe in-memory budget store. + + Uses a flat dict keyed by ``(scope_key, period_key)`` tuples. + Each entry is a ``_Bucket`` accumulating USD and token totals. + + Thread safety: all mutations are protected by a single + ``threading.Lock``. The lock scope is intentionally narrow -- + only the dict lookup + bucket update + snapshot construction. + + This store is suitable for single-process deployments and the + Python SDK. For multi-process or distributed setups, use a + Redis or Postgres-backed store (separate package). + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._buckets: dict[tuple[str, str], _Bucket] = {} + + def record_and_check( + self, + scope_key: str, + period_key: str, + input_tokens: int, + output_tokens: int, + cost_usd: float, + limit_usd: float | None = None, + limit_tokens: int | None = None, + ) -> BudgetSnapshot: + """Atomically record usage and return current budget state. + + Args: + scope_key: Composite scope identifier (e.g. "channel=slack|user_id=u1"). + period_key: Time window identifier (e.g. "2026-03-21", "2026-W12"). + input_tokens: Input tokens consumed by this call. + output_tokens: Output tokens consumed by this call. + cost_usd: USD cost of this call. + limit_usd: USD ceiling for this scope+period (None = uncapped). + limit_tokens: Token ceiling for this scope+period (None = uncapped). + + Returns: + BudgetSnapshot with current totals and exceeded status. + """ + key = (scope_key, period_key) + with self._lock: + bucket = self._buckets.get(key) + if bucket is None: + bucket = _Bucket() + self._buckets[key] = bucket + bucket.spent_usd += cost_usd + bucket.input_tokens += input_tokens + bucket.output_tokens += output_tokens + + total_tokens = bucket.total_tokens + utilization = _compute_utilization( + bucket.spent_usd, total_tokens, limit_usd, limit_tokens + ) + exceeded = False + if limit_usd is not None and bucket.spent_usd > limit_usd: + exceeded = True + if limit_tokens is not None and total_tokens > limit_tokens: + exceeded = True + + return BudgetSnapshot( + spent_usd=bucket.spent_usd, + spent_tokens=total_tokens, + limit_usd=limit_usd, + limit_tokens=limit_tokens, + utilization=utilization, + exceeded=exceeded, + ) + + def get_snapshot( + self, + scope_key: str, + period_key: str, + limit_usd: float | None = None, + limit_tokens: int | None = None, + ) -> BudgetSnapshot: + """Read current budget state without recording usage.""" + key = (scope_key, period_key) + with self._lock: + bucket = self._buckets.get(key) + if bucket is None: + return BudgetSnapshot( + spent_usd=0.0, + spent_tokens=0, + limit_usd=limit_usd, + limit_tokens=limit_tokens, + utilization=0.0, + exceeded=False, + ) + total_tokens = bucket.total_tokens + utilization = _compute_utilization( + bucket.spent_usd, total_tokens, limit_usd, limit_tokens + ) + exceeded = False + if limit_usd is not None and bucket.spent_usd > limit_usd: + exceeded = True + if limit_tokens is not None and total_tokens > limit_tokens: + exceeded = True + return BudgetSnapshot( + spent_usd=bucket.spent_usd, + spent_tokens=total_tokens, + limit_usd=limit_usd, + limit_tokens=limit_tokens, + utilization=utilization, + exceeded=exceeded, + ) + + def reset(self, scope_key: str | None = None, period_key: str | None = None) -> None: + """Clear accumulated usage. + + Args: + scope_key: If set, only clear buckets matching this scope. + period_key: If set, only clear buckets matching this period. + Both None: clear everything. + """ + with self._lock: + if scope_key is None and period_key is None: + self._buckets.clear() + return + keys_to_remove = [ + k + for k in self._buckets + if (scope_key is None or k[0] == scope_key) + and (period_key is None or k[1] == period_key) + ] + for k in keys_to_remove: + del self._buckets[k] diff --git a/evaluators/builtin/tests/budget/__init__.py b/evaluators/builtin/tests/budget/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py new file mode 100644 index 00000000..de4202db --- /dev/null +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -0,0 +1,389 @@ +"""Tests for the budget evaluator.""" + +from __future__ import annotations + +import threading +from typing import Any +from unittest.mock import patch + +import pytest +from pydantic import ValidationError + +from agent_control_evaluators.budget.config import BudgetEvaluatorConfig, BudgetLimitRule +from agent_control_evaluators.budget.evaluator import ( + BudgetEvaluator, + _build_scope_key, + _derive_period_key, + _extract_tokens, +) +from agent_control_evaluators.budget.store import ( + BudgetSnapshot, + InMemoryBudgetStore, + _compute_utilization, +) + + +# --------------------------------------------------------------------------- +# InMemoryBudgetStore +# --------------------------------------------------------------------------- + + +class TestInMemoryBudgetStore: + """Unit tests for the in-memory budget store.""" + + def test_basic_record_and_check(self) -> None: + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 100, 50, 0.01, limit_usd=1.0) + assert snap.spent_usd == pytest.approx(0.01) + assert snap.spent_tokens == 150 + assert snap.exceeded is False + + def test_accumulation(self) -> None: + store = InMemoryBudgetStore() + store.record_and_check("s", "p", 100, 50, 0.5, limit_usd=1.0) + snap = store.record_and_check("s", "p", 100, 50, 0.6, limit_usd=1.0) + assert snap.spent_usd == pytest.approx(1.1) + assert snap.spent_tokens == 300 + assert snap.exceeded is True + + def test_scope_isolation(self) -> None: + store = InMemoryBudgetStore() + store.record_and_check("scope-a", "p", 100, 0, 1.0, limit_usd=2.0) + snap_b = store.record_and_check("scope-b", "p", 50, 0, 0.5, limit_usd=2.0) + assert snap_b.spent_usd == pytest.approx(0.5) + assert snap_b.exceeded is False + + def test_period_isolation(self) -> None: + store = InMemoryBudgetStore() + store.record_and_check("s", "2026-03-20", 100, 0, 0.5, limit_usd=1.0) + snap = store.record_and_check("s", "2026-03-21", 100, 0, 0.5, limit_usd=1.0) + assert snap.spent_usd == pytest.approx(0.5) + assert snap.exceeded is False + + def test_exceeded_usd(self) -> None: + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 0, 0, 2.0, limit_usd=1.0) + assert snap.exceeded is True + assert snap.utilization == pytest.approx(1.0) + + def test_exceeded_tokens(self) -> None: + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 600, 500, 0.0, limit_tokens=1000) + assert snap.exceeded is True + assert snap.spent_tokens == 1100 + + def test_exceeded_both_max_utilization(self) -> None: + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 500, 500, 0.8, limit_usd=1.0, limit_tokens=2000) + assert snap.exceeded is False + assert snap.utilization == pytest.approx(0.8) # usd: 0.8, tokens: 0.5 + + def test_no_limits_never_exceeded(self) -> None: + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 999999, 999999, 999999.0) + assert snap.exceeded is False + assert snap.utilization == 0.0 + + def test_thread_safety(self) -> None: + store = InMemoryBudgetStore() + errors: list[str] = [] + + def record_many() -> None: + try: + for _ in range(100): + store.record_and_check("s", "p", 1, 1, 0.001, limit_usd=1000.0) + except Exception as exc: + errors.append(str(exc)) + + threads = [threading.Thread(target=record_many) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert errors == [] + snap = store.get_snapshot("s", "p", limit_usd=1000.0) + assert snap.spent_tokens == 2000 # 10 threads x 100 calls x 2 tokens + assert snap.spent_usd == pytest.approx(1.0) # 10 x 100 x 0.001 + + def test_get_snapshot_empty(self) -> None: + store = InMemoryBudgetStore() + snap = store.get_snapshot("nonexistent", "p", limit_usd=1.0) + assert snap.spent_usd == 0.0 + assert snap.spent_tokens == 0 + assert snap.exceeded is False + + def test_reset_all(self) -> None: + store = InMemoryBudgetStore() + store.record_and_check("a", "p1", 10, 10, 1.0) + store.record_and_check("b", "p2", 20, 20, 2.0) + store.reset() + assert store.get_snapshot("a", "p1").spent_usd == 0.0 + assert store.get_snapshot("b", "p2").spent_usd == 0.0 + + def test_reset_by_scope(self) -> None: + store = InMemoryBudgetStore() + store.record_and_check("a", "p", 10, 10, 1.0) + store.record_and_check("b", "p", 20, 20, 2.0) + store.reset(scope_key="a") + assert store.get_snapshot("a", "p").spent_usd == 0.0 + assert store.get_snapshot("b", "p").spent_usd == pytest.approx(2.0) + + +# --------------------------------------------------------------------------- +# Utility functions +# --------------------------------------------------------------------------- + + +class TestUtilities: + def test_compute_utilization_no_limits(self) -> None: + assert _compute_utilization(100.0, 10000, None, None) == 0.0 + + def test_compute_utilization_usd_only(self) -> None: + assert _compute_utilization(0.5, 0, 1.0, None) == pytest.approx(0.5) + + def test_compute_utilization_clamped(self) -> None: + assert _compute_utilization(2.0, 0, 1.0, None) == pytest.approx(1.0) + + def test_derive_period_key_none(self) -> None: + assert _derive_period_key(None) == "" + + def test_derive_period_key_daily(self) -> None: + key = _derive_period_key("daily") + assert len(key) == 10 # YYYY-MM-DD + assert key[4] == "-" and key[7] == "-" + + def test_derive_period_key_weekly(self) -> None: + key = _derive_period_key("weekly") + assert "-W" in key + + def test_derive_period_key_monthly(self) -> None: + key = _derive_period_key("monthly") + assert len(key) == 7 # YYYY-MM + + def test_build_scope_key_global(self) -> None: + assert _build_scope_key({}, None, {}) == "__global__" + + def test_build_scope_key_with_scope(self) -> None: + key = _build_scope_key({"channel": "slack"}, None, {}) + assert key == "channel=slack" + + def test_build_scope_key_with_per(self) -> None: + key = _build_scope_key({"channel": "slack"}, "user_id", {"user_id": "u1"}) + assert key == "channel=slack|user_id=u1" + + def test_build_scope_key_per_missing_in_metadata(self) -> None: + key = _build_scope_key({}, "user_id", {}) + assert key == "__global__" + + def test_extract_tokens_standard_fields(self) -> None: + data = {"usage": {"input_tokens": 100, "output_tokens": 50}} + assert _extract_tokens(data, None) == (100, 50) + + def test_extract_tokens_openai_fields(self) -> None: + data = {"usage": {"prompt_tokens": 80, "completion_tokens": 40}} + assert _extract_tokens(data, None) == (80, 40) + + def test_extract_tokens_total_only(self) -> None: + data = {"usage": {"total_tokens": 200}} + assert _extract_tokens(data, None) == (0, 200) + + def test_extract_tokens_none(self) -> None: + assert _extract_tokens(None, None) == (0, 0) + + def test_extract_tokens_custom_path(self) -> None: + data = {"metrics": {"tokens": 300}} + assert _extract_tokens(data, "metrics.tokens") == (0, 300) + + +# --------------------------------------------------------------------------- +# BudgetEvaluatorConfig +# --------------------------------------------------------------------------- + + +class TestBudgetEvaluatorConfig: + def test_valid_config(self) -> None: + config = BudgetEvaluatorConfig( + limits=[BudgetLimitRule(limit_usd=10.0)] + ) + assert len(config.limits) == 1 + + def test_empty_limits_rejected(self) -> None: + with pytest.raises(ValidationError): + BudgetEvaluatorConfig(limits=[]) + + def test_no_limit_rejected(self) -> None: + with pytest.raises(ValidationError, match="At least one"): + BudgetLimitRule() + + def test_negative_limit_rejected(self) -> None: + with pytest.raises(ValidationError, match="positive"): + BudgetLimitRule(limit_usd=-1.0) + + def test_token_only_limit(self) -> None: + rule = BudgetLimitRule(limit_tokens=1000) + assert rule.limit_usd is None + assert rule.limit_tokens == 1000 + + +# --------------------------------------------------------------------------- +# BudgetEvaluator +# --------------------------------------------------------------------------- + + +class TestBudgetEvaluator: + """Integration tests for the budget evaluator.""" + + def _make_evaluator(self, **kwargs: Any) -> BudgetEvaluator: + config = BudgetEvaluatorConfig(**kwargs) + return BudgetEvaluator(config) + + @pytest.mark.asyncio + async def test_single_call_under_budget(self) -> None: + ev = self._make_evaluator(limits=[{"limit_usd": 10.0}]) + result = await ev.evaluate({"usage": {"input_tokens": 100, "output_tokens": 50}, "cost": 0.01}) + assert result.matched is False + assert result.confidence == 1.0 + + @pytest.mark.asyncio + async def test_accumulate_past_budget(self) -> None: + ev = self._make_evaluator( + limits=[{"limit_usd": 0.05}], + cost_path="cost", + ) + await ev.evaluate({"cost": 0.03}) + result = await ev.evaluate({"cost": 0.03}) + assert result.matched is True + assert result.metadata is not None + assert len(result.metadata["breached_rules"]) == 1 + + @pytest.mark.asyncio + async def test_per_channel_limits(self) -> None: + ev = self._make_evaluator( + limits=[ + {"scope": {"channel": "slack"}, "limit_usd": 1.0}, + {"scope": {"channel": "api"}, "limit_usd": 10.0}, + ], + cost_path="cost", + metadata_paths={"channel": "channel"}, + ) + # Slack over budget + result = await ev.evaluate({"cost": 2.0, "channel": "slack"}) + assert result.matched is True + # API under budget + result = await ev.evaluate({"cost": 2.0, "channel": "api"}) + assert result.matched is False + + @pytest.mark.asyncio + async def test_per_user_per_channel(self) -> None: + ev = self._make_evaluator( + limits=[{"scope": {"channel": "slack"}, "per": "user_id", "limit_usd": 1.0}], + cost_path="cost", + metadata_paths={"channel": "channel", "user_id": "user_id"}, + ) + await ev.evaluate({"cost": 0.8, "channel": "slack", "user_id": "u1"}) + # u1 is near limit, u2 is fresh + r1 = await ev.evaluate({"cost": 0.3, "channel": "slack", "user_id": "u1"}) + r2 = await ev.evaluate({"cost": 0.3, "channel": "slack", "user_id": "u2"}) + assert r1.matched is True # u1 exceeded + assert r2.matched is False # u2 under budget + + @pytest.mark.asyncio + async def test_daily_window(self) -> None: + ev = self._make_evaluator( + limits=[{"window": "daily", "limit_usd": 1.0}], + cost_path="cost", + ) + await ev.evaluate({"cost": 0.8}) + result = await ev.evaluate({"cost": 0.3}) + assert result.matched is True + + @pytest.mark.asyncio + async def test_token_only_limit(self) -> None: + ev = self._make_evaluator(limits=[{"limit_tokens": 500}]) + result = await ev.evaluate({"usage": {"input_tokens": 300, "output_tokens": 300}}) + assert result.matched is True + + @pytest.mark.asyncio + async def test_usd_only_limit(self) -> None: + ev = self._make_evaluator( + limits=[{"limit_usd": 1.0}], + cost_path="cost", + ) + result = await ev.evaluate({"cost": 0.5}) + assert result.matched is False + + @pytest.mark.asyncio + async def test_both_limits_token_exceeded(self) -> None: + ev = self._make_evaluator( + limits=[{"limit_usd": 10.0, "limit_tokens": 100}], + cost_path="cost", + ) + result = await ev.evaluate({"cost": 0.01, "usage": {"input_tokens": 60, "output_tokens": 60}}) + assert result.matched is True # tokens exceeded, USD ok + + @pytest.mark.asyncio + async def test_missing_metadata_global_fallback(self) -> None: + ev = self._make_evaluator( + limits=[ + {"scope": {"channel": "slack"}, "limit_usd": 1.0}, + {"scope": {}, "limit_usd": 100.0}, # global fallback + ], + cost_path="cost", + metadata_paths={"channel": "channel"}, + ) + # No channel in data -- only global rule applies + result = await ev.evaluate({"cost": 0.5}) + assert result.matched is False + assert result.metadata is not None + # Only 1 snapshot (global), slack rule skipped + assert len(result.metadata["all_snapshots"]) == 1 + + @pytest.mark.asyncio + async def test_no_data_returns_not_matched(self) -> None: + ev = self._make_evaluator(limits=[{"limit_usd": 1.0}]) + result = await ev.evaluate(None) + assert result.matched is False + + @pytest.mark.asyncio + async def test_pricing_estimation(self) -> None: + ev = self._make_evaluator( + limits=[{"limit_usd": 0.01}], + pricing={"gpt-4o": {"input_per_1k": 0.005, "output_per_1k": 0.015}}, + model_path="model", + ) + # 1000 input + 1000 output = $0.005 + $0.015 = $0.02 + result = await ev.evaluate({ + "model": "gpt-4o", + "usage": {"input_tokens": 1000, "output_tokens": 1000}, + }) + assert result.matched is True + assert result.metadata["cost_usd"] == pytest.approx(0.02) + + @pytest.mark.asyncio + async def test_confidence_always_one(self) -> None: + ev = self._make_evaluator(limits=[{"limit_usd": 1.0}], cost_path="cost") + r1 = await ev.evaluate({"cost": 0.5}) + r2 = await ev.evaluate({"cost": 0.6}) + assert r1.confidence == 1.0 + assert r2.confidence == 1.0 + + @pytest.mark.asyncio + async def test_utilization_in_metadata(self) -> None: + ev = self._make_evaluator(limits=[{"limit_usd": 10.0}], cost_path="cost") + result = await ev.evaluate({"cost": 5.0}) + assert result.metadata is not None + assert result.metadata["max_utilization"] == pytest.approx(0.5) + + +class TestBudgetEvaluatorRegistration: + def test_registered(self) -> None: + from agent_control_evaluators import get_evaluator + + cls = get_evaluator("budget") + assert cls is BudgetEvaluator + + def test_from_dict(self) -> None: + ev = BudgetEvaluator.from_dict({"limits": [{"limit_usd": 5.0}]}) + assert isinstance(ev, BudgetEvaluator) + assert ev.config.limits[0].limit_usd == 5.0 From 8ae042f27cf642bf2d29cd74fdc75b1673c8cd90 Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 09:39:04 +0900 Subject: [PATCH 2/7] fix(evaluators): budget evaluator R1 -- security hardening + 6 adversarial tests 3-body review findings: Security: - Sanitize pipe/equals in scope key metadata values (injection prevention) - Add max_buckets=100K to InMemoryBudgetStore (OOM prevention, fail-closed) - Block dunder attribute access in _extract_by_path - Add math.isfinite guard on extracted cost values - Skip per-user rules when per field missing from metadata (was collapsing per-user budgets into global bucket) Correctness: - Changed exceeded check from > to >= (utilization=100% now triggers exceeded) - Removed unused BudgetSnapshot import from evaluator.py Tests (6 adversarial): - Exact limit boundary (USD and tokens) - Scope key injection via pipe character - max_buckets OOM prevention - per-field missing skips rule - dunder path rejection 54 budget tests, 284 total evaluator tests passing. --- .../budget/evaluator.py | 24 +++++++-- .../agent_control_evaluators/budget/store.py | 23 ++++++-- .../builtin/tests/budget/test_budget.py | 53 +++++++++++++++++++ 3 files changed, 90 insertions(+), 10 deletions(-) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py index 73d5e658..14f941eb 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py @@ -8,13 +8,14 @@ from __future__ import annotations import logging +import math from datetime import datetime, timezone from typing import Any from agent_control_evaluators._base import Evaluator, EvaluatorMetadata from agent_control_evaluators._registry import register_evaluator from agent_control_evaluators.budget.config import BudgetEvaluatorConfig, BudgetLimitRule -from agent_control_evaluators.budget.store import BudgetSnapshot, InMemoryBudgetStore +from agent_control_evaluators.budget.store import InMemoryBudgetStore from agent_control_models import EvaluatorResult logger = logging.getLogger(__name__) @@ -40,6 +41,11 @@ def _derive_period_key(window: str | None) -> str: return "" +def _sanitize_scope_value(val: str) -> str: + """Remove pipe and equals from scope values to prevent key injection.""" + return val.replace("|", "_").replace("=", "_") + + def _build_scope_key( scope: dict[str, str], per: str | None, @@ -48,12 +54,13 @@ def _build_scope_key( """Build a composite scope key from static dimensions and per-field. Format: "channel=slack|user_id=u1" or "__global__" if empty. + Values are sanitized to prevent injection via pipe/equals characters. """ parts: list[str] = [] for k, v in sorted(scope.items()): - parts.append(f"{k}={v}") + parts.append(f"{k}={_sanitize_scope_value(v)}") if per and per in metadata: - parts.append(f"{per}={metadata[per]}") + parts.append(f"{per}={_sanitize_scope_value(metadata[per])}") return "|".join(parts) if parts else "__global__" @@ -61,6 +68,8 @@ def _extract_by_path(data: Any, path: str) -> Any: """Extract a value from nested data using dot-notation path.""" current = data for part in path.split("."): + if part.startswith("_"): + return None if isinstance(current, dict): current = current.get(part) elif hasattr(current, part): @@ -112,7 +121,7 @@ def _extract_cost(data: Any, cost_path: str | None) -> float | None: if data is None or cost_path is None: return None val = _extract_by_path(data, cost_path) - if isinstance(val, (int, float)) and val >= 0: + if isinstance(val, (int, float)) and math.isfinite(val) and val >= 0: return float(val) return None @@ -286,11 +295,16 @@ async def evaluate(self, data: Any) -> EvaluatorResult: def _scope_matches(rule: BudgetLimitRule, metadata: dict[str, str]) -> bool: - """Check if rule's static scope dimensions match step metadata. + """Check if rule's scope dimensions match step metadata. An empty scope dict matches everything (global rule). + If ``per`` is set but the field is missing from metadata, the rule + is skipped to prevent per-user budgets from collapsing into a + single global bucket. """ for key, expected in rule.scope.items(): if metadata.get(key) != expected: return False + if rule.per and rule.per not in metadata: + return False return True diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/store.py b/evaluators/builtin/src/agent_control_evaluators/budget/store.py index be7a6365..75754674 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/store.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/store.py @@ -103,9 +103,12 @@ class InMemoryBudgetStore: Redis or Postgres-backed store (separate package). """ - def __init__(self) -> None: + _DEFAULT_MAX_BUCKETS = 100_000 + + def __init__(self, *, max_buckets: int = _DEFAULT_MAX_BUCKETS) -> None: self._lock = threading.Lock() self._buckets: dict[tuple[str, str], _Bucket] = {} + self._max_buckets = max_buckets def record_and_check( self, @@ -135,6 +138,16 @@ def record_and_check( with self._lock: bucket = self._buckets.get(key) if bucket is None: + if len(self._buckets) >= self._max_buckets: + # Fail-closed: treat as exceeded to prevent OOM + return BudgetSnapshot( + spent_usd=cost_usd, + spent_tokens=input_tokens + output_tokens, + limit_usd=limit_usd, + limit_tokens=limit_tokens, + utilization=1.0, + exceeded=True, + ) bucket = _Bucket() self._buckets[key] = bucket bucket.spent_usd += cost_usd @@ -146,9 +159,9 @@ def record_and_check( bucket.spent_usd, total_tokens, limit_usd, limit_tokens ) exceeded = False - if limit_usd is not None and bucket.spent_usd > limit_usd: + if limit_usd is not None and bucket.spent_usd >= limit_usd: exceeded = True - if limit_tokens is not None and total_tokens > limit_tokens: + if limit_tokens is not None and total_tokens >= limit_tokens: exceeded = True return BudgetSnapshot( @@ -185,9 +198,9 @@ def get_snapshot( bucket.spent_usd, total_tokens, limit_usd, limit_tokens ) exceeded = False - if limit_usd is not None and bucket.spent_usd > limit_usd: + if limit_usd is not None and bucket.spent_usd >= limit_usd: exceeded = True - if limit_tokens is not None and total_tokens > limit_tokens: + if limit_tokens is not None and total_tokens >= limit_tokens: exceeded = True return BudgetSnapshot( spent_usd=bucket.spent_usd, diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py index de4202db..f307002e 100644 --- a/evaluators/builtin/tests/budget/test_budget.py +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -387,3 +387,56 @@ def test_from_dict(self) -> None: ev = BudgetEvaluator.from_dict({"limits": [{"limit_usd": 5.0}]}) assert isinstance(ev, BudgetEvaluator) assert ev.config.limits[0].limit_usd == 5.0 + + +# --------------------------------------------------------------------------- +# Security / adversarial tests (R1 findings) +# --------------------------------------------------------------------------- + + +class TestBudgetAdversarial: + """Adversarial tests for budget evaluator security.""" + + def test_exceeded_at_exact_limit_usd(self) -> None: + """Spending exactly the limit must trigger exceeded (>= not >).""" + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 0, 0, 1.0, limit_usd=1.0) + assert snap.exceeded is True + assert snap.utilization == pytest.approx(1.0) + + def test_exceeded_at_exact_limit_tokens(self) -> None: + store = InMemoryBudgetStore() + snap = store.record_and_check("s", "p", 500, 500, 0.0, limit_tokens=1000) + assert snap.exceeded is True + + def test_scope_key_injection_pipe(self) -> None: + """Pipe in metadata value must be sanitized, not create new scope dimension.""" + key = _build_scope_key({"ch": "slack"}, "uid", {"ch": "slack", "uid": "u1|ch=admin"}) + assert "|ch=admin" not in key.split("|")[-1] # injected dimension not present + assert "u1_ch_admin" in key # sanitized + + def test_max_buckets_prevents_oom(self) -> None: + store = InMemoryBudgetStore(max_buckets=5) + for i in range(10): + snap = store.record_and_check(f"scope-{i}", "p", 1, 1, 0.01, limit_usd=100.0) + # After 5, new buckets are rejected with exceeded=True + assert len(store._buckets) == 5 + + @pytest.mark.asyncio + async def test_per_without_metadata_skips_rule(self) -> None: + """per='user_id' but user_id missing -> rule skipped, not global.""" + from agent_control_evaluators.budget.evaluator import BudgetEvaluator + config = BudgetEvaluatorConfig( + limits=[{"scope": {}, "per": "user_id", "limit_usd": 1.0}], + cost_path="cost", + metadata_paths={"user_id": "user_id"}, + ) + ev = BudgetEvaluator(config) + # No user_id in data -> rule skipped -> not matched (no applicable rules) + result = await ev.evaluate({"cost": 999.0}) + assert result.matched is False + + def test_extract_by_path_rejects_dunder(self) -> None: + from agent_control_evaluators.budget.evaluator import _extract_by_path + assert _extract_by_path({"a": 1}, "__class__") is None + assert _extract_by_path({"a": {"__init__": 1}}, "a.__init__") is None From 4cd08eb8d4cd4b3b1412cbdedd004ac5090754d8 Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 09:41:48 +0900 Subject: [PATCH 3/7] fix(evaluators): budget R2 -- percent-encoding, fail-closed snapshot, dunder guard R2 findings: - _sanitize_scope_value: percent-encode |/= instead of replacing with _ (was causing key collisions between "a|b" and "a_b") - max_buckets fail-closed: spent_usd/spent_tokens now 0.0/0 (not recorded, previously reported current-call-only values misleading callers) - _extract_by_path: narrowed guard from startswith("_") to startswith("__") (single-underscore dict keys are legitimate data fields) - Fixed tautological test assertion in test_scope_key_injection_pipe - Added 3 tests: no-collision, single-underscore access, NaN/Inf cost 57 budget tests, 287 total evaluator tests passing. --- .../budget/evaluator.py | 10 +++++-- .../agent_control_evaluators/budget/store.py | 8 +++-- .../builtin/tests/budget/test_budget.py | 29 +++++++++++++++++-- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py index 14f941eb..0e06f8ba 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py @@ -42,8 +42,12 @@ def _derive_period_key(window: str | None) -> str: def _sanitize_scope_value(val: str) -> str: - """Remove pipe and equals from scope values to prevent key injection.""" - return val.replace("|", "_").replace("=", "_") + """Percent-encode pipe and equals in scope values to prevent key injection. + + Uses percent-encoding (not replacement) to preserve round-trip + distinction: "a|b" and "a_b" remain different keys. + """ + return val.replace("%", "%25").replace("|", "%7C").replace("=", "%3D") def _build_scope_key( @@ -68,7 +72,7 @@ def _extract_by_path(data: Any, path: str) -> Any: """Extract a value from nested data using dot-notation path.""" current = data for part in path.split("."): - if part.startswith("_"): + if part.startswith("__"): return None if isinstance(current, dict): current = current.get(part) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/store.py b/evaluators/builtin/src/agent_control_evaluators/budget/store.py index 75754674..f647f484 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/store.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/store.py @@ -139,10 +139,12 @@ def record_and_check( bucket = self._buckets.get(key) if bucket is None: if len(self._buckets) >= self._max_buckets: - # Fail-closed: treat as exceeded to prevent OOM + # Fail-closed: treat as exceeded to prevent OOM. + # spent fields are 0 because no bucket exists for + # this scope+period -- the call is rejected, not recorded. return BudgetSnapshot( - spent_usd=cost_usd, - spent_tokens=input_tokens + output_tokens, + spent_usd=0.0, + spent_tokens=0, limit_usd=limit_usd, limit_tokens=limit_tokens, utilization=1.0, diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py index f307002e..ea9e48a5 100644 --- a/evaluators/builtin/tests/budget/test_budget.py +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -412,15 +412,26 @@ def test_exceeded_at_exact_limit_tokens(self) -> None: def test_scope_key_injection_pipe(self) -> None: """Pipe in metadata value must be sanitized, not create new scope dimension.""" key = _build_scope_key({"ch": "slack"}, "uid", {"ch": "slack", "uid": "u1|ch=admin"}) - assert "|ch=admin" not in key.split("|")[-1] # injected dimension not present - assert "u1_ch_admin" in key # sanitized + parts = key.split("|") + assert len(parts) == 2, f"Expected 2 parts, got {len(parts)}: {parts}" + assert "ch=admin" not in parts # injected component not a segment + + def test_scope_key_no_collision(self) -> None: + """Percent-encoding must keep 'a|b' distinct from 'a_b'.""" + key1 = _build_scope_key({}, "uid", {"uid": "a|b"}) + key2 = _build_scope_key({}, "uid", {"uid": "a_b"}) + assert key1 != key2 def test_max_buckets_prevents_oom(self) -> None: store = InMemoryBudgetStore(max_buckets=5) + exceeded_count = 0 for i in range(10): snap = store.record_and_check(f"scope-{i}", "p", 1, 1, 0.01, limit_usd=100.0) - # After 5, new buckets are rejected with exceeded=True + if snap.exceeded: + exceeded_count += 1 + assert snap.spent_usd == 0.0 # not recorded, fail-closed assert len(store._buckets) == 5 + assert exceeded_count == 5 # scopes 5-9 rejected @pytest.mark.asyncio async def test_per_without_metadata_skips_rule(self) -> None: @@ -440,3 +451,15 @@ def test_extract_by_path_rejects_dunder(self) -> None: from agent_control_evaluators.budget.evaluator import _extract_by_path assert _extract_by_path({"a": 1}, "__class__") is None assert _extract_by_path({"a": {"__init__": 1}}, "a.__init__") is None + + def test_extract_by_path_allows_single_underscore(self) -> None: + """Single-underscore keys in data dicts are legitimate.""" + from agent_control_evaluators.budget.evaluator import _extract_by_path + assert _extract_by_path({"_metadata": {"tokens": 42}}, "_metadata.tokens") == 42 + + def test_extract_cost_rejects_nan_inf(self) -> None: + """NaN and Inf must not pass through as cost values.""" + from agent_control_evaluators.budget.evaluator import _extract_cost + assert _extract_cost({"c": float("nan")}, "c") is None + assert _extract_cost({"c": float("inf")}, "c") is None + assert _extract_cost({"c": float("-inf")}, "c") is None From 0b41ae9de5a3c5386f9c0095cccaed970762ba99 Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 09:45:19 +0900 Subject: [PATCH 4/7] fix(evaluators): budget R4 -- clamp negative pricing cost to zero R4 finding: negative pricing rates in config caused _estimate_cost to return negative cost_usd, which subtracted from spent_usd and disabled USD limit enforcement entirely. Fix: max(0.0, cost) in _estimate_cost return. Test: negative pricing rates produce spent_usd >= 0. 58 budget tests, 288 total evaluator tests passing. --- .../agent_control_evaluators/budget/evaluator.py | 3 ++- evaluators/builtin/tests/budget/test_budget.py | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py index 0e06f8ba..67785926 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py @@ -144,7 +144,8 @@ def _estimate_cost( return 0.0 input_rate = rates.get("input_per_1k", 0.0) output_rate = rates.get("output_per_1k", 0.0) - return (input_tokens * input_rate + output_tokens * output_rate) / 1000.0 + cost = (input_tokens * input_rate + output_tokens * output_rate) / 1000.0 + return max(0.0, cost) # never return negative cost def _extract_metadata(data: Any, metadata_paths: dict[str, str]) -> dict[str, str]: diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py index ea9e48a5..cd69416e 100644 --- a/evaluators/builtin/tests/budget/test_budget.py +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -463,3 +463,18 @@ def test_extract_cost_rejects_nan_inf(self) -> None: assert _extract_cost({"c": float("nan")}, "c") is None assert _extract_cost({"c": float("inf")}, "c") is None assert _extract_cost({"c": float("-inf")}, "c") is None + + @pytest.mark.asyncio + async def test_negative_pricing_does_not_reduce_budget(self) -> None: + """Negative pricing rates must not produce negative cost (budget credit).""" + from agent_control_evaluators.budget.evaluator import BudgetEvaluator + config = BudgetEvaluatorConfig( + limits=[{"limit_usd": 0.01}], + pricing={"model": {"input_per_1k": -5.0, "output_per_1k": -5.0}}, + model_path="model", + ) + ev = BudgetEvaluator(config) + for _ in range(10): + await ev.evaluate({"model": "model", "usage": {"input_tokens": 1000, "output_tokens": 1000}}) + snap = ev._store.get_snapshot("__global__", "", limit_usd=0.01) + assert snap.spent_usd >= 0.0 # must not go negative From 5a100f871b4241343aebc8f0d52637afb149a080 Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 09:49:14 +0900 Subject: [PATCH 5/7] fix(evaluators): budget R5 -- isfinite guard on estimated cost R5 finding: Inf pricing rates produced inf cost, permanently locking buckets in exceeded state. max(0.0, inf) = inf. Fix: isfinite + negative check on _estimate_cost return value. Tests: Inf pricing rate test, strengthened negative pricing assertion. 59 budget tests passing. --- .../budget/evaluator.py | 4 +++- evaluators/builtin/tests/budget/test_budget.py | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py index 67785926..17eb2762 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py @@ -145,7 +145,9 @@ def _estimate_cost( input_rate = rates.get("input_per_1k", 0.0) output_rate = rates.get("output_per_1k", 0.0) cost = (input_tokens * input_rate + output_tokens * output_rate) / 1000.0 - return max(0.0, cost) # never return negative cost + if not math.isfinite(cost) or cost < 0: + return 0.0 + return cost def _extract_metadata(data: Any, metadata_paths: dict[str, str]) -> dict[str, str]: diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py index cd69416e..eed9ff37 100644 --- a/evaluators/builtin/tests/budget/test_budget.py +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -477,4 +477,20 @@ async def test_negative_pricing_does_not_reduce_budget(self) -> None: for _ in range(10): await ev.evaluate({"model": "model", "usage": {"input_tokens": 1000, "output_tokens": 1000}}) snap = ev._store.get_snapshot("__global__", "", limit_usd=0.01) - assert snap.spent_usd >= 0.0 # must not go negative + assert snap.spent_usd == pytest.approx(0.0) # negative rates clamped to 0 + + @pytest.mark.asyncio + async def test_inf_pricing_does_not_cause_inf_cost(self) -> None: + """Inf pricing rates must not produce inf cost (permanent false positive).""" + import math as _math + from agent_control_evaluators.budget.evaluator import BudgetEvaluator + config = BudgetEvaluatorConfig( + limits=[{"limit_usd": 100.0}], + pricing={"model": {"input_per_1k": float("inf"), "output_per_1k": 0.01}}, + model_path="model", + ) + ev = BudgetEvaluator(config) + await ev.evaluate({"model": "model", "usage": {"input_tokens": 100, "output_tokens": 100}}) + snap = ev._store.get_snapshot("__global__", "", limit_usd=100.0) + assert _math.isfinite(snap.spent_usd) + assert snap.spent_usd == pytest.approx(0.0) # inf rate -> cost clamped to 0 From 96e59ba8bce3386438b68586978313b0eda2a8b4 Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 10:01:14 +0900 Subject: [PATCH 6/7] fix(evaluators): budget R8 -- reject NaN/Inf limit_usd in config validation R8 finding: float("nan") passed the `v <= 0` validator (IEEE 754: nan <= 0 is False). NaN limit_usd silently disabled budget enforcement because all NaN comparisons return False. Fix: added math.isfinite(v) guard to validate_limit_usd. Tests: NaN and Inf limit_usd rejection. 61 budget tests, 291 total evaluator tests passing. --- .../agent_control_evaluators/budget/config.py | 5 +++-- evaluators/builtin/tests/budget/test_budget.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/config.py b/evaluators/builtin/src/agent_control_evaluators/budget/config.py index d18372b5..eaa5d8b0 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/config.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/config.py @@ -2,6 +2,7 @@ from __future__ import annotations +import math from typing import Any, Literal from pydantic import Field, field_validator, model_validator @@ -43,8 +44,8 @@ def at_least_one_limit(self) -> "BudgetLimitRule": @field_validator("limit_usd") @classmethod def validate_limit_usd(cls, v: float | None) -> float | None: - if v is not None and v <= 0: - raise ValueError("limit_usd must be positive") + if v is not None and (not math.isfinite(v) or v <= 0): + raise ValueError("limit_usd must be a finite positive number") return v @field_validator("limit_tokens") diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py index eed9ff37..5cd7739c 100644 --- a/evaluators/builtin/tests/budget/test_budget.py +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -220,6 +220,22 @@ def test_negative_limit_rejected(self) -> None: with pytest.raises(ValidationError, match="positive"): BudgetLimitRule(limit_usd=-1.0) + def test_nan_limit_usd_rejected(self) -> None: + """float('nan') passes v <= 0 check but must still be rejected. + + IEEE 754: nan <= 0 is False, so without an explicit isfinite guard, + nan silently passes validation. A nan limit_usd causes utilization=nan + and exceeded=False always (all nan comparisons are False), permanently + disabling the budget limit -- a silent security bypass. + """ + with pytest.raises(ValidationError, match="finite"): + BudgetLimitRule(limit_usd=float("nan")) + + def test_inf_limit_usd_rejected(self) -> None: + """float('inf') limit is accepted by v <= 0 but means limit never triggers.""" + with pytest.raises(ValidationError, match="finite"): + BudgetLimitRule(limit_usd=float("inf")) + def test_token_only_limit(self) -> None: rule = BudgetLimitRule(limit_tokens=1000) assert rule.limit_usd is None From fa414d744afd3d1ea3b28ac455ebdf814a244027 Mon Sep 17 00:00:00 2001 From: amabito Date: Sat, 21 Mar 2026 10:10:22 +0900 Subject: [PATCH 7/7] fix(evaluators): budget R10 -- prevent double-counting for shared scope+period R10 finding: when multiple limit rules share the same (scope_key, period_key), each rule called record_and_check() independently, causing the same tokens and cost to be counted N times in the store. Fix: track recorded (scope_key, period_key) pairs per evaluate() call. First rule records; subsequent rules for the same pair use get_snapshot(). Tests: 2 new tests for same-scope double-count prevention. 63 budget tests, 293 total evaluator tests passing. Review loop: R9 CLEAN, R10 fix, R11 CLEAN -- 3 consecutive clean achieved. --- .../budget/evaluator.py | 38 +++++++++++----- .../builtin/tests/budget/test_budget.py | 43 +++++++++++++++++++ pyproject.toml | 6 +++ 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py index 17eb2762..8a22efc3 100644 --- a/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py +++ b/evaluators/builtin/src/agent_control_evaluators/budget/evaluator.py @@ -230,9 +230,13 @@ async def evaluate(self, data: Any) -> EvaluatorResult: # Extract metadata for scope key building step_metadata = _extract_metadata(data, self.config.metadata_paths) - # Check each limit rule + # Check each limit rule. + # Track which (scope_key, period_key) pairs have already been recorded + # this evaluation to prevent double-counting when multiple rules share + # the same scope and window. breached_rules: list[dict[str, Any]] = [] all_snapshots: list[dict[str, Any]] = [] + recorded_pairs: set[tuple[str, str]] = set() for rule in self.config.limits: # Check if rule scope matches step metadata @@ -241,16 +245,28 @@ async def evaluate(self, data: Any) -> EvaluatorResult: scope_key = _build_scope_key(rule.scope, rule.per, step_metadata) period_key = _derive_period_key(rule.window) - - snapshot = self._store.record_and_check( - scope_key=scope_key, - period_key=period_key, - input_tokens=input_tokens, - output_tokens=output_tokens, - cost_usd=cost_usd, - limit_usd=rule.limit_usd, - limit_tokens=rule.limit_tokens, - ) + pair = (scope_key, period_key) + + if pair not in recorded_pairs: + # First rule for this (scope, period): record usage and check. + snapshot = self._store.record_and_check( + scope_key=scope_key, + period_key=period_key, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + limit_usd=rule.limit_usd, + limit_tokens=rule.limit_tokens, + ) + recorded_pairs.add(pair) + else: + # Subsequent rule for same (scope, period): read without recording. + snapshot = self._store.get_snapshot( + scope_key=scope_key, + period_key=period_key, + limit_usd=rule.limit_usd, + limit_tokens=rule.limit_tokens, + ) snap_info = { "scope_key": scope_key, diff --git a/evaluators/builtin/tests/budget/test_budget.py b/evaluators/builtin/tests/budget/test_budget.py index 5cd7739c..6c33343e 100644 --- a/evaluators/builtin/tests/budget/test_budget.py +++ b/evaluators/builtin/tests/budget/test_budget.py @@ -495,6 +495,49 @@ async def test_negative_pricing_does_not_reduce_budget(self) -> None: snap = ev._store.get_snapshot("__global__", "", limit_usd=0.01) assert snap.spent_usd == pytest.approx(0.0) # negative rates clamped to 0 + @pytest.mark.asyncio + async def test_two_rules_same_scope_no_double_count(self) -> None: + """Two rules with the same scope+window must not double-record usage. + + When limits=[{limit_usd: 1.0}, {limit_tokens: 5000}] are both global + and cumulative, they share scope_key='__global__' and period_key=''. + Recording twice to the same bucket would inflate spend 2x, causing + the tighter limit to trigger at half the configured threshold. + """ + from agent_control_evaluators.budget.evaluator import BudgetEvaluator + config = BudgetEvaluatorConfig( + limits=[ + {"limit_usd": 1.0}, + {"limit_tokens": 5000}, + ], + cost_path="cost", + ) + ev = BudgetEvaluator(config) + await ev.evaluate({"cost": 0.1, "usage": {"input_tokens": 100, "output_tokens": 100}}) + snap = ev._store.get_snapshot("__global__", "", limit_usd=1.0) + assert snap.spent_usd == pytest.approx(0.1), "spent_usd must not be double-counted" + assert snap.spent_tokens == 200, "spent_tokens must not be double-counted" + + @pytest.mark.asyncio + async def test_two_rules_same_scope_different_windows_no_double_count(self) -> None: + """Rules with different windows use different period_keys -- no shared bucket.""" + from agent_control_evaluators.budget.evaluator import BudgetEvaluator, _derive_period_key + config = BudgetEvaluatorConfig( + limits=[ + {"window": "daily", "limit_usd": 1.0}, + {"window": "monthly", "limit_usd": 10.0}, + ], + cost_path="cost", + ) + ev = BudgetEvaluator(config) + await ev.evaluate({"cost": 0.6}) + daily_key = _derive_period_key("daily") + monthly_key = _derive_period_key("monthly") + snap_d = ev._store.get_snapshot("__global__", daily_key, limit_usd=1.0) + snap_m = ev._store.get_snapshot("__global__", monthly_key, limit_usd=10.0) + assert snap_d.spent_usd == pytest.approx(0.6) + assert snap_m.spent_usd == pytest.approx(0.6) + @pytest.mark.asyncio async def test_inf_pricing_does_not_cause_inf_cost(self) -> None: """Inf pricing rates must not produce inf cost (permanent false positive).""" diff --git a/pyproject.toml b/pyproject.toml index 25783c18..677ba833 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,3 +80,9 @@ tag_format = "v{version}" # feat = minor, fix/perf/refactor = patch, breaking (!) = major allowed_tags = ["feat", "fix", "perf", "chore", "docs", "style", "refactor", "test", "ci"] patch_tags = ["fix", "perf", "chore", "refactor"] + +[dependency-groups] +dev = [ + "pytest>=9.0.2", + "pytest-asyncio>=1.3.0", +]