diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 642c36b..785c102 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -155,8 +155,9 @@ out concurrently. The **reduce step is deliberately not TangleBrain's** — the the `delegate_many` results itself (it holds the original task context that makes for good synthesis), and offloads the stitch with an ordinary `delegate(task=…)` call when the reduction is mechanical and large. No dedicated reducer tool: the existing primitives cover it, and frontier-side synthesis is the -better default until usage proves otherwise. Still ahead: orchestration-tree observability (which also -adds the deferred non-local delegate metering) — whose data would tell us if a reducer ever earns code. +better default until usage proves otherwise. Delegated sub-calls are now **metered** (see Measurement +below) — the deferred observability landed as a by-backend breakdown; a per-parent-task tree (linking +each delegation to its top-level task across processes) is the remaining stretch. ### Measurement — per-task records (`measurement.py`) @@ -169,6 +170,16 @@ authenticated CLIs expose no usable counts), so one consistent approximate metho every tier. All measurement I/O is best-effort: logging never breaks routing, and a corrupt record never breaks the rollup. +Records carry a `kind`: `"task"` for a top-level routed request (what the spend-avoided headline +counts) or `"delegate"` for a sub-call offloaded through `run_delegate` (every delegation, including +each `delegate_many` item, is metered at that single seam). The rollup keeps delegate records **out of +the headline** — the parent task already credits the whole job, so counting the sub-calls again would +double-count the saving — and aggregates them **separately** into a by-backend breakdown (count, est +tokens, informational cloud-equiv) surfaced in `--stats` and the GUI. Concurrent appends (delegate_many +fans out across threads) are serialized by a process-level lock. Linking each delegation to its +*specific* parent task across processes (a true tree) is deferred — it needs a task-id propagated +through the orchestrator CLIs to the MCP child, which can't be verified hermetically. + ### Knob GUI — localhost panel (`gui/`) `tanglebrain-gui` serves a thin, **localhost-only** web panel (stdlib `http.server` + a single diff --git a/CHANGELOG.md b/CHANGELOG.md index ce9a1be..304c549 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Delegate observability + metering (scatter-gather roadmap #39, slice 6).** Delegated sub-calls + are now metered: every `run_delegate` execution (including each `delegate_many` item — metered at one + seam) is logged as a `kind: delegate` usage record with its served backend + estimated tokens. + `tanglebrain --stats` and the knob panel gain a **"Delegated sub-tasks" breakdown by backend** (count, + est tokens, informational cloud-equiv). Delegate records are kept **out of the "spend avoided" + headline** so a sub-call's saving is never double-counted against its parent task, and concurrent + fan-out appends are serialized by a process-level lock. Records carry a new `kind` field + (`task`/`delegate`; older records read as `task`). The per-parent-task tree (cross-process linkage) + is deferred. Closes the deferred metering noted since the measurement layer landed. + ### Internal - **Documented the synthesis/reduce pattern (scatter-gather roadmap #39, slice 4).** README + diff --git a/README.md b/README.md index e3612ab..7753f6c 100644 --- a/README.md +++ b/README.md @@ -225,10 +225,11 @@ adapters as the CLI above, so endpoints and keys live in one place. It exposes f Make a backend a delegate target by flagging its roster entry `can_delegate: true` (mirrors `can_orchestrate`). The shipped example flags the local tier, so the menu is non-empty out of the -box. **Note:** non-local delegate spend is **not metered** in this version — orchestration-tree -observability is on the [scatter-gather roadmap](https://github.com/Jason-Vaughan/TangleBrain/issues/39). -Any non-local target is opt-in and your responsibility under that provider's terms — see -[DISCLAIMER.md](DISCLAIMER.md). +box. **Delegated sub-calls are now metered**: each is logged as a `kind: delegate` usage record, and +`tanglebrain --stats` (and the knob panel) show a "Delegated sub-tasks" breakdown by backend (count, +est tokens, informational cloud-equiv). These are kept **out of** the "spend avoided" headline so a +sub-call's saving is never double-counted against its parent task. Any non-local target is opt-in and +your responsibility under that provider's terms — see [DISCLAIMER.md](DISCLAIMER.md). **Synthesising fan-out results.** The full pattern is decompose → fan out (`delegate_many`) → **reduce** → answer. TangleBrain ships the dispatch primitives but deliberately does *not* own the diff --git a/tanglebrain/config/roster.yaml b/tanglebrain/config/roster.yaml index e14c4c8..2a02c41 100644 --- a/tanglebrain/config/roster.yaml +++ b/tanglebrain/config/roster.yaml @@ -44,8 +44,8 @@ # e.g. a cheaper sub or a better-fit model. Flag any entry `can_delegate: true` and it joins the # menu the `delegate` tool advertises; the model picks a target by its `good_at` fit. A target is # invoked as a leaf (it never gets its own delegate tool — no recursion). `api` targets still obey -# the billing gate; non-local delegate spend is not metered in this version. This example is an -# openai-compat backend; uncomment + point it at one you hold. +# the billing gate; delegated sub-calls are metered separately (see `tanglebrain --stats`). This +# example is an openai-compat backend; uncomment + point it at one you hold. # # - id: cheap-remote # tier: sub diff --git a/tanglebrain/delegate.py b/tanglebrain/delegate.py index f92bb12..c185afd 100644 --- a/tanglebrain/delegate.py +++ b/tanglebrain/delegate.py @@ -24,6 +24,7 @@ import sys from concurrent.futures import ThreadPoolExecutor +from tanglebrain.measurement import record_task from tanglebrain.roster import ROSTER_ENV_VAR, Roster, RosterEntry, load_roster from tanglebrain.selector import SelectionError, build_adapter, select_local from tanglebrain.settings import Settings, load_settings @@ -231,8 +232,9 @@ def run_delegate( The selected target is built as a **leaf** (``inject_delegate=False``) — a delegate target never receives its own delegate tool, so there is no recursive delegation. ``api`` targets (reachable only via explicit ``target``) flow through the existing billing gate in - :func:`tanglebrain.selector.build_adapter`; ``cli`` targets keep their env-scrub. Non-local - delegate spend is **not metered** in this version (consistent with the existing delegate posture). + :func:`tanglebrain.selector.build_adapter`; ``cli`` targets keep their env-scrub. Each sub-call is + **metered** as a ``kind="delegate"`` usage record, kept out of the spend-avoided headline (see + :func:`tanglebrain.measurement.rollup`). Args: prompt: The self-contained sub-task to delegate. Give it everything it needs — the target @@ -265,7 +267,16 @@ def run_delegate( else: entry = select_local(roster) adapter = build_adapter(entry, inject_delegate=False) - return adapter.run(prompt, {"max_tokens": max_tokens}) + text = adapter.run(prompt, {"max_tokens": max_tokens}) + # Meter the sub-call for orchestration-tree observability. Tagged kind="delegate" so the rollup + # keeps it OUT of the spend-avoided headline (the parent task already credits the whole job) and + # in a separate by-backend breakdown. record_task never raises; the extra guard is belt-and- + # suspenders — metering must never break a delegation. + try: + record_task(path="delegate", entry=entry, prompt=prompt, response=text, kind="delegate") + except Exception: + pass + return text #: Concurrency cap to use only when :func:`os.cpu_count` can't report a core count (rare). @@ -361,7 +372,8 @@ def run_delegate_many( Partial failure never sinks the batch: each result carries a ``status`` (``ok`` / ``no_fit`` / ``error``), and results are returned **ordered by input index** even though workers finish out of - order. Non-local delegate spend is **not metered** (consistent with the rest of the delegate). + order. Each dispatched sub-call is metered (via :func:`run_delegate`) as a ``kind="delegate"`` + usage record, kept out of the spend-avoided headline. Args: tasks: A list of task descriptors, each a mapping ``{prompt, target?, task?, max_tokens?}``. diff --git a/tanglebrain/gui/static/index.html b/tanglebrain/gui/static/index.html index 3b300df..ea6a977 100644 --- a/tanglebrain/gui/static/index.html +++ b/tanglebrain/gui/static/index.html @@ -116,6 +116,18 @@

