Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ out concurrently. The **reduce step is deliberately not TangleBrain's** — the
the `delegate_many` results itself (it holds the original task context that makes for good synthesis),
and offloads the stitch with an ordinary `delegate(task=…)` call when the reduction is mechanical and
large. No dedicated reducer tool: the existing primitives cover it, and frontier-side synthesis is the
better default until usage proves otherwise. Still ahead: orchestration-tree observability (which also
adds the deferred non-local delegate metering) — whose data would tell us if a reducer ever earns code.
better default until usage proves otherwise. Delegated sub-calls are now **metered** (see Measurement
below) — the deferred observability landed as a by-backend breakdown; a per-parent-task tree (linking
each delegation to its top-level task across processes) is the remaining stretch.

### Measurement — per-task records (`measurement.py`)

Expand All @@ -169,6 +170,16 @@ authenticated CLIs expose no usable counts), so one consistent approximate metho
every tier. All measurement I/O is best-effort: logging never breaks routing, and a corrupt record
never breaks the rollup.

Records carry a `kind`: `"task"` for a top-level routed request (what the spend-avoided headline
counts) or `"delegate"` for a sub-call offloaded through `run_delegate` (every delegation, including
each `delegate_many` item, is metered at that single seam). The rollup keeps delegate records **out of
the headline** — the parent task already credits the whole job, so counting the sub-calls again would
double-count the saving — and aggregates them **separately** into a by-backend breakdown (count, est
tokens, informational cloud-equiv) surfaced in `--stats` and the GUI. Concurrent appends (delegate_many
fans out across threads) are serialized by a process-level lock. Linking each delegation to its
*specific* parent task across processes (a true tree) is deferred — it needs a task-id propagated
through the orchestrator CLIs to the MCP child, which can't be verified hermetically.

### Knob GUI — localhost panel (`gui/`)

