diff --git a/CHANGELOG.md b/CHANGELOG.md index 68990ac..c1256f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Per-parent-task delegation tree (cross-process linkage, scatter-gather roadmap #39 stretch / + closes #52).** Each delegated sub-call is now linked back to the specific top-level task that + spawned it, across the process boundary. The CLI mints a task id per routed task; the + orchestrator-CLI adapter injects it as `TANGLEBRAIN_TASK_ID` into the orchestrator's environment + (only when the delegate tool is injected), the orchestrator forwards it to the MCP delegate child + it spawns, and `run_delegate` reads it back to stamp each delegate record's `parent_task_id`. Task + records gain a `task_id`, delegate records gain a `parent_task_id` (both written only when present, + so existing records and readers are unaffected). `tanglebrain --stats` and the rollup gain a + `by_parent` grouping — "Linked to: N parent task(s)" — with sub-calls run outside a propagated task + grouped as `unlinked`. The linkage was **manually verified live through the real claude→MCP-delegate + boundary** (the env survives the orchestrator's subprocess hop; the parent and delegate records + shared the same id) — the orchestrator-forwards-env hop is a load-bearing assumption, not a + TangleBrain-enforced guarantee, so a delegate that loses the env degrades safely to `unlinked` (never + an error). This was the deferred half of the scatter-gather epic whose entry criterion was a + live-verification spike — now done. + ### Fixed - **`pyproject.toml` package metadata carried the purged "cost-tiered / flat-rate subscriptions" diff --git a/tanglebrain/adapters/cli.py b/tanglebrain/adapters/cli.py index a8ffa21..e0d3ab8 100644 --- a/tanglebrain/adapters/cli.py +++ b/tanglebrain/adapters/cli.py @@ -301,6 +301,18 @@ def run(self, prompt: str, opts: Mapping[str, object] | None = None) -> str: argv = build_argv(self._effective_cmd(), prompt) env = scrubbed_env(self.scrub_env) + # When this CLI is an orchestrator carrying the delegate tool, propagate the top-level task + # id into its environment so the MCP delegate child it spawns can stamp each sub-call's + # parent_task_id (linking the delegation tree across the process boundary). Gated on + # inject_delegate: a leaf CLI call spawns no delegate child, so there is nothing to link. + # Lazy import keeps the constant's home module (measurement) out of this adapter's import + # graph, mirroring the lazy delegate_substitutions import in _effective_cmd. + task_id = opts.get("task_id") + if self.inject_delegate and task_id is not None: + from tanglebrain.measurement import PARENT_TASK_ID_ENV + + env[PARENT_TASK_ID_ENV] = str(task_id) + try: completed = subprocess.run( argv, diff --git a/tanglebrain/cli.py b/tanglebrain/cli.py index 4cf2463..4257d3b 100644 --- a/tanglebrain/cli.py +++ b/tanglebrain/cli.py @@ -16,6 +16,7 @@ import argparse import sys +import uuid from tanglebrain import __version__ from tanglebrain.adapters import AdapterError @@ -178,7 +179,14 @@ def run_once( AdapterError: If the adapter cannot produce text. """ roster = load_roster(roster_path) - opts = {"max_tokens": max_tokens} if max_tokens is not None else None + # Mint a task id for this routed task. It is recorded on the task and threaded through opts so + # the orchestrator-CLI adapter can propagate it to delegated sub-calls (see CliAdapter.run / + # PARENT_TASK_ID_ENV), linking the delegation tree back to this task. Cheap and side-effect-free + # to mint on every path; only the router path (orchestrators with the delegate tool) acts on it. + task_id = uuid.uuid4().hex + opts: dict = {"task_id": task_id} + if max_tokens is not None: + opts["max_tokens"] = max_tokens if model is not None: path, entry = "model", select_by_id(roster, model) @@ -199,7 +207,7 @@ def run_once( text = router.route(prompt, task=task, opts=opts) entry = router.last_served - record_task(path=path, entry=entry, prompt=prompt, response=text) + record_task(path=path, entry=entry, prompt=prompt, response=text, task_id=task_id) return (text, _served(path, entry)) if return_served else text diff --git a/tanglebrain/delegate.py b/tanglebrain/delegate.py index c185afd..df78d8f 100644 --- a/tanglebrain/delegate.py +++ b/tanglebrain/delegate.py @@ -24,7 +24,7 @@ import sys from concurrent.futures import ThreadPoolExecutor -from tanglebrain.measurement import record_task +from tanglebrain.measurement import PARENT_TASK_ID_ENV, record_task from tanglebrain.roster import ROSTER_ENV_VAR, Roster, RosterEntry, load_roster from tanglebrain.selector import SelectionError, build_adapter, select_local from tanglebrain.settings import Settings, load_settings @@ -270,10 +270,26 @@ def run_delegate( text = adapter.run(prompt, {"max_tokens": max_tokens}) # Meter the sub-call for orchestration-tree observability. Tagged kind="delegate" so the rollup # keeps it OUT of the spend-avoided headline (the parent task already credits the whole job) and - # in a separate by-backend breakdown. record_task never raises; the extra guard is belt-and- - # suspenders — metering must never break a delegation. + # in a separate by-backend breakdown. The parent task id, propagated from the orchestrator via + # PARENT_TASK_ID_ENV, links this sub-call to its top-level task (absent → recorded as unlinked). + # + # LOAD-BEARING ASSUMPTION (verified live for claude, not asserted by any hermetic test): the + # linkage depends on the orchestrator CLI forwarding its environment to the MCP delegate child it + # spawns. That forwarding is the orchestrator's behavior, not TangleBrain's — if a CLI stops + # forwarding env, or a roster adds TANGLEBRAIN_TASK_ID to an orchestrator's invoke.scrub_env, the + # linkage silently degrades to "unlinked" (never an error — the delegation itself is unaffected). + # + # record_task never raises; the extra guard is belt-and-suspenders — metering must never break a + # delegation. try: - record_task(path="delegate", entry=entry, prompt=prompt, response=text, kind="delegate") + record_task( + path="delegate", + entry=entry, + prompt=prompt, + response=text, + kind="delegate", + parent_task_id=os.environ.get(PARENT_TASK_ID_ENV), + ) except Exception: pass return text diff --git a/tanglebrain/measurement.py b/tanglebrain/measurement.py index c83dc11..dd7da44 100644 --- a/tanglebrain/measurement.py +++ b/tanglebrain/measurement.py @@ -33,6 +33,14 @@ LOG_FILENAME = "usage.jsonl" +#: Env var carrying the top-level task id from the orchestrator down to a delegated sub-call. The +#: CLI mints a task id per routed task and the orchestrator-CLI adapter injects it into the +#: orchestrator subprocess env (only when the delegate tool is injected); the orchestrator forwards +#: its env to the MCP delegate child it spawns, where :func:`tanglebrain.delegate.run_delegate` reads +#: it back and stamps each delegate record's ``parent_task_id``. This is what links a delegated +#: sub-call to the specific top-level task that spawned it, across the process boundary. +PARENT_TASK_ID_ENV = "TANGLEBRAIN_TASK_ID" + # Serializes appends to the usage log so concurrent writers in one process (delegate_many fans # sub-tasks out across threads) can't interleave bytes mid-line. Per-process only; cross-process # appends rely on the OS's O_APPEND atomicity for short lines, as before. @@ -293,6 +301,8 @@ def record_task( prompt: str, response: str, kind: str = "task", + task_id: str | None = None, + parent_task_id: str | None = None, log_path: str | os.PathLike[str] | None = None, pricing: Pricing | None = None, ) -> None: @@ -311,6 +321,11 @@ def record_task( kind: ``"task"`` for a top-level routed task (the default; what the spend-avoided headline counts) or ``"delegate"`` for a delegated sub-call. Delegate records are rolled up **separately** so a sub-call's saving is never double-counted against its parent task. + task_id: For a top-level task, the id minted for this routed task (so its delegated sub-calls + can be linked back to it). Omitted from the record when ``None``. + parent_task_id: For a delegated sub-call, the id of the top-level task that spawned it (read + from :data:`PARENT_TASK_ID_ENV`). Omitted from the record when ``None`` — e.g. a delegate + invoked outside a propagated task, which rolls up as ``unlinked``. log_path: Override the usage-log path (tests inject a temp path). Defaults to :func:`default_log_path`. pricing: Override the pricing. Defaults to :func:`load_pricing`. @@ -338,6 +353,12 @@ def record_task( "spend_avoided_usd": round(avoided, 6), "pricing_ref": pricing.reference_model, } + # Optional linkage fields — only written when present, so existing records/readers that + # never set them are unaffected (a missing field reads as "no linkage"). + if task_id is not None: + record["task_id"] = str(task_id) + if parent_task_id is not None: + record["parent_task_id"] = str(parent_task_id) target = Path(log_path) if log_path is not None else default_log_path() target.parent.mkdir(parents=True, exist_ok=True) with _LOG_LOCK: @@ -403,9 +424,12 @@ def rollup(records: list[dict]) -> dict: ``out_tokens_est`` (summed estimates), and ``cloud_equiv_usd`` / ``spend_avoided_usd`` (summed dollars) — all over **top-level tasks only** — plus ``delegates``, a separate sub-rollup of delegated sub-calls ``{count, by_backend: {model: {count, in_tokens_est, - out_tokens_est}}, in_tokens_est, out_tokens_est, cloud_equiv_usd}``. Delegate records are - kept out of the headline so a sub-call's saving is never double-counted against its parent - task; their cloud-equiv is informational. A record without a ``kind`` field counts as a task. + out_tokens_est}}, by_parent: {parent_task_id: {count, by_backend: {model: count}}}, + in_tokens_est, out_tokens_est, cloud_equiv_usd}``. ``by_parent`` groups each delegate under + the top-level task that spawned it (via ``parent_task_id``); delegates with no + ``parent_task_id`` are grouped under the sentinel ``"unlinked"``. Delegate records are kept + out of the headline so a sub-call's saving is never double-counted against its parent task; + their cloud-equiv is informational. A record without a ``kind`` field counts as a task. """ summary: dict = { "tasks": 0, @@ -418,6 +442,7 @@ def rollup(records: list[dict]) -> dict: delegates: dict = { "count": 0, "by_backend": {}, + "by_parent": {}, "in_tokens_est": 0, "out_tokens_est": 0, "cloud_equiv_usd": 0.0, @@ -434,6 +459,13 @@ def rollup(records: list[dict]) -> dict: backend["count"] += 1 backend["in_tokens_est"] += in_tok backend["out_tokens_est"] += out_tok + # Per-parent tree: link this sub-call to the top-level task that spawned it. A delegate + # with no parent_task_id (run outside a propagated task) groups under "unlinked". + parent_id = r.get("parent_task_id") + parent_key = str(parent_id) if parent_id not in (None, "") else "unlinked" + parent = delegates["by_parent"].setdefault(parent_key, {"count": 0, "by_backend": {}}) + parent["count"] += 1 + parent["by_backend"][model] = parent["by_backend"].get(model, 0) + 1 delegates["in_tokens_est"] += in_tok delegates["out_tokens_est"] += out_tok delegates["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd")) @@ -494,6 +526,17 @@ def format_rollup(summary: dict, pricing: Pricing) -> str: f"{model} {info.get('count', 0)}" for model, info in sorted(by_backend.items()) ) lines.append(f" By backend: {backends}") + by_parent = delegates.get("by_parent") or {} + if by_parent: + linked = [k for k in by_parent if k != "unlinked"] + unlinked = (by_parent.get("unlinked") or {}).get("count", 0) + if linked: + tree = f"{len(linked)} parent task(s)" + if unlinked: + tree += f", {unlinked} unlinked" + else: + tree = f"{unlinked} unlinked" # all sub-calls ran outside a propagated task + lines.append(f" Linked to: {tree}") lines.append( f" Est. tokens: in {delegates.get('in_tokens_est', 0):,} / " f"out {delegates.get('out_tokens_est', 0):,}" diff --git a/tests/test_cli.py b/tests/test_cli.py index 16447e1..3539866 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -88,14 +88,20 @@ def test_max_tokens_threaded_through_local(self): fake_adapter.run.return_value = "x" with patch("tanglebrain.cli.build_adapter", return_value=fake_adapter): run_once("hello", local=True, max_tokens=256) - self.assertEqual(fake_adapter.run.call_args.args[1], {"max_tokens": 256}) + opts = fake_adapter.run.call_args.args[1] + # opts now always carries a minted task_id (for delegate-tree linkage), plus max_tokens here. + self.assertEqual(opts.get("max_tokens"), 256) + self.assertIsInstance(opts.get("task_id"), str) - def test_no_max_tokens_passes_none_opts_local(self): + def test_no_max_tokens_still_passes_task_id_opts_local(self): fake_adapter = MagicMock() fake_adapter.run.return_value = "x" with patch("tanglebrain.cli.build_adapter", return_value=fake_adapter): run_once("hello", local=True) - self.assertIsNone(fake_adapter.run.call_args.args[1]) + opts = fake_adapter.run.call_args.args[1] + # With no max_tokens, opts is no longer None — it still carries the minted task_id. + self.assertNotIn("max_tokens", opts) + self.assertIsInstance(opts.get("task_id"), str) def test_model_routes_to_named_entry(self): # --model selects a specific roster entry (here, a sub) instead of local-first. @@ -142,6 +148,20 @@ def test_router_threads_task_hint(self): RouterCls.assert_called_once() self.assertEqual(fake_router.route.call_args.kwargs.get("task"), "code") + def test_router_threads_task_id_in_opts(self): + # The minted task_id must reach Router.route's opts — this is the seam that propagates it to + # the orchestrator (and thence to delegated sub-calls). The router path is the feature's point. + fake_router = MagicMock() + fake_router.route.return_value = "routed reply" + with patch("tanglebrain.cli.load_roster"), patch( + "tanglebrain.cli.Router", return_value=fake_router + ): + run_once("hello") + opts = fake_router.route.call_args.kwargs.get("opts") + self.assertIsInstance(opts, dict) + self.assertIsInstance(opts.get("task_id"), str) + self.assertTrue(opts["task_id"]) + def test_model_takes_precedence_over_router(self): # model is an explicit override and wins over the default router path. fake_adapter = MagicMock() diff --git a/tests/test_cli_adapter.py b/tests/test_cli_adapter.py index 91c5fe6..09d53ca 100644 --- a/tests/test_cli_adapter.py +++ b/tests/test_cli_adapter.py @@ -79,6 +79,40 @@ def test_run_hands_scrubbed_env_to_subprocess(self): self.assertIn("PATH", passed_env) +class TaskIdPropagationTest(unittest.TestCase): + """The orchestrator-CLI adapter injects the parent task id env for delegate-tree linkage.""" + + def _env_for(self, inject_delegate, opts): + from tanglebrain.measurement import PARENT_TASK_ID_ENV # noqa: F401 (used by caller asserts) + + adapter = CliAdapter( + cmd=["claude", "-p"], parse="claude-json", inject_delegate=inject_delegate + ) + with patch.dict(os.environ, {"PATH": "/b"}, clear=True): + with patch_run(return_value=completed(0, CLAUDE_JSON_OK)) as run: + adapter.run("hi", opts) + return run.call_args.kwargs["env"] + + def test_injects_task_id_for_orchestrator(self): + from tanglebrain.measurement import PARENT_TASK_ID_ENV + + env = self._env_for(inject_delegate=True, opts={"task_id": "task-abc"}) + self.assertEqual(env[PARENT_TASK_ID_ENV], "task-abc") + + def test_no_injection_for_leaf_cli(self): + # inject_delegate=False ⇒ no delegate child is spawned, so nothing to link — don't inject. + from tanglebrain.measurement import PARENT_TASK_ID_ENV + + env = self._env_for(inject_delegate=False, opts={"task_id": "task-abc"}) + self.assertNotIn(PARENT_TASK_ID_ENV, env) + + def test_no_injection_when_task_id_absent(self): + from tanglebrain.measurement import PARENT_TASK_ID_ENV + + env = self._env_for(inject_delegate=True, opts={}) + self.assertNotIn(PARENT_TASK_ID_ENV, env) + + class BuildArgvTest(unittest.TestCase): """Prompt injection: {prompt} substitution, else append; never via the shell.""" diff --git a/tests/test_delegate.py b/tests/test_delegate.py index ef77687..4b87d9a 100644 --- a/tests/test_delegate.py +++ b/tests/test_delegate.py @@ -523,6 +523,35 @@ def test_records_delegate_kind(self): self.assertEqual(kw["prompt"], "do it") self.assertEqual(kw["response"], "result-text") + def test_reads_parent_task_id_from_env(self): + from tanglebrain.measurement import PARENT_TASK_ID_ENV + + entry = _entry("local-x", tier="local", can_delegate=True) + adapter = MagicMock() + adapter.run.return_value = "result-text" + with patch.dict(os.environ, {PARENT_TASK_ID_ENV: "task-xyz"}), patch( + "tanglebrain.delegate.load_roster", return_value=MagicMock() + ), patch("tanglebrain.delegate.select_local", return_value=entry), patch( + "tanglebrain.delegate.build_adapter", return_value=adapter + ), patch("tanglebrain.delegate.record_task") as rec: + run_delegate("do it") + self.assertEqual(rec.call_args.kwargs["parent_task_id"], "task-xyz") + + def test_parent_task_id_none_when_env_unset(self): + from tanglebrain.measurement import PARENT_TASK_ID_ENV + + entry = _entry("local-x", tier="local", can_delegate=True) + adapter = MagicMock() + adapter.run.return_value = "x" + env_without = {k: v for k, v in os.environ.items() if k != PARENT_TASK_ID_ENV} + with patch.dict(os.environ, env_without, clear=True), patch( + "tanglebrain.delegate.load_roster", return_value=MagicMock() + ), patch("tanglebrain.delegate.select_local", return_value=entry), patch( + "tanglebrain.delegate.build_adapter", return_value=adapter + ), patch("tanglebrain.delegate.record_task") as rec: + run_delegate("do it") + self.assertIsNone(rec.call_args.kwargs["parent_task_id"]) + def test_metering_failure_never_breaks_delegation(self): entry = _entry("local-x", tier="local") adapter = MagicMock() diff --git a/tests/test_measurement.py b/tests/test_measurement.py index 580d64f..c7d1661 100644 --- a/tests/test_measurement.py +++ b/tests/test_measurement.py @@ -371,6 +371,63 @@ def test_by_backend_aggregates_multiple(self): self.assertEqual(d["by_backend"]["local-x"]["in_tokens_est"], 30) self.assertEqual(d["by_backend"]["cheap-sub"]["count"], 1) + def test_record_writes_task_id_when_given(self): + record_task(path="router", entry=FakeEntry("claude", "sub"), prompt="hi", response="yo", + task_id="task-abc", log_path=self.log, pricing=FIXED) + rec = self._read()[0] + self.assertEqual(rec["task_id"], "task-abc") + self.assertNotIn("parent_task_id", rec) + + def test_record_writes_parent_task_id_for_delegate(self): + record_task(path="delegate", entry=FakeEntry("local-x", "local"), prompt="hi", response="yo", + kind="delegate", parent_task_id="task-abc", log_path=self.log, pricing=FIXED) + rec = self._read()[0] + self.assertEqual(rec["parent_task_id"], "task-abc") + self.assertNotIn("task_id", rec) + + def test_record_omits_linkage_fields_when_absent(self): + record_task(path="local", entry=FakeEntry("local-x", "local"), prompt="hi", response="yo", + log_path=self.log, pricing=FIXED) + rec = self._read()[0] + self.assertNotIn("task_id", rec) + self.assertNotIn("parent_task_id", rec) + + def test_rollup_groups_delegates_by_parent(self): + records = [ + {"kind": "delegate", "model": "local-x", "parent_task_id": "p1"}, + {"kind": "delegate", "model": "cheap-sub", "parent_task_id": "p1"}, + {"kind": "delegate", "model": "local-x", "parent_task_id": "p2"}, + ] + by_parent = rollup(records)["delegates"]["by_parent"] + self.assertEqual(by_parent["p1"]["count"], 2) + self.assertEqual(by_parent["p1"]["by_backend"], {"local-x": 1, "cheap-sub": 1}) + self.assertEqual(by_parent["p2"]["count"], 1) + self.assertNotIn("unlinked", by_parent) + + def test_rollup_unlinked_delegate_grouped_under_sentinel(self): + # A delegate with no parent_task_id (run outside a propagated task) groups under "unlinked". + by_parent = rollup([{"kind": "delegate", "model": "local-x"}])["delegates"]["by_parent"] + self.assertEqual(by_parent["unlinked"]["count"], 1) + + def test_format_shows_linked_parents(self): + s = rollup([ + {"kind": "delegate", "model": "local-x", "parent_task_id": "p1"}, + {"kind": "delegate", "model": "local-x", "parent_task_id": "p2"}, + {"kind": "delegate", "model": "local-x"}, + ]) + out = format_rollup(s, FIXED) + self.assertIn("Linked to:", out) + self.assertIn("2 parent task(s)", out) + self.assertIn("1 unlinked", out) + + def test_format_all_unlinked_reads_cleanly(self): + # When no delegate is linked, the line should read "N unlinked", not "0 parent task(s), ...". + s = rollup([{"kind": "delegate", "model": "local-x"}, + {"kind": "delegate", "model": "local-x"}]) + out = format_rollup(s, FIXED) + self.assertIn("Linked to: 2 unlinked", out) + self.assertNotIn("parent task(s)", out) + def test_format_shows_delegate_section_when_present(self): s = rollup([ {"kind": "task", "tier": "sub", "in_tokens_est": 1, "out_tokens_est": 1,