Pricing reference

Est. tokens (in / out)
${(s.in_tokens_est||0).toLocaleString()} / ${(s.out_tokens_est||0).toLocaleString()}
`; if (d.is_placeholder) html += `
⚠ pricing: PLACEHOLDER — figures illustrative until the anchor is ratified.
`; + const dg = s.delegates || {}; + if (dg.count) { + const backends = Object.entries(dg.by_backend || {}).map(([k, v]) => `${esc(k)} ${v.count || 0}`).join(", ") || "—"; + html += `

Delegated sub-tasks (offloaded by orchestrators)

+
+
Sub-tasks
${dg.count}
+
By backend
${backends}
+
Est. tokens (in / out)
${(dg.in_tokens_est||0).toLocaleString()} / ${(dg.out_tokens_est||0).toLocaleString()}
+
Cloud-equiv
${money(dg.cloud_equiv_usd)}
+
+
Informational — delegate spend is already credited within its parent task's “spend avoided”.
`; + } $("statsCard").innerHTML = html; } catch (e) { $("statsCard").innerHTML = `failed to load stats`; } } diff --git a/tanglebrain/measurement.py b/tanglebrain/measurement.py index d500756..c83dc11 100644 --- a/tanglebrain/measurement.py +++ b/tanglebrain/measurement.py @@ -22,6 +22,7 @@ import json import os import shutil +import threading from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -32,6 +33,11 @@ LOG_FILENAME = "usage.jsonl" +# Serializes appends to the usage log so concurrent writers in one process (delegate_many fans +# sub-tasks out across threads) can't interleave bytes mid-line. Per-process only; cross-process +# appends rely on the OS's O_APPEND atomicity for short lines, as before. +_LOG_LOCK = threading.Lock() + # Chars per token for the uniform estimation heuristic. ~4 chars/token is the standard rough # approximation for English-ish text across modern BPE tokenizers; good enough for an *estimate*. _CHARS_PER_TOKEN = 4 @@ -286,17 +292,25 @@ def record_task( entry: object, prompt: str, response: str, + kind: str = "task", log_path: str | os.PathLike[str] | None = None, pricing: Pricing | None = None, ) -> None: - """Append one usage record for a routed task. Never raises — a logging failure is dropped. + """Append one usage record for a routed task or a delegated sub-call. Never raises. + + A logging failure is dropped — measurement is a side-effect that must never affect the returned + answer. Args: - path: Which execution path served the task — ``router`` | ``local`` | ``model``. + path: Which execution path served the work — ``router`` | ``local`` | ``model`` | + ``delegate``. entry: The served :class:`~tanglebrain.roster.RosterEntry` (read for ``tier``/``id``); may be ``None`` (e.g. the router didn't surface one), in which case both are ``"unknown"``. - prompt: The task prompt (for input-token estimation). + prompt: The prompt (for input-token estimation). response: The returned response text (for output-token estimation). + kind: ``"task"`` for a top-level routed task (the default; what the spend-avoided headline + counts) or ``"delegate"`` for a delegated sub-call. Delegate records are rolled up + **separately** so a sub-call's saving is never double-counted against its parent task. log_path: Override the usage-log path (tests inject a temp path). Defaults to :func:`default_log_path`. pricing: Override the pricing. Defaults to :func:`load_pricing`. @@ -314,6 +328,7 @@ def record_task( avoided = 0.0 if tier == "api" else equiv record = { "ts": datetime.now(timezone.utc).isoformat(timespec="seconds"), + "kind": str(kind), "path": str(path), "tier": str(tier), "model": str(model), @@ -325,8 +340,9 @@ def record_task( } target = Path(log_path) if log_path is not None else default_log_path() target.parent.mkdir(parents=True, exist_ok=True) - with target.open("a", encoding="utf-8") as fh: - fh.write(json.dumps(record) + "\n") + with _LOG_LOCK: + with target.open("a", encoding="utf-8") as fh: + fh.write(json.dumps(record) + "\n") except Exception: # Measurement is a side-effect: a failure here must never affect the returned answer. return @@ -385,7 +401,11 @@ def rollup(records: list[dict]) -> dict: Returns: A dict with: ``tasks`` (int), ``by_tier`` (tier → count), ``in_tokens_est`` / ``out_tokens_est`` (summed estimates), and ``cloud_equiv_usd`` / ``spend_avoided_usd`` - (summed dollars). + (summed dollars) — all over **top-level tasks only** — plus ``delegates``, a separate + sub-rollup of delegated sub-calls ``{count, by_backend: {model: {count, in_tokens_est, + out_tokens_est}}, in_tokens_est, out_tokens_est, cloud_equiv_usd}``. Delegate records are + kept out of the headline so a sub-call's saving is never double-counted against its parent + task; their cloud-equiv is informational. A record without a ``kind`` field counts as a task. """ summary: dict = { "tasks": 0, @@ -395,16 +415,40 @@ def rollup(records: list[dict]) -> dict: "cloud_equiv_usd": 0.0, "spend_avoided_usd": 0.0, } + delegates: dict = { + "count": 0, + "by_backend": {}, + "in_tokens_est": 0, + "out_tokens_est": 0, + "cloud_equiv_usd": 0.0, + } for r in records: + in_tok = _as_int(r.get("in_tokens_est")) + out_tok = _as_int(r.get("out_tokens_est")) + if str(r.get("kind", "task")) == "delegate": + delegates["count"] += 1 + model = str(r.get("model", "unknown")) + backend = delegates["by_backend"].setdefault( + model, {"count": 0, "in_tokens_est": 0, "out_tokens_est": 0} + ) + backend["count"] += 1 + backend["in_tokens_est"] += in_tok + backend["out_tokens_est"] += out_tok + delegates["in_tokens_est"] += in_tok + delegates["out_tokens_est"] += out_tok + delegates["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd")) + continue summary["tasks"] += 1 tier = str(r.get("tier", "unknown")) summary["by_tier"][tier] = summary["by_tier"].get(tier, 0) + 1 - summary["in_tokens_est"] += _as_int(r.get("in_tokens_est")) - summary["out_tokens_est"] += _as_int(r.get("out_tokens_est")) + summary["in_tokens_est"] += in_tok + summary["out_tokens_est"] += out_tok summary["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd")) summary["spend_avoided_usd"] += _as_float(r.get("spend_avoided_usd")) summary["cloud_equiv_usd"] = round(summary["cloud_equiv_usd"], 4) summary["spend_avoided_usd"] = round(summary["spend_avoided_usd"], 4) + delegates["cloud_equiv_usd"] = round(delegates["cloud_equiv_usd"], 4) + summary["delegates"] = delegates return summary @@ -438,4 +482,24 @@ def format_rollup(summary: dict, pricing: Pricing) -> str: " ⚠ pricing: PLACEHOLDER — figures are illustrative; set real rates in " "config/pricing.yaml and flip placeholder to false." ) + + delegates = summary.get("delegates") or {} + if delegates.get("count"): + lines.append("") + lines.append(" Delegated sub-tasks (offloaded by orchestrators)") + lines.append(f" Count: {delegates.get('count', 0)}") + by_backend = delegates.get("by_backend") or {} + if by_backend: + backends = ", ".join( + f"{model} {info.get('count', 0)}" for model, info in sorted(by_backend.items()) + ) + lines.append(f" By backend: {backends}") + lines.append( + f" Est. tokens: in {delegates.get('in_tokens_est', 0):,} / " + f"out {delegates.get('out_tokens_est', 0):,}" + ) + lines.append( + f" Cloud-equiv: ${delegates.get('cloud_equiv_usd', 0.0):,.2f} " + "(informational — already credited within parent tasks)" + ) return "\n".join(lines) diff --git a/tests/test_delegate.py b/tests/test_delegate.py index 9f5aee7..ef77687 100644 --- a/tests/test_delegate.py +++ b/tests/test_delegate.py @@ -72,6 +72,12 @@ def test_substitutions_cover_both_tokens(self): class RunLocalDelegateTest(unittest.TestCase): + def setUp(self): + # run_delegate now meters via record_task; stub it so these tests never touch the real log. + patcher = patch("tanglebrain.delegate.record_task") + patcher.start() + self.addCleanup(patcher.stop) + def test_returns_adapter_text(self): adapter = MagicMock() adapter.run.return_value = "grunt result" @@ -167,6 +173,11 @@ def test_adapter_error_propagates(self): class RunDelegateTargetTest(unittest.TestCase): """The generalized delegate: ``run_delegate(target=...)`` resolution, opt-in, and no-recursion.""" + def setUp(self): + patcher = patch("tanglebrain.delegate.record_task") + patcher.start() + self.addCleanup(patcher.stop) + def test_target_none_uses_local_as_a_leaf(self): # target=None must still route to the local tier AND build it as a leaf (inject_delegate # False) — a delegate target never gets its own delegate tool, so no recursive delegation. @@ -280,6 +291,11 @@ def test_available_capabilities_sorted_unique_excludes_api(self): class RunDelegateCapabilityTest(unittest.TestCase): """run_delegate routing by task, plus target>task precedence.""" + def setUp(self): + patcher = patch("tanglebrain.delegate.record_task") + patcher.start() + self.addCleanup(patcher.stop) + def test_task_routes_to_selected_leaf(self): entry = _entry("sub-a", tier="sub", good_at=["code"]) roster = _roster_of(entry) @@ -485,5 +501,40 @@ def submit(self, fn, *args): self.assertEqual(captured["max_workers"], 3) +class DelegateMeteringTest(unittest.TestCase): + """run_delegate meters each sub-call as a kind='delegate' usage record (observability).""" + + def test_records_delegate_kind(self): + entry = _entry("local-x", tier="local", can_delegate=True) + adapter = MagicMock() + adapter.run.return_value = "result-text" + with patch("tanglebrain.delegate.load_roster", return_value=MagicMock()), patch( + "tanglebrain.delegate.select_local", return_value=entry + ), patch("tanglebrain.delegate.build_adapter", return_value=adapter), patch( + "tanglebrain.delegate.record_task" + ) as rec: + out = run_delegate("do it") + self.assertEqual(out, "result-text") + rec.assert_called_once() + kw = rec.call_args.kwargs + self.assertEqual(kw["kind"], "delegate") + self.assertEqual(kw["path"], "delegate") + self.assertIs(kw["entry"], entry) + self.assertEqual(kw["prompt"], "do it") + self.assertEqual(kw["response"], "result-text") + + def test_metering_failure_never_breaks_delegation(self): + entry = _entry("local-x", tier="local") + adapter = MagicMock() + adapter.run.return_value = "answer" + with patch("tanglebrain.delegate.load_roster", return_value=MagicMock()), patch( + "tanglebrain.delegate.select_local", return_value=entry + ), patch("tanglebrain.delegate.build_adapter", return_value=adapter), patch( + "tanglebrain.delegate.record_task", side_effect=RuntimeError("log boom") + ): + out = run_delegate("q") + self.assertEqual(out, "answer") # metering error swallowed, answer still returned + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_gui.py b/tests/test_gui.py index 2cb4be9..4c8020a 100644 --- a/tests/test_gui.py +++ b/tests/test_gui.py @@ -109,6 +109,21 @@ def test_rolls_up_records(self): self.assertAlmostEqual(out["summary"]["spend_avoided_usd"], 1.5) self.assertIn("is_placeholder", out) + def test_includes_delegate_breakdown(self): + recs = [ + {"kind": "task", "tier": "sub", "in_tokens_est": 5, "out_tokens_est": 5, + "cloud_equiv_usd": 0.5, "spend_avoided_usd": 0.5}, + {"kind": "delegate", "model": "local-x", "in_tokens_est": 40, "out_tokens_est": 60, + "cloud_equiv_usd": 2.0}, + ] + with patch("tanglebrain.gui.views.read_records", return_value=recs): + out = views.view_stats() + # Headline stays task-only; delegates surface separately for the panel's fan-out breakdown. + self.assertEqual(out["summary"]["tasks"], 1) + delegates = out["summary"]["delegates"] + self.assertEqual(delegates["count"], 1) + self.assertEqual(delegates["by_backend"]["local-x"]["count"], 1) + class RunPromptTest(unittest.TestCase): def test_happy_path_reports_served(self): diff --git a/tests/test_live.py b/tests/test_live.py index 0e5c7ca..a42acb0 100644 --- a/tests/test_live.py +++ b/tests/test_live.py @@ -110,6 +110,21 @@ def test_fan_out_runs_concurrently_and_returns_ordered_results(self): self.assertTrue(all(r["status"] == "ok" for r in results), results) self.assertTrue(all(r["text"].strip() for r in results)) + def test_fan_out_writes_delegate_usage_records(self): + # Observability: the fan-out meters each sub-call as a kind='delegate' usage record. Pin a + # temp state dir so the real ~/.cache log is untouched. + import tempfile + from tanglebrain.measurement import read_records, rollup + + with tempfile.TemporaryDirectory() as tmp: + with patch.dict(os.environ, {"TANGLEBRAIN_STATE_DIR": tmp}, clear=False): + run_delegate_many( + [{"prompt": "Reply with exactly: a"}, {"prompt": "Reply with exactly: b"}] + ) + summary = rollup(read_records()) + self.assertEqual(summary["delegates"]["count"], 2) + self.assertEqual(summary["tasks"], 0) # delegates are not headline tasks + @unittest.skipUnless(LIVE, "set TANGLEBRAIN_LIVE=1 to run the live orchestrated-delegation test") class LiveDelegateInjectionTest(unittest.TestCase): diff --git a/tests/test_measurement.py b/tests/test_measurement.py index 4c9b807..580d64f 100644 --- a/tests/test_measurement.py +++ b/tests/test_measurement.py @@ -309,5 +309,99 @@ def test_honors_state_dir_env(self): self.assertEqual(default_log_path(), Path("/tmp/tb-test/usage.jsonl")) +class DelegateObservabilityTest(unittest.TestCase): + """kind='delegate' records: written, kept out of the headline, rolled up separately, thread-safe.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.log = str(Path(self.tmp) / "usage.jsonl") + + def _read(self): + return read_records(self.log) + + def test_record_defaults_to_task_kind(self): + record_task(path="router", entry=FakeEntry("claude", "sub"), + prompt="hi", response="yo", log_path=self.log, pricing=FIXED) + self.assertEqual(self._read()[0]["kind"], "task") + + def test_record_delegate_kind(self): + record_task(path="delegate", entry=FakeEntry("local-x", "local"), + prompt="hi", response="yo", kind="delegate", log_path=self.log, pricing=FIXED) + self.assertEqual(self._read()[0]["kind"], "delegate") + + def test_rollup_excludes_delegates_from_headline(self): + records = [ + {"kind": "task", "tier": "sub", "in_tokens_est": 10, "out_tokens_est": 10, + "cloud_equiv_usd": 1.0, "spend_avoided_usd": 1.0}, + {"kind": "delegate", "model": "local-x", "in_tokens_est": 100, "out_tokens_est": 200, + "cloud_equiv_usd": 5.0, "spend_avoided_usd": 5.0}, + ] + s = rollup(records) + # Headline counts the one task only — delegate tokens/spend must NOT inflate it. + self.assertEqual(s["tasks"], 1) + self.assertEqual(s["by_tier"], {"sub": 1}) + self.assertEqual(s["in_tokens_est"], 10) + self.assertEqual(s["out_tokens_est"], 10) + self.assertAlmostEqual(s["spend_avoided_usd"], 1.0) + # Delegate sub-rollup is separate + informational. + d = s["delegates"] + self.assertEqual(d["count"], 1) + self.assertEqual( + d["by_backend"], + {"local-x": {"count": 1, "in_tokens_est": 100, "out_tokens_est": 200}}, + ) + self.assertEqual(d["in_tokens_est"], 100) + self.assertEqual(d["out_tokens_est"], 200) + self.assertAlmostEqual(d["cloud_equiv_usd"], 5.0) + + def test_kindless_record_counts_as_task(self): + s = rollup([{"tier": "local", "in_tokens_est": 4, "spend_avoided_usd": 0.2}]) + self.assertEqual(s["tasks"], 1) + self.assertEqual(s["delegates"]["count"], 0) + + def test_by_backend_aggregates_multiple(self): + records = [ + {"kind": "delegate", "model": "local-x", "in_tokens_est": 10, "out_tokens_est": 5}, + {"kind": "delegate", "model": "local-x", "in_tokens_est": 20, "out_tokens_est": 5}, + {"kind": "delegate", "model": "cheap-sub", "in_tokens_est": 1, "out_tokens_est": 1}, + ] + d = rollup(records)["delegates"] + self.assertEqual(d["count"], 3) + self.assertEqual(d["by_backend"]["local-x"]["count"], 2) + self.assertEqual(d["by_backend"]["local-x"]["in_tokens_est"], 30) + self.assertEqual(d["by_backend"]["cheap-sub"]["count"], 1) + + def test_format_shows_delegate_section_when_present(self): + s = rollup([ + {"kind": "task", "tier": "sub", "in_tokens_est": 1, "out_tokens_est": 1, + "cloud_equiv_usd": 0.1, "spend_avoided_usd": 0.1}, + {"kind": "delegate", "model": "local-x", "in_tokens_est": 50, "out_tokens_est": 50, + "cloud_equiv_usd": 2.0}, + ]) + out = format_rollup(s, FIXED) + self.assertIn("Delegated sub-tasks", out) + self.assertIn("local-x", out) + + def test_format_omits_delegate_section_when_absent(self): + s = rollup([{"kind": "task", "tier": "sub", "spend_avoided_usd": 0.1}]) + self.assertNotIn("Delegated sub-tasks", format_rollup(s, FIXED)) + + def test_concurrent_appends_are_serialized(self): + import threading + + def worker(n): + record_task(path="delegate", entry=FakeEntry(f"m{n}", "local"), + prompt="p", response="r", kind="delegate", log_path=self.log, pricing=FIXED) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + recs = self._read() + self.assertEqual(len(recs), 20) # 20 well-formed lines — no interleaved/corrupted writes + self.assertTrue(all(r.get("kind") == "delegate" for r in recs)) + + if __name__ == "__main__": unittest.main()