Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **Per-parent-task delegation tree (cross-process linkage, scatter-gather roadmap #39 stretch /
closes #52).** Each delegated sub-call is now linked back to the specific top-level task that
spawned it, across the process boundary. The CLI mints a task id per routed task; the
orchestrator-CLI adapter injects it as `TANGLEBRAIN_TASK_ID` into the orchestrator's environment
(only when the delegate tool is injected), the orchestrator forwards it to the MCP delegate child
it spawns, and `run_delegate` reads it back to stamp each delegate record's `parent_task_id`. Task
records gain a `task_id`, delegate records gain a `parent_task_id` (both written only when present,
so existing records and readers are unaffected). `tanglebrain --stats` and the rollup gain a
`by_parent` grouping — "Linked to: N parent task(s)" — with sub-calls run outside a propagated task
grouped as `unlinked`. The linkage was **manually verified live through the real claude→MCP-delegate
boundary** (the env survives the orchestrator's subprocess hop; the parent and delegate records
shared the same id) — the orchestrator-forwards-env hop is a load-bearing assumption, not a
TangleBrain-enforced guarantee, so a delegate that loses the env degrades safely to `unlinked` (never
an error). This was the deferred half of the scatter-gather epic whose entry criterion was a
live-verification spike — now done.

### Fixed

- **`pyproject.toml` package metadata carried the purged "cost-tiered / flat-rate subscriptions"
Expand Down
12 changes: 12 additions & 0 deletions tanglebrain/adapters/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ def run(self, prompt: str, opts: Mapping[str, object] | None = None) -> str:
argv = build_argv(self._effective_cmd(), prompt)
env = scrubbed_env(self.scrub_env)

# When this CLI is an orchestrator carrying the delegate tool, propagate the top-level task
# id into its environment so the MCP delegate child it spawns can stamp each sub-call's
# parent_task_id (linking the delegation tree across the process boundary). Gated on
# inject_delegate: a leaf CLI call spawns no delegate child, so there is nothing to link.
# Lazy import keeps the constant's home module (measurement) out of this adapter's import
# graph, mirroring the lazy delegate_substitutions import in _effective_cmd.
task_id = opts.get("task_id")
if self.inject_delegate and task_id is not None:
from tanglebrain.measurement import PARENT_TASK_ID_ENV

env[PARENT_TASK_ID_ENV] = str(task_id)

try:
completed = subprocess.run(
argv,
Expand Down
12 changes: 10 additions & 2 deletions tanglebrain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import argparse
import sys
import uuid

from tanglebrain import __version__
from tanglebrain.adapters import AdapterError
Expand Down Expand Up @@ -178,7 +179,14 @@ def run_once(
AdapterError: If the adapter cannot produce text.
"""
roster = load_roster(roster_path)
opts = {"max_tokens": max_tokens} if max_tokens is not None else None
# Mint a task id for this routed task. It is recorded on the task and threaded through opts so
# the orchestrator-CLI adapter can propagate it to delegated sub-calls (see CliAdapter.run /
# PARENT_TASK_ID_ENV), linking the delegation tree back to this task. Cheap and side-effect-free
# to mint on every path; only the router path (orchestrators with the delegate tool) acts on it.
task_id = uuid.uuid4().hex
opts: dict = {"task_id": task_id}
if max_tokens is not None:
opts["max_tokens"] = max_tokens

if model is not None:
path, entry = "model", select_by_id(roster, model)
Expand All @@ -199,7 +207,7 @@ def run_once(
text = router.route(prompt, task=task, opts=opts)
entry = router.last_served

record_task(path=path, entry=entry, prompt=prompt, response=text)
record_task(path=path, entry=entry, prompt=prompt, response=text, task_id=task_id)
return (text, _served(path, entry)) if return_served else text


Expand Down
24 changes: 20 additions & 4 deletions tanglebrain/delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
from concurrent.futures import ThreadPoolExecutor

from tanglebrain.measurement import record_task
from tanglebrain.measurement import PARENT_TASK_ID_ENV, record_task
from tanglebrain.roster import ROSTER_ENV_VAR, Roster, RosterEntry, load_roster
from tanglebrain.selector import SelectionError, build_adapter, select_local
from tanglebrain.settings import Settings, load_settings
Expand Down Expand Up @@ -270,10 +270,26 @@ def run_delegate(
text = adapter.run(prompt, {"max_tokens": max_tokens})
# Meter the sub-call for orchestration-tree observability. Tagged kind="delegate" so the rollup
# keeps it OUT of the spend-avoided headline (the parent task already credits the whole job) and
# in a separate by-backend breakdown. record_task never raises; the extra guard is belt-and-
# suspenders — metering must never break a delegation.
# in a separate by-backend breakdown. The parent task id, propagated from the orchestrator via
# PARENT_TASK_ID_ENV, links this sub-call to its top-level task (absent → recorded as unlinked).
#
# LOAD-BEARING ASSUMPTION (verified live for claude, not asserted by any hermetic test): the
# linkage depends on the orchestrator CLI forwarding its environment to the MCP delegate child it
# spawns. That forwarding is the orchestrator's behavior, not TangleBrain's — if a CLI stops
# forwarding env, or a roster adds TANGLEBRAIN_TASK_ID to an orchestrator's invoke.scrub_env, the
# linkage silently degrades to "unlinked" (never an error — the delegation itself is unaffected).
#
# record_task never raises; the extra guard is belt-and-suspenders — metering must never break a
# delegation.
try:
record_task(path="delegate", entry=entry, prompt=prompt, response=text, kind="delegate")
record_task(
path="delegate",
entry=entry,
prompt=prompt,
response=text,
kind="delegate",
parent_task_id=os.environ.get(PARENT_TASK_ID_ENV),
)
except Exception:
pass
return text
Expand Down
49 changes: 46 additions & 3 deletions tanglebrain/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@

LOG_FILENAME = "usage.jsonl"

#: Env var carrying the top-level task id from the orchestrator down to a delegated sub-call. The
#: CLI mints a task id per routed task and the orchestrator-CLI adapter injects it into the
#: orchestrator subprocess env (only when the delegate tool is injected); the orchestrator forwards
#: its env to the MCP delegate child it spawns, where :func:`tanglebrain.delegate.run_delegate` reads
#: it back and stamps each delegate record's ``parent_task_id``. This is what links a delegated
#: sub-call to the specific top-level task that spawned it, across the process boundary.
PARENT_TASK_ID_ENV = "TANGLEBRAIN_TASK_ID"

# Serializes appends to the usage log so concurrent writers in one process (delegate_many fans
# sub-tasks out across threads) can't interleave bytes mid-line. Per-process only; cross-process
# appends rely on the OS's O_APPEND atomicity for short lines, as before.
Expand Down Expand Up @@ -293,6 +301,8 @@ def record_task(
prompt: str,
response: str,
kind: str = "task",
task_id: str | None = None,
parent_task_id: str | None = None,
log_path: str | os.PathLike[str] | None = None,
pricing: Pricing | None = None,
) -> None:
Expand All @@ -311,6 +321,11 @@ def record_task(
kind: ``"task"`` for a top-level routed task (the default; what the spend-avoided headline
counts) or ``"delegate"`` for a delegated sub-call. Delegate records are rolled up
**separately** so a sub-call's saving is never double-counted against its parent task.
task_id: For a top-level task, the id minted for this routed task (so its delegated sub-calls
can be linked back to it). Omitted from the record when ``None``.
parent_task_id: For a delegated sub-call, the id of the top-level task that spawned it (read
from :data:`PARENT_TASK_ID_ENV`). Omitted from the record when ``None`` — e.g. a delegate
invoked outside a propagated task, which rolls up as ``unlinked``.
log_path: Override the usage-log path (tests inject a temp path). Defaults to
:func:`default_log_path`.
pricing: Override the pricing. Defaults to :func:`load_pricing`.
Expand Down Expand Up @@ -338,6 +353,12 @@ def record_task(
"spend_avoided_usd": round(avoided, 6),
"pricing_ref": pricing.reference_model,
}
# Optional linkage fields — only written when present, so existing records/readers that
# never set them are unaffected (a missing field reads as "no linkage").
if task_id is not None:
record["task_id"] = str(task_id)
if parent_task_id is not None:
record["parent_task_id"] = str(parent_task_id)
target = Path(log_path) if log_path is not None else default_log_path()
target.parent.mkdir(parents=True, exist_ok=True)
with _LOG_LOCK:
Expand Down Expand Up @@ -403,9 +424,12 @@ def rollup(records: list[dict]) -> dict:
``out_tokens_est`` (summed estimates), and ``cloud_equiv_usd`` / ``spend_avoided_usd``
(summed dollars) — all over **top-level tasks only** — plus ``delegates``, a separate
sub-rollup of delegated sub-calls ``{count, by_backend: {model: {count, in_tokens_est,
out_tokens_est}}, in_tokens_est, out_tokens_est, cloud_equiv_usd}``. Delegate records are
kept out of the headline so a sub-call's saving is never double-counted against its parent
task; their cloud-equiv is informational. A record without a ``kind`` field counts as a task.
out_tokens_est}}, by_parent: {parent_task_id: {count, by_backend: {model: count}}},
in_tokens_est, out_tokens_est, cloud_equiv_usd}``. ``by_parent`` groups each delegate under
the top-level task that spawned it (via ``parent_task_id``); delegates with no
``parent_task_id`` are grouped under the sentinel ``"unlinked"``. Delegate records are kept
out of the headline so a sub-call's saving is never double-counted against its parent task;
their cloud-equiv is informational. A record without a ``kind`` field counts as a task.
"""
summary: dict = {
"tasks": 0,
Expand All @@ -418,6 +442,7 @@ def rollup(records: list[dict]) -> dict:
delegates: dict = {
"count": 0,
"by_backend": {},
"by_parent": {},
"in_tokens_est": 0,
"out_tokens_est": 0,
"cloud_equiv_usd": 0.0,
Expand All @@ -434,6 +459,13 @@ def rollup(records: list[dict]) -> dict:
backend["count"] += 1
backend["in_tokens_est"] += in_tok
backend["out_tokens_est"] += out_tok
# Per-parent tree: link this sub-call to the top-level task that spawned it. A delegate
# with no parent_task_id (run outside a propagated task) groups under "unlinked".
parent_id = r.get("parent_task_id")
parent_key = str(parent_id) if parent_id not in (None, "") else "unlinked"
parent = delegates["by_parent"].setdefault(parent_key, {"count": 0, "by_backend": {}})
parent["count"] += 1
parent["by_backend"][model] = parent["by_backend"].get(model, 0) + 1
delegates["in_tokens_est"] += in_tok
delegates["out_tokens_est"] += out_tok
delegates["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd"))
Expand Down Expand Up @@ -494,6 +526,17 @@ def format_rollup(summary: dict, pricing: Pricing) -> str:
f"{model} {info.get('count', 0)}" for model, info in sorted(by_backend.items())
)
lines.append(f" By backend: {backends}")
by_parent = delegates.get("by_parent") or {}
if by_parent:
linked = [k for k in by_parent if k != "unlinked"]
unlinked = (by_parent.get("unlinked") or {}).get("count", 0)
if linked:
tree = f"{len(linked)} parent task(s)"
if unlinked:
tree += f", {unlinked} unlinked"
else:
tree = f"{unlinked} unlinked" # all sub-calls ran outside a propagated task
lines.append(f" Linked to: {tree}")
lines.append(
f" Est. tokens: in {delegates.get('in_tokens_est', 0):,} / "
f"out {delegates.get('out_tokens_est', 0):,}"
Expand Down
26 changes: 23 additions & 3 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,20 @@ def test_max_tokens_threaded_through_local(self):
fake_adapter.run.return_value = "x"
with patch("tanglebrain.cli.build_adapter", return_value=fake_adapter):
run_once("hello", local=True, max_tokens=256)
self.assertEqual(fake_adapter.run.call_args.args[1], {"max_tokens": 256})
opts = fake_adapter.run.call_args.args[1]
# opts now always carries a minted task_id (for delegate-tree linkage), plus max_tokens here.
self.assertEqual(opts.get("max_tokens"), 256)
self.assertIsInstance(opts.get("task_id"), str)

def test_no_max_tokens_passes_none_opts_local(self):
def test_no_max_tokens_still_passes_task_id_opts_local(self):
fake_adapter = MagicMock()
fake_adapter.run.return_value = "x"
with patch("tanglebrain.cli.build_adapter", return_value=fake_adapter):
run_once("hello", local=True)
self.assertIsNone(fake_adapter.run.call_args.args[1])
opts = fake_adapter.run.call_args.args[1]
# With no max_tokens, opts is no longer None — it still carries the minted task_id.
self.assertNotIn("max_tokens", opts)
self.assertIsInstance(opts.get("task_id"), str)

def test_model_routes_to_named_entry(self):
# --model selects a specific roster entry (here, a sub) instead of local-first.
Expand Down Expand Up @@ -142,6 +148,20 @@ def test_router_threads_task_hint(self):
RouterCls.assert_called_once()
self.assertEqual(fake_router.route.call_args.kwargs.get("task"), "code")

def test_router_threads_task_id_in_opts(self):
# The minted task_id must reach Router.route's opts — this is the seam that propagates it to
# the orchestrator (and thence to delegated sub-calls). The router path is the feature's point.
fake_router = MagicMock()
fake_router.route.return_value = "routed reply"
with patch("tanglebrain.cli.load_roster"), patch(
"tanglebrain.cli.Router", return_value=fake_router
):
run_once("hello")
opts = fake_router.route.call_args.kwargs.get("opts")
self.assertIsInstance(opts, dict)
self.assertIsInstance(opts.get("task_id"), str)
self.assertTrue(opts["task_id"])

def test_model_takes_precedence_over_router(self):
# model is an explicit override and wins over the default router path.
fake_adapter = MagicMock()
Expand Down
34 changes: 34 additions & 0 deletions tests/test_cli_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,40 @@ def test_run_hands_scrubbed_env_to_subprocess(self):
self.assertIn("PATH", passed_env)


class TaskIdPropagationTest(unittest.TestCase):
"""The orchestrator-CLI adapter injects the parent task id env for delegate-tree linkage."""

def _env_for(self, inject_delegate, opts):
from tanglebrain.measurement import PARENT_TASK_ID_ENV # noqa: F401 (used by caller asserts)

adapter = CliAdapter(
cmd=["claude", "-p"], parse="claude-json", inject_delegate=inject_delegate
)
with patch.dict(os.environ, {"PATH": "/b"}, clear=True):
with patch_run(return_value=completed(0, CLAUDE_JSON_OK)) as run:
adapter.run("hi", opts)
return run.call_args.kwargs["env"]

def test_injects_task_id_for_orchestrator(self):
from tanglebrain.measurement import PARENT_TASK_ID_ENV

env = self._env_for(inject_delegate=True, opts={"task_id": "task-abc"})
self.assertEqual(env[PARENT_TASK_ID_ENV], "task-abc")

def test_no_injection_for_leaf_cli(self):
# inject_delegate=False ⇒ no delegate child is spawned, so nothing to link — don't inject.
from tanglebrain.measurement import PARENT_TASK_ID_ENV

env = self._env_for(inject_delegate=False, opts={"task_id": "task-abc"})
self.assertNotIn(PARENT_TASK_ID_ENV, env)

def test_no_injection_when_task_id_absent(self):
from tanglebrain.measurement import PARENT_TASK_ID_ENV

env = self._env_for(inject_delegate=True, opts={})
self.assertNotIn(PARENT_TASK_ID_ENV, env)


class BuildArgvTest(unittest.TestCase):
"""Prompt injection: {prompt} substitution, else append; never via the shell."""

Expand Down
29 changes: 29 additions & 0 deletions tests/test_delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,35 @@ def test_records_delegate_kind(self):
self.assertEqual(kw["prompt"], "do it")
self.assertEqual(kw["response"], "result-text")

def test_reads_parent_task_id_from_env(self):
from tanglebrain.measurement import PARENT_TASK_ID_ENV

entry = _entry("local-x", tier="local", can_delegate=True)
adapter = MagicMock()
adapter.run.return_value = "result-text"
with patch.dict(os.environ, {PARENT_TASK_ID_ENV: "task-xyz"}), patch(
"tanglebrain.delegate.load_roster", return_value=MagicMock()
), patch("tanglebrain.delegate.select_local", return_value=entry), patch(
"tanglebrain.delegate.build_adapter", return_value=adapter
), patch("tanglebrain.delegate.record_task") as rec:
run_delegate("do it")
self.assertEqual(rec.call_args.kwargs["parent_task_id"], "task-xyz")

def test_parent_task_id_none_when_env_unset(self):
from tanglebrain.measurement import PARENT_TASK_ID_ENV

entry = _entry("local-x", tier="local", can_delegate=True)
adapter = MagicMock()
adapter.run.return_value = "x"
env_without = {k: v for k, v in os.environ.items() if k != PARENT_TASK_ID_ENV}
with patch.dict(os.environ, env_without, clear=True), patch(
"tanglebrain.delegate.load_roster", return_value=MagicMock()
), patch("tanglebrain.delegate.select_local", return_value=entry), patch(
"tanglebrain.delegate.build_adapter", return_value=adapter
), patch("tanglebrain.delegate.record_task") as rec:
run_delegate("do it")
self.assertIsNone(rec.call_args.kwargs["parent_task_id"])

def test_metering_failure_never_breaks_delegation(self):
entry = _entry("local-x", tier="local")
adapter = MagicMock()
Expand Down
Loading
Loading