From 874670b4aeb8cd8fa0f228af67b314457434fbff Mon Sep 17 00:00:00 2001
From: Jason-Vaughan <95194903+Jason-Vaughan@users.noreply.github.com>
Date: Thu, 18 Jun 2026 13:53:30 -0700
Subject: [PATCH] Add delegate observability + metering (#39 slice 6)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Meter delegated sub-calls and surface them, without corrupting the
spend-avoided headline. The deferred non-local delegate metering.
- measurement: records gain a `kind` field (task|delegate; old records
read as task). `rollup` keeps delegate records OUT of the headline
(no double-count — the parent task already credits the whole job) and
aggregates them separately into `summary["delegates"]` (count, by
backend, est tokens, informational cloud-equiv). `format_rollup` shows
a "Delegated sub-tasks" section. Process-level lock serializes the
log append (delegate_many fans out across threads).
- delegate: `run_delegate` meters each sub-call as kind="delegate" at a
single seam (delegate_many inherits it); a metering error never breaks
the delegation.
- GUI: stats card shows the delegate breakdown.
- README/ARCHITECTURE/CHANGELOG/roster comment + docstrings updated
(the old "not metered" notes corrected). +13 hermetic tests (380 pass)
+ a gated live metering check. Independent Critic: SHIP (3 doc-parity
fixes applied).
The per-parent-task cross-process tree is deferred.
---
ARCHITECTURE.md | 15 ++++-
CHANGELOG.md | 12 ++++
README.md | 9 +--
tanglebrain/config/roster.yaml | 4 +-
tanglebrain/delegate.py | 20 +++++--
tanglebrain/gui/static/index.html | 12 ++++
tanglebrain/measurement.py | 80 +++++++++++++++++++++++---
tests/test_delegate.py | 51 +++++++++++++++++
tests/test_gui.py | 15 +++++
tests/test_live.py | 15 +++++
tests/test_measurement.py | 94 +++++++++++++++++++++++++++++++
11 files changed, 307 insertions(+), 20 deletions(-)
diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md
index 642c36b..785c102 100644
--- a/ARCHITECTURE.md
+++ b/ARCHITECTURE.md
@@ -155,8 +155,9 @@ out concurrently. The **reduce step is deliberately not TangleBrain's** — the
the `delegate_many` results itself (it holds the original task context that makes for good synthesis),
and offloads the stitch with an ordinary `delegate(task=…)` call when the reduction is mechanical and
large. No dedicated reducer tool: the existing primitives cover it, and frontier-side synthesis is the
-better default until usage proves otherwise. Still ahead: orchestration-tree observability (which also
-adds the deferred non-local delegate metering) — whose data would tell us if a reducer ever earns code.
+better default until usage proves otherwise. Delegated sub-calls are now **metered** (see Measurement
+below) — the deferred observability landed as a by-backend breakdown; a per-parent-task tree (linking
+each delegation to its top-level task across processes) is the remaining stretch.
### Measurement — per-task records (`measurement.py`)
@@ -169,6 +170,16 @@ authenticated CLIs expose no usable counts), so one consistent approximate metho
every tier. All measurement I/O is best-effort: logging never breaks routing, and a corrupt record
never breaks the rollup.
+Records carry a `kind`: `"task"` for a top-level routed request (what the spend-avoided headline
+counts) or `"delegate"` for a sub-call offloaded through `run_delegate` (every delegation, including
+each `delegate_many` item, is metered at that single seam). The rollup keeps delegate records **out of
+the headline** — the parent task already credits the whole job, so counting the sub-calls again would
+double-count the saving — and aggregates them **separately** into a by-backend breakdown (count, est
+tokens, informational cloud-equiv) surfaced in `--stats` and the GUI. Concurrent appends (delegate_many
+fans out across threads) are serialized by a process-level lock. Linking each delegation to its
+*specific* parent task across processes (a true tree) is deferred — it needs a task-id propagated
+through the orchestrator CLIs to the MCP child, which can't be verified hermetically.
+
### Knob GUI — localhost panel (`gui/`)
`tanglebrain-gui` serves a thin, **localhost-only** web panel (stdlib `http.server` + a single
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ce9a1be..304c549 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
+### Added
+
+- **Delegate observability + metering (scatter-gather roadmap #39, slice 6).** Delegated sub-calls
+ are now metered: every `run_delegate` execution (including each `delegate_many` item — metered at one
+ seam) is logged as a `kind: delegate` usage record with its served backend + estimated tokens.
+ `tanglebrain --stats` and the knob panel gain a **"Delegated sub-tasks" breakdown by backend** (count,
+ est tokens, informational cloud-equiv). Delegate records are kept **out of the "spend avoided"
+ headline** so a sub-call's saving is never double-counted against its parent task, and concurrent
+ fan-out appends are serialized by a process-level lock. Records carry a new `kind` field
+ (`task`/`delegate`; older records read as `task`). The per-parent-task tree (cross-process linkage)
+ is deferred. Closes the deferred metering noted since the measurement layer landed.
+
### Internal
- **Documented the synthesis/reduce pattern (scatter-gather roadmap #39, slice 4).** README +
diff --git a/README.md b/README.md
index e3612ab..7753f6c 100644
--- a/README.md
+++ b/README.md
@@ -225,10 +225,11 @@ adapters as the CLI above, so endpoints and keys live in one place. It exposes f
Make a backend a delegate target by flagging its roster entry `can_delegate: true` (mirrors
`can_orchestrate`). The shipped example flags the local tier, so the menu is non-empty out of the
-box. **Note:** non-local delegate spend is **not metered** in this version — orchestration-tree
-observability is on the [scatter-gather roadmap](https://github.com/Jason-Vaughan/TangleBrain/issues/39).
-Any non-local target is opt-in and your responsibility under that provider's terms — see
-[DISCLAIMER.md](DISCLAIMER.md).
+box. **Delegated sub-calls are now metered**: each is logged as a `kind: delegate` usage record, and
+`tanglebrain --stats` (and the knob panel) show a "Delegated sub-tasks" breakdown by backend (count,
+est tokens, informational cloud-equiv). These are kept **out of** the "spend avoided" headline so a
+sub-call's saving is never double-counted against its parent task. Any non-local target is opt-in and
+your responsibility under that provider's terms — see [DISCLAIMER.md](DISCLAIMER.md).
**Synthesising fan-out results.** The full pattern is decompose → fan out (`delegate_many`) →
**reduce** → answer. TangleBrain ships the dispatch primitives but deliberately does *not* own the
diff --git a/tanglebrain/config/roster.yaml b/tanglebrain/config/roster.yaml
index e14c4c8..2a02c41 100644
--- a/tanglebrain/config/roster.yaml
+++ b/tanglebrain/config/roster.yaml
@@ -44,8 +44,8 @@
# e.g. a cheaper sub or a better-fit model. Flag any entry `can_delegate: true` and it joins the
# menu the `delegate` tool advertises; the model picks a target by its `good_at` fit. A target is
# invoked as a leaf (it never gets its own delegate tool — no recursion). `api` targets still obey
-# the billing gate; non-local delegate spend is not metered in this version. This example is an
-# openai-compat backend; uncomment + point it at one you hold.
+# the billing gate; delegated sub-calls are metered separately (see `tanglebrain --stats`). This
+# example is an openai-compat backend; uncomment + point it at one you hold.
#
# - id: cheap-remote
# tier: sub
diff --git a/tanglebrain/delegate.py b/tanglebrain/delegate.py
index f92bb12..c185afd 100644
--- a/tanglebrain/delegate.py
+++ b/tanglebrain/delegate.py
@@ -24,6 +24,7 @@
import sys
from concurrent.futures import ThreadPoolExecutor
+from tanglebrain.measurement import record_task
from tanglebrain.roster import ROSTER_ENV_VAR, Roster, RosterEntry, load_roster
from tanglebrain.selector import SelectionError, build_adapter, select_local
from tanglebrain.settings import Settings, load_settings
@@ -231,8 +232,9 @@ def run_delegate(
The selected target is built as a **leaf** (``inject_delegate=False``) — a delegate target never
receives its own delegate tool, so there is no recursive delegation. ``api`` targets (reachable
only via explicit ``target``) flow through the existing billing gate in
- :func:`tanglebrain.selector.build_adapter`; ``cli`` targets keep their env-scrub. Non-local
- delegate spend is **not metered** in this version (consistent with the existing delegate posture).
+ :func:`tanglebrain.selector.build_adapter`; ``cli`` targets keep their env-scrub. Each sub-call is
+ **metered** as a ``kind="delegate"`` usage record, kept out of the spend-avoided headline (see
+ :func:`tanglebrain.measurement.rollup`).
Args:
prompt: The self-contained sub-task to delegate. Give it everything it needs — the target
@@ -265,7 +267,16 @@ def run_delegate(
else:
entry = select_local(roster)
adapter = build_adapter(entry, inject_delegate=False)
- return adapter.run(prompt, {"max_tokens": max_tokens})
+ text = adapter.run(prompt, {"max_tokens": max_tokens})
+ # Meter the sub-call for orchestration-tree observability. Tagged kind="delegate" so the rollup
+ # keeps it OUT of the spend-avoided headline (the parent task already credits the whole job) and
+ # in a separate by-backend breakdown. record_task never raises; the extra guard is belt-and-
+ # suspenders — metering must never break a delegation.
+ try:
+ record_task(path="delegate", entry=entry, prompt=prompt, response=text, kind="delegate")
+ except Exception:
+ pass
+ return text
#: Concurrency cap to use only when :func:`os.cpu_count` can't report a core count (rare).
@@ -361,7 +372,8 @@ def run_delegate_many(
Partial failure never sinks the batch: each result carries a ``status`` (``ok`` / ``no_fit`` /
``error``), and results are returned **ordered by input index** even though workers finish out of
- order. Non-local delegate spend is **not metered** (consistent with the rest of the delegate).
+ order. Each dispatched sub-call is metered (via :func:`run_delegate`) as a ``kind="delegate"``
+ usage record, kept out of the spend-avoided headline.
Args:
tasks: A list of task descriptors, each a mapping ``{prompt, target?, task?, max_tokens?}``.
diff --git a/tanglebrain/gui/static/index.html b/tanglebrain/gui/static/index.html
index 3b300df..ea6a977 100644
--- a/tanglebrain/gui/static/index.html
+++ b/tanglebrain/gui/static/index.html
@@ -116,6 +116,18 @@
Pricing reference
Est. tokens (in / out)
${(s.in_tokens_est||0).toLocaleString()} / ${(s.out_tokens_est||0).toLocaleString()}
`;
if (d.is_placeholder) html += `⚠ pricing: PLACEHOLDER — figures illustrative until the anchor is ratified.
`;
+ const dg = s.delegates || {};
+ if (dg.count) {
+ const backends = Object.entries(dg.by_backend || {}).map(([k, v]) => `${esc(k)} ${v.count || 0}`).join(", ") || "—";
+ html += `Delegated sub-tasks (offloaded by orchestrators)
+
+
+
+
Est. tokens (in / out)
${(dg.in_tokens_est||0).toLocaleString()} / ${(dg.out_tokens_est||0).toLocaleString()}
+
Cloud-equiv
${money(dg.cloud_equiv_usd)}
+
+ Informational — delegate spend is already credited within its parent task's “spend avoided”.
`;
+ }
$("statsCard").innerHTML = html;
} catch (e) { $("statsCard").innerHTML = `failed to load stats`; }
}
diff --git a/tanglebrain/measurement.py b/tanglebrain/measurement.py
index d500756..c83dc11 100644
--- a/tanglebrain/measurement.py
+++ b/tanglebrain/measurement.py
@@ -22,6 +22,7 @@
import json
import os
import shutil
+import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
@@ -32,6 +33,11 @@
LOG_FILENAME = "usage.jsonl"
+# Serializes appends to the usage log so concurrent writers in one process (delegate_many fans
+# sub-tasks out across threads) can't interleave bytes mid-line. Per-process only; cross-process
+# appends rely on the OS's O_APPEND atomicity for short lines, as before.
+_LOG_LOCK = threading.Lock()
+
# Chars per token for the uniform estimation heuristic. ~4 chars/token is the standard rough
# approximation for English-ish text across modern BPE tokenizers; good enough for an *estimate*.
_CHARS_PER_TOKEN = 4
@@ -286,17 +292,25 @@ def record_task(
entry: object,
prompt: str,
response: str,
+ kind: str = "task",
log_path: str | os.PathLike[str] | None = None,
pricing: Pricing | None = None,
) -> None:
- """Append one usage record for a routed task. Never raises — a logging failure is dropped.
+ """Append one usage record for a routed task or a delegated sub-call. Never raises.
+
+ A logging failure is dropped — measurement is a side-effect that must never affect the returned
+ answer.
Args:
- path: Which execution path served the task — ``router`` | ``local`` | ``model``.
+ path: Which execution path served the work — ``router`` | ``local`` | ``model`` |
+ ``delegate``.
entry: The served :class:`~tanglebrain.roster.RosterEntry` (read for ``tier``/``id``); may
be ``None`` (e.g. the router didn't surface one), in which case both are ``"unknown"``.
- prompt: The task prompt (for input-token estimation).
+ prompt: The prompt (for input-token estimation).
response: The returned response text (for output-token estimation).
+ kind: ``"task"`` for a top-level routed task (the default; what the spend-avoided headline
+ counts) or ``"delegate"`` for a delegated sub-call. Delegate records are rolled up
+ **separately** so a sub-call's saving is never double-counted against its parent task.
log_path: Override the usage-log path (tests inject a temp path). Defaults to
:func:`default_log_path`.
pricing: Override the pricing. Defaults to :func:`load_pricing`.
@@ -314,6 +328,7 @@ def record_task(
avoided = 0.0 if tier == "api" else equiv
record = {
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
+ "kind": str(kind),
"path": str(path),
"tier": str(tier),
"model": str(model),
@@ -325,8 +340,9 @@ def record_task(
}
target = Path(log_path) if log_path is not None else default_log_path()
target.parent.mkdir(parents=True, exist_ok=True)
- with target.open("a", encoding="utf-8") as fh:
- fh.write(json.dumps(record) + "\n")
+ with _LOG_LOCK:
+ with target.open("a", encoding="utf-8") as fh:
+ fh.write(json.dumps(record) + "\n")
except Exception:
# Measurement is a side-effect: a failure here must never affect the returned answer.
return
@@ -385,7 +401,11 @@ def rollup(records: list[dict]) -> dict:
Returns:
A dict with: ``tasks`` (int), ``by_tier`` (tier → count), ``in_tokens_est`` /
``out_tokens_est`` (summed estimates), and ``cloud_equiv_usd`` / ``spend_avoided_usd``
- (summed dollars).
+ (summed dollars) — all over **top-level tasks only** — plus ``delegates``, a separate
+ sub-rollup of delegated sub-calls ``{count, by_backend: {model: {count, in_tokens_est,
+ out_tokens_est}}, in_tokens_est, out_tokens_est, cloud_equiv_usd}``. Delegate records are
+ kept out of the headline so a sub-call's saving is never double-counted against its parent
+ task; their cloud-equiv is informational. A record without a ``kind`` field counts as a task.
"""
summary: dict = {
"tasks": 0,
@@ -395,16 +415,40 @@ def rollup(records: list[dict]) -> dict:
"cloud_equiv_usd": 0.0,
"spend_avoided_usd": 0.0,
}
+ delegates: dict = {
+ "count": 0,
+ "by_backend": {},
+ "in_tokens_est": 0,
+ "out_tokens_est": 0,
+ "cloud_equiv_usd": 0.0,
+ }
for r in records:
+ in_tok = _as_int(r.get("in_tokens_est"))
+ out_tok = _as_int(r.get("out_tokens_est"))
+ if str(r.get("kind", "task")) == "delegate":
+ delegates["count"] += 1
+ model = str(r.get("model", "unknown"))
+ backend = delegates["by_backend"].setdefault(
+ model, {"count": 0, "in_tokens_est": 0, "out_tokens_est": 0}
+ )
+ backend["count"] += 1
+ backend["in_tokens_est"] += in_tok
+ backend["out_tokens_est"] += out_tok
+ delegates["in_tokens_est"] += in_tok
+ delegates["out_tokens_est"] += out_tok
+ delegates["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd"))
+ continue
summary["tasks"] += 1
tier = str(r.get("tier", "unknown"))
summary["by_tier"][tier] = summary["by_tier"].get(tier, 0) + 1
- summary["in_tokens_est"] += _as_int(r.get("in_tokens_est"))
- summary["out_tokens_est"] += _as_int(r.get("out_tokens_est"))
+ summary["in_tokens_est"] += in_tok
+ summary["out_tokens_est"] += out_tok
summary["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd"))
summary["spend_avoided_usd"] += _as_float(r.get("spend_avoided_usd"))
summary["cloud_equiv_usd"] = round(summary["cloud_equiv_usd"], 4)
summary["spend_avoided_usd"] = round(summary["spend_avoided_usd"], 4)
+ delegates["cloud_equiv_usd"] = round(delegates["cloud_equiv_usd"], 4)
+ summary["delegates"] = delegates
return summary
@@ -438,4 +482,24 @@ def format_rollup(summary: dict, pricing: Pricing) -> str:
" ⚠ pricing: PLACEHOLDER — figures are illustrative; set real rates in "
"config/pricing.yaml and flip placeholder to false."
)
+
+ delegates = summary.get("delegates") or {}
+ if delegates.get("count"):
+ lines.append("")
+ lines.append(" Delegated sub-tasks (offloaded by orchestrators)")
+ lines.append(f" Count: {delegates.get('count', 0)}")
+ by_backend = delegates.get("by_backend") or {}
+ if by_backend:
+ backends = ", ".join(
+ f"{model} {info.get('count', 0)}" for model, info in sorted(by_backend.items())
+ )
+ lines.append(f" By backend: {backends}")
+ lines.append(
+ f" Est. tokens: in {delegates.get('in_tokens_est', 0):,} / "
+ f"out {delegates.get('out_tokens_est', 0):,}"
+ )
+ lines.append(
+ f" Cloud-equiv: ${delegates.get('cloud_equiv_usd', 0.0):,.2f} "
+ "(informational — already credited within parent tasks)"
+ )
return "\n".join(lines)
diff --git a/tests/test_delegate.py b/tests/test_delegate.py
index 9f5aee7..ef77687 100644
--- a/tests/test_delegate.py
+++ b/tests/test_delegate.py
@@ -72,6 +72,12 @@ def test_substitutions_cover_both_tokens(self):
class RunLocalDelegateTest(unittest.TestCase):
+ def setUp(self):
+ # run_delegate now meters via record_task; stub it so these tests never touch the real log.
+ patcher = patch("tanglebrain.delegate.record_task")
+ patcher.start()
+ self.addCleanup(patcher.stop)
+
def test_returns_adapter_text(self):
adapter = MagicMock()
adapter.run.return_value = "grunt result"
@@ -167,6 +173,11 @@ def test_adapter_error_propagates(self):
class RunDelegateTargetTest(unittest.TestCase):
"""The generalized delegate: ``run_delegate(target=...)`` resolution, opt-in, and no-recursion."""
+ def setUp(self):
+ patcher = patch("tanglebrain.delegate.record_task")
+ patcher.start()
+ self.addCleanup(patcher.stop)
+
def test_target_none_uses_local_as_a_leaf(self):
# target=None must still route to the local tier AND build it as a leaf (inject_delegate
# False) — a delegate target never gets its own delegate tool, so no recursive delegation.
@@ -280,6 +291,11 @@ def test_available_capabilities_sorted_unique_excludes_api(self):
class RunDelegateCapabilityTest(unittest.TestCase):
"""run_delegate routing by task, plus target>task precedence."""
+ def setUp(self):
+ patcher = patch("tanglebrain.delegate.record_task")
+ patcher.start()
+ self.addCleanup(patcher.stop)
+
def test_task_routes_to_selected_leaf(self):
entry = _entry("sub-a", tier="sub", good_at=["code"])
roster = _roster_of(entry)
@@ -485,5 +501,40 @@ def submit(self, fn, *args):
self.assertEqual(captured["max_workers"], 3)
+class DelegateMeteringTest(unittest.TestCase):
+ """run_delegate meters each sub-call as a kind='delegate' usage record (observability)."""
+
+ def test_records_delegate_kind(self):
+ entry = _entry("local-x", tier="local", can_delegate=True)
+ adapter = MagicMock()
+ adapter.run.return_value = "result-text"
+ with patch("tanglebrain.delegate.load_roster", return_value=MagicMock()), patch(
+ "tanglebrain.delegate.select_local", return_value=entry
+ ), patch("tanglebrain.delegate.build_adapter", return_value=adapter), patch(
+ "tanglebrain.delegate.record_task"
+ ) as rec:
+ out = run_delegate("do it")
+ self.assertEqual(out, "result-text")
+ rec.assert_called_once()
+ kw = rec.call_args.kwargs
+ self.assertEqual(kw["kind"], "delegate")
+ self.assertEqual(kw["path"], "delegate")
+ self.assertIs(kw["entry"], entry)
+ self.assertEqual(kw["prompt"], "do it")
+ self.assertEqual(kw["response"], "result-text")
+
+ def test_metering_failure_never_breaks_delegation(self):
+ entry = _entry("local-x", tier="local")
+ adapter = MagicMock()
+ adapter.run.return_value = "answer"
+ with patch("tanglebrain.delegate.load_roster", return_value=MagicMock()), patch(
+ "tanglebrain.delegate.select_local", return_value=entry
+ ), patch("tanglebrain.delegate.build_adapter", return_value=adapter), patch(
+ "tanglebrain.delegate.record_task", side_effect=RuntimeError("log boom")
+ ):
+ out = run_delegate("q")
+ self.assertEqual(out, "answer") # metering error swallowed, answer still returned
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_gui.py b/tests/test_gui.py
index 2cb4be9..4c8020a 100644
--- a/tests/test_gui.py
+++ b/tests/test_gui.py
@@ -109,6 +109,21 @@ def test_rolls_up_records(self):
self.assertAlmostEqual(out["summary"]["spend_avoided_usd"], 1.5)
self.assertIn("is_placeholder", out)
+ def test_includes_delegate_breakdown(self):
+ recs = [
+ {"kind": "task", "tier": "sub", "in_tokens_est": 5, "out_tokens_est": 5,
+ "cloud_equiv_usd": 0.5, "spend_avoided_usd": 0.5},
+ {"kind": "delegate", "model": "local-x", "in_tokens_est": 40, "out_tokens_est": 60,
+ "cloud_equiv_usd": 2.0},
+ ]
+ with patch("tanglebrain.gui.views.read_records", return_value=recs):
+ out = views.view_stats()
+ # Headline stays task-only; delegates surface separately for the panel's fan-out breakdown.
+ self.assertEqual(out["summary"]["tasks"], 1)
+ delegates = out["summary"]["delegates"]
+ self.assertEqual(delegates["count"], 1)
+ self.assertEqual(delegates["by_backend"]["local-x"]["count"], 1)
+
class RunPromptTest(unittest.TestCase):
def test_happy_path_reports_served(self):
diff --git a/tests/test_live.py b/tests/test_live.py
index 0e5c7ca..a42acb0 100644
--- a/tests/test_live.py
+++ b/tests/test_live.py
@@ -110,6 +110,21 @@ def test_fan_out_runs_concurrently_and_returns_ordered_results(self):
self.assertTrue(all(r["status"] == "ok" for r in results), results)
self.assertTrue(all(r["text"].strip() for r in results))
+ def test_fan_out_writes_delegate_usage_records(self):
+ # Observability: the fan-out meters each sub-call as a kind='delegate' usage record. Pin a
+ # temp state dir so the real ~/.cache log is untouched.
+ import tempfile
+ from tanglebrain.measurement import read_records, rollup
+
+ with tempfile.TemporaryDirectory() as tmp:
+ with patch.dict(os.environ, {"TANGLEBRAIN_STATE_DIR": tmp}, clear=False):
+ run_delegate_many(
+ [{"prompt": "Reply with exactly: a"}, {"prompt": "Reply with exactly: b"}]
+ )
+ summary = rollup(read_records())
+ self.assertEqual(summary["delegates"]["count"], 2)
+ self.assertEqual(summary["tasks"], 0) # delegates are not headline tasks
+
@unittest.skipUnless(LIVE, "set TANGLEBRAIN_LIVE=1 to run the live orchestrated-delegation test")
class LiveDelegateInjectionTest(unittest.TestCase):
diff --git a/tests/test_measurement.py b/tests/test_measurement.py
index 4c9b807..580d64f 100644
--- a/tests/test_measurement.py
+++ b/tests/test_measurement.py
@@ -309,5 +309,99 @@ def test_honors_state_dir_env(self):
self.assertEqual(default_log_path(), Path("/tmp/tb-test/usage.jsonl"))
+class DelegateObservabilityTest(unittest.TestCase):
+ """kind='delegate' records: written, kept out of the headline, rolled up separately, thread-safe."""
+
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp()
+ self.log = str(Path(self.tmp) / "usage.jsonl")
+
+ def _read(self):
+ return read_records(self.log)
+
+ def test_record_defaults_to_task_kind(self):
+ record_task(path="router", entry=FakeEntry("claude", "sub"),
+ prompt="hi", response="yo", log_path=self.log, pricing=FIXED)
+ self.assertEqual(self._read()[0]["kind"], "task")
+
+ def test_record_delegate_kind(self):
+ record_task(path="delegate", entry=FakeEntry("local-x", "local"),
+ prompt="hi", response="yo", kind="delegate", log_path=self.log, pricing=FIXED)
+ self.assertEqual(self._read()[0]["kind"], "delegate")
+
+ def test_rollup_excludes_delegates_from_headline(self):
+ records = [
+ {"kind": "task", "tier": "sub", "in_tokens_est": 10, "out_tokens_est": 10,
+ "cloud_equiv_usd": 1.0, "spend_avoided_usd": 1.0},
+ {"kind": "delegate", "model": "local-x", "in_tokens_est": 100, "out_tokens_est": 200,
+ "cloud_equiv_usd": 5.0, "spend_avoided_usd": 5.0},
+ ]
+ s = rollup(records)
+ # Headline counts the one task only — delegate tokens/spend must NOT inflate it.
+ self.assertEqual(s["tasks"], 1)
+ self.assertEqual(s["by_tier"], {"sub": 1})
+ self.assertEqual(s["in_tokens_est"], 10)
+ self.assertEqual(s["out_tokens_est"], 10)
+ self.assertAlmostEqual(s["spend_avoided_usd"], 1.0)
+ # Delegate sub-rollup is separate + informational.
+ d = s["delegates"]
+ self.assertEqual(d["count"], 1)
+ self.assertEqual(
+ d["by_backend"],
+ {"local-x": {"count": 1, "in_tokens_est": 100, "out_tokens_est": 200}},
+ )
+ self.assertEqual(d["in_tokens_est"], 100)
+ self.assertEqual(d["out_tokens_est"], 200)
+ self.assertAlmostEqual(d["cloud_equiv_usd"], 5.0)
+
+ def test_kindless_record_counts_as_task(self):
+ s = rollup([{"tier": "local", "in_tokens_est": 4, "spend_avoided_usd": 0.2}])
+ self.assertEqual(s["tasks"], 1)
+ self.assertEqual(s["delegates"]["count"], 0)
+
+ def test_by_backend_aggregates_multiple(self):
+ records = [
+ {"kind": "delegate", "model": "local-x", "in_tokens_est": 10, "out_tokens_est": 5},
+ {"kind": "delegate", "model": "local-x", "in_tokens_est": 20, "out_tokens_est": 5},
+ {"kind": "delegate", "model": "cheap-sub", "in_tokens_est": 1, "out_tokens_est": 1},
+ ]
+ d = rollup(records)["delegates"]
+ self.assertEqual(d["count"], 3)
+ self.assertEqual(d["by_backend"]["local-x"]["count"], 2)
+ self.assertEqual(d["by_backend"]["local-x"]["in_tokens_est"], 30)
+ self.assertEqual(d["by_backend"]["cheap-sub"]["count"], 1)
+
+ def test_format_shows_delegate_section_when_present(self):
+ s = rollup([
+ {"kind": "task", "tier": "sub", "in_tokens_est": 1, "out_tokens_est": 1,
+ "cloud_equiv_usd": 0.1, "spend_avoided_usd": 0.1},
+ {"kind": "delegate", "model": "local-x", "in_tokens_est": 50, "out_tokens_est": 50,
+ "cloud_equiv_usd": 2.0},
+ ])
+ out = format_rollup(s, FIXED)
+ self.assertIn("Delegated sub-tasks", out)
+ self.assertIn("local-x", out)
+
+ def test_format_omits_delegate_section_when_absent(self):
+ s = rollup([{"kind": "task", "tier": "sub", "spend_avoided_usd": 0.1}])
+ self.assertNotIn("Delegated sub-tasks", format_rollup(s, FIXED))
+
+ def test_concurrent_appends_are_serialized(self):
+ import threading
+
+ def worker(n):
+ record_task(path="delegate", entry=FakeEntry(f"m{n}", "local"),
+ prompt="p", response="r", kind="delegate", log_path=self.log, pricing=FIXED)
+
+ threads = [threading.Thread(target=worker, args=(i,)) for i in range(20)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+ recs = self._read()
+ self.assertEqual(len(recs), 20) # 20 well-formed lines — no interleaved/corrupted writes
+ self.assertTrue(all(r.get("kind") == "delegate" for r in recs))
+
+
if __name__ == "__main__":
unittest.main()