`tanglebrain-gui` serves a thin, **localhost-only** web panel (stdlib `http.server` + a single
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **Delegate observability + metering (scatter-gather roadmap #39, slice 6).** Delegated sub-calls
are now metered: every `run_delegate` execution (including each `delegate_many` item — metered at one
seam) is logged as a `kind: delegate` usage record with its served backend + estimated tokens.
`tanglebrain --stats` and the knob panel gain a **"Delegated sub-tasks" breakdown by backend** (count,
est tokens, informational cloud-equiv). Delegate records are kept **out of the "spend avoided"
headline** so a sub-call's saving is never double-counted against its parent task, and concurrent
fan-out appends are serialized by a process-level lock. Records carry a new `kind` field
(`task`/`delegate`; older records read as `task`). The per-parent-task tree (cross-process linkage)
is deferred. Closes the deferred metering noted since the measurement layer landed.

### Internal

- **Documented the synthesis/reduce pattern (scatter-gather roadmap #39, slice 4).** README +
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ adapters as the CLI above, so endpoints and keys live in one place. It exposes f

Make a backend a delegate target by flagging its roster entry `can_delegate: true` (mirrors
`can_orchestrate`). The shipped example flags the local tier, so the menu is non-empty out of the
box. **Note:** non-local delegate spend is **not metered** in this version — orchestration-tree
observability is on the [scatter-gather roadmap](https://github.com/Jason-Vaughan/TangleBrain/issues/39).
Any non-local target is opt-in and your responsibility under that provider's terms — see
[DISCLAIMER.md](DISCLAIMER.md).
box. **Delegated sub-calls are now metered**: each is logged as a `kind: delegate` usage record, and
`tanglebrain --stats` (and the knob panel) show a "Delegated sub-tasks" breakdown by backend (count,
est tokens, informational cloud-equiv). These are kept **out of** the "spend avoided" headline so a
sub-call's saving is never double-counted against its parent task. Any non-local target is opt-in and
your responsibility under that provider's terms — see [DISCLAIMER.md](DISCLAIMER.md).

**Synthesising fan-out results.** The full pattern is decompose → fan out (`delegate_many`) →
**reduce** → answer. TangleBrain ships the dispatch primitives but deliberately does *not* own the
Expand Down
4 changes: 2 additions & 2 deletions tanglebrain/config/roster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
# e.g. a cheaper sub or a better-fit model. Flag any entry `can_delegate: true` and it joins the
# menu the `delegate` tool advertises; the model picks a target by its `good_at` fit. A target is
# invoked as a leaf (it never gets its own delegate tool — no recursion). `api` targets still obey
# the billing gate; non-local delegate spend is not metered in this version. This example is an
# openai-compat backend; uncomment + point it at one you hold.
# the billing gate; delegated sub-calls are metered separately (see `tanglebrain --stats`). This
# example is an openai-compat backend; uncomment + point it at one you hold.
#
# - id: cheap-remote
# tier: sub
Expand Down
20 changes: 16 additions & 4 deletions tanglebrain/delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sys
from concurrent.futures import ThreadPoolExecutor

from tanglebrain.measurement import record_task
from tanglebrain.roster import ROSTER_ENV_VAR, Roster, RosterEntry, load_roster
from tanglebrain.selector import SelectionError, build_adapter, select_local
from tanglebrain.settings import Settings, load_settings
Expand Down Expand Up @@ -231,8 +232,9 @@ def run_delegate(
The selected target is built as a **leaf** (``inject_delegate=False``) — a delegate target never
receives its own delegate tool, so there is no recursive delegation. ``api`` targets (reachable
only via explicit ``target``) flow through the existing billing gate in
:func:`tanglebrain.selector.build_adapter`; ``cli`` targets keep their env-scrub. Non-local
delegate spend is **not metered** in this version (consistent with the existing delegate posture).
:func:`tanglebrain.selector.build_adapter`; ``cli`` targets keep their env-scrub. Each sub-call is
**metered** as a ``kind="delegate"`` usage record, kept out of the spend-avoided headline (see
:func:`tanglebrain.measurement.rollup`).

Args:
prompt: The self-contained sub-task to delegate. Give it everything it needs — the target
Expand Down Expand Up @@ -265,7 +267,16 @@ def run_delegate(
else:
entry = select_local(roster)
adapter = build_adapter(entry, inject_delegate=False)
return adapter.run(prompt, {"max_tokens": max_tokens})
text = adapter.run(prompt, {"max_tokens": max_tokens})
# Meter the sub-call for orchestration-tree observability. Tagged kind="delegate" so the rollup
# keeps it OUT of the spend-avoided headline (the parent task already credits the whole job) and
# in a separate by-backend breakdown. record_task never raises; the extra guard is belt-and-
# suspenders — metering must never break a delegation.
try:
record_task(path="delegate", entry=entry, prompt=prompt, response=text, kind="delegate")
except Exception:
pass
return text


#: Concurrency cap to use only when :func:`os.cpu_count` can't report a core count (rare).
Expand Down Expand Up @@ -361,7 +372,8 @@ def run_delegate_many(

Partial failure never sinks the batch: each result carries a ``status`` (``ok`` / ``no_fit`` /
``error``), and results are returned **ordered by input index** even though workers finish out of
order. Non-local delegate spend is **not metered** (consistent with the rest of the delegate).
order. Each dispatched sub-call is metered (via :func:`run_delegate`) as a ``kind="delegate"``
usage record, kept out of the spend-avoided headline.

Args:
tasks: A list of task descriptors, each a mapping ``{prompt, target?, task?, max_tokens?}``.
Expand Down
12 changes: 12 additions & 0 deletions tanglebrain/gui/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ <h2>Pricing reference</h2>
<div class="stat"><div class="label">Est. tokens (in / out)</div><div class="value mono" style="font-size:.95rem">${(s.in_tokens_est||0).toLocaleString()} / ${(s.out_tokens_est||0).toLocaleString()}</div></div>
</div>`;
if (d.is_placeholder) html += `<div class="caveat">⚠ pricing: PLACEHOLDER — figures illustrative until the anchor is ratified.</div>`;
const dg = s.delegates || {};
if (dg.count) {
const backends = Object.entries(dg.by_backend || {}).map(([k, v]) => `${esc(k)} ${v.count || 0}`).join(", ") || "—";
html += `<h3 style="margin:1rem 0 .4rem">Delegated sub-tasks <span class="muted">(offloaded by orchestrators)</span></h3>
<div class="stat-grid">
<div class="stat"><div class="label">Sub-tasks</div><div class="value">${dg.count}</div></div>
<div class="stat"><div class="label">By backend</div><div class="value mono" style="font-size:.95rem">${backends}</div></div>
<div class="stat"><div class="label">Est. tokens (in / out)</div><div class="value mono" style="font-size:.95rem">${(dg.in_tokens_est||0).toLocaleString()} / ${(dg.out_tokens_est||0).toLocaleString()}</div></div>
<div class="stat"><div class="label">Cloud-equiv</div><div class="value mono" style="font-size:.95rem">${money(dg.cloud_equiv_usd)}</div></div>
</div>
<div class="caveat">Informational — delegate spend is already credited within its parent task's “spend avoided”.</div>`;
}
$("statsCard").innerHTML = html;
} catch (e) { $("statsCard").innerHTML = `<span class="err">failed to load stats</span>`; }
}
Expand Down
80 changes: 72 additions & 8 deletions tanglebrain/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import os
import shutil
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -32,6 +33,11 @@

LOG_FILENAME = "usage.jsonl"

# Serializes appends to the usage log so concurrent writers in one process (delegate_many fans
# sub-tasks out across threads) can't interleave bytes mid-line. Per-process only; cross-process
# appends rely on the OS's O_APPEND atomicity for short lines, as before.
_LOG_LOCK = threading.Lock()

# Chars per token for the uniform estimation heuristic. ~4 chars/token is the standard rough
# approximation for English-ish text across modern BPE tokenizers; good enough for an *estimate*.
_CHARS_PER_TOKEN = 4
Expand Down Expand Up @@ -286,17 +292,25 @@ def record_task(
entry: object,
prompt: str,
response: str,
kind: str = "task",
log_path: str | os.PathLike[str] | None = None,
pricing: Pricing | None = None,
) -> None:
"""Append one usage record for a routed task. Never raises — a logging failure is dropped.
"""Append one usage record for a routed task or a delegated sub-call. Never raises.

A logging failure is dropped — measurement is a side-effect that must never affect the returned
answer.

Args:
path: Which execution path served the task — ``router`` | ``local`` | ``model``.
path: Which execution path served the work — ``router`` | ``local`` | ``model`` |
``delegate``.
entry: The served :class:`~tanglebrain.roster.RosterEntry` (read for ``tier``/``id``); may
be ``None`` (e.g. the router didn't surface one), in which case both are ``"unknown"``.
prompt: The task prompt (for input-token estimation).
prompt: The prompt (for input-token estimation).
response: The returned response text (for output-token estimation).
kind: ``"task"`` for a top-level routed task (the default; what the spend-avoided headline
counts) or ``"delegate"`` for a delegated sub-call. Delegate records are rolled up
**separately** so a sub-call's saving is never double-counted against its parent task.
log_path: Override the usage-log path (tests inject a temp path). Defaults to
:func:`default_log_path`.
pricing: Override the pricing. Defaults to :func:`load_pricing`.
Expand All @@ -314,6 +328,7 @@ def record_task(
avoided = 0.0 if tier == "api" else equiv
record = {
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"kind": str(kind),
"path": str(path),
"tier": str(tier),
"model": str(model),
Expand All @@ -325,8 +340,9 @@ def record_task(
}
target = Path(log_path) if log_path is not None else default_log_path()
target.parent.mkdir(parents=True, exist_ok=True)
with target.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")
with _LOG_LOCK:
with target.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")
except Exception:
# Measurement is a side-effect: a failure here must never affect the returned answer.
return
Expand Down Expand Up @@ -385,7 +401,11 @@ def rollup(records: list[dict]) -> dict:
Returns:
A dict with: ``tasks`` (int), ``by_tier`` (tier → count), ``in_tokens_est`` /
``out_tokens_est`` (summed estimates), and ``cloud_equiv_usd`` / ``spend_avoided_usd``
(summed dollars).
(summed dollars) — all over **top-level tasks only** — plus ``delegates``, a separate
sub-rollup of delegated sub-calls ``{count, by_backend: {model: {count, in_tokens_est,
out_tokens_est}}, in_tokens_est, out_tokens_est, cloud_equiv_usd}``. Delegate records are
kept out of the headline so a sub-call's saving is never double-counted against its parent
task; their cloud-equiv is informational. A record without a ``kind`` field counts as a task.
"""
summary: dict = {
"tasks": 0,
Expand All @@ -395,16 +415,40 @@ def rollup(records: list[dict]) -> dict:
"cloud_equiv_usd": 0.0,
"spend_avoided_usd": 0.0,
}
delegates: dict = {
"count": 0,
"by_backend": {},
"in_tokens_est": 0,
"out_tokens_est": 0,
"cloud_equiv_usd": 0.0,
}
for r in records:
in_tok = _as_int(r.get("in_tokens_est"))
out_tok = _as_int(r.get("out_tokens_est"))
if str(r.get("kind", "task")) == "delegate":
delegates["count"] += 1
model = str(r.get("model", "unknown"))
backend = delegates["by_backend"].setdefault(
model, {"count": 0, "in_tokens_est": 0, "out_tokens_est": 0}
)
backend["count"] += 1
backend["in_tokens_est"] += in_tok
backend["out_tokens_est"] += out_tok
delegates["in_tokens_est"] += in_tok
delegates["out_tokens_est"] += out_tok
delegates["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd"))
continue
summary["tasks"] += 1
tier = str(r.get("tier", "unknown"))
summary["by_tier"][tier] = summary["by_tier"].get(tier, 0) + 1
summary["in_tokens_est"] += _as_int(r.get("in_tokens_est"))
summary["out_tokens_est"] += _as_int(r.get("out_tokens_est"))
summary["in_tokens_est"] += in_tok
summary["out_tokens_est"] += out_tok
summary["cloud_equiv_usd"] += _as_float(r.get("cloud_equiv_usd"))
summary["spend_avoided_usd"] += _as_float(r.get("spend_avoided_usd"))
summary["cloud_equiv_usd"] = round(summary["cloud_equiv_usd"], 4)
summary["spend_avoided_usd"] = round(summary["spend_avoided_usd"], 4)
delegates["cloud_equiv_usd"] = round(delegates["cloud_equiv_usd"], 4)
summary["delegates"] = delegates
return summary


Expand Down Expand Up @@ -438,4 +482,24 @@ def format_rollup(summary: dict, pricing: Pricing) -> str:
" ⚠ pricing: PLACEHOLDER — figures are illustrative; set real rates in "
"config/pricing.yaml and flip placeholder to false."
)

delegates = summary.get("delegates") or {}
if delegates.get("count"):
lines.append("")
lines.append(" Delegated sub-tasks (offloaded by orchestrators)")
lines.append(f" Count: {delegates.get('count', 0)}")
by_backend = delegates.get("by_backend") or {}
if by_backend:
backends = ", ".join(
f"{model} {info.get('count', 0)}" for model, info in sorted(by_backend.items())
)
lines.append(f" By backend: {backends}")
lines.append(
f" Est. tokens: in {delegates.get('in_tokens_est', 0):,} / "
f"out {delegates.get('out_tokens_est', 0):,}"
)
lines.append(
f" Cloud-equiv: ${delegates.get('cloud_equiv_usd', 0.0):,.2f} "
"(informational — already credited within parent tasks)"
)
return "\n".join(lines)
Loading
Loading