diff --git a/README.md b/README.md index 82dd0d4..d3cd330 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ agentspec generate agent.yaml --framework langgraph - [x] **Scan** an existing codebase and auto-generate the manifest - [x] **Evaluate** agent quality against JSONL datasets with CI pass/fail gates - [x] **Deploy** to Kubernetes — operator injects sidecar, exposes `/health/ready` and `/gap` +- [x] **Track** token usage per model — in-process metering, no external infrastructure - [x] **Export** to A2A / AgentCard format - [ ] Visual dashboard for fleet-wide agent observability (coming soon) - [ ] Native OpenTelemetry trace export (coming soon) @@ -39,7 +40,7 @@ agentspec generate agent.yaml --framework langgraph AgentSpec Architecture - **`agent.yaml`** is the single source of truth — the SDK reads it at runtime, the CLI validates and audits it, the operator deploys it -- **Sidecar** is injected automatically by the operator and exposes live `/health/ready`, `/gap`, and `/explore` endpoints without touching agent code +- **Sidecar** is injected automatically by the operator and exposes live `/health/ready`, `/gap`, `/explore`, and `/usage` endpoints without touching agent code - **CLI** wraps the SDK for local development — validate, audit, generate, scan, evaluate - **MCP Server** bridges the sidecar to Claude Code and VS Code for in-editor introspection @@ -126,6 +127,13 @@ npm install @agentspec/sdk [medium] MEM-04 — Vector store namespace isolated ``` +**Token usage** (`kubectl get agentobservations`): +``` +NAME PHASE GRADE SCORE TOKENS CHECKED +budget-assistant Healthy B 82 12,450 30s ago +gymcoach Healthy A 95 3,200 15s ago +``` + --- ## Manifest diff --git a/docs/concepts/operating-modes.md b/docs/concepts/operating-modes.md index 2190a8e..d895db9 100644 --- a/docs/concepts/operating-modes.md +++ b/docs/concepts/operating-modes.md @@ -15,7 +15,7 @@ source of confusion when working with VS Code, MCP, and the CLI. | **When to use** | Local dev, or cluster agent via per-agent port-forward | K8s cluster with AgentSpec Operator deployed | | **URL target** | `http://localhost:4001` (direct or port-forwarded per agent) | Operator service URL (one URL for all agents) | | **Data freshness** | **Live** — computed fresh on each request | **Stored** — last heartbeat (up to `RATE_LIMIT_SECONDS` stale) | -| **Endpoints** | `GET /gap`, `GET /proof`, `GET /health/ready`, `GET /explore` | `GET /api/v1/agents/{name}/gap`, `/proof`, `/health` | +| **Endpoints** | `GET /gap`, `GET /proof`, `GET /health/ready`, `GET /explore`, `GET /usage` | `GET /api/v1/agents/{name}/gap`, `/proof`, `/health`, `/usage` | | **Auth** | None (port-forward is already a trust boundary) | `X-Admin-Key` header | | **VS Code config** | `agentspec.sidecarUrl` | `agentspec.cluster.controlPlaneUrl` + `agentspec.cluster.adminKey` | @@ -53,6 +53,7 @@ All endpoints return **live** data computed at request time: - `GET /proof` — compliance proof records - `GET /health/ready` — live health checks - `GET /explore` — runtime capabilities +- `GET /usage` — aggregated token usage from the audit ring ### VS Code configuration @@ -116,6 +117,7 @@ All endpoints return **stored** data from the last heartbeat push: - `GET /api/v1/agents/{name}/gap` — last known gap report - `GET /api/v1/agents/{name}/proof` — proof records - `GET /api/v1/agents/{name}/health` — last health check result +- `GET /api/v1/agents/{name}/usage` — last token usage snapshot ### VS Code configuration diff --git a/docs/concepts/runtime-introspection.md b/docs/concepts/runtime-introspection.md index 3ff78f8..0faa8ba 100644 --- a/docs/concepts/runtime-introspection.md +++ b/docs/concepts/runtime-introspection.md @@ -191,6 +191,79 @@ These categories only appear in runtime `HealthReport`s (not in CLI pre-flight o | `service` | `AgentSpecReporter` | TCP connectivity for `spec.requires.services` entries | | `model` | `AgentSpecReporter` | Provider API endpoint reachable (resolves `$env:` at runtime) | +## Token Usage Tracking + +`AgentSpecReporter` includes a built-in `UsageLedger` that aggregates LLM token counts in-process. No external infrastructure required — token counts flow through the existing heartbeat push. + +### Recording usage + +After each LLM call, record the token counts: + +```typescript +reporter.usage.record('openai/gpt-4o', promptTokens, completionTokens) +``` + +For LangGraph agents, `instrument_call_model` records automatically when a `ledger` is provided: + +```python +from agentspec_langgraph import instrument_call_model, UsageLedger + +ledger = UsageLedger() +call_model = instrument_call_model( + original_call_model, + reporter=reporter, + model_id="groq/llama-3.3-70b-versatile", + ledger=ledger, +) +``` + +### How it flows + +``` +LLM response → UsageLedger.record() → heartbeat push → CRD status → VS Code + sidecar GET /usage ─────────→ VS Code +``` + +Each heartbeat ships a **window snapshot** (e.g., last 30s of usage), then resets the counters. The control plane stores each window with the heartbeat row. + +### Querying usage + +**Sidecar mode** (live, from audit ring): +``` +GET /usage +``` + +**Operator mode** (stored, from last heartbeat): +``` +GET /api/v1/agents/{name}/usage +``` + +Response: +```json +{ + "windowStartedAt": "2026-03-31T12:00:00.000Z", + "models": [ + { "modelId": "openai/gpt-4o", "totalTokens": 1250, "callCount": 8 } + ], + "totalTokens": 1250, + "totalCalls": 8 +} +``` + +### CRD visibility + +Token usage appears in the `AgentObservation` CRD status and in `kubectl` output: + +```bash +kubectl get agentobservations +# NAME PHASE GRADE SCORE TOKENS CHECKED +# gymcoach Healthy A 92 1250 2m ago +``` + +### VS Code + +The agent detail panel shows a **Token Usage** section with total tokens, call count, and a per-model breakdown table. + ## Caching and Refresh `AgentSpecReporter` caches the last `HealthReport` to avoid hammering external APIs on every request to `/agentspec/health`. diff --git a/packages/control-plane/api/agents.py b/packages/control-plane/api/agents.py index e9272e6..bb0cdf6 100644 --- a/packages/control-plane/api/agents.py +++ b/packages/control-plane/api/agents.py @@ -3,6 +3,7 @@ GET /api/v1/agents/{name}/health — last known HealthReport for an agent GET /api/v1/agents/{name}/gap — last known GapReport for an agent GET /api/v1/agents/{name}/proof — last known proof records for an agent +GET /api/v1/agents/{name}/usage — last known token usage for an agent All endpoints require the X-Admin-Key header (verify_admin_key dependency). The {name} path parameter is validated against the k8s resource name pattern. @@ -10,15 +11,17 @@ from __future__ import annotations import logging +from typing import Any, Optional, Type from fastapi import APIRouter, Depends, HTTPException, Path +from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from auth.keys import verify_admin_key from db.base import get_session from db.models import Agent, Heartbeat -from schemas import AgentSummary, StoredHealthReport, StoredGapReport, StoredProofRecords +from schemas import AgentSummary, StoredHealthReport, StoredGapReport, StoredProofRecords, StoredUsageReport logger = logging.getLogger(__name__) router = APIRouter() @@ -56,6 +59,32 @@ async def _get_latest_heartbeat( return agent, latest +async def _get_validated_field( + name: str, + session: AsyncSession, + field: str, + schema: Type[BaseModel], + raw_data: Optional[dict[str, Any]] = None, +) -> dict: + """Fetch a field from the latest heartbeat, validate through a Pydantic model, return dict. + + When raw_data is provided it is used instead of getattr(latest, field). + This handles the common pattern of merging extra fields (e.g. receivedAt) before validation. + """ + _, latest = await _get_latest_heartbeat(name, session) + data = raw_data if raw_data is not None else getattr(latest, field, None) + if data is None: + data = {"receivedAt": latest.received_at.isoformat()} + else: + data = {**data, "receivedAt": latest.received_at.isoformat()} + try: + report = schema.model_validate(data) + except Exception: + logger.error("Stored %s data for '%s' failed schema validation", field, name) + raise HTTPException(status_code=500, detail=f"Stored {field} data is corrupt") + return report.model_dump() + + @router.get( "/agents", response_model=list[AgentSummary], @@ -107,15 +136,7 @@ async def get_agent_gap( session: AsyncSession = Depends(get_session), ) -> dict: """Return the last known GapReport for a remote agent (from its latest heartbeat).""" - _, latest = await _get_latest_heartbeat(name, session) - try: - report = StoredGapReport.model_validate( - {**(latest.gap or {}), "receivedAt": latest.received_at.isoformat()} - ) - except Exception: - logger.error("Stored gap data for '%s' failed schema validation", name) - raise HTTPException(status_code=500, detail="Stored gap data is corrupt") - return report.model_dump() + return await _get_validated_field(name, session, "gap", StoredGapReport) @router.get( @@ -128,11 +149,19 @@ async def get_agent_proof( ) -> dict: """Return the last known proof records for a remote agent (from its latest heartbeat).""" _, latest = await _get_latest_heartbeat(name, session) - try: - stored = StoredProofRecords.model_validate( - {"records": latest.proof or [], "receivedAt": latest.received_at.isoformat()} - ) - except Exception: - logger.error("Stored proof data for '%s' failed schema validation", name) - raise HTTPException(status_code=500, detail="Stored proof data is corrupt") - return stored.model_dump() + return await _get_validated_field( + name, session, "proof", StoredProofRecords, + raw_data={"records": latest.proof or []}, + ) + + +@router.get( + "/agents/{name}/usage", + dependencies=[Depends(verify_admin_key)], +) +async def get_agent_usage( + name: str = _K8S_NAME_PATH, + session: AsyncSession = Depends(get_session), +) -> dict: + """Return the last known token usage for a remote agent (from its latest heartbeat).""" + return await _get_validated_field(name, session, "usage", StoredUsageReport) diff --git a/packages/control-plane/api/heartbeat.py b/packages/control-plane/api/heartbeat.py index fd50449..5661439 100644 --- a/packages/control-plane/api/heartbeat.py +++ b/packages/control-plane/api/heartbeat.py @@ -117,7 +117,7 @@ async def heartbeat( _check_rate_limit(agent_id) # 6. Derive phase / grade / score - status_patch = build_status_patch(data.health, data.gap) + status_patch = build_status_patch(data.health, data.gap, data.usage) now = datetime.now(timezone.utc) # 7. Persist heartbeat @@ -127,6 +127,7 @@ async def heartbeat( health=data.health, gap=data.gap, proof=data.proof, + usage=data.usage, ) session.add(hb) diff --git a/packages/control-plane/db/models.py b/packages/control-plane/db/models.py index 087a287..29198cb 100644 --- a/packages/control-plane/db/models.py +++ b/packages/control-plane/db/models.py @@ -43,5 +43,6 @@ class Heartbeat(Base): health: Mapped[dict] = mapped_column(JSON, nullable=False) gap: Mapped[dict] = mapped_column(JSON, nullable=False) proof: Mapped[list] = mapped_column(JSON, nullable=False, default=list) + usage: Mapped[dict | None] = mapped_column(JSON, nullable=True) agent: Mapped[Agent] = relationship(back_populates="heartbeats") diff --git a/packages/control-plane/k8s/upsert.py b/packages/control-plane/k8s/upsert.py index 85e3e81..fe79d8b 100644 --- a/packages/control-plane/k8s/upsert.py +++ b/packages/control-plane/k8s/upsert.py @@ -19,7 +19,7 @@ NAMESPACE = "agentspec-remote" -def build_status_patch(health: dict[str, Any], gap: dict[str, Any]) -> dict[str, Any]: +def build_status_patch(health: dict[str, Any], gap: dict[str, Any], usage: dict[str, Any] | None = None) -> dict[str, Any]: """ Derive AgentObservation .status from heartbeat data. @@ -49,13 +49,16 @@ def build_status_patch(health: dict[str, Any], gap: dict[str, Any]) -> dict[str, else: grade = "F" - return { + patch: dict[str, Any] = { "phase": phase, "grade": grade, "score": score, "health": health, "gap": gap, } + if usage is not None: + patch["usage"] = usage + return patch async def upsert_agent_observation( diff --git a/packages/control-plane/schemas.py b/packages/control-plane/schemas.py index 9363629..48b5bf7 100644 --- a/packages/control-plane/schemas.py +++ b/packages/control-plane/schemas.py @@ -39,6 +39,7 @@ class HeartbeatRequest(BaseModel): health: dict[str, Any] gap: dict[str, Any] proof: list[dict[str, Any]] = Field(default_factory=list) + usage: Optional[dict[str, Any]] = None # ── Stored health report (GET /agents/{name}/health response) ───────────────── @@ -95,6 +96,20 @@ class StoredProofRecords(BaseModel): receivedAt: Optional[str] = None +# ── Stored usage report (GET /agents/{name}/usage response) ───────────────── + +class StoredUsageReport(BaseModel): + """Schema for the usage snapshot stored in heartbeat rows. + + Strips unknown fields (extra='ignore') — same pattern as StoredHealthReport. + """ + windowStartedAt: Optional[str] = None + models: list[dict[str, Any]] = Field(default_factory=list) + totalTokens: int = 0 + totalCalls: int = 0 + receivedAt: Optional[str] = None + + # ── Agent summary (list endpoint) ───────────────────────────────────────────── class AgentSummary(BaseModel): diff --git a/packages/control-plane/tests/test_agents.py b/packages/control-plane/tests/test_agents.py index b364160..c60835f 100644 --- a/packages/control-plane/tests/test_agents.py +++ b/packages/control-plane/tests/test_agents.py @@ -1,6 +1,7 @@ """ Tests for GET /api/v1/agents, GET /api/v1/agents/{name}/health, -GET /api/v1/agents/{name}/gap, and GET /api/v1/agents/{name}/proof. +GET /api/v1/agents/{name}/gap, GET /api/v1/agents/{name}/proof, +and GET /api/v1/agents/{name}/usage. All endpoints require X-Admin-Key authentication. """ from __future__ import annotations @@ -277,3 +278,74 @@ async def test_get_proof_strips_unknown_fields(registered): record = resp.json()["records"][0] assert "internal_secret" not in record assert record["ruleId"] == "SEC-LLM-06" + + +# ── GET /agents/{name}/usage ──────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_get_usage_unknown_agent_returns_404(client): + ac, _ = client + resp = await ac.get("/api/v1/agents/does-not-exist/usage", headers=ADMIN_HEADERS) + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_get_usage_no_heartbeat_yet_returns_404(client): + ac, _ = client + await ac.post( + "/api/v1/register", + json={"agentName": "silent-agent", "runtime": "local"}, + headers=ADMIN_HEADERS, + ) + resp = await ac.get("/api/v1/agents/silent-agent/usage", headers=ADMIN_HEADERS) + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_get_usage_returns_stored_usage(registered): + ac, mock_upsert, agent_id, api_key = registered + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + usage = { + "windowStartedAt": "2026-03-31T12:00:00.000Z", + "models": [{"modelId": "openai/gpt-4o", "totalTokens": 500, "callCount": 3}], + "totalTokens": 500, + "totalCalls": 3, + } + await ac.post( + "/api/v1/heartbeat", + content=json.dumps({"health": make_health(), "gap": make_gap(), "usage": usage}), + headers=headers, + ) + resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS) + assert resp.status_code == 200 + data = resp.json() + assert data["totalTokens"] == 500 + assert data["totalCalls"] == 3 + assert data["models"][0]["modelId"] == "openai/gpt-4o" + assert "receivedAt" in data + + +@pytest.mark.asyncio +async def test_get_usage_returns_empty_when_no_usage_in_heartbeat(registered): + """When heartbeat had no usage field, GET /usage returns zeroed-out defaults.""" + ac, mock_upsert, agent_id, api_key = registered + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + await ac.post( + "/api/v1/heartbeat", + content=json.dumps({"health": make_health(), "gap": make_gap()}), + headers=headers, + ) + resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS) + assert resp.status_code == 200 + data = resp.json() + assert data["totalTokens"] == 0 + assert data["totalCalls"] == 0 + assert data["models"] == [] + assert "receivedAt" in data + + +@pytest.mark.asyncio +async def test_get_usage_requires_auth(client): + ac, _ = client + resp = await ac.get("/api/v1/agents/some-agent/usage") + assert resp.status_code in (401, 403) diff --git a/packages/control-plane/tests/test_heartbeat.py b/packages/control-plane/tests/test_heartbeat.py index 3497c2d..256e821 100644 --- a/packages/control-plane/tests/test_heartbeat.py +++ b/packages/control-plane/tests/test_heartbeat.py @@ -225,3 +225,49 @@ async def test_heartbeat_rate_limit_returns_429(registered, monkeypatch): second = await ac.post("/api/v1/heartbeat", content=payload, headers=headers) assert second.status_code == 429 + + +# ── Token usage in heartbeat ──────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_heartbeat_with_usage_stores_correctly(registered): + ac, _, agent_id, api_key = registered + usage = { + "windowStartedAt": "2026-03-31T12:00:00.000Z", + "models": [{"modelId": "openai/gpt-4o", "totalTokens": 500, "callCount": 3}], + "totalTokens": 500, + "totalCalls": 3, + } + resp = await ac.post( + "/api/v1/heartbeat", + content=json.dumps({"health": make_health(), "gap": make_gap(), "usage": usage}), + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + ) + assert resp.status_code == 204 + + # Verify stored via GET /agents/{name}/usage + usage_resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS) + assert usage_resp.status_code == 200 + data = usage_resp.json() + assert data["totalTokens"] == 500 + assert data["totalCalls"] == 3 + assert len(data["models"]) == 1 + + +@pytest.mark.asyncio +async def test_heartbeat_without_usage_succeeds(registered): + ac, _, agent_id, api_key = registered + resp = await ac.post( + "/api/v1/heartbeat", + content=json.dumps({"health": make_health(), "gap": make_gap()}), + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + ) + assert resp.status_code == 204 + + # GET /agents/{name}/usage returns empty usage + usage_resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS) + assert usage_resp.status_code == 200 + data = usage_resp.json() + assert data["totalTokens"] == 0 + assert data["totalCalls"] == 0 + assert data["models"] == [] diff --git a/packages/control-plane/tests/test_k8s_upsert.py b/packages/control-plane/tests/test_k8s_upsert.py index 8d3f414..97840cf 100644 --- a/packages/control-plane/tests/test_k8s_upsert.py +++ b/packages/control-plane/tests/test_k8s_upsert.py @@ -161,6 +161,24 @@ def test_build_status_patch_score_clamped_below_zero(): assert patch["grade"] == "F" +# ── Usage in status patch ──────────────────────────────────────────────────── + +def test_build_status_patch_includes_usage_when_provided(): + usage = {"totalTokens": 500, "totalCalls": 3, "models": [{"modelId": "openai/gpt-4o", "totalTokens": 500, "callCount": 3}]} + patch = build_status_patch(make_health("ready"), make_gap(80), usage=usage) + assert patch["usage"] == usage + + +def test_build_status_patch_omits_usage_when_none(): + patch = build_status_patch(make_health("ready"), make_gap(80), usage=None) + assert "usage" not in patch + + +def test_build_status_patch_omits_usage_by_default(): + patch = build_status_patch(make_health("ready"), make_gap(80)) + assert "usage" not in patch + + @pytest.mark.asyncio async def test_upsert_idempotent_called_twice(): mock_client = AsyncMock() diff --git a/packages/operator/crds/agentobservation.yaml b/packages/operator/crds/agentobservation.yaml index 9a1cbb4..81c6526 100644 --- a/packages/operator/crds/agentobservation.yaml +++ b/packages/operator/crds/agentobservation.yaml @@ -46,6 +46,10 @@ spec: type: string description: "Data source: agent-sdk (live) or manifest-static (fallback)" jsonPath: .status.source + - name: Tokens + type: integer + description: "Total tokens in last heartbeat window" + jsonPath: .status.usage.totalTokens - name: Checked type: date description: "Time of last successful probe" @@ -133,6 +137,35 @@ spec: type: integer skipped: type: integer + usage: + type: object + description: "Token usage from the last heartbeat window" + properties: + totalTokens: + type: integer + description: "Total tokens consumed in the last heartbeat window" + totalCalls: + type: integer + description: "Total model calls in the last heartbeat window" + models: + type: array + description: "Per-model token usage breakdown" + items: + type: object + properties: + modelId: + type: string + description: "Model identifier (e.g. openai/gpt-4o)" + totalTokens: + type: integer + description: "Total tokens for this model" + callCount: + type: integer + description: "Number of calls to this model" + windowStartedAt: + type: string + format: date-time + description: "Start of the usage aggregation window" conditions: type: array description: "Standard Kubernetes conditions" diff --git a/packages/sdk-langgraph/agentspec_langgraph/__init__.py b/packages/sdk-langgraph/agentspec_langgraph/__init__.py index d1d319c..d34c4e8 100644 --- a/packages/sdk-langgraph/agentspec_langgraph/__init__.py +++ b/packages/sdk-langgraph/agentspec_langgraph/__init__.py @@ -64,6 +64,7 @@ from .model_instrumentation import instrument_call_model from .sidecar_client import SidecarClient from .tool_node import AgentSpecToolNode +from .usage_ledger import UsageLedger __version__ = "0.1.0" @@ -80,4 +81,5 @@ "ModelCallEvent", "GuardrailEvent", "MemoryWriteEvent", + "UsageLedger", ] diff --git a/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py b/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py index 31a8263..d5606c7 100644 --- a/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py +++ b/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py @@ -28,6 +28,7 @@ from typing import Any, Callable, Optional from .events import ModelCallEvent +from .usage_ledger import UsageLedger def instrument_call_model( @@ -35,6 +36,7 @@ def instrument_call_model( reporter: Optional[Any] = None, model_id: str = "unknown/unknown", max_events: int = 10_000, + ledger: Optional[UsageLedger] = None, ) -> Callable[..., Any]: """ Wrap a LangGraph call_model node function with AgentSpec instrumentation. @@ -43,6 +45,7 @@ def instrument_call_model( - Times each LLM call - Extracts token usage from AIMessage.usage_metadata or response_metadata - Emits a ModelCallEvent to the reporter + - Records token usage in the UsageLedger (if provided) Parameters ---------- @@ -56,6 +59,9 @@ def instrument_call_model( max_events: Maximum number of call events to retain in memory (default: 10,000). Older events are dropped when the limit is reached. + ledger: + Optional UsageLedger for aggregating token counts across calls. + When provided, every model call accumulates into the ledger. Returns ------- @@ -86,6 +92,12 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: except Exception: pass # Reporter errors never break the agent + if ledger is not None: + try: + ledger.record(model_id, prompt_tokens or 0, completion_tokens or 0) + except Exception: + pass # Ledger errors never break the agent + return result # Expose call log on the wrapped function for testing / introspection diff --git a/packages/sdk-langgraph/agentspec_langgraph/usage_ledger.py b/packages/sdk-langgraph/agentspec_langgraph/usage_ledger.py new file mode 100644 index 0000000..b5296e1 --- /dev/null +++ b/packages/sdk-langgraph/agentspec_langgraph/usage_ledger.py @@ -0,0 +1,98 @@ +""" +UsageLedger — in-process token usage counter (Python mirror of TypeScript UsageLedger). + +Framework-agnostic, zero-dependency accumulator for LLM token counts. +The agent code calls record() after each LLM call; the reporter drains +the ledger via snapshot(reset=True) on each heartbeat push. + +Thread-safe: uses threading.Lock to protect concurrent record() calls +(common in LangGraph agents using ThreadPoolExecutor for tool calls). + +Usage: + ledger = UsageLedger() + ledger.record("openai/gpt-4o", prompt_tokens=100, completion_tokens=50) + snapshot = ledger.snapshot(reset=True) # returns dict, resets counters +""" + +from __future__ import annotations + +import threading +from datetime import datetime, timezone +from typing import Any + + +# ── Private helpers ─────────────────────────────────────────────────────────── + + +def _create_empty_entry(model_id: str) -> dict[str, Any]: + return { + "modelId": model_id, + "promptTokens": 0, + "completionTokens": 0, + "totalTokens": 0, + "callCount": 0, + } + + +def _build_snapshot( + counters: dict[str, dict[str, Any]], + window_start: datetime, +) -> dict[str, Any]: + # Shallow-copy each entry to avoid exposing mutable internal state + models = [dict(entry) for entry in counters.values()] + total_tokens = sum(m["totalTokens"] for m in models) + total_calls = sum(m["callCount"] for m in models) + return { + "windowStartedAt": window_start.isoformat(), + "models": models, + "totalTokens": total_tokens, + "totalCalls": total_calls, + } + + +# ── Public class ────────────────────────────────────────────────────────────── + + +class UsageLedger: + """In-process token usage accumulator. + + Thread-safe. O(1) record(), snapshot returns a dict matching the + TypeScript UsageSnapshot shape. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._counters: dict[str, dict[str, Any]] = {} + self._window_start = datetime.now(timezone.utc) + + def record( + self, + model_id: str, + prompt_tokens: int = 0, + completion_tokens: int = 0, + ) -> None: + """Record token usage for a single LLM call.""" + p = max(0, prompt_tokens) + c = max(0, completion_tokens) + with self._lock: + entry = self._counters.get(model_id) + if entry is None: + entry = _create_empty_entry(model_id) + self._counters[model_id] = entry + entry["promptTokens"] += p + entry["completionTokens"] += c + entry["totalTokens"] += p + c + entry["callCount"] += 1 + + def snapshot(self, reset: bool = True) -> dict[str, Any]: + """Return a frozen snapshot of the current usage window. + + Args: + reset: When True (default), clears all counters and starts a new window. + """ + with self._lock: + snap = _build_snapshot(self._counters, self._window_start) + if reset: + self._counters = {} + self._window_start = datetime.now(timezone.utc) + return snap diff --git a/packages/sdk-langgraph/tests/test_model_instrumentation.py b/packages/sdk-langgraph/tests/test_model_instrumentation.py index fc2eabc..4994cd9 100644 --- a/packages/sdk-langgraph/tests/test_model_instrumentation.py +++ b/packages/sdk-langgraph/tests/test_model_instrumentation.py @@ -184,3 +184,56 @@ def raw_call_model(state): result = call_model({"messages": []}) assert result == "raw response" assert reporter.model_calls[0].token_count == 0 + + +def test_ledger_error_does_not_propagate(reporter): + """Ledger errors must not break the agent — same pattern as reporter errors.""" + from unittest.mock import MagicMock + from agentspec_langgraph.usage_ledger import UsageLedger + + broken_ledger = MagicMock(spec=UsageLedger) + broken_ledger.record.side_effect = RuntimeError("ledger boom") + + def raw_call_model(state): + msg = MockAIMessage() + msg.usage_metadata = {"total_tokens": 100, "input_tokens": 60, "output_tokens": 40} + return {"messages": [msg]} + + call_model = instrument_call_model( + raw_call_model, reporter=reporter, model_id="openai/gpt-4o", ledger=broken_ledger, + ) + result = call_model({"messages": []}) + + # Function returns normally despite ledger error + assert "messages" in result + assert len(result["messages"]) == 1 + # Reporter still recorded + assert len(reporter.model_calls) == 1 + # Ledger was attempted + broken_ledger.record.assert_called_once() + + +def test_ledger_accumulates_usage(reporter): + """When a UsageLedger is provided, instrument_call_model records usage into it.""" + from agentspec_langgraph.usage_ledger import UsageLedger + + ledger = UsageLedger() + + def raw_call_model(state): + msg = MockAIMessage() + msg.usage_metadata = {"total_tokens": 342, "input_tokens": 200, "output_tokens": 142} + return {"messages": [msg]} + + call_model = instrument_call_model( + raw_call_model, + reporter=reporter, + model_id="openai/gpt-4o", + ledger=ledger, + ) + call_model({"messages": []}) + call_model({"messages": []}) + + snap = ledger.snapshot(reset=False) + assert snap["totalTokens"] == 684 + assert snap["totalCalls"] == 2 + assert snap["models"][0]["modelId"] == "openai/gpt-4o" diff --git a/packages/sdk-langgraph/tests/test_usage_ledger.py b/packages/sdk-langgraph/tests/test_usage_ledger.py new file mode 100644 index 0000000..dfe730d --- /dev/null +++ b/packages/sdk-langgraph/tests/test_usage_ledger.py @@ -0,0 +1,101 @@ +"""Tests for UsageLedger — in-process token usage counter.""" + +from datetime import datetime, timezone + +import pytest + +from agentspec_langgraph.usage_ledger import UsageLedger + + +class TestRecord: + def test_accumulates_tokens_for_same_model(self): + ledger = UsageLedger() + ledger.record("openai/gpt-4o", prompt_tokens=100, completion_tokens=50) + ledger.record("openai/gpt-4o", prompt_tokens=200, completion_tokens=80) + + snap = ledger.snapshot(reset=False) + assert len(snap["models"]) == 1 + m = snap["models"][0] + assert m["modelId"] == "openai/gpt-4o" + assert m["promptTokens"] == 300 + assert m["completionTokens"] == 130 + assert m["totalTokens"] == 430 + assert m["callCount"] == 2 + + def test_tracks_multiple_models_independently(self): + ledger = UsageLedger() + ledger.record("openai/gpt-4o", 100, 50) + ledger.record("anthropic/claude-sonnet-4-6", 200, 80) + ledger.record("openai/gpt-4o", 50, 20) + + snap = ledger.snapshot(reset=False) + assert len(snap["models"]) == 2 + + by_id = {m["modelId"]: m for m in snap["models"]} + assert by_id["openai/gpt-4o"]["totalTokens"] == 220 + assert by_id["anthropic/claude-sonnet-4-6"]["totalTokens"] == 280 + + +class TestSnapshot: + def test_returns_correct_aggregate_totals(self): + ledger = UsageLedger() + ledger.record("openai/gpt-4o", 100, 50) + ledger.record("anthropic/claude-sonnet-4-6", 200, 80) + + snap = ledger.snapshot(reset=False) + assert snap["totalTokens"] == 430 + assert snap["totalCalls"] == 2 + + def test_resets_counters_when_reset_true(self): + ledger = UsageLedger() + ledger.record("openai/gpt-4o", 100, 50) + + first = ledger.snapshot(reset=True) + assert first["totalTokens"] == 150 + + second = ledger.snapshot(reset=False) + assert second["models"] == [] + assert second["totalTokens"] == 0 + assert second["totalCalls"] == 0 + + def test_does_not_reset_when_reset_false(self): + ledger = UsageLedger() + ledger.record("openai/gpt-4o", 100, 50) + ledger.snapshot(reset=False) + + snap = ledger.snapshot(reset=False) + assert snap["totalTokens"] == 150 + + def test_window_started_at_resets_after_snapshot_true(self): + ledger = UsageLedger() + snap1 = ledger.snapshot(reset=False) + window1 = snap1["windowStartedAt"] + + ledger.record("openai/gpt-4o", 10, 5) + ledger.snapshot(reset=True) + + snap2 = ledger.snapshot(reset=False) + assert datetime.fromisoformat(snap2["windowStartedAt"]) >= datetime.fromisoformat(window1) + + def test_empty_snapshot(self): + ledger = UsageLedger() + snap = ledger.snapshot(reset=False) + assert snap["models"] == [] + assert snap["totalTokens"] == 0 + assert snap["totalCalls"] == 0 + assert "windowStartedAt" in snap + + def test_defaults_reset_to_true(self): + ledger = UsageLedger() + ledger.record("openai/gpt-4o", 100, 50) + ledger.snapshot() # default reset=True + + snap = ledger.snapshot(reset=False) + assert snap["totalTokens"] == 0 + + def test_window_started_at_is_valid_iso(self): + ledger = UsageLedger() + snap = ledger.snapshot(reset=False) + # Should not raise + dt = datetime.fromisoformat(snap["windowStartedAt"]) + assert dt.tzinfo is not None # timezone-aware diff --git a/packages/sdk/src/__tests__/push.test.ts b/packages/sdk/src/__tests__/push.test.ts index 2c21d95..7019f94 100644 --- a/packages/sdk/src/__tests__/push.test.ts +++ b/packages/sdk/src/__tests__/push.test.ts @@ -157,7 +157,7 @@ describe('AgentSpecReporter — push mode', () => { reporter.stopPushMode() }) - it('5. body contains { health, gap } with correct agentName', async () => { + it('5. body contains { health, gap, usage } with correct agentName', async () => { const { AgentSpecReporter } = await import('../agent/reporter.js') const reporter = new AgentSpecReporter(testManifest) @@ -170,9 +170,10 @@ describe('AgentSpecReporter — push mode', () => { await vi.advanceTimersByTimeAsync(500) const [, init] = fetchMock.mock.calls[0] as [string, RequestInit] - const body = JSON.parse(init.body as string) as { health: HealthReport; gap: AuditReport } + const body = JSON.parse(init.body as string) as { health: HealthReport; gap: AuditReport; usage: unknown } expect(body.health.agentName).toBe('push-test-agent') expect(body.gap.agentName).toBe('push-test-agent') + expect(body).toHaveProperty('usage') reporter.stopPushMode() }) @@ -321,4 +322,99 @@ describe('AgentSpecReporter — push mode', () => { expect(reporter.isPushModeActive()).toBe(false) }) + + // ── Token usage in heartbeat ─────────────────────────────────────────────── + + it('13. heartbeat payload includes usage field', async () => { + const { AgentSpecReporter } = await import('../agent/reporter.js') + const reporter = new AgentSpecReporter(testManifest) + + reporter.usage.record('openai/gpt-4o', 100, 50) + + reporter.startPushMode({ + controlPlaneUrl: 'https://cp.example.com', + apiKey: 'test-key-abc', + intervalSeconds: 30, + }) + + await vi.advanceTimersByTimeAsync(500) + + const [, init] = fetchMock.mock.calls[0] as [string, RequestInit] + const body = JSON.parse(init.body as string) + expect(body).toHaveProperty('usage') + expect(body.usage.totalTokens).toBe(150) + expect(body.usage.totalCalls).toBe(1) + expect(body.usage.models).toHaveLength(1) + expect(body.usage.models[0].modelId).toBe('openai/gpt-4o') + reporter.stopPushMode() + }) + + it('14. heartbeat includes empty usage when no recordUsage() calls', async () => { + const { AgentSpecReporter } = await import('../agent/reporter.js') + const reporter = new AgentSpecReporter(testManifest) + + reporter.startPushMode({ + controlPlaneUrl: 'https://cp.example.com', + apiKey: 'test-key-abc', + intervalSeconds: 30, + }) + + await vi.advanceTimersByTimeAsync(500) + + const [, init] = fetchMock.mock.calls[0] as [string, RequestInit] + const body = JSON.parse(init.body as string) + expect(body.usage).toEqual({ + windowStartedAt: expect.any(String), + models: [], + totalTokens: 0, + totalCalls: 0, + }) + reporter.stopPushMode() + }) + + it('15. usage ledger resets after heartbeat fires', async () => { + const { AgentSpecReporter } = await import('../agent/reporter.js') + const reporter = new AgentSpecReporter(testManifest) + + reporter.usage.record('openai/gpt-4o', 100, 50) + + reporter.startPushMode({ + controlPlaneUrl: 'https://cp.example.com', + apiKey: 'test-key-abc', + intervalSeconds: 5, + }) + + // First heartbeat fires and drains the ledger + await vi.advanceTimersByTimeAsync(500) + + // Record nothing between first and second heartbeat + await vi.advanceTimersByTimeAsync(5000) + + // Second heartbeat should have empty usage + const [, init] = fetchMock.mock.calls[1] as [string, RequestInit] + const body = JSON.parse(init.body as string) + expect(body.usage.totalTokens).toBe(0) + expect(body.usage.totalCalls).toBe(0) + reporter.stopPushMode() + }) + + it('16. reporter.usage is a UsageLedger instance', async () => { + const { AgentSpecReporter } = await import('../agent/reporter.js') + const { UsageLedger } = await import('../agent/usage-ledger.js') + const reporter = new AgentSpecReporter(testManifest) + + expect(reporter.usage).toBeInstanceOf(UsageLedger) + }) + + it('17. reporter.usage.record() accumulates correctly', async () => { + const { AgentSpecReporter } = await import('../agent/reporter.js') + const reporter = new AgentSpecReporter(testManifest) + + reporter.usage.record('openai/gpt-4o', 100, 50) + reporter.usage.record('openai/gpt-4o', 200, 80) + + const snap = reporter.usage.snapshot(false) + expect(snap.totalTokens).toBe(430) + expect(snap.totalCalls).toBe(2) + }) }) diff --git a/packages/sdk/src/__tests__/usage-ledger.test.ts b/packages/sdk/src/__tests__/usage-ledger.test.ts new file mode 100644 index 0000000..4132100 --- /dev/null +++ b/packages/sdk/src/__tests__/usage-ledger.test.ts @@ -0,0 +1,130 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { UsageLedger } from '../agent/usage-ledger.js' +import type { UsageSnapshot, ModelUsageEntry } from '../agent/usage-ledger.js' + +describe('UsageLedger', () => { + let ledger: UsageLedger + + beforeEach(() => { + ledger = new UsageLedger() + }) + + // ── record() ───────────────────────────────────────────────────────────────── + + it('accumulates tokens for the same model across multiple calls', () => { + ledger.record('openai/gpt-4o', 100, 50) + ledger.record('openai/gpt-4o', 200, 80) + + const snap = ledger.snapshot(false) + expect(snap.models).toHaveLength(1) + expect(snap.models[0]).toEqual({ + modelId: 'openai/gpt-4o', + promptTokens: 300, + completionTokens: 130, + totalTokens: 430, + callCount: 2, + }) + }) + + it('tracks multiple models independently', () => { + ledger.record('openai/gpt-4o', 100, 50) + ledger.record('anthropic/claude-sonnet-4-6', 200, 80) + ledger.record('openai/gpt-4o', 50, 20) + + const snap = ledger.snapshot(false) + expect(snap.models).toHaveLength(2) + + const gpt = snap.models.find((m) => m.modelId === 'openai/gpt-4o') + const claude = snap.models.find((m) => m.modelId === 'anthropic/claude-sonnet-4-6') + + expect(gpt).toEqual({ + modelId: 'openai/gpt-4o', + promptTokens: 150, + completionTokens: 70, + totalTokens: 220, + callCount: 2, + }) + expect(claude).toEqual({ + modelId: 'anthropic/claude-sonnet-4-6', + promptTokens: 200, + completionTokens: 80, + totalTokens: 280, + callCount: 1, + }) + }) + + // ── snapshot() ──────────────────────────────────────────────────────────────── + + it('returns correct aggregate totals', () => { + ledger.record('openai/gpt-4o', 100, 50) + ledger.record('anthropic/claude-sonnet-4-6', 200, 80) + + const snap = ledger.snapshot(false) + expect(snap.totalTokens).toBe(430) + expect(snap.totalCalls).toBe(2) + }) + + it('resets counters when snapshot(true) is called', () => { + ledger.record('openai/gpt-4o', 100, 50) + + const first = ledger.snapshot(true) + expect(first.totalTokens).toBe(150) + expect(first.totalCalls).toBe(1) + + const second = ledger.snapshot(false) + expect(second.models).toHaveLength(0) + expect(second.totalTokens).toBe(0) + expect(second.totalCalls).toBe(0) + }) + + it('does NOT reset counters when snapshot(false) is called', () => { + ledger.record('openai/gpt-4o', 100, 50) + + ledger.snapshot(false) + + const snap = ledger.snapshot(false) + expect(snap.totalTokens).toBe(150) + expect(snap.totalCalls).toBe(1) + }) + + it('resets windowStartedAt after snapshot(true)', () => { + const snap1 = ledger.snapshot(false) + const window1 = snap1.windowStartedAt + + // Force a reset + ledger.record('openai/gpt-4o', 10, 5) + ledger.snapshot(true) + + // New window should have a >= timestamp + const snap2 = ledger.snapshot(false) + expect(new Date(snap2.windowStartedAt).getTime()) + .toBeGreaterThanOrEqual(new Date(window1).getTime()) + }) + + it('returns empty snapshot when no calls have been recorded', () => { + const snap = ledger.snapshot(false) + + expect(snap).toEqual({ + windowStartedAt: expect.any(String), + models: [], + totalTokens: 0, + totalCalls: 0, + }) + }) + + it('defaults reset to true', () => { + ledger.record('openai/gpt-4o', 100, 50) + + // Default parameter — should reset + ledger.snapshot() + + const snap = ledger.snapshot(false) + expect(snap.totalTokens).toBe(0) + }) + + it('windowStartedAt is a valid ISO-8601 string', () => { + const snap = ledger.snapshot(false) + expect(() => new Date(snap.windowStartedAt)).not.toThrow() + expect(new Date(snap.windowStartedAt).toISOString()).toBe(snap.windowStartedAt) + }) +}) diff --git a/packages/sdk/src/agent/reporter.ts b/packages/sdk/src/agent/reporter.ts index aa67d76..b3ad18f 100644 --- a/packages/sdk/src/agent/reporter.ts +++ b/packages/sdk/src/agent/reporter.ts @@ -22,6 +22,7 @@ import { runAudit } from '../audit/index.js' import type { AgentSpecManifest } from '../schema/manifest.schema.js' import type { HealthReport } from '../health/index.js' import type { PushModeOptions } from './push.js' +import { UsageLedger } from './usage-ledger.js' export interface ReporterOptions { /** @@ -52,6 +53,9 @@ export class AgentSpecReporter { private stopped = false private readonly registeredTools = new Set() + /** In-process token usage accumulator. */ + readonly usage = new UsageLedger() + constructor( private readonly manifest: AgentSpecManifest, private readonly opts: ReporterOptions = {}, @@ -172,14 +176,15 @@ export class AgentSpecReporter { try { const health = await this.getReport() const gap = runAudit(this.manifest) + const usage = this.usage.snapshot(true) // Build payload, cap at 64 KB by trimming checks const trimmedChecks = [...health.checks] - let body = JSON.stringify({ health, gap }) + let body = JSON.stringify({ health, gap, usage }) if (body.length > 65_536) { while (trimmedChecks.length > 0) { trimmedChecks.pop() - body = JSON.stringify({ health: { ...health, checks: trimmedChecks }, gap }) + body = JSON.stringify({ health: { ...health, checks: trimmedChecks }, gap, usage }) if (body.length <= 65_536) break } } diff --git a/packages/sdk/src/agent/usage-ledger.ts b/packages/sdk/src/agent/usage-ledger.ts new file mode 100644 index 0000000..1701393 --- /dev/null +++ b/packages/sdk/src/agent/usage-ledger.ts @@ -0,0 +1,85 @@ +/** In-process token usage counter — aggregates LLM call tokens between heartbeat pushes. */ + +// ── Types (exported) ───────────────────────────────────────────────────────── + +export interface ModelUsageEntry { + modelId: string + promptTokens: number + completionTokens: number + totalTokens: number + callCount: number +} + +export interface UsageSnapshot { + windowStartedAt: string + models: ModelUsageEntry[] + totalTokens: number + totalCalls: number +} + +// ── Private helpers ────────────────────────────────────────────────────────── + +function createEmptyEntry(modelId: string): ModelUsageEntry { + return { modelId, promptTokens: 0, completionTokens: 0, totalTokens: 0, callCount: 0 } +} + +function buildSnapshot( + counters: Map, + windowStart: Date, +): UsageSnapshot { + // Shallow-copy each entry to avoid exposing mutable internal state + const models = [...counters.values()].map((e) => ({ ...e })) + let totalTokens = 0 + let totalCalls = 0 + for (const m of models) { + totalTokens += m.totalTokens + totalCalls += m.callCount + } + return { + windowStartedAt: windowStart.toISOString(), + models, + totalTokens, + totalCalls, + } +} + +// ── Public class ───────────────────────────────────────────────────────────── + +export class UsageLedger { + private counters = new Map() + private windowStart = new Date() + + /** + * Record token usage for a single LLM call. + * Call this once per LLM invocation with the model ID and token counts. + */ + record(modelId: string, promptTokens: number, completionTokens: number): void { + const p = Math.max(0, promptTokens) + const c = Math.max(0, completionTokens) + let entry = this.counters.get(modelId) + if (!entry) { + entry = createEmptyEntry(modelId) + this.counters.set(modelId, entry) + } + entry.promptTokens += p + entry.completionTokens += c + entry.totalTokens += p + c + entry.callCount += 1 + } + + /** + * Return a frozen snapshot of the current usage window. + * + * @param reset - When true (default), clears all counters and starts a new + * window. Use reset=true in the heartbeat push so each window is shipped + * exactly once. Use reset=false for inspection without draining. + */ + snapshot(reset = true): UsageSnapshot { + const snap = buildSnapshot(this.counters, this.windowStart) + if (reset) { + this.counters = new Map() + this.windowStart = new Date() + } + return snap + } +} diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 848c5e9..151e6bb 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -106,6 +106,12 @@ export { export { type PushModeOptions } from './agent/push.js' +export { + UsageLedger, + type UsageSnapshot, + type ModelUsageEntry, +} from './agent/usage-ledger.js' + export { agentSpecFastifyPlugin } from './agent/adapters/fastify.js' export { agentSpecExpressRouter } from './agent/adapters/express.js' diff --git a/packages/sidecar/src/__tests__/usage.test.ts b/packages/sidecar/src/__tests__/usage.test.ts new file mode 100644 index 0000000..97f543c --- /dev/null +++ b/packages/sidecar/src/__tests__/usage.test.ts @@ -0,0 +1,117 @@ +/** + * Tests for GET /usage — token usage aggregation from the audit ring. + */ + +import { describe, it, expect, beforeEach } from 'vitest' +import type { FastifyInstance } from 'fastify' +import { buildControlPlaneApp } from '../control-plane/index.js' +import { AuditRing } from '../audit-ring.js' +import { testManifest } from './fixtures.js' + +describe('GET /usage', () => { + let app: FastifyInstance + let ring: AuditRing + + beforeEach(async () => { + ring = new AuditRing() + app = await buildControlPlaneApp(testManifest, ring, { opaUrl: null }) + }) + + it('returns 200 with empty totals when ring has no model calls', async () => { + const res = await app.inject({ method: 'GET', url: '/usage' }) + expect(res.statusCode).toBe(200) + + const body = res.json() + expect(body.totalTokens).toBe(0) + expect(body.totalCalls).toBe(0) + expect(body.models).toEqual([]) + expect(body.sampleSize).toBe(0) + }) + + it('sums tokens across multiple audit entries', async () => { + ring.push({ + requestId: 'r1', + timestamp: new Date().toISOString(), + method: 'POST', + path: '/chat', + modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 100 }], + }) + ring.push({ + requestId: 'r2', + timestamp: new Date().toISOString(), + method: 'POST', + path: '/chat', + modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 250 }], + }) + + const res = await app.inject({ method: 'GET', url: '/usage' }) + const body = res.json() + + expect(body.totalTokens).toBe(350) + expect(body.totalCalls).toBe(2) + expect(body.models).toHaveLength(1) + expect(body.models[0]).toEqual({ + modelId: 'openai/gpt-4o', + totalTokens: 350, + callCount: 2, + }) + expect(body.sampleSize).toBe(2) + }) + + it('groups by modelId correctly', async () => { + ring.push({ + requestId: 'r1', + timestamp: new Date().toISOString(), + method: 'POST', + path: '/chat', + modelCalls: [ + { modelId: 'openai/gpt-4o', tokenCount: 100 }, + { modelId: 'anthropic/claude-sonnet-4-6', tokenCount: 200 }, + ], + }) + ring.push({ + requestId: 'r2', + timestamp: new Date().toISOString(), + method: 'POST', + path: '/chat', + modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 50 }], + }) + + const res = await app.inject({ method: 'GET', url: '/usage' }) + const body = res.json() + + expect(body.totalTokens).toBe(350) + expect(body.totalCalls).toBe(3) + expect(body.models).toHaveLength(2) + + const gpt = body.models.find((m: { modelId: string }) => m.modelId === 'openai/gpt-4o') + const claude = body.models.find((m: { modelId: string }) => m.modelId === 'anthropic/claude-sonnet-4-6') + + expect(gpt).toEqual({ modelId: 'openai/gpt-4o', totalTokens: 150, callCount: 2 }) + expect(claude).toEqual({ modelId: 'anthropic/claude-sonnet-4-6', totalTokens: 200, callCount: 1 }) + }) + + it('safely skips entries without modelCalls', async () => { + ring.push({ + requestId: 'r1', + timestamp: new Date().toISOString(), + method: 'GET', + path: '/health', + // no modelCalls + }) + ring.push({ + requestId: 'r2', + timestamp: new Date().toISOString(), + method: 'POST', + path: '/chat', + modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 500 }], + }) + + const res = await app.inject({ method: 'GET', url: '/usage' }) + const body = res.json() + + expect(body.totalTokens).toBe(500) + expect(body.totalCalls).toBe(1) + expect(body.sampleSize).toBe(2) + }) +}) diff --git a/packages/sidecar/src/control-plane/index.ts b/packages/sidecar/src/control-plane/index.ts index 07591d3..ea4d4c4 100644 --- a/packages/sidecar/src/control-plane/index.ts +++ b/packages/sidecar/src/control-plane/index.ts @@ -11,6 +11,7 @@ import { buildEvalRoutes } from './eval.js' import { buildGapRoutes } from './gap.js' import { buildEventsRoutes } from './events.js' import { buildProofRoutes, ProofStore } from './proof.js' +import { buildUsageRoutes } from './usage.js' export interface ControlPlaneOptions { logger?: boolean @@ -39,6 +40,7 @@ export async function buildControlPlaneApp( await buildGapRoutes(app, manifest, auditRing) await buildEventsRoutes(app, manifest, auditRing, { opaUrl: opts.opaUrl }) await buildProofRoutes(app, opts.proofStore ?? new ProofStore()) + await buildUsageRoutes(app, auditRing) return app } diff --git a/packages/sidecar/src/control-plane/usage.ts b/packages/sidecar/src/control-plane/usage.ts new file mode 100644 index 0000000..ff96740 --- /dev/null +++ b/packages/sidecar/src/control-plane/usage.ts @@ -0,0 +1,67 @@ +/** + * GET /usage — aggregate token usage from the audit ring. + * + * Scans all entries in the ring buffer, sums modelCalls by modelId, + * and returns a UsageResponse with per-model and aggregate totals. + */ + +import type { FastifyInstance } from 'fastify' +import type { AuditRing, AuditEntry } from '../audit-ring.js' + +// ── Types ──────────────────────────────────────────────────────────────────── + +interface UsageByModel { + modelId: string + totalTokens: number + callCount: number +} + +interface UsageResponse { + models: UsageByModel[] + totalTokens: number + totalCalls: number + sampleSize: number +} + +// ── Private helpers ────────────────────────────────────────────────────────── + +function aggregateFromRing(entries: AuditEntry[]): UsageResponse { + const byModel = new Map() + let totalTokens = 0 + let totalCalls = 0 + + for (const entry of entries) { + if (!entry.modelCalls) continue + for (const call of entry.modelCalls) { + let model = byModel.get(call.modelId) + if (!model) { + model = { modelId: call.modelId, totalTokens: 0, callCount: 0 } + byModel.set(call.modelId, model) + } + model.totalTokens += call.tokenCount + model.callCount += 1 + totalTokens += call.tokenCount + totalCalls += 1 + } + } + + return { + models: [...byModel.values()], + totalTokens, + totalCalls, + sampleSize: entries.length, + } +} + +// ── Route builder ──────────────────────────────────────────────────────────── + +export async function buildUsageRoutes( + app: FastifyInstance, + auditRing: AuditRing, +): Promise { + app.get('/usage', async (_request, reply) => { + const entries = auditRing.getAll() + const usage = aggregateFromRing(entries) + return reply.code(200).send(usage) + }) +}