diff --git a/README.md b/README.md
index 82dd0d4..d3cd330 100644
--- a/README.md
+++ b/README.md
@@ -28,6 +28,7 @@ agentspec generate agent.yaml --framework langgraph
- [x] **Scan** an existing codebase and auto-generate the manifest
- [x] **Evaluate** agent quality against JSONL datasets with CI pass/fail gates
- [x] **Deploy** to Kubernetes — operator injects sidecar, exposes `/health/ready` and `/gap`
+- [x] **Track** token usage per model — in-process metering, no external infrastructure
- [x] **Export** to A2A / AgentCard format
- [ ] Visual dashboard for fleet-wide agent observability (coming soon)
- [ ] Native OpenTelemetry trace export (coming soon)
@@ -39,7 +40,7 @@ agentspec generate agent.yaml --framework langgraph
- **`agent.yaml`** is the single source of truth — the SDK reads it at runtime, the CLI validates and audits it, the operator deploys it
-- **Sidecar** is injected automatically by the operator and exposes live `/health/ready`, `/gap`, and `/explore` endpoints without touching agent code
+- **Sidecar** is injected automatically by the operator and exposes live `/health/ready`, `/gap`, `/explore`, and `/usage` endpoints without touching agent code
- **CLI** wraps the SDK for local development — validate, audit, generate, scan, evaluate
- **MCP Server** bridges the sidecar to Claude Code and VS Code for in-editor introspection
@@ -126,6 +127,13 @@ npm install @agentspec/sdk
[medium] MEM-04 — Vector store namespace isolated
```
+**Token usage** (`kubectl get agentobservations`):
+```
+NAME PHASE GRADE SCORE TOKENS CHECKED
+budget-assistant Healthy B 82 12,450 30s ago
+gymcoach Healthy A 95 3,200 15s ago
+```
+
---
## Manifest
diff --git a/docs/concepts/operating-modes.md b/docs/concepts/operating-modes.md
index 2190a8e..d895db9 100644
--- a/docs/concepts/operating-modes.md
+++ b/docs/concepts/operating-modes.md
@@ -15,7 +15,7 @@ source of confusion when working with VS Code, MCP, and the CLI.
| **When to use** | Local dev, or cluster agent via per-agent port-forward | K8s cluster with AgentSpec Operator deployed |
| **URL target** | `http://localhost:4001` (direct or port-forwarded per agent) | Operator service URL (one URL for all agents) |
| **Data freshness** | **Live** — computed fresh on each request | **Stored** — last heartbeat (up to `RATE_LIMIT_SECONDS` stale) |
-| **Endpoints** | `GET /gap`, `GET /proof`, `GET /health/ready`, `GET /explore` | `GET /api/v1/agents/{name}/gap`, `/proof`, `/health` |
+| **Endpoints** | `GET /gap`, `GET /proof`, `GET /health/ready`, `GET /explore`, `GET /usage` | `GET /api/v1/agents/{name}/gap`, `/proof`, `/health`, `/usage` |
| **Auth** | None (port-forward is already a trust boundary) | `X-Admin-Key` header |
| **VS Code config** | `agentspec.sidecarUrl` | `agentspec.cluster.controlPlaneUrl` + `agentspec.cluster.adminKey` |
@@ -53,6 +53,7 @@ All endpoints return **live** data computed at request time:
- `GET /proof` — compliance proof records
- `GET /health/ready` — live health checks
- `GET /explore` — runtime capabilities
+- `GET /usage` — aggregated token usage from the audit ring
### VS Code configuration
@@ -116,6 +117,7 @@ All endpoints return **stored** data from the last heartbeat push:
- `GET /api/v1/agents/{name}/gap` — last known gap report
- `GET /api/v1/agents/{name}/proof` — proof records
- `GET /api/v1/agents/{name}/health` — last health check result
+- `GET /api/v1/agents/{name}/usage` — last token usage snapshot
### VS Code configuration
diff --git a/docs/concepts/runtime-introspection.md b/docs/concepts/runtime-introspection.md
index 3ff78f8..0faa8ba 100644
--- a/docs/concepts/runtime-introspection.md
+++ b/docs/concepts/runtime-introspection.md
@@ -191,6 +191,79 @@ These categories only appear in runtime `HealthReport`s (not in CLI pre-flight o
| `service` | `AgentSpecReporter` | TCP connectivity for `spec.requires.services` entries |
| `model` | `AgentSpecReporter` | Provider API endpoint reachable (resolves `$env:` at runtime) |
+## Token Usage Tracking
+
+`AgentSpecReporter` includes a built-in `UsageLedger` that aggregates LLM token counts in-process. No external infrastructure required — token counts flow through the existing heartbeat push.
+
+### Recording usage
+
+After each LLM call, record the token counts:
+
+```typescript
+reporter.usage.record('openai/gpt-4o', promptTokens, completionTokens)
+```
+
+For LangGraph agents, `instrument_call_model` records automatically when a `ledger` is provided:
+
+```python
+from agentspec_langgraph import instrument_call_model, UsageLedger
+
+ledger = UsageLedger()
+call_model = instrument_call_model(
+ original_call_model,
+ reporter=reporter,
+ model_id="groq/llama-3.3-70b-versatile",
+ ledger=ledger,
+)
+```
+
+### How it flows
+
+```
+LLM response → UsageLedger.record() → heartbeat push → CRD status → VS Code
+ sidecar GET /usage ─────────→ VS Code
+```
+
+Each heartbeat ships a **window snapshot** (e.g., last 30s of usage), then resets the counters. The control plane stores each window with the heartbeat row.
+
+### Querying usage
+
+**Sidecar mode** (live, from audit ring):
+```
+GET /usage
+```
+
+**Operator mode** (stored, from last heartbeat):
+```
+GET /api/v1/agents/{name}/usage
+```
+
+Response:
+```json
+{
+ "windowStartedAt": "2026-03-31T12:00:00.000Z",
+ "models": [
+ { "modelId": "openai/gpt-4o", "totalTokens": 1250, "callCount": 8 }
+ ],
+ "totalTokens": 1250,
+ "totalCalls": 8
+}
+```
+
+### CRD visibility
+
+Token usage appears in the `AgentObservation` CRD status and in `kubectl` output:
+
+```bash
+kubectl get agentobservations
+# NAME PHASE GRADE SCORE TOKENS CHECKED
+# gymcoach Healthy A 92 1250 2m ago
+```
+
+### VS Code
+
+The agent detail panel shows a **Token Usage** section with total tokens, call count, and a per-model breakdown table.
+
## Caching and Refresh
`AgentSpecReporter` caches the last `HealthReport` to avoid hammering external APIs on every request to `/agentspec/health`.
diff --git a/packages/control-plane/api/agents.py b/packages/control-plane/api/agents.py
index e9272e6..bb0cdf6 100644
--- a/packages/control-plane/api/agents.py
+++ b/packages/control-plane/api/agents.py
@@ -3,6 +3,7 @@
GET /api/v1/agents/{name}/health — last known HealthReport for an agent
GET /api/v1/agents/{name}/gap — last known GapReport for an agent
GET /api/v1/agents/{name}/proof — last known proof records for an agent
+GET /api/v1/agents/{name}/usage — last known token usage for an agent
All endpoints require the X-Admin-Key header (verify_admin_key dependency).
The {name} path parameter is validated against the k8s resource name pattern.
@@ -10,15 +11,17 @@
from __future__ import annotations
import logging
+from typing import Any, Optional, Type
from fastapi import APIRouter, Depends, HTTPException, Path
+from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from auth.keys import verify_admin_key
from db.base import get_session
from db.models import Agent, Heartbeat
-from schemas import AgentSummary, StoredHealthReport, StoredGapReport, StoredProofRecords
+from schemas import AgentSummary, StoredHealthReport, StoredGapReport, StoredProofRecords, StoredUsageReport
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -56,6 +59,32 @@ async def _get_latest_heartbeat(
return agent, latest
+async def _get_validated_field(
+ name: str,
+ session: AsyncSession,
+ field: str,
+ schema: Type[BaseModel],
+ raw_data: Optional[dict[str, Any]] = None,
+) -> dict:
+ """Fetch a field from the latest heartbeat, validate through a Pydantic model, return dict.
+
+ When raw_data is provided it is used instead of getattr(latest, field).
+ This handles the common pattern of merging extra fields (e.g. receivedAt) before validation.
+ """
+ _, latest = await _get_latest_heartbeat(name, session)
+ data = raw_data if raw_data is not None else getattr(latest, field, None)
+ if data is None:
+ data = {"receivedAt": latest.received_at.isoformat()}
+ else:
+ data = {**data, "receivedAt": latest.received_at.isoformat()}
+ try:
+ report = schema.model_validate(data)
+ except Exception:
+ logger.error("Stored %s data for '%s' failed schema validation", field, name)
+ raise HTTPException(status_code=500, detail=f"Stored {field} data is corrupt")
+ return report.model_dump()
+
+
@router.get(
"/agents",
response_model=list[AgentSummary],
@@ -107,15 +136,7 @@ async def get_agent_gap(
session: AsyncSession = Depends(get_session),
) -> dict:
"""Return the last known GapReport for a remote agent (from its latest heartbeat)."""
- _, latest = await _get_latest_heartbeat(name, session)
- try:
- report = StoredGapReport.model_validate(
- {**(latest.gap or {}), "receivedAt": latest.received_at.isoformat()}
- )
- except Exception:
- logger.error("Stored gap data for '%s' failed schema validation", name)
- raise HTTPException(status_code=500, detail="Stored gap data is corrupt")
- return report.model_dump()
+ return await _get_validated_field(name, session, "gap", StoredGapReport)
@router.get(
@@ -128,11 +149,19 @@ async def get_agent_proof(
) -> dict:
"""Return the last known proof records for a remote agent (from its latest heartbeat)."""
_, latest = await _get_latest_heartbeat(name, session)
- try:
- stored = StoredProofRecords.model_validate(
- {"records": latest.proof or [], "receivedAt": latest.received_at.isoformat()}
- )
- except Exception:
- logger.error("Stored proof data for '%s' failed schema validation", name)
- raise HTTPException(status_code=500, detail="Stored proof data is corrupt")
- return stored.model_dump()
+ return await _get_validated_field(
+ name, session, "proof", StoredProofRecords,
+ raw_data={"records": latest.proof or []},
+ )
+
+
+@router.get(
+ "/agents/{name}/usage",
+ dependencies=[Depends(verify_admin_key)],
+)
+async def get_agent_usage(
+ name: str = _K8S_NAME_PATH,
+ session: AsyncSession = Depends(get_session),
+) -> dict:
+ """Return the last known token usage for a remote agent (from its latest heartbeat)."""
+ return await _get_validated_field(name, session, "usage", StoredUsageReport)
diff --git a/packages/control-plane/api/heartbeat.py b/packages/control-plane/api/heartbeat.py
index fd50449..5661439 100644
--- a/packages/control-plane/api/heartbeat.py
+++ b/packages/control-plane/api/heartbeat.py
@@ -117,7 +117,7 @@ async def heartbeat(
_check_rate_limit(agent_id)
# 6. Derive phase / grade / score
- status_patch = build_status_patch(data.health, data.gap)
+ status_patch = build_status_patch(data.health, data.gap, data.usage)
now = datetime.now(timezone.utc)
# 7. Persist heartbeat
@@ -127,6 +127,7 @@ async def heartbeat(
health=data.health,
gap=data.gap,
proof=data.proof,
+ usage=data.usage,
)
session.add(hb)
diff --git a/packages/control-plane/db/models.py b/packages/control-plane/db/models.py
index 087a287..29198cb 100644
--- a/packages/control-plane/db/models.py
+++ b/packages/control-plane/db/models.py
@@ -43,5 +43,6 @@ class Heartbeat(Base):
health: Mapped[dict] = mapped_column(JSON, nullable=False)
gap: Mapped[dict] = mapped_column(JSON, nullable=False)
proof: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
+ usage: Mapped[dict | None] = mapped_column(JSON, nullable=True)
agent: Mapped[Agent] = relationship(back_populates="heartbeats")
diff --git a/packages/control-plane/k8s/upsert.py b/packages/control-plane/k8s/upsert.py
index 85e3e81..fe79d8b 100644
--- a/packages/control-plane/k8s/upsert.py
+++ b/packages/control-plane/k8s/upsert.py
@@ -19,7 +19,7 @@
NAMESPACE = "agentspec-remote"
-def build_status_patch(health: dict[str, Any], gap: dict[str, Any]) -> dict[str, Any]:
+def build_status_patch(health: dict[str, Any], gap: dict[str, Any], usage: dict[str, Any] | None = None) -> dict[str, Any]:
"""
Derive AgentObservation .status from heartbeat data.
@@ -49,13 +49,16 @@ def build_status_patch(health: dict[str, Any], gap: dict[str, Any]) -> dict[str,
else:
grade = "F"
- return {
+ patch: dict[str, Any] = {
"phase": phase,
"grade": grade,
"score": score,
"health": health,
"gap": gap,
}
+ if usage is not None:
+ patch["usage"] = usage
+ return patch
async def upsert_agent_observation(
diff --git a/packages/control-plane/schemas.py b/packages/control-plane/schemas.py
index 9363629..48b5bf7 100644
--- a/packages/control-plane/schemas.py
+++ b/packages/control-plane/schemas.py
@@ -39,6 +39,7 @@ class HeartbeatRequest(BaseModel):
health: dict[str, Any]
gap: dict[str, Any]
proof: list[dict[str, Any]] = Field(default_factory=list)
+ usage: Optional[dict[str, Any]] = None
# ── Stored health report (GET /agents/{name}/health response) ─────────────────
@@ -95,6 +96,20 @@ class StoredProofRecords(BaseModel):
receivedAt: Optional[str] = None
+# ── Stored usage report (GET /agents/{name}/usage response) ─────────────────
+
+class StoredUsageReport(BaseModel):
+ """Schema for the usage snapshot stored in heartbeat rows.
+
+ Strips unknown fields (extra='ignore') — same pattern as StoredHealthReport.
+ """
+ windowStartedAt: Optional[str] = None
+ models: list[dict[str, Any]] = Field(default_factory=list)
+ totalTokens: int = 0
+ totalCalls: int = 0
+ receivedAt: Optional[str] = None
+
+
# ── Agent summary (list endpoint) ─────────────────────────────────────────────
class AgentSummary(BaseModel):
diff --git a/packages/control-plane/tests/test_agents.py b/packages/control-plane/tests/test_agents.py
index b364160..c60835f 100644
--- a/packages/control-plane/tests/test_agents.py
+++ b/packages/control-plane/tests/test_agents.py
@@ -1,6 +1,7 @@
"""
Tests for GET /api/v1/agents, GET /api/v1/agents/{name}/health,
-GET /api/v1/agents/{name}/gap, and GET /api/v1/agents/{name}/proof.
+GET /api/v1/agents/{name}/gap, GET /api/v1/agents/{name}/proof,
+and GET /api/v1/agents/{name}/usage.
All endpoints require X-Admin-Key authentication.
"""
from __future__ import annotations
@@ -277,3 +278,74 @@ async def test_get_proof_strips_unknown_fields(registered):
record = resp.json()["records"][0]
assert "internal_secret" not in record
assert record["ruleId"] == "SEC-LLM-06"
+
+
+# ── GET /agents/{name}/usage ────────────────────────────────────────────────
+
+@pytest.mark.asyncio
+async def test_get_usage_unknown_agent_returns_404(client):
+ ac, _ = client
+ resp = await ac.get("/api/v1/agents/does-not-exist/usage", headers=ADMIN_HEADERS)
+ assert resp.status_code == 404
+
+
+@pytest.mark.asyncio
+async def test_get_usage_no_heartbeat_yet_returns_404(client):
+ ac, _ = client
+ await ac.post(
+ "/api/v1/register",
+ json={"agentName": "silent-agent", "runtime": "local"},
+ headers=ADMIN_HEADERS,
+ )
+ resp = await ac.get("/api/v1/agents/silent-agent/usage", headers=ADMIN_HEADERS)
+ assert resp.status_code == 404
+
+
+@pytest.mark.asyncio
+async def test_get_usage_returns_stored_usage(registered):
+ ac, mock_upsert, agent_id, api_key = registered
+ headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
+ usage = {
+ "windowStartedAt": "2026-03-31T12:00:00.000Z",
+ "models": [{"modelId": "openai/gpt-4o", "totalTokens": 500, "callCount": 3}],
+ "totalTokens": 500,
+ "totalCalls": 3,
+ }
+ await ac.post(
+ "/api/v1/heartbeat",
+ content=json.dumps({"health": make_health(), "gap": make_gap(), "usage": usage}),
+ headers=headers,
+ )
+ resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS)
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["totalTokens"] == 500
+ assert data["totalCalls"] == 3
+ assert data["models"][0]["modelId"] == "openai/gpt-4o"
+ assert "receivedAt" in data
+
+
+@pytest.mark.asyncio
+async def test_get_usage_returns_empty_when_no_usage_in_heartbeat(registered):
+ """When heartbeat had no usage field, GET /usage returns zeroed-out defaults."""
+ ac, mock_upsert, agent_id, api_key = registered
+ headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
+ await ac.post(
+ "/api/v1/heartbeat",
+ content=json.dumps({"health": make_health(), "gap": make_gap()}),
+ headers=headers,
+ )
+ resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS)
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["totalTokens"] == 0
+ assert data["totalCalls"] == 0
+ assert data["models"] == []
+ assert "receivedAt" in data
+
+
+@pytest.mark.asyncio
+async def test_get_usage_requires_auth(client):
+ ac, _ = client
+ resp = await ac.get("/api/v1/agents/some-agent/usage")
+ assert resp.status_code in (401, 403)
diff --git a/packages/control-plane/tests/test_heartbeat.py b/packages/control-plane/tests/test_heartbeat.py
index 3497c2d..256e821 100644
--- a/packages/control-plane/tests/test_heartbeat.py
+++ b/packages/control-plane/tests/test_heartbeat.py
@@ -225,3 +225,49 @@ async def test_heartbeat_rate_limit_returns_429(registered, monkeypatch):
second = await ac.post("/api/v1/heartbeat", content=payload, headers=headers)
assert second.status_code == 429
+
+
+# ── Token usage in heartbeat ────────────────────────────────────────────────
+
+@pytest.mark.asyncio
+async def test_heartbeat_with_usage_stores_correctly(registered):
+ ac, _, agent_id, api_key = registered
+ usage = {
+ "windowStartedAt": "2026-03-31T12:00:00.000Z",
+ "models": [{"modelId": "openai/gpt-4o", "totalTokens": 500, "callCount": 3}],
+ "totalTokens": 500,
+ "totalCalls": 3,
+ }
+ resp = await ac.post(
+ "/api/v1/heartbeat",
+ content=json.dumps({"health": make_health(), "gap": make_gap(), "usage": usage}),
+ headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
+ )
+ assert resp.status_code == 204
+
+ # Verify stored via GET /agents/{name}/usage
+ usage_resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS)
+ assert usage_resp.status_code == 200
+ data = usage_resp.json()
+ assert data["totalTokens"] == 500
+ assert data["totalCalls"] == 3
+ assert len(data["models"]) == 1
+
+
+@pytest.mark.asyncio
+async def test_heartbeat_without_usage_succeeds(registered):
+ ac, _, agent_id, api_key = registered
+ resp = await ac.post(
+ "/api/v1/heartbeat",
+ content=json.dumps({"health": make_health(), "gap": make_gap()}),
+ headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
+ )
+ assert resp.status_code == 204
+
+ # GET /agents/{name}/usage returns empty usage
+ usage_resp = await ac.get("/api/v1/agents/test-bedrock/usage", headers=ADMIN_HEADERS)
+ assert usage_resp.status_code == 200
+ data = usage_resp.json()
+ assert data["totalTokens"] == 0
+ assert data["totalCalls"] == 0
+ assert data["models"] == []
diff --git a/packages/control-plane/tests/test_k8s_upsert.py b/packages/control-plane/tests/test_k8s_upsert.py
index 8d3f414..97840cf 100644
--- a/packages/control-plane/tests/test_k8s_upsert.py
+++ b/packages/control-plane/tests/test_k8s_upsert.py
@@ -161,6 +161,24 @@ def test_build_status_patch_score_clamped_below_zero():
assert patch["grade"] == "F"
+# ── Usage in status patch ────────────────────────────────────────────────────
+
+def test_build_status_patch_includes_usage_when_provided():
+ usage = {"totalTokens": 500, "totalCalls": 3, "models": [{"modelId": "openai/gpt-4o", "totalTokens": 500, "callCount": 3}]}
+ patch = build_status_patch(make_health("ready"), make_gap(80), usage=usage)
+ assert patch["usage"] == usage
+
+
+def test_build_status_patch_omits_usage_when_none():
+ patch = build_status_patch(make_health("ready"), make_gap(80), usage=None)
+ assert "usage" not in patch
+
+
+def test_build_status_patch_omits_usage_by_default():
+ patch = build_status_patch(make_health("ready"), make_gap(80))
+ assert "usage" not in patch
+
+
@pytest.mark.asyncio
async def test_upsert_idempotent_called_twice():
mock_client = AsyncMock()
diff --git a/packages/operator/crds/agentobservation.yaml b/packages/operator/crds/agentobservation.yaml
index 9a1cbb4..81c6526 100644
--- a/packages/operator/crds/agentobservation.yaml
+++ b/packages/operator/crds/agentobservation.yaml
@@ -46,6 +46,10 @@ spec:
type: string
description: "Data source: agent-sdk (live) or manifest-static (fallback)"
jsonPath: .status.source
+ - name: Tokens
+ type: integer
+ description: "Total tokens in last heartbeat window"
+ jsonPath: .status.usage.totalTokens
- name: Checked
type: date
description: "Time of last successful probe"
@@ -133,6 +137,35 @@ spec:
type: integer
skipped:
type: integer
+ usage:
+ type: object
+ description: "Token usage from the last heartbeat window"
+ properties:
+ totalTokens:
+ type: integer
+ description: "Total tokens consumed in the last heartbeat window"
+ totalCalls:
+ type: integer
+ description: "Total model calls in the last heartbeat window"
+ models:
+ type: array
+ description: "Per-model token usage breakdown"
+ items:
+ type: object
+ properties:
+ modelId:
+ type: string
+ description: "Model identifier (e.g. openai/gpt-4o)"
+ totalTokens:
+ type: integer
+ description: "Total tokens for this model"
+ callCount:
+ type: integer
+ description: "Number of calls to this model"
+ windowStartedAt:
+ type: string
+ format: date-time
+ description: "Start of the usage aggregation window"
conditions:
type: array
description: "Standard Kubernetes conditions"
diff --git a/packages/sdk-langgraph/agentspec_langgraph/__init__.py b/packages/sdk-langgraph/agentspec_langgraph/__init__.py
index d1d319c..d34c4e8 100644
--- a/packages/sdk-langgraph/agentspec_langgraph/__init__.py
+++ b/packages/sdk-langgraph/agentspec_langgraph/__init__.py
@@ -64,6 +64,7 @@
from .model_instrumentation import instrument_call_model
from .sidecar_client import SidecarClient
from .tool_node import AgentSpecToolNode
+from .usage_ledger import UsageLedger
__version__ = "0.1.0"
@@ -80,4 +81,5 @@
"ModelCallEvent",
"GuardrailEvent",
"MemoryWriteEvent",
+ "UsageLedger",
]
diff --git a/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py b/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py
index 31a8263..d5606c7 100644
--- a/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py
+++ b/packages/sdk-langgraph/agentspec_langgraph/model_instrumentation.py
@@ -28,6 +28,7 @@
from typing import Any, Callable, Optional
from .events import ModelCallEvent
+from .usage_ledger import UsageLedger
def instrument_call_model(
@@ -35,6 +36,7 @@ def instrument_call_model(
reporter: Optional[Any] = None,
model_id: str = "unknown/unknown",
max_events: int = 10_000,
+ ledger: Optional[UsageLedger] = None,
) -> Callable[..., Any]:
"""
Wrap a LangGraph call_model node function with AgentSpec instrumentation.
@@ -43,6 +45,7 @@ def instrument_call_model(
- Times each LLM call
- Extracts token usage from AIMessage.usage_metadata or response_metadata
- Emits a ModelCallEvent to the reporter
+ - Records token usage in the UsageLedger (if provided)
Parameters
----------
@@ -56,6 +59,9 @@ def instrument_call_model(
max_events:
Maximum number of call events to retain in memory (default: 10,000).
Older events are dropped when the limit is reached.
+ ledger:
+ Optional UsageLedger for aggregating token counts across calls.
+ When provided, every model call accumulates into the ledger.
Returns
-------
@@ -86,6 +92,12 @@ def wrapped(*args: Any, **kwargs: Any) -> Any:
except Exception:
pass # Reporter errors never break the agent
+ if ledger is not None:
+ try:
+ ledger.record(model_id, prompt_tokens or 0, completion_tokens or 0)
+ except Exception:
+ pass # Ledger errors never break the agent
+
return result
# Expose call log on the wrapped function for testing / introspection
diff --git a/packages/sdk-langgraph/agentspec_langgraph/usage_ledger.py b/packages/sdk-langgraph/agentspec_langgraph/usage_ledger.py
new file mode 100644
index 0000000..b5296e1
--- /dev/null
+++ b/packages/sdk-langgraph/agentspec_langgraph/usage_ledger.py
@@ -0,0 +1,98 @@
+"""
+UsageLedger — in-process token usage counter (Python mirror of TypeScript UsageLedger).
+
+Framework-agnostic, zero-dependency accumulator for LLM token counts.
+The agent code calls record() after each LLM call; the reporter drains
+the ledger via snapshot(reset=True) on each heartbeat push.
+
+Thread-safe: uses threading.Lock to protect concurrent record() calls
+(common in LangGraph agents using ThreadPoolExecutor for tool calls).
+
+Usage:
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", prompt_tokens=100, completion_tokens=50)
+ snapshot = ledger.snapshot(reset=True) # returns dict, resets counters
+"""
+
+from __future__ import annotations
+
+import threading
+from datetime import datetime, timezone
+from typing import Any
+
+
+# ── Private helpers ───────────────────────────────────────────────────────────
+
+
+def _create_empty_entry(model_id: str) -> dict[str, Any]:
+ return {
+ "modelId": model_id,
+ "promptTokens": 0,
+ "completionTokens": 0,
+ "totalTokens": 0,
+ "callCount": 0,
+ }
+
+
+def _build_snapshot(
+ counters: dict[str, dict[str, Any]],
+ window_start: datetime,
+) -> dict[str, Any]:
+ # Shallow-copy each entry to avoid exposing mutable internal state
+ models = [dict(entry) for entry in counters.values()]
+ total_tokens = sum(m["totalTokens"] for m in models)
+ total_calls = sum(m["callCount"] for m in models)
+ return {
+ "windowStartedAt": window_start.isoformat(),
+ "models": models,
+ "totalTokens": total_tokens,
+ "totalCalls": total_calls,
+ }
+
+
+# ── Public class ──────────────────────────────────────────────────────────────
+
+
+class UsageLedger:
+ """In-process token usage accumulator.
+
+ Thread-safe. O(1) record(), snapshot returns a dict matching the
+ TypeScript UsageSnapshot shape.
+ """
+
+ def __init__(self) -> None:
+ self._lock = threading.Lock()
+ self._counters: dict[str, dict[str, Any]] = {}
+ self._window_start = datetime.now(timezone.utc)
+
+ def record(
+ self,
+ model_id: str,
+ prompt_tokens: int = 0,
+ completion_tokens: int = 0,
+ ) -> None:
+ """Record token usage for a single LLM call."""
+ p = max(0, prompt_tokens)
+ c = max(0, completion_tokens)
+ with self._lock:
+ entry = self._counters.get(model_id)
+ if entry is None:
+ entry = _create_empty_entry(model_id)
+ self._counters[model_id] = entry
+ entry["promptTokens"] += p
+ entry["completionTokens"] += c
+ entry["totalTokens"] += p + c
+ entry["callCount"] += 1
+
+ def snapshot(self, reset: bool = True) -> dict[str, Any]:
+ """Return a frozen snapshot of the current usage window.
+
+ Args:
+ reset: When True (default), clears all counters and starts a new window.
+ """
+ with self._lock:
+ snap = _build_snapshot(self._counters, self._window_start)
+ if reset:
+ self._counters = {}
+ self._window_start = datetime.now(timezone.utc)
+ return snap
diff --git a/packages/sdk-langgraph/tests/test_model_instrumentation.py b/packages/sdk-langgraph/tests/test_model_instrumentation.py
index fc2eabc..4994cd9 100644
--- a/packages/sdk-langgraph/tests/test_model_instrumentation.py
+++ b/packages/sdk-langgraph/tests/test_model_instrumentation.py
@@ -184,3 +184,56 @@ def raw_call_model(state):
result = call_model({"messages": []})
assert result == "raw response"
assert reporter.model_calls[0].token_count == 0
+
+
+def test_ledger_error_does_not_propagate(reporter):
+ """Ledger errors must not break the agent — same pattern as reporter errors."""
+ from unittest.mock import MagicMock
+ from agentspec_langgraph.usage_ledger import UsageLedger
+
+ broken_ledger = MagicMock(spec=UsageLedger)
+ broken_ledger.record.side_effect = RuntimeError("ledger boom")
+
+ def raw_call_model(state):
+ msg = MockAIMessage()
+ msg.usage_metadata = {"total_tokens": 100, "input_tokens": 60, "output_tokens": 40}
+ return {"messages": [msg]}
+
+ call_model = instrument_call_model(
+ raw_call_model, reporter=reporter, model_id="openai/gpt-4o", ledger=broken_ledger,
+ )
+ result = call_model({"messages": []})
+
+ # Function returns normally despite ledger error
+ assert "messages" in result
+ assert len(result["messages"]) == 1
+ # Reporter still recorded
+ assert len(reporter.model_calls) == 1
+ # Ledger was attempted
+ broken_ledger.record.assert_called_once()
+
+
+def test_ledger_accumulates_usage(reporter):
+ """When a UsageLedger is provided, instrument_call_model records usage into it."""
+ from agentspec_langgraph.usage_ledger import UsageLedger
+
+ ledger = UsageLedger()
+
+ def raw_call_model(state):
+ msg = MockAIMessage()
+ msg.usage_metadata = {"total_tokens": 342, "input_tokens": 200, "output_tokens": 142}
+ return {"messages": [msg]}
+
+ call_model = instrument_call_model(
+ raw_call_model,
+ reporter=reporter,
+ model_id="openai/gpt-4o",
+ ledger=ledger,
+ )
+ call_model({"messages": []})
+ call_model({"messages": []})
+
+ snap = ledger.snapshot(reset=False)
+ assert snap["totalTokens"] == 684
+ assert snap["totalCalls"] == 2
+ assert snap["models"][0]["modelId"] == "openai/gpt-4o"
diff --git a/packages/sdk-langgraph/tests/test_usage_ledger.py b/packages/sdk-langgraph/tests/test_usage_ledger.py
new file mode 100644
index 0000000..dfe730d
--- /dev/null
+++ b/packages/sdk-langgraph/tests/test_usage_ledger.py
@@ -0,0 +1,101 @@
+"""Tests for UsageLedger — in-process token usage counter."""
+
+from datetime import datetime, timezone
+
+import pytest
+
+from agentspec_langgraph.usage_ledger import UsageLedger
+
+
+class TestRecord:
+ def test_accumulates_tokens_for_same_model(self):
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", prompt_tokens=100, completion_tokens=50)
+ ledger.record("openai/gpt-4o", prompt_tokens=200, completion_tokens=80)
+
+ snap = ledger.snapshot(reset=False)
+ assert len(snap["models"]) == 1
+ m = snap["models"][0]
+ assert m["modelId"] == "openai/gpt-4o"
+ assert m["promptTokens"] == 300
+ assert m["completionTokens"] == 130
+ assert m["totalTokens"] == 430
+ assert m["callCount"] == 2
+
+ def test_tracks_multiple_models_independently(self):
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", 100, 50)
+ ledger.record("anthropic/claude-sonnet-4-6", 200, 80)
+ ledger.record("openai/gpt-4o", 50, 20)
+
+ snap = ledger.snapshot(reset=False)
+ assert len(snap["models"]) == 2
+
+ by_id = {m["modelId"]: m for m in snap["models"]}
+ assert by_id["openai/gpt-4o"]["totalTokens"] == 220
+ assert by_id["anthropic/claude-sonnet-4-6"]["totalTokens"] == 280
+
+
+class TestSnapshot:
+ def test_returns_correct_aggregate_totals(self):
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", 100, 50)
+ ledger.record("anthropic/claude-sonnet-4-6", 200, 80)
+
+ snap = ledger.snapshot(reset=False)
+ assert snap["totalTokens"] == 430
+ assert snap["totalCalls"] == 2
+
+ def test_resets_counters_when_reset_true(self):
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", 100, 50)
+
+ first = ledger.snapshot(reset=True)
+ assert first["totalTokens"] == 150
+
+ second = ledger.snapshot(reset=False)
+ assert second["models"] == []
+ assert second["totalTokens"] == 0
+ assert second["totalCalls"] == 0
+
+ def test_does_not_reset_when_reset_false(self):
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", 100, 50)
+ ledger.snapshot(reset=False)
+
+ snap = ledger.snapshot(reset=False)
+ assert snap["totalTokens"] == 150
+
+ def test_window_started_at_resets_after_snapshot_true(self):
+ ledger = UsageLedger()
+ snap1 = ledger.snapshot(reset=False)
+ window1 = snap1["windowStartedAt"]
+
+ ledger.record("openai/gpt-4o", 10, 5)
+ ledger.snapshot(reset=True)
+
+ snap2 = ledger.snapshot(reset=False)
+ assert datetime.fromisoformat(snap2["windowStartedAt"]) >= datetime.fromisoformat(window1)
+
+ def test_empty_snapshot(self):
+ ledger = UsageLedger()
+ snap = ledger.snapshot(reset=False)
+ assert snap["models"] == []
+ assert snap["totalTokens"] == 0
+ assert snap["totalCalls"] == 0
+ assert "windowStartedAt" in snap
+
+ def test_defaults_reset_to_true(self):
+ ledger = UsageLedger()
+ ledger.record("openai/gpt-4o", 100, 50)
+ ledger.snapshot() # default reset=True
+
+ snap = ledger.snapshot(reset=False)
+ assert snap["totalTokens"] == 0
+
+ def test_window_started_at_is_valid_iso(self):
+ ledger = UsageLedger()
+ snap = ledger.snapshot(reset=False)
+ # Should not raise
+ dt = datetime.fromisoformat(snap["windowStartedAt"])
+ assert dt.tzinfo is not None # timezone-aware
diff --git a/packages/sdk/src/__tests__/push.test.ts b/packages/sdk/src/__tests__/push.test.ts
index 2c21d95..7019f94 100644
--- a/packages/sdk/src/__tests__/push.test.ts
+++ b/packages/sdk/src/__tests__/push.test.ts
@@ -157,7 +157,7 @@ describe('AgentSpecReporter — push mode', () => {
reporter.stopPushMode()
})
- it('5. body contains { health, gap } with correct agentName', async () => {
+ it('5. body contains { health, gap, usage } with correct agentName', async () => {
const { AgentSpecReporter } = await import('../agent/reporter.js')
const reporter = new AgentSpecReporter(testManifest)
@@ -170,9 +170,10 @@ describe('AgentSpecReporter — push mode', () => {
await vi.advanceTimersByTimeAsync(500)
const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]
- const body = JSON.parse(init.body as string) as { health: HealthReport; gap: AuditReport }
+ const body = JSON.parse(init.body as string) as { health: HealthReport; gap: AuditReport; usage: unknown }
expect(body.health.agentName).toBe('push-test-agent')
expect(body.gap.agentName).toBe('push-test-agent')
+ expect(body).toHaveProperty('usage')
reporter.stopPushMode()
})
@@ -321,4 +322,99 @@ describe('AgentSpecReporter — push mode', () => {
expect(reporter.isPushModeActive()).toBe(false)
})
+
+ // ── Token usage in heartbeat ───────────────────────────────────────────────
+
+ it('13. heartbeat payload includes usage field', async () => {
+ const { AgentSpecReporter } = await import('../agent/reporter.js')
+ const reporter = new AgentSpecReporter(testManifest)
+
+ reporter.usage.record('openai/gpt-4o', 100, 50)
+
+ reporter.startPushMode({
+ controlPlaneUrl: 'https://cp.example.com',
+ apiKey: 'test-key-abc',
+ intervalSeconds: 30,
+ })
+
+ await vi.advanceTimersByTimeAsync(500)
+
+ const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]
+ const body = JSON.parse(init.body as string)
+ expect(body).toHaveProperty('usage')
+ expect(body.usage.totalTokens).toBe(150)
+ expect(body.usage.totalCalls).toBe(1)
+ expect(body.usage.models).toHaveLength(1)
+ expect(body.usage.models[0].modelId).toBe('openai/gpt-4o')
+ reporter.stopPushMode()
+ })
+
+ it('14. heartbeat includes empty usage when no recordUsage() calls', async () => {
+ const { AgentSpecReporter } = await import('../agent/reporter.js')
+ const reporter = new AgentSpecReporter(testManifest)
+
+ reporter.startPushMode({
+ controlPlaneUrl: 'https://cp.example.com',
+ apiKey: 'test-key-abc',
+ intervalSeconds: 30,
+ })
+
+ await vi.advanceTimersByTimeAsync(500)
+
+ const [, init] = fetchMock.mock.calls[0] as [string, RequestInit]
+ const body = JSON.parse(init.body as string)
+ expect(body.usage).toEqual({
+ windowStartedAt: expect.any(String),
+ models: [],
+ totalTokens: 0,
+ totalCalls: 0,
+ })
+ reporter.stopPushMode()
+ })
+
+ it('15. usage ledger resets after heartbeat fires', async () => {
+ const { AgentSpecReporter } = await import('../agent/reporter.js')
+ const reporter = new AgentSpecReporter(testManifest)
+
+ reporter.usage.record('openai/gpt-4o', 100, 50)
+
+ reporter.startPushMode({
+ controlPlaneUrl: 'https://cp.example.com',
+ apiKey: 'test-key-abc',
+ intervalSeconds: 5,
+ })
+
+ // First heartbeat fires and drains the ledger
+ await vi.advanceTimersByTimeAsync(500)
+
+ // Record nothing between first and second heartbeat
+ await vi.advanceTimersByTimeAsync(5000)
+
+ // Second heartbeat should have empty usage
+ const [, init] = fetchMock.mock.calls[1] as [string, RequestInit]
+ const body = JSON.parse(init.body as string)
+ expect(body.usage.totalTokens).toBe(0)
+ expect(body.usage.totalCalls).toBe(0)
+ reporter.stopPushMode()
+ })
+
+ it('16. reporter.usage is a UsageLedger instance', async () => {
+ const { AgentSpecReporter } = await import('../agent/reporter.js')
+ const { UsageLedger } = await import('../agent/usage-ledger.js')
+ const reporter = new AgentSpecReporter(testManifest)
+
+ expect(reporter.usage).toBeInstanceOf(UsageLedger)
+ })
+
+ it('17. reporter.usage.record() accumulates correctly', async () => {
+ const { AgentSpecReporter } = await import('../agent/reporter.js')
+ const reporter = new AgentSpecReporter(testManifest)
+
+ reporter.usage.record('openai/gpt-4o', 100, 50)
+ reporter.usage.record('openai/gpt-4o', 200, 80)
+
+ const snap = reporter.usage.snapshot(false)
+ expect(snap.totalTokens).toBe(430)
+ expect(snap.totalCalls).toBe(2)
+ })
})
diff --git a/packages/sdk/src/__tests__/usage-ledger.test.ts b/packages/sdk/src/__tests__/usage-ledger.test.ts
new file mode 100644
index 0000000..4132100
--- /dev/null
+++ b/packages/sdk/src/__tests__/usage-ledger.test.ts
@@ -0,0 +1,130 @@
+import { describe, it, expect, beforeEach } from 'vitest'
+import { UsageLedger } from '../agent/usage-ledger.js'
+import type { UsageSnapshot, ModelUsageEntry } from '../agent/usage-ledger.js'
+
+describe('UsageLedger', () => {
+ let ledger: UsageLedger
+
+ beforeEach(() => {
+ ledger = new UsageLedger()
+ })
+
+ // ── record() ─────────────────────────────────────────────────────────────────
+
+ it('accumulates tokens for the same model across multiple calls', () => {
+ ledger.record('openai/gpt-4o', 100, 50)
+ ledger.record('openai/gpt-4o', 200, 80)
+
+ const snap = ledger.snapshot(false)
+ expect(snap.models).toHaveLength(1)
+ expect(snap.models[0]).toEqual({
+ modelId: 'openai/gpt-4o',
+ promptTokens: 300,
+ completionTokens: 130,
+ totalTokens: 430,
+ callCount: 2,
+ })
+ })
+
+ it('tracks multiple models independently', () => {
+ ledger.record('openai/gpt-4o', 100, 50)
+ ledger.record('anthropic/claude-sonnet-4-6', 200, 80)
+ ledger.record('openai/gpt-4o', 50, 20)
+
+ const snap = ledger.snapshot(false)
+ expect(snap.models).toHaveLength(2)
+
+ const gpt = snap.models.find((m) => m.modelId === 'openai/gpt-4o')
+ const claude = snap.models.find((m) => m.modelId === 'anthropic/claude-sonnet-4-6')
+
+ expect(gpt).toEqual({
+ modelId: 'openai/gpt-4o',
+ promptTokens: 150,
+ completionTokens: 70,
+ totalTokens: 220,
+ callCount: 2,
+ })
+ expect(claude).toEqual({
+ modelId: 'anthropic/claude-sonnet-4-6',
+ promptTokens: 200,
+ completionTokens: 80,
+ totalTokens: 280,
+ callCount: 1,
+ })
+ })
+
+ // ── snapshot() ────────────────────────────────────────────────────────────────
+
+ it('returns correct aggregate totals', () => {
+ ledger.record('openai/gpt-4o', 100, 50)
+ ledger.record('anthropic/claude-sonnet-4-6', 200, 80)
+
+ const snap = ledger.snapshot(false)
+ expect(snap.totalTokens).toBe(430)
+ expect(snap.totalCalls).toBe(2)
+ })
+
+ it('resets counters when snapshot(true) is called', () => {
+ ledger.record('openai/gpt-4o', 100, 50)
+
+ const first = ledger.snapshot(true)
+ expect(first.totalTokens).toBe(150)
+ expect(first.totalCalls).toBe(1)
+
+ const second = ledger.snapshot(false)
+ expect(second.models).toHaveLength(0)
+ expect(second.totalTokens).toBe(0)
+ expect(second.totalCalls).toBe(0)
+ })
+
+ it('does NOT reset counters when snapshot(false) is called', () => {
+ ledger.record('openai/gpt-4o', 100, 50)
+
+ ledger.snapshot(false)
+
+ const snap = ledger.snapshot(false)
+ expect(snap.totalTokens).toBe(150)
+ expect(snap.totalCalls).toBe(1)
+ })
+
+ it('resets windowStartedAt after snapshot(true)', () => {
+ const snap1 = ledger.snapshot(false)
+ const window1 = snap1.windowStartedAt
+
+ // Force a reset
+ ledger.record('openai/gpt-4o', 10, 5)
+ ledger.snapshot(true)
+
+ // New window should have a >= timestamp
+ const snap2 = ledger.snapshot(false)
+ expect(new Date(snap2.windowStartedAt).getTime())
+ .toBeGreaterThanOrEqual(new Date(window1).getTime())
+ })
+
+ it('returns empty snapshot when no calls have been recorded', () => {
+ const snap = ledger.snapshot(false)
+
+ expect(snap).toEqual({
+ windowStartedAt: expect.any(String),
+ models: [],
+ totalTokens: 0,
+ totalCalls: 0,
+ })
+ })
+
+ it('defaults reset to true', () => {
+ ledger.record('openai/gpt-4o', 100, 50)
+
+ // Default parameter — should reset
+ ledger.snapshot()
+
+ const snap = ledger.snapshot(false)
+ expect(snap.totalTokens).toBe(0)
+ })
+
+ it('windowStartedAt is a valid ISO-8601 string', () => {
+ const snap = ledger.snapshot(false)
+ expect(() => new Date(snap.windowStartedAt)).not.toThrow()
+ expect(new Date(snap.windowStartedAt).toISOString()).toBe(snap.windowStartedAt)
+ })
+})
diff --git a/packages/sdk/src/agent/reporter.ts b/packages/sdk/src/agent/reporter.ts
index aa67d76..b3ad18f 100644
--- a/packages/sdk/src/agent/reporter.ts
+++ b/packages/sdk/src/agent/reporter.ts
@@ -22,6 +22,7 @@ import { runAudit } from '../audit/index.js'
import type { AgentSpecManifest } from '../schema/manifest.schema.js'
import type { HealthReport } from '../health/index.js'
import type { PushModeOptions } from './push.js'
+import { UsageLedger } from './usage-ledger.js'
export interface ReporterOptions {
/**
@@ -52,6 +53,9 @@ export class AgentSpecReporter {
private stopped = false
private readonly registeredTools = new Set()
+ /** In-process token usage accumulator. */
+ readonly usage = new UsageLedger()
+
constructor(
private readonly manifest: AgentSpecManifest,
private readonly opts: ReporterOptions = {},
@@ -172,14 +176,15 @@ export class AgentSpecReporter {
try {
const health = await this.getReport()
const gap = runAudit(this.manifest)
+ const usage = this.usage.snapshot(true)
// Build payload, cap at 64 KB by trimming checks
const trimmedChecks = [...health.checks]
- let body = JSON.stringify({ health, gap })
+ let body = JSON.stringify({ health, gap, usage })
if (body.length > 65_536) {
while (trimmedChecks.length > 0) {
trimmedChecks.pop()
- body = JSON.stringify({ health: { ...health, checks: trimmedChecks }, gap })
+ body = JSON.stringify({ health: { ...health, checks: trimmedChecks }, gap, usage })
if (body.length <= 65_536) break
}
}
diff --git a/packages/sdk/src/agent/usage-ledger.ts b/packages/sdk/src/agent/usage-ledger.ts
new file mode 100644
index 0000000..1701393
--- /dev/null
+++ b/packages/sdk/src/agent/usage-ledger.ts
@@ -0,0 +1,85 @@
+/** In-process token usage counter — aggregates LLM call tokens between heartbeat pushes. */
+
+// ── Types (exported) ─────────────────────────────────────────────────────────
+
+export interface ModelUsageEntry {
+ modelId: string
+ promptTokens: number
+ completionTokens: number
+ totalTokens: number
+ callCount: number
+}
+
+export interface UsageSnapshot {
+ windowStartedAt: string
+ models: ModelUsageEntry[]
+ totalTokens: number
+ totalCalls: number
+}
+
+// ── Private helpers ──────────────────────────────────────────────────────────
+
+function createEmptyEntry(modelId: string): ModelUsageEntry {
+ return { modelId, promptTokens: 0, completionTokens: 0, totalTokens: 0, callCount: 0 }
+}
+
+function buildSnapshot(
+ counters: Map,
+ windowStart: Date,
+): UsageSnapshot {
+ // Shallow-copy each entry to avoid exposing mutable internal state
+ const models = [...counters.values()].map((e) => ({ ...e }))
+ let totalTokens = 0
+ let totalCalls = 0
+ for (const m of models) {
+ totalTokens += m.totalTokens
+ totalCalls += m.callCount
+ }
+ return {
+ windowStartedAt: windowStart.toISOString(),
+ models,
+ totalTokens,
+ totalCalls,
+ }
+}
+
+// ── Public class ─────────────────────────────────────────────────────────────
+
+export class UsageLedger {
+ private counters = new Map()
+ private windowStart = new Date()
+
+ /**
+ * Record token usage for a single LLM call.
+ * Call this once per LLM invocation with the model ID and token counts.
+ */
+ record(modelId: string, promptTokens: number, completionTokens: number): void {
+ const p = Math.max(0, promptTokens)
+ const c = Math.max(0, completionTokens)
+ let entry = this.counters.get(modelId)
+ if (!entry) {
+ entry = createEmptyEntry(modelId)
+ this.counters.set(modelId, entry)
+ }
+ entry.promptTokens += p
+ entry.completionTokens += c
+ entry.totalTokens += p + c
+ entry.callCount += 1
+ }
+
+ /**
+ * Return a frozen snapshot of the current usage window.
+ *
+ * @param reset - When true (default), clears all counters and starts a new
+ * window. Use reset=true in the heartbeat push so each window is shipped
+ * exactly once. Use reset=false for inspection without draining.
+ */
+ snapshot(reset = true): UsageSnapshot {
+ const snap = buildSnapshot(this.counters, this.windowStart)
+ if (reset) {
+ this.counters = new Map()
+ this.windowStart = new Date()
+ }
+ return snap
+ }
+}
diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts
index 848c5e9..151e6bb 100644
--- a/packages/sdk/src/index.ts
+++ b/packages/sdk/src/index.ts
@@ -106,6 +106,12 @@ export {
export { type PushModeOptions } from './agent/push.js'
+export {
+ UsageLedger,
+ type UsageSnapshot,
+ type ModelUsageEntry,
+} from './agent/usage-ledger.js'
+
export { agentSpecFastifyPlugin } from './agent/adapters/fastify.js'
export { agentSpecExpressRouter } from './agent/adapters/express.js'
diff --git a/packages/sidecar/src/__tests__/usage.test.ts b/packages/sidecar/src/__tests__/usage.test.ts
new file mode 100644
index 0000000..97f543c
--- /dev/null
+++ b/packages/sidecar/src/__tests__/usage.test.ts
@@ -0,0 +1,117 @@
+/**
+ * Tests for GET /usage — token usage aggregation from the audit ring.
+ */
+
+import { describe, it, expect, beforeEach } from 'vitest'
+import type { FastifyInstance } from 'fastify'
+import { buildControlPlaneApp } from '../control-plane/index.js'
+import { AuditRing } from '../audit-ring.js'
+import { testManifest } from './fixtures.js'
+
+describe('GET /usage', () => {
+ let app: FastifyInstance
+ let ring: AuditRing
+
+ beforeEach(async () => {
+ ring = new AuditRing()
+ app = await buildControlPlaneApp(testManifest, ring, { opaUrl: null })
+ })
+
+ it('returns 200 with empty totals when ring has no model calls', async () => {
+ const res = await app.inject({ method: 'GET', url: '/usage' })
+ expect(res.statusCode).toBe(200)
+
+ const body = res.json()
+ expect(body.totalTokens).toBe(0)
+ expect(body.totalCalls).toBe(0)
+ expect(body.models).toEqual([])
+ expect(body.sampleSize).toBe(0)
+ })
+
+ it('sums tokens across multiple audit entries', async () => {
+ ring.push({
+ requestId: 'r1',
+ timestamp: new Date().toISOString(),
+ method: 'POST',
+ path: '/chat',
+ modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 100 }],
+ })
+ ring.push({
+ requestId: 'r2',
+ timestamp: new Date().toISOString(),
+ method: 'POST',
+ path: '/chat',
+ modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 250 }],
+ })
+
+ const res = await app.inject({ method: 'GET', url: '/usage' })
+ const body = res.json()
+
+ expect(body.totalTokens).toBe(350)
+ expect(body.totalCalls).toBe(2)
+ expect(body.models).toHaveLength(1)
+ expect(body.models[0]).toEqual({
+ modelId: 'openai/gpt-4o',
+ totalTokens: 350,
+ callCount: 2,
+ })
+ expect(body.sampleSize).toBe(2)
+ })
+
+ it('groups by modelId correctly', async () => {
+ ring.push({
+ requestId: 'r1',
+ timestamp: new Date().toISOString(),
+ method: 'POST',
+ path: '/chat',
+ modelCalls: [
+ { modelId: 'openai/gpt-4o', tokenCount: 100 },
+ { modelId: 'anthropic/claude-sonnet-4-6', tokenCount: 200 },
+ ],
+ })
+ ring.push({
+ requestId: 'r2',
+ timestamp: new Date().toISOString(),
+ method: 'POST',
+ path: '/chat',
+ modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 50 }],
+ })
+
+ const res = await app.inject({ method: 'GET', url: '/usage' })
+ const body = res.json()
+
+ expect(body.totalTokens).toBe(350)
+ expect(body.totalCalls).toBe(3)
+ expect(body.models).toHaveLength(2)
+
+ const gpt = body.models.find((m: { modelId: string }) => m.modelId === 'openai/gpt-4o')
+ const claude = body.models.find((m: { modelId: string }) => m.modelId === 'anthropic/claude-sonnet-4-6')
+
+ expect(gpt).toEqual({ modelId: 'openai/gpt-4o', totalTokens: 150, callCount: 2 })
+ expect(claude).toEqual({ modelId: 'anthropic/claude-sonnet-4-6', totalTokens: 200, callCount: 1 })
+ })
+
+ it('safely skips entries without modelCalls', async () => {
+ ring.push({
+ requestId: 'r1',
+ timestamp: new Date().toISOString(),
+ method: 'GET',
+ path: '/health',
+ // no modelCalls
+ })
+ ring.push({
+ requestId: 'r2',
+ timestamp: new Date().toISOString(),
+ method: 'POST',
+ path: '/chat',
+ modelCalls: [{ modelId: 'openai/gpt-4o', tokenCount: 500 }],
+ })
+
+ const res = await app.inject({ method: 'GET', url: '/usage' })
+ const body = res.json()
+
+ expect(body.totalTokens).toBe(500)
+ expect(body.totalCalls).toBe(1)
+ expect(body.sampleSize).toBe(2)
+ })
+})
diff --git a/packages/sidecar/src/control-plane/index.ts b/packages/sidecar/src/control-plane/index.ts
index 07591d3..ea4d4c4 100644
--- a/packages/sidecar/src/control-plane/index.ts
+++ b/packages/sidecar/src/control-plane/index.ts
@@ -11,6 +11,7 @@ import { buildEvalRoutes } from './eval.js'
import { buildGapRoutes } from './gap.js'
import { buildEventsRoutes } from './events.js'
import { buildProofRoutes, ProofStore } from './proof.js'
+import { buildUsageRoutes } from './usage.js'
export interface ControlPlaneOptions {
logger?: boolean
@@ -39,6 +40,7 @@ export async function buildControlPlaneApp(
await buildGapRoutes(app, manifest, auditRing)
await buildEventsRoutes(app, manifest, auditRing, { opaUrl: opts.opaUrl })
await buildProofRoutes(app, opts.proofStore ?? new ProofStore())
+ await buildUsageRoutes(app, auditRing)
return app
}
diff --git a/packages/sidecar/src/control-plane/usage.ts b/packages/sidecar/src/control-plane/usage.ts
new file mode 100644
index 0000000..ff96740
--- /dev/null
+++ b/packages/sidecar/src/control-plane/usage.ts
@@ -0,0 +1,67 @@
+/**
+ * GET /usage — aggregate token usage from the audit ring.
+ *
+ * Scans all entries in the ring buffer, sums modelCalls by modelId,
+ * and returns a UsageResponse with per-model and aggregate totals.
+ */
+
+import type { FastifyInstance } from 'fastify'
+import type { AuditRing, AuditEntry } from '../audit-ring.js'
+
+// ── Types ────────────────────────────────────────────────────────────────────
+
+interface UsageByModel {
+ modelId: string
+ totalTokens: number
+ callCount: number
+}
+
+interface UsageResponse {
+ models: UsageByModel[]
+ totalTokens: number
+ totalCalls: number
+ sampleSize: number
+}
+
+// ── Private helpers ──────────────────────────────────────────────────────────
+
+function aggregateFromRing(entries: AuditEntry[]): UsageResponse {
+ const byModel = new Map()
+ let totalTokens = 0
+ let totalCalls = 0
+
+ for (const entry of entries) {
+ if (!entry.modelCalls) continue
+ for (const call of entry.modelCalls) {
+ let model = byModel.get(call.modelId)
+ if (!model) {
+ model = { modelId: call.modelId, totalTokens: 0, callCount: 0 }
+ byModel.set(call.modelId, model)
+ }
+ model.totalTokens += call.tokenCount
+ model.callCount += 1
+ totalTokens += call.tokenCount
+ totalCalls += 1
+ }
+ }
+
+ return {
+ models: [...byModel.values()],
+ totalTokens,
+ totalCalls,
+ sampleSize: entries.length,
+ }
+}
+
+// ── Route builder ────────────────────────────────────────────────────────────
+
+export async function buildUsageRoutes(
+ app: FastifyInstance,
+ auditRing: AuditRing,
+): Promise {
+ app.get('/usage', async (_request, reply) => {
+ const entries = auditRing.getAll()
+ const usage = aggregateFromRing(entries)
+ return reply.code(200).send(usage)
+ })
+}