diff --git a/.env.example b/.env.example
index 77ca0f3a3..8ae162861 100644
--- a/.env.example
+++ b/.env.example
@@ -100,6 +100,11 @@
# MAX_OBS_PER_SESSION=500 # Per-session observation cap before consolidation kicks in
# SUMMARIZE_CHUNK_SIZE=400 # When mem::summarize sees a session larger than this, it chunks observations and map-reduces (chunk-summarize → reduce-merge) to stay within the LLM's context window. Default 400 ≈ 50k tokens per chunk at ~110 tok/obs. Native sessions are capped by MAX_OBS_PER_SESSION; chunking primarily matters for bulk-imported jsonl sessions, which bypass that cap.
# SUMMARIZE_CHUNK_CONCURRENCY=6 # Parallel chunk LLM calls during chunked summarize. Default 6 fits ~100-chunk sessions under iii's 180s function-invocation timeout at typical ~8s/call. High-throughput providers (Novita, DeepInfra, DeepSeek) commonly allow 100+ concurrent — bump this for very large imported sessions.
+# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Per-session hard cap on estimated LLM tokens (compress + summarize). Once exceeded, further LLM calls for that session are blocked and compression falls back to zero-LLM synthetic output. A soft-warn event fires at 80%. Cost safety net for pathological sessions (runaway tool loops). Per-session override via the tokenCap field on POST /session/start.
+# AGENTMEMORY_SYSTEM_TOKEN_CAP=1000000 # Separate cap for cron-fired / cross-session LLM work (consolidation, reflect, graph extraction) that has no active session; tracked under the "__system__" sentinel.
+# AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS=7 # How long a session's token-budget row is kept after the session ends before the hourly reaper deletes it.
+# AGENTMEMORY_COST_IN_PER_1M=0.14 # USD per 1M input tokens, used to display a rough per-session costEstimate from the raw token counts.
+# AGENTMEMORY_COST_OUT_PER_1M=0.28 # USD per 1M output tokens for the costEstimate.
# -----------------------------------------------------------------------------
# 5. Behaviour flags
diff --git a/AGENTS.md b/AGENTS.md
index 873ef10fc..a0b0abbe1 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -117,7 +117,7 @@ Hook scripts in `src/hooks/` are standalone Node.js scripts (no iii-sdk import).
## Current Stats (v0.9.16)
- 53 MCP tools (8 visible by default, `AGENTMEMORY_TOOLS=all` for all)
-- 128 REST endpoints
+- 129 REST endpoints
- 6 MCP resources, 3 MCP prompts
- 12 hooks, 15 skills
- 50+ iii functions
diff --git a/README.md b/README.md
index c4ec2c1e0..33992d9cc 100644
--- a/README.md
+++ b/README.md
@@ -1294,6 +1294,18 @@ Quality vs cost tradeoff for memory work: compression is a summarization task wi
Sources: [OpenRouter pricing for Sonnet 4.6](https://openrouter.ai/anthropic/claude-sonnet-4.6/pricing), [DeepSeek V4 Pro](https://openrouter.ai/deepseek/deepseek-v4-pro), [DeepSeek pricing notes](https://api-docs.deepseek.com/quick_start/pricing/).
+### Per-session token budget
+
+Pathological sessions (long-running CC instances, runaway tool loops) can rack up unbounded background compress/summarize spend. Each session gets a running token budget with a **hard cap** (default **100k estimated tokens**) and an **80% soft warning**.
+
+| Knob | Default | Purpose |
+|------|---------|---------|
+| `AGENTMEMORY_SESSION_TOKEN_CAP` | `100000` | Hard cap per session. Once exceeded, further LLM calls for that session are blocked; compression falls back to zero-LLM synthetic output. |
+| `AGENTMEMORY_SYSTEM_TOKEN_CAP` | `1000000` | Separate cap for cron-fired / cross-session LLM work (consolidation, reflect) tracked under the `__system__` sentinel. |
+| `AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS` | `7` | How long budget rows persist after a session ends before the hourly reaper deletes them. |
+
+Per-session override: pass `tokenCap` in the body of `POST /agentmemory/session/start`. Budget state is KV-persisted and survives restarts. `agentmemory status` reports `N active, M near-cap, K exhausted`. OTEL histogram: `session.tokens_used`.
+
### Multi-agent memory (`AGENT_ID` + `AGENTMEMORY_AGENT_SCOPE`)
In multi-agent setups where several roles share one agentmemory server (architect / developer / reviewer / researcher / support-agent), `AGENT_ID` tags every write with the role that made it. `AGENTMEMORY_AGENT_SCOPE` controls whether recall filters by that tag.
@@ -1448,6 +1460,13 @@ Create `~/.agentmemory/.env`:
# LLM provider to compress the
# observation — expect significant
# token spend on active sessions.
+# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Per-session hard cap on estimated
+ # LLM tokens (compress + summarize).
+ # Soft-warn at 80%; synthetic fallback
+ # after exhaustion. Override per
+ # session via tokenCap on /session/start.
+# AGENTMEMORY_SYSTEM_TOKEN_CAP=1000000 # Cap for cron / cross-session LLM work.
+# AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS=7
# AGENTMEMORY_SLOTS=false # OFF by default. Editable pinned
# memory slots — persona,
# user_preferences, tool_guidelines,
@@ -1500,7 +1519,7 @@ Create `~/.agentmemory/.env`:
-128 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers.
+129 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers.
Key endpoints
diff --git a/plugin/skills/agentmemory-config/REFERENCE.md b/plugin/skills/agentmemory-config/REFERENCE.md
index 08c873a44..9679ad959 100644
--- a/plugin/skills/agentmemory-config/REFERENCE.md
+++ b/plugin/skills/agentmemory-config/REFERENCE.md
@@ -3,13 +3,15 @@
Generated by scanning `src/` for `AGENTMEMORY_*` usage. Do not edit the block below by hand; run `npm run skills:gen` after adding or removing a variable. Internal markers ending in two underscores are excluded.
-Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 34 recognized variables:
+Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 39 recognized variables:
- `AGENTMEMORY_AGENT_SCOPE`
- `AGENTMEMORY_ALLOW_AGENT_SDK`
- `AGENTMEMORY_AUTO_COMPRESS`
- `AGENTMEMORY_COMMIT_SHA`
- `AGENTMEMORY_COPILOT_MCP_BLOCK`
+- `AGENTMEMORY_COST_IN_PER_1M`
+- `AGENTMEMORY_COST_OUT_PER_1M`
- `AGENTMEMORY_CWD`
- `AGENTMEMORY_DEBUG`
- `AGENTMEMORY_DROP_STALE_INDEX`
@@ -30,9 +32,12 @@ Configuration is read from the environment and from `~/.agentmemory/.env` (no `e
- `AGENTMEMORY_REFLECT`
- `AGENTMEMORY_SDK_CHILD`
- `AGENTMEMORY_SECRET`
+- `AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS`
- `AGENTMEMORY_SESSION_ID`
+- `AGENTMEMORY_SESSION_TOKEN_CAP`
- `AGENTMEMORY_SLOTS`
- `AGENTMEMORY_SUPPRESS_COST_WARNING`
+- `AGENTMEMORY_SYSTEM_TOKEN_CAP`
- `AGENTMEMORY_TOOLS`
- `AGENTMEMORY_URL`
- `AGENTMEMORY_USE_DOCKER`
diff --git a/plugin/skills/agentmemory-rest-api/REFERENCE.md b/plugin/skills/agentmemory-rest-api/REFERENCE.md
index d36171def..f4bb5f954 100644
--- a/plugin/skills/agentmemory-rest-api/REFERENCE.md
+++ b/plugin/skills/agentmemory-rest-api/REFERENCE.md
@@ -5,7 +5,7 @@ Generated from `src/triggers/api.ts`. Do not edit the block below by hand; run `
The REST API is the primary surface. All paths are under `http://localhost:3111` (override with `--port`). When `AGENTMEMORY_SECRET` is set, send `Authorization: Bearer $AGENTMEMORY_SECRET`; localhost is otherwise open.
-117 registered endpoints:
+118 registered endpoints:
| Method | Path |
| --- | --- |
@@ -96,6 +96,7 @@ The REST API is the primary surface. All paths are under `http://localhost:3111`
| POST | `/agentmemory/sentinels/cancel` |
| POST | `/agentmemory/sentinels/check` |
| POST | `/agentmemory/sentinels/trigger` |
+| GET | `/agentmemory/session/budget` |
| GET | `/agentmemory/session/by-commit` |
| POST | `/agentmemory/session/commit` |
| POST | `/agentmemory/session/end` |
diff --git a/src/cli.ts b/src/cli.ts
index 48f6f09b6..a9663657c 100644
--- a/src/cli.ts
+++ b/src/cli.ts
@@ -1345,13 +1345,14 @@ async function runStatus() {
}
try {
- const [healthRes, sessionsRes, graphRes, memoriesRes, flagsRes, followupRes] = await Promise.all([
+ const [healthRes, sessionsRes, graphRes, memoriesRes, flagsRes, followupRes, budgetsRes] = await Promise.all([
apiFetch(base, "health"),
apiFetch(base, "sessions"),
apiFetch(base, "graph/stats"),
apiFetch(base, "memories?count=true"),
apiFetch(base, "config/flags"),
apiFetch(base, "diagnostics/followup"),
+ apiFetch(base, "session/budget").catch(() => null),
]);
if (typeof healthRes?.viewerPort === "number") {
@@ -1422,6 +1423,33 @@ async function runStatus() {
);
}
+ const budgetList = Array.isArray(budgetsRes?.budgets) ? budgetsRes.budgets : [];
+ const sessionBudgets = budgetList.filter(
+ (b: any) => b && b.sessionId && b.sessionId !== "__system__",
+ );
+ if (sessionBudgets.length > 0) {
+ let exhausted = 0;
+ let nearCap = 0;
+ let active = 0;
+ for (const b of sessionBudgets) {
+ const used = Number(b.tokensUsed) || 0;
+ const cap = Number(b.tokenCap) || 0;
+ const isExhausted = Boolean(b.exhaustedAt) || (cap > 0 && used >= cap);
+ if (isExhausted) {
+ exhausted++;
+ } else if (b.warnEmittedAt || (cap > 0 && used >= cap * 0.8)) {
+ nearCap++;
+ active++;
+ } else {
+ active++;
+ }
+ }
+ lines.push("");
+ lines.push(
+ `Token budgets: ${active} active, ${nearCap} near-cap, ${exhausted} exhausted`,
+ );
+ }
+
p.note(lines.join("\n"), "agentmemory");
} catch (err) {
p.log.error(err instanceof Error ? err.message : String(err));
diff --git a/src/config.ts b/src/config.ts
index f68da2e31..db6b6ebc6 100644
--- a/src/config.ts
+++ b/src/config.ts
@@ -403,6 +403,55 @@ export function getConsolidationDecayDays(): number {
return safeParseInt(getMergedEnv()["CONSOLIDATION_DECAY_DAYS"], 30);
}
+// Per-session LLM token budget. Hard cap default is 100k
+// estimated tokens per session. AGENTMEMORY_SESSION_TOKEN_CAP overrides the
+// global default; mem::session::start can override per-session.
+const SESSION_TOKEN_CAP_DEFAULT = 100_000;
+const SYSTEM_TOKEN_CAP_DEFAULT = 1_000_000;
+const SESSION_BUDGET_RETENTION_DAYS_DEFAULT = 7;
+// Soft warning fires at this fraction of the cap.
+export const SESSION_BUDGET_WARN_RATIO = 0.8;
+
+export function getSessionTokenCap(): number {
+ const n = safeParseInt(
+getMergedEnv()["AGENTMEMORY_SESSION_TOKEN_CAP"],
+ SESSION_TOKEN_CAP_DEFAULT
+ );
+ return n > 0 ? n : SESSION_TOKEN_CAP_DEFAULT;
+}
+
+// Cron-fired / cross-session LLM calls (consolidation, reflect, graph
+// extraction) have no active session; they bill the "__system__" sentinel
+// against this separate, larger cap.
+export function getSystemTokenCap(): number {
+ const n = safeParseInt(
+ getMergedEnv()["AGENTMEMORY_SYSTEM_TOKEN_CAP"],
+ SYSTEM_TOKEN_CAP_DEFAULT,
+ );
+ return n > 0 ? n : SYSTEM_TOKEN_CAP_DEFAULT;
+}
+
+export function getSessionBudgetRetentionDays(): number {
+ const n = safeParseInt(
+ getMergedEnv()["AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS"],
+ SESSION_BUDGET_RETENTION_DAYS_DEFAULT,
+ );
+ return n > 0 ? n : SESSION_BUDGET_RETENTION_DAYS_DEFAULT;
+}
+
+// USD-per-1M-token rates used to normalize raw token counts into a rough
+// costEstimate at record time. Override with
+// AGENTMEMORY_COST_IN_PER_1M / AGENTMEMORY_COST_OUT_PER_1M.
+export function getCostRatesPer1M(): { input: number; output: number } {
+ const env = getMergedEnv();
+ const input = parseFloat(env["AGENTMEMORY_COST_IN_PER_1M"] ?? "");
+ const output = parseFloat(env["AGENTMEMORY_COST_OUT_PER_1M"] ?? "");
+ return {
+ input: Number.isFinite(input) && input >= 0 ? input : 0.14,
+ output: Number.isFinite(output) && output >= 0 ? output : 0.28,
+ };
+}
+
export function isStandaloneMcp(): boolean {
return getMergedEnv()["STANDALONE_MCP"] === "true";
}
diff --git a/src/functions/compress.ts b/src/functions/compress.ts
index 0569555e0..216647486 100644
--- a/src/functions/compress.ts
+++ b/src/functions/compress.ts
@@ -16,6 +16,8 @@ import {
import { VISION_DESCRIPTION_PROMPT } from "../prompts/vision.js";
import { getXmlTag, getXmlChildren } from "../prompts/xml.js";
import { getSearchIndex, vectorIndexAddGuarded } from "./search.js";
+import { buildSyntheticCompression } from "./compress-synthetic.js";
+import { withSession } from "../state/session-context.js";
import { CompressOutputSchema } from "../eval/schemas.js";
import { validateOutput } from "../eval/validator.js";
import { scoreCompression } from "../eval/quality.js";
@@ -75,7 +77,7 @@ export function registerCompressFunction(
observationId: string;
sessionId: string;
raw: RawObservation;
- }) => {
+ }) => withSession(data.sessionId, async () => {
const startMs = Date.now();
let imageDescription: string | undefined;
@@ -253,6 +255,43 @@ export function registerCompressFunction(
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
const latencyMs = Date.now() - startMs;
+
+ // the session's token cap is exhausted. Don't drop
+ // the observation — fall back to zero-LLM synthetic compression so
+ // recall/search still work, exactly as if AGENTMEMORY_AUTO_COMPRESS
+ // were off, until the budget is reset or the session ends.
+ if (msg === "session_budget_exhausted") {
+ const synthetic = buildSyntheticCompression(data.raw);
+ synthetic.id = data.observationId;
+ synthetic.sessionId = data.sessionId;
+ await kv.set(
+ KV.observations(data.sessionId),
+ data.observationId,
+ synthetic,
+ );
+ try {
+ getSearchIndex().add(synthetic);
+ } catch {}
+ await vectorIndexAddGuarded(
+ synthetic.id,
+ synthetic.sessionId,
+ synthetic.title + " " + (synthetic.narrative || ""),
+ { kind: "synthetic", logId: synthetic.id },
+ ).catch(() => {});
+ if (metricsStore) {
+ await metricsStore.record("mem::compress", latencyMs, false);
+ }
+ logger.warn("Compression budget exhausted; stored synthetic", {
+ obsId: data.observationId,
+ sessionId: data.sessionId,
+ });
+ return {
+ success: true,
+ compressed: synthetic,
+ budgetExhausted: true,
+ };
+ }
+
if (metricsStore) {
await metricsStore.record("mem::compress", latencyMs, false);
}
@@ -262,6 +301,6 @@ export function registerCompressFunction(
});
return { success: false, error: "compression_failed" };
}
- },
+ }),
);
}
diff --git a/src/functions/observe.ts b/src/functions/observe.ts
index 2bf13d547..535a31379 100644
--- a/src/functions/observe.ts
+++ b/src/functions/observe.ts
@@ -272,6 +272,14 @@ export function registerObserveFunction(
? { firstPrompt: trimmedPrompt }
: {}),
});
+ // OpenCode and other plugins that skip
+ // POST /session/start create the session here; seed its token
+ // budget too so background compress/summarize spend is capped.
+ sdk.trigger({
+ function_id: "mem::session::budget::init",
+ payload: { sessionId: payload.sessionId },
+ action: TriggerAction.Void(),
+ });
}
// Per-observation LLM compression is opt-in as of 0.8.8 (#138).
diff --git a/src/functions/session-budget.ts b/src/functions/session-budget.ts
new file mode 100644
index 000000000..78bd2f33a
--- /dev/null
+++ b/src/functions/session-budget.ts
@@ -0,0 +1,331 @@
+import { TriggerAction, type ISdk } from "iii-sdk";
+import type { SessionBudget } from "../types.js";
+import { KV } from "../state/schema.js";
+import type { StateKV } from "../state/kv.js";
+import { withKeyedLock } from "../state/keyed-mutex.js";
+import { SYSTEM_SESSION } from "../state/session-context.js";
+import {
+ getSessionTokenCap,
+ getSystemTokenCap,
+ getSessionBudgetRetentionDays,
+ getCostRatesPer1M,
+ SESSION_BUDGET_WARN_RATIO,
+} from "../config.js";
+import { safeAudit } from "./audit.js";
+import { getHistograms } from "../telemetry/setup.js";
+import { logger } from "../logger.js";
+
+function defaultCapFor(sessionId: string): number {
+ return sessionId === SYSTEM_SESSION ? getSystemTokenCap() : getSessionTokenCap();
+}
+
+function newBudget(
+ sessionId: string,
+ tokenCap: number,
+ now: string,
+): SessionBudget {
+ return {
+ sessionId,
+ tokenCap,
+ tokensUsed: 0,
+ inputTokens: 0,
+ outputTokens: 0,
+ costEstimate: 0,
+ callCount: 0,
+ createdAt: now,
+ updatedAt: now,
+ };
+}
+
+function emitBudgetEvent(
+ sdk: ISdk,
+ functionId: string,
+ budget: SessionBudget,
+): Promise {
+ return Promise.resolve(
+ sdk.trigger({
+ function_id: functionId,
+ payload: { budget },
+ action: TriggerAction.Void(),
+ }),
+ ).catch(() => {});
+}
+
+// Returns the existing budget untouched if one already exists
+// so a re-fired session/start never resets a live counter. Fresh per
+// sessionId — a forked session gets its own row and its own cap.
+export async function initBudget(
+ kv: StateKV,
+ params: { sessionId: string; tokenCap?: number },
+): Promise {
+ const sessionId = params.sessionId;
+ return withKeyedLock(`budget:${sessionId}`, async () => {
+ const existing = await kv
+ .get(KV.sessionBudget, sessionId)
+ .catch(() => null);
+ if (existing) return existing;
+ const now = new Date().toISOString();
+ const cap = typeof params.tokenCap === "number" && params.tokenCap > 0
+ ? Math.floor(params.tokenCap)
+ : defaultCapFor(sessionId);
+ const budget = newBudget(sessionId, cap, now);
+ await kv.set(KV.sessionBudget, sessionId, budget);
+ return budget;
+ });
+}
+
+export async function getBudget(
+ kv: StateKV,
+ sessionId: string,
+): Promise {
+ return kv.get(KV.sessionBudget, sessionId).catch(() => null);
+}
+
+// Read-only fast path used by the provider wrapper before each LLM call.
+export async function isBudgetExhausted(
+ kv: StateKV,
+ sessionId: string,
+): Promise {
+ const b = await getBudget(kv, sessionId);
+ if (!b) return false;
+ if (b.exhaustedAt) return true;
+ return b.tokensUsed >= b.tokenCap;
+}
+
+
+export async function recordBudget(
+ kv: StateKV,
+ sdk: ISdk,
+ params: {
+ sessionId: string;
+ inputTokens: number;
+ outputTokens: number;
+ model?: string;
+ },
+): Promise {
+ const sessionId =
+ typeof params.sessionId === "string" && params.sessionId.trim().length > 0
+ ? params.sessionId.trim()
+ : SYSTEM_SESSION;
+ const inTok = Math.max(0, Math.floor(params.inputTokens || 0));
+ const outTok = Math.max(0, Math.floor(params.outputTokens || 0));
+
+ return withKeyedLock(`budget:${sessionId}`, async () => {
+ const now = new Date().toISOString();
+ const existing = await kv
+ .get(KV.sessionBudget, sessionId)
+ .catch(() => null);
+ const budget =
+ existing ?? newBudget(sessionId, defaultCapFor(sessionId), now);
+ const rates = getCostRatesPer1M();
+
+ const prevUsed = budget.tokensUsed;
+ budget.inputTokens += inTok;
+ budget.outputTokens += outTok;
+ budget.tokensUsed += inTok + outTok;
+ budget.callCount += 1;
+ budget.costEstimate +=
+ (inTok / 1_000_000) * rates.input +
+ (outTok / 1_000_000) * rates.output;
+ budget.updatedAt = now;
+
+ const warnAt = Math.floor(budget.tokenCap * SESSION_BUDGET_WARN_RATIO);
+ const crossedWarn =
+ !budget.warnEmittedAt &&
+ prevUsed < warnAt &&
+ budget.tokensUsed >= warnAt &&
+ budget.tokensUsed < budget.tokenCap;
+ const crossedExhausted =
+ !budget.exhaustedAt && budget.tokensUsed >= budget.tokenCap;
+
+ if (crossedWarn) budget.warnEmittedAt = now;
+ if (crossedExhausted) budget.exhaustedAt = now;
+
+ await kv.set(KV.sessionBudget, sessionId, budget);
+
+ try {
+ getHistograms().sessionTokensUsed.record(budget.tokensUsed);
+ } catch {}
+
+ if (crossedWarn) {
+ void emitBudgetEvent(sdk, "event::mem::budget::soft-warned", budget);
+ await safeAudit(
+ kv,
+ "budget_warn",
+ "mem::session::budget::record",
+ [sessionId],
+ { tokensUsed: budget.tokensUsed, tokenCap: budget.tokenCap },
+ );
+ }
+ if (crossedExhausted) {
+ void emitBudgetEvent(sdk, "event::mem::budget::exhausted", budget);
+ await safeAudit(
+ kv,
+ "budget_exhausted",
+ "mem::session::budget::record",
+ [sessionId],
+ { tokensUsed: budget.tokensUsed, tokenCap: budget.tokenCap },
+ );
+ }
+ return budget;
+ });
+}
+
+export async function reapBudgets(
+ kv: StateKV,
+): Promise<{ swept: number; kept: number }> {
+ const retentionMs = getSessionBudgetRetentionDays() * 24 * 60 * 60 * 1000;
+ const cutoff = Date.now() - retentionMs;
+ const budgets = await kv
+ .list(KV.sessionBudget)
+ .catch(() => [] as SessionBudget[]);
+
+ let swept = 0;
+ let kept = 0;
+ for (const b of budgets) {
+ if (!b || !b.sessionId || b.sessionId === SYSTEM_SESSION) {
+ kept++;
+ continue;
+ }
+ const session = await kv
+ .get<{ endedAt?: string }>(KV.sessions, b.sessionId)
+ .catch(() => null);
+
+ let reapable = false;
+ if (session?.endedAt) {
+ reapable = new Date(session.endedAt).getTime() < cutoff;
+ } else if (!session) {
+ reapable = new Date(b.updatedAt).getTime() < cutoff;
+ }
+
+ if (reapable) {
+ try {
+ await kv.delete(KV.sessionBudget, b.sessionId);
+ swept++;
+ } catch (err) {
+ kept++;
+ logger.warn("session budget reap delete failed", {
+ sessionId: b.sessionId,
+ error: err instanceof Error ? err.message : String(err),
+ });
+ }
+ } else {
+ kept++;
+ }
+ }
+ return { swept, kept };
+}
+
+
+export interface SessionBudgetMeter {
+ isExhausted(sessionId: string): Promise;
+ record(
+ sessionId: string,
+ inputTokens: number,
+ outputTokens: number,
+ model?: string,
+ ): Promise;
+}
+
+const NOOP_METER: SessionBudgetMeter = {
+ isExhausted: async () => false,
+ record: async () => {},
+};
+
+let activeMeter: SessionBudgetMeter = NOOP_METER;
+
+export function initSessionBudgetMeter(
+ kv: StateKV,
+ sdk: ISdk,
+): SessionBudgetMeter {
+ activeMeter = {
+ isExhausted: (sessionId) => isBudgetExhausted(kv, sessionId),
+ record: async (sessionId, inputTokens, outputTokens, model) => {
+ try {
+ await recordBudget(kv, sdk, {
+ sessionId,
+ inputTokens,
+ outputTokens,
+ model,
+ });
+ } catch (err) {
+ logger.warn("session budget record failed", {
+ sessionId,
+ error: err instanceof Error ? err.message : String(err),
+ });
+ }
+ },
+ };
+ return activeMeter;
+}
+
+export function getSessionBudgetMeter(): SessionBudgetMeter {
+ return activeMeter;
+}
+
+export function resetSessionBudgetMeter(): void {
+ activeMeter = NOOP_METER;
+}
+
+export function registerSessionBudgetFunctions(sdk: ISdk, kv: StateKV): void {
+ sdk.registerFunction(
+ "mem::session::budget::init",
+ async (data: { sessionId?: string; tokenCap?: number } | undefined) => {
+ if (!data?.sessionId || typeof data.sessionId !== "string") {
+ return { success: false, error: "sessionId is required" };
+ }
+ const budget = await initBudget(kv, {
+ sessionId: data.sessionId.trim(),
+ tokenCap: data.tokenCap,
+ });
+ return { success: true, budget };
+ },
+ );
+
+ sdk.registerFunction(
+ "mem::session::budget::record",
+ async (
+ data:
+ | {
+ sessionId?: string;
+ inputTokens?: number;
+ outputTokens?: number;
+ model?: string;
+ }
+ | undefined,
+ ) => {
+ if (!data?.sessionId || typeof data.sessionId !== "string") {
+ return { success: false, error: "sessionId is required" };
+ }
+ const budget = await recordBudget(kv, sdk, {
+ sessionId: data.sessionId.trim(),
+ inputTokens: data.inputTokens ?? 0,
+ outputTokens: data.outputTokens ?? 0,
+ model: data.model,
+ });
+ return { success: true, budget };
+ },
+ );
+
+ sdk.registerFunction(
+ "mem::session::budget::get",
+ async (data: { sessionId?: string } | undefined) => {
+ if (data?.sessionId && typeof data.sessionId === "string") {
+ const budget = await getBudget(kv, data.sessionId.trim());
+ return { success: true, budget: budget ?? null };
+ }
+ const budgets = await kv
+ .list(KV.sessionBudget)
+ .catch(() => [] as SessionBudget[]);
+ return { success: true, budgets };
+ },
+ );
+
+ sdk.registerFunction("mem::session::budget::reap", async () => {
+ const result = await reapBudgets(kv);
+ if (result.swept > 0) {
+ logger.info("Session budget reap complete", result);
+ }
+ return { success: true, ...result };
+ });
+}
diff --git a/src/functions/summarize.ts b/src/functions/summarize.ts
index 4c501ca8c..68408975f 100644
--- a/src/functions/summarize.ts
+++ b/src/functions/summarize.ts
@@ -19,6 +19,8 @@ import { validateOutput } from "../eval/validator.js";
import { scoreSummary } from "../eval/quality.js";
import type { MetricsStore } from "../eval/metrics-store.js";
import { safeAudit } from "./audit.js";
+import { withSession } from "../state/session-context.js";
+import { getSessionBudgetMeter } from "./session-budget.js";
import { logger } from "../logger.js";
// Per-chunk observation budget when a session is too large to fit in one
@@ -105,6 +107,7 @@ async function produceSummaryXml(
mode: "single" | "chunked";
chunks: number;
skipped?: number;
+ truncated?: boolean;
}> {
const chunkSize = getChunkSize();
if (compressed.length <= chunkSize) {
@@ -132,7 +135,23 @@ async function produceSummaryXml(
// so the reduce step sees partials in chronological order even when some
// were skipped.
const partialByIdx: Array = new Array(chunks.length).fill(null);
+ let truncated = false;
+ let processedChunks = 0;
+ const meter = getSessionBudgetMeter();
for (let batchStart = 0; batchStart < chunks.length; batchStart += concurrency) {
+ // BEFORE dispatching the next batch when the
+ // session's token cap is exhausted. The in-flight batch already
+ // completed; we reduce over whatever partials we have and mark the
+ // summary truncated rather than burning past the cap.
+ if (batchStart > 0 && (await meter.isExhausted(sessionId))) {
+ truncated = true;
+ logger.warn("Summarize aborted mid-session: token budget exhausted", {
+ sessionId,
+ processedChunks,
+ totalChunks: chunks.length,
+ });
+ break;
+ }
const batch = chunks.slice(batchStart, batchStart + concurrency);
await Promise.all(
batch.map(async (chunk, j) => {
@@ -147,14 +166,20 @@ async function produceSummaryXml(
);
}),
);
+ processedChunks += batch.length;
}
- const skipped = partialByIdx.filter((p) => p === null).length;
+ // When truncated, only the chunks we actually attempted count toward the
+ // skip ratio — unprocessed tail chunks aren't failures.
+ const consideredChunks = truncated ? processedChunks : chunks.length;
+ const skipped = partialByIdx
+ .slice(0, consideredChunks)
+ .filter((p) => p === null).length;
const partials = partialByIdx.filter((p): p is SessionSummary => p !== null);
- if (skipped > Math.floor(chunks.length * MAX_SKIP_RATIO)) {
+ if (skipped > Math.floor(consideredChunks * MAX_SKIP_RATIO)) {
throw new Error(
- `too_many_chunks_skipped: ${skipped}/${chunks.length} chunks failed to parse after retry`,
+ `too_many_chunks_skipped: ${skipped}/${consideredChunks} chunks failed to parse after retry`,
);
}
if (skipped > 0) {
@@ -165,6 +190,23 @@ async function produceSummaryXml(
});
}
+ // When truncated, the budget is already exhausted — a reduce LLM call
+ // would be blocked by the same gate and throw away the partial work.
+ // Merge the partials deterministically (no LLM) so the truncated summary
+ // is still persisted.
+ if (truncated) {
+ if (partials.length === 0) {
+ return { response: "", mode: "chunked", chunks: processedChunks, skipped, truncated };
+ }
+ return {
+ response: synthesizeReducedXml(partials),
+ mode: "chunked",
+ chunks: processedChunks,
+ skipped,
+ truncated,
+ };
+ }
+
const reduceInput = partials.map((p) => {
const originalIdx = partialByIdx.indexOf(p);
return {
@@ -177,11 +219,67 @@ async function produceSummaryXml(
obsRangeEnd: Math.min((originalIdx + 1) * chunkSize, compressed.length),
};
});
- const response = await provider.summarize(
- REDUCE_SYSTEM,
- buildReducePrompt(reduceInput),
+ // The reduce call can be blocked if the final chunk pushed the session
+ // over its cap (no mid-loop trigger fired). Rather than lose the whole
+ // summary, fall back to the deterministic merge and mark it truncated.
+ let response: string;
+ try {
+ response = await provider.summarize(
+ REDUCE_SYSTEM,
+ buildReducePrompt(reduceInput),
+ );
+ } catch (err) {
+ if (
+ err instanceof Error &&
+ err.message === "session_budget_exhausted" &&
+ partials.length > 0
+ ) {
+ return {
+ response: synthesizeReducedXml(partials),
+ mode: "chunked",
+ chunks: chunks.length,
+ skipped,
+ truncated: true,
+ };
+ }
+ throw err;
+ }
+ return {
+ response,
+ mode: "chunked",
+ chunks: chunks.length,
+ skipped,
+ truncated,
+ };
+}
+
+// Deterministic merge of chunk partials into a parseable summary XML, used
+// only on the truncated (budget-exhausted) path where an LLM reduce call
+// is not permitted.
+function synthesizeReducedXml(partials: SessionSummary[]): string {
+ const esc = (s: string) =>
+ String(s ?? "")
+ .replace(/&/g, "&")
+ .replace(//g, ">");
+ const uniq = (xs: string[]) => [...new Set(xs.filter(Boolean))];
+ const title = esc(partials[0]?.title || "Session summary (partial)");
+ const narrative = esc(
+ partials
+ .map((p) => p.narrative)
+ .filter(Boolean)
+ .join(" "),
+ );
+ const decisions = uniq(partials.flatMap((p) => p.keyDecisions || []));
+ const files = uniq(partials.flatMap((p) => p.filesModified || []));
+ const concepts = uniq(partials.flatMap((p) => p.concepts || []));
+ return (
+ `${title}` +
+ `${narrative}` +
+ `${decisions.map((d) => `${esc(d)}`).join("")}` +
+ `${files.map((f) => `${esc(f)}`).join("")}` +
+ `${concepts.map((c) => `${esc(c)}`).join("")}`
);
- return { response, mode: "chunked", chunks: chunks.length, skipped };
}
// #783: many LLMs (DeepSeek, GPT variants, some Anthropic responses)
@@ -233,7 +331,8 @@ export function registerSummarizeFunction(
metricsStore?: MetricsStore,
): void {
sdk.registerFunction("mem::summarize",
- async (data: { sessionId: string } | undefined) => {
+ async (data: { sessionId: string } | undefined) =>
+ withSession(data?.sessionId, async () => {
const startMs = Date.now();
if (!data || typeof data.sessionId !== "string" || !data.sessionId.trim()) {
return { success: false, error: "sessionId is required" };
@@ -282,6 +381,7 @@ export function registerSummarizeFunction(
let response = "";
let mode = "single";
let chunks = 1;
+ let truncated = false;
for (let attempt = 1; attempt <= 2; attempt++) {
const produced = await produceSummaryXml(
provider,
@@ -292,6 +392,7 @@ export function registerSummarizeFunction(
response = produced.response;
mode = produced.mode;
chunks = produced.chunks;
+ truncated = produced.truncated ?? false;
if (!response || !response.trim()) {
logger.warn("Empty provider response on summarize", {
sessionId,
@@ -356,10 +457,13 @@ export function registerSummarizeFunction(
const qualityScore = scoreSummary(summaryForValidation);
+ if (truncated) summary.truncated = true;
+
await kv.set(KV.summaries, sessionId, summary);
await safeAudit(kv, "compress", "mem::summarize", [sessionId], {
title: summary.title,
observationCount: compressed.length,
+ ...(truncated ? { truncated: true } : {}),
});
const latencyMs = Date.now() - startMs;
@@ -393,6 +497,6 @@ export function registerSummarizeFunction(
});
return { success: false, error: msg };
}
- },
+ }),
);
}
diff --git a/src/index.ts b/src/index.ts
index 4233e8a67..d1a346c82 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -88,6 +88,7 @@ import { registerTemporalGraphFunctions } from "./functions/temporal-graph.js";
import { registerRetentionFunctions } from "./functions/retention.js";
import { registerCompressFileFunction } from "./functions/compress-file.js";
import { registerReplayFunctions } from "./functions/replay.js";
+import { registerSessionBudgetFunctions,initSessionBudgetMeter} from "./functions/session-budget.js";
import { registerApiTriggers } from "./triggers/api.js";
import { registerEventTriggers } from "./triggers/events.js";
import { registerMcpEndpoints } from "./mcp/server.js";
@@ -220,6 +221,7 @@ async function main() {
const secret = getEnvVar("AGENTMEMORY_SECRET");
const metricsStore = new MetricsStore(kv);
const dedupMap = new DedupMap();
+ initSessionBudgetMeter(kv, sdk);
const vectorIndex = embeddingProvider ? new VectorIndex() : null;
@@ -332,6 +334,7 @@ async function main() {
registerRetentionFunctions(sdk, kv);
registerCompressFileFunction(sdk, kv, provider);
registerReplayFunctions(sdk, kv);
+ registerSessionBudgetFunctions(sdk, kv);
bootLog(
`v0.6 advanced retrieval: sliding-window, query-expansion, temporal-graph, retention-scoring`,
);
@@ -518,7 +521,7 @@ async function main() {
`Ready. ${embeddingProvider ? "Triple-stream (BM25+Vector+Graph)" : "BM25+Graph"} search active.`,
);
bootLog(
- `REST API: 128 endpoints at http://localhost:${config.restPort}/agentmemory/*`,
+ `REST API: 129 endpoints at http://localhost:${config.restPort}/agentmemory/*`,
);
bootLog(
`MCP surface (opt-in via \`npx @agentmemory/mcp\`): ${getAllTools().length} tools · 6 resources · 3 prompts`,
@@ -579,6 +582,19 @@ async function main() {
}, 60 * 60 * 1000);
recentSearchesSweepTimer.unref();
+ // reap token budgets for sessions whose endedAt +
+ // retention window has passed (plus orphaned rows).
+ // Hourly, unref'd so it never holds the process open.
+ const sessionBudgetReapTimer = setInterval(async () => {
+ try {
+ await sdk.trigger({
+ function_id: "mem::session::budget::reap",
+ payload: {},
+ });
+ } catch {}
+ }, 60 * 60 * 1000);
+ sessionBudgetReapTimer.unref();
+
if (isConsolidationEnabled()) {
const consolidationTimer = setInterval(async () => {
try {
diff --git a/src/providers/index.ts b/src/providers/index.ts
index 0ec3feba0..e1aeda0d6 100644
--- a/src/providers/index.ts
+++ b/src/providers/index.ts
@@ -55,7 +55,7 @@ function defaultModelFor(providerType: ProviderConfig["provider"]): string {
}
export function createProvider(config: ProviderConfig): ResilientProvider {
- return new ResilientProvider(createBaseProvider(config));
+ return new ResilientProvider(createBaseProvider(config), config.model);
}
export function createFallbackProvider(
@@ -87,9 +87,9 @@ export function createFallbackProvider(
}
if (providers.length > 1) {
- return new ResilientProvider(new FallbackChainProvider(providers));
+ return new ResilientProvider(new FallbackChainProvider(providers),config.model,);
}
- return new ResilientProvider(providers[0]);
+ return new ResilientProvider(providers[0], config.model);
}
function createBaseProvider(config: ProviderConfig): MemoryProvider {
diff --git a/src/providers/resilient.ts b/src/providers/resilient.ts
index 95ece40c9..b0dd5d314 100644
--- a/src/providers/resilient.ts
+++ b/src/providers/resilient.ts
@@ -1,34 +1,77 @@
import type { MemoryProvider, CircuitBreakerState } from "../types.js";
import { CircuitBreaker } from "./circuit-breaker.js";
+import { currentSessionId } from "../state/session-context.js";
+import { getSessionBudgetMeter } from "../functions/session-budget.js";
+
+// Rough token estimate (char/3) reused from the context
+// renderer. Per-session budgets are a cost safety net, not billing-grade
+// accounting — providers return a bare string with no usage field, so we
+// estimate from prompt + response length rather than threading exact usage
+// through every provider method.
+function estimateTokens(text: string): number {
+ if (!text) return 0;
+ return Math.ceil(text.length / 3);
+}
export class ResilientProvider implements MemoryProvider {
private breaker = new CircuitBreaker();
name: string;
- constructor(private inner: MemoryProvider) {
+ constructor(private inner: MemoryProvider, private modelName = inner.name) {
this.name = `resilient(${inner.name})`;
}
- private async call(fn: () => Promise): Promise {
+ // All LLM traffic funnels through here. Order: circuit-breaker gate ->
+ // per-session budget gate -> inner call -> record estimated tokens in a
+ // finally (0/0 on failure so partial calls are never double-counted).
+ private async call(systemPrompt: string, userPrompt: string, fn: () => Promise): Promise {
if (!this.breaker.isAllowed) {
throw new Error("circuit_breaker_open");
}
+
+ const sessionId = currentSessionId();
+ const meter = getSessionBudgetMeter();
+ if (await meter.isExhausted(sessionId)) {
+ throw new Error("session_budget_exhausted");
+ }
+
+ const inputTokens = estimateTokens(systemPrompt) + estimateTokens(userPrompt);
+ let outputTokens = 0;
+ let succeeded = false;
try {
const result = await fn();
+ outputTokens = estimateTokens(result);
+ succeeded = true;
this.breaker.recordSuccess();
return result;
} catch (err) {
this.breaker.recordFailure();
throw err;
+ } finally {
+ // Failed calls record 0/0: the inner provider may have aborted before
+ // consuming tokens, and counting a best-guess input on every retry
+ // would over-bill the cap on flaky providers.
+ await meter
+ .record(
+ sessionId,
+ succeeded ? inputTokens : 0,
+ succeeded ? outputTokens : 0,
+ this.modelName,
+ )
+ .catch(() => {});
}
}
async compress(systemPrompt: string, userPrompt: string): Promise {
- return this.call(() => this.inner.compress(systemPrompt, userPrompt));
+ return this.call(systemPrompt, userPrompt, () =>
+ this.inner.compress(systemPrompt, userPrompt),
+ );
}
async summarize(systemPrompt: string, userPrompt: string): Promise {
- return this.call(() => this.inner.summarize(systemPrompt, userPrompt));
+ return this.call(systemPrompt, userPrompt, () =>
+ this.inner.summarize(systemPrompt, userPrompt),
+ );
}
get circuitState(): CircuitBreakerState {
diff --git a/src/state/schema.ts b/src/state/schema.ts
index cb29d41ad..0233a2c67 100644
--- a/src/state/schema.ts
+++ b/src/state/schema.ts
@@ -72,6 +72,10 @@ export const KV = {
// #771: tracks the most recent smart-search call per session, used by
// the followup-rate diagnostic. Key = sessionId. TTL-swept hourly.
recentSearches: "mem:recent-searches",
+ // estimated-token-per-session LLM token budget. Key = sessionId (or the
+ // "__system__" sentinel for cron / cross-session LLM calls). Reaped by
+ // mem::session::budget::reap once a session's endedAt + retention passes.
+ sessionBudget: "mem:session-budget",
} as const;
export const STREAM = {
diff --git a/src/state/session-context.ts b/src/state/session-context.ts
new file mode 100644
index 000000000..7757d1d27
--- /dev/null
+++ b/src/state/session-context.ts
@@ -0,0 +1,20 @@
+import { AsyncLocalStorage } from "node:async_hooks";
+
+export const SYSTEM_SESSION = "__system__";
+
+interface SessionStore {
+ sessionId: string;
+}
+
+export const sessionContext = new AsyncLocalStorage();
+
+export function withSession(sessionId: string | undefined, fn: () => Promise): Promise {
+ const id = typeof sessionId === "string" && sessionId.trim().length > 0
+ ? sessionId.trim()
+ : SYSTEM_SESSION;
+ return sessionContext.run({ sessionId: id }, fn);
+}
+
+export function currentSessionId(): string {
+ return sessionContext.getStore()?.sessionId ?? SYSTEM_SESSION;
+}
diff --git a/src/telemetry/setup.ts b/src/telemetry/setup.ts
index fc55b4f55..00690d347 100644
--- a/src/telemetry/setup.ts
+++ b/src/telemetry/setup.ts
@@ -61,6 +61,7 @@ interface Histograms {
qualityScore: Histogram;
embeddingLatency: Histogram;
vectorSearchLatency: Histogram;
+ sessionTokensUsed: Histogram;
}
type Meter = {
@@ -105,6 +106,7 @@ const HISTOGRAM_NAMES: Array<[keyof Histograms, string]> = [
["qualityScore", "quality.score"],
["embeddingLatency", "embedding.latency_ms"],
["vectorSearchLatency", "vector_search.latency_ms"],
+ ["sessionTokensUsed", "session.tokens_used"],
];
// Accessors so functions outside `initMetrics`'s closure can record into
diff --git a/src/triggers/api.ts b/src/triggers/api.ts
index 7b6c2bf2b..adbbfcd12 100644
--- a/src/triggers/api.ts
+++ b/src/triggers/api.ts
@@ -1,5 +1,5 @@
import { TriggerAction, type ISdk, type ApiRequest } from "iii-sdk";
-import type { Session, CompressedObservation, HookPayload, CommitLink, SessionSummary } from "../types.js";
+import type { Session, CompressedObservation, HookPayload, CommitLink, SessionSummary, SessionBudget } from "../types.js";
import { withKeyedLock } from "../state/keyed-mutex.js";
import { KV } from "../state/schema.js";
import { StateKV } from "../state/kv.js";
@@ -594,6 +594,21 @@ export function registerApiTriggers(
...(agentId ? { agentId } : {}),
};
await kv.set(KV.sessions, sessionId, session);
+ // Seed the per-session token budget. An optional
+ // tokenCap in the request body overrides the global default for this
+ // session only; init is idempotent so a re-fired start never resets
+ // a live counter.
+ const tokenCap = parseOptionalInt(body.tokenCap);
+ sdk.trigger({
+ function_id: "mem::session::budget::init",
+ payload: {
+ sessionId,
+ ...(typeof tokenCap === "number" && tokenCap > 0
+ ? { tokenCap }
+ : {}),
+ },
+ action: TriggerAction.Void(),
+ });
const contextResult = await sdk.trigger<
{ sessionId: string; project: string },
{ context: string }
@@ -653,6 +668,33 @@ export function registerApiTriggers(
},
});
+ // Read per-session token-budget state. With ?sessionId,
+ // returns that session's budget; without it, returns all budget rows
+ // (used by `agentmemory status` and the viewer to surface near-cap and
+ // exhausted sessions).
+ sdk.registerFunction("api::session::budget",
+ async (req: ApiRequest): Promise => {
+ const authErr = checkAuth(req, secret);
+ if (authErr) return authErr;
+ const sessionId = asNonEmptyString(req.query_params?.["sessionId"]);
+ if (sessionId) {
+ const budget = await kv
+ .get(KV.sessionBudget, sessionId)
+ .catch(() => null);
+ return { status_code: 200, body: { budget: budget ?? null } };
+ }
+ const budgets = await kv
+ .list(KV.sessionBudget)
+ .catch(() => [] as SessionBudget[]);
+ return { status_code: 200, body: { budgets } };
+ },
+ );
+ sdk.registerTrigger({
+ type: "http",
+ function_id: "api::session::budget",
+ config: { api_path: "/agentmemory/session/budget", http_method: "GET" },
+ });
+
sdk.registerFunction("api::summarize",
async (req: ApiRequest<{ sessionId: string }>): Promise => {
const sessionId = asNonEmptyString((req.body as Record)?.sessionId);
diff --git a/src/triggers/events.ts b/src/triggers/events.ts
index e38b58db4..a220eeac1 100644
--- a/src/triggers/events.ts
+++ b/src/triggers/events.ts
@@ -104,6 +104,35 @@ export function registerEventTriggers(sdk: ISdk, kv: StateKV): void {
config: { topic: "agentmemory.session.ended" },
});
+ // Estimated-token-per-session budget lifecycle events. mem::session::budget::record
+ // fires these when a session crosses 80% (soft warn) or hits its cap
+ // (exhausted). Each fans the budget out to the viewer stream group so
+ // dashboards can surface near-cap / exhausted sessions in real time.
+ const registerBudgetEvent = (
+ functionId: string,
+ eventType: string,
+ ): void => {
+ sdk.registerFunction(
+ functionId,
+ async (data: { budget?: unknown }) => {
+ await sdk.trigger({
+ function_id: "stream::send",
+ payload: {
+ stream_name: STREAM.name,
+ group_id: STREAM.viewerGroup,
+ id: `${eventType}-${Date.now()}`,
+ type: eventType,
+ data: { budget: data?.budget ?? null },
+ },
+ action: TriggerAction.Void(),
+ });
+ return { emitted: true };
+ },
+ );
+ };
+ registerBudgetEvent("event::mem::budget::soft-warned", "budget.soft_warned");
+ registerBudgetEvent("event::mem::budget::exhausted", "budget.exhausted");
+
// React to observation count changes and emit a lightweight live event for dashboards/viewer.
sdk.registerFunction(
"event::session::observation-count-changed",
diff --git a/src/types.ts b/src/types.ts
index 6797dfaf9..4f2c3e9e8 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -114,6 +114,21 @@ export interface SessionSummary {
filesModified: string[];
concepts: string[];
observationCount: number;
+ truncated?: boolean;
+}
+
+export interface SessionBudget {
+ sessionId: string;
+ tokenCap: number;
+ tokensUsed: number;
+ inputTokens: number;
+ outputTokens: number;
+ costEstimate: number;
+ callCount: number;
+ warnEmittedAt?: string;
+ exhaustedAt?: string;
+ createdAt: string;
+ updatedAt: string;
}
export type HookType =
@@ -608,7 +623,9 @@ export interface AuditEntry {
| "slot_replace"
| "slot_create"
| "slot_delete"
- | "slot_reflect";
+ | "slot_reflect"
+ | "budget_warn"
+ | "budget_exhausted";
userId?: string;
functionId: string;
targetIds: string[];
diff --git a/test/session-budget.test.ts b/test/session-budget.test.ts
new file mode 100644
index 000000000..7a695d3ad
--- /dev/null
+++ b/test/session-budget.test.ts
@@ -0,0 +1,414 @@
+import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
+
+vi.mock("../src/logger.js", () => ({
+ logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() },
+ bootLog: vi.fn(),
+}));
+
+import {
+ registerSessionBudgetFunctions,
+ initBudget,
+ recordBudget,
+ isBudgetExhausted,
+ reapBudgets,
+ getBudget,
+ initSessionBudgetMeter,
+ resetSessionBudgetMeter,
+} from "../src/functions/session-budget.js";
+import { ResilientProvider } from "../src/providers/resilient.js";
+import { withSession, SYSTEM_SESSION } from "../src/state/session-context.js";
+import { registerCompressFunction } from "../src/functions/compress.js";
+import { registerSummarizeFunction } from "../src/functions/summarize.js";
+import { KV } from "../src/state/schema.js";
+import type {
+ SessionBudget,
+ MemoryProvider,
+ RawObservation,
+ Session,
+ CompressedObservation,
+} from "../src/types.js";
+
+function mockKV() {
+ const store = new Map>();
+ return {
+ store,
+ get: async (scope: string, key: string): Promise =>
+ (store.get(scope)?.get(key) as T) ?? null,
+ set: async (scope: string, key: string, data: T): Promise => {
+ if (!store.has(scope)) store.set(scope, new Map());
+ store.get(scope)!.set(key, data);
+ return data;
+ },
+ delete: async (scope: string, key: string): Promise => {
+ store.get(scope)?.delete(key);
+ },
+ list: async (scope: string): Promise => {
+ const entries = store.get(scope);
+ return entries ? (Array.from(entries.values()) as T[]) : [];
+ },
+ update: async (
+ scope: string,
+ key: string,
+ ops: Array<{ type: string; path: string; value?: unknown }>,
+ ): Promise => {
+ const cur = (store.get(scope)?.get(key) as Record) ?? {};
+ for (const op of ops) {
+ if (op.type === "set") cur[op.path] = op.value;
+ }
+ if (!store.has(scope)) store.set(scope, new Map());
+ store.get(scope)!.set(key, cur);
+ return cur;
+ },
+ };
+}
+
+function mockSdk() {
+ const functions = new Map();
+ const triggers: Array<{ id: string; payload: unknown }> = [];
+ return {
+ functions,
+ triggers,
+ registerFunction: (idOrOpts: string | { id: string }, handler: Function) => {
+ const id = typeof idOrOpts === "string" ? idOrOpts : idOrOpts.id;
+ functions.set(id, handler);
+ },
+ registerTrigger: () => {},
+ trigger: async (
+ idOrInput: string | { function_id: string; payload: unknown },
+ data?: unknown,
+ ) => {
+ const id = typeof idOrInput === "string" ? idOrInput : idOrInput.function_id;
+ const payload = typeof idOrInput === "string" ? data : idOrInput.payload;
+ triggers.push({ id, payload });
+ const fn = functions.get(id);
+ if (!fn) return undefined; // events/streams not registered in unit ctx
+ return fn(payload);
+ },
+ };
+}
+
+function innerProvider(overrides: Partial = {}): MemoryProvider {
+ return {
+ name: "test-inner",
+ compress: vi.fn().mockResolvedValue("ok"),
+ summarize: vi.fn().mockResolvedValue("ok"),
+ ...overrides,
+ };
+}
+
+describe("session budget", () => {
+ let kv: ReturnType;
+ let sdk: ReturnType;
+
+ beforeEach(() => {
+ kv = mockKV();
+ sdk = mockSdk();
+ delete process.env.AGENTMEMORY_SESSION_TOKEN_CAP;
+ delete process.env.AGENTMEMORY_SYSTEM_TOKEN_CAP;
+ delete process.env.AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS;
+ delete process.env.SUMMARIZE_CHUNK_SIZE;
+ delete process.env.SUMMARIZE_CHUNK_CONCURRENCY;
+ });
+
+ afterEach(() => {
+ resetSessionBudgetMeter();
+ vi.clearAllMocks();
+ });
+
+ describe("recordBudget", () => {
+ it("forks fresh per session", async () => {
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 50,
+ outputTokens: 50,
+ });
+ const s1 = await getBudget(kv as never, "s1");
+ const s2 = await getBudget(kv as never, "s2");
+ expect(s1?.tokensUsed).toBe(100);
+ expect(s2).toBeNull();
+ });
+
+ it("enforces the hard cap", async () => {
+ await initBudget(kv as never, { sessionId: "s1", tokenCap: 100 });
+ const b = await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 60,
+ outputTokens: 50,
+ });
+ expect(b.tokensUsed).toBe(110);
+ expect(b.exhaustedAt).toBeTruthy();
+ expect(await isBudgetExhausted(kv as never, "s1")).toBe(true);
+ });
+
+ it("emits a soft warning exactly once at 80%", async () => {
+ await initBudget(kv as never, { sessionId: "s1", tokenCap: 100 });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 0,
+ outputTokens: 85,
+ });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 0,
+ outputTokens: 5,
+ });
+ const b = await getBudget(kv as never, "s1");
+ expect(b?.warnEmittedAt).toBeTruthy();
+ expect(b?.exhaustedAt).toBeFalsy();
+ const warnEvents = sdk.triggers.filter(
+ (t) => t.id === "event::mem::budget::soft-warned",
+ );
+ expect(warnEvents).toHaveLength(1);
+ });
+
+ it("emits an exhausted event exactly once when crossing the cap", async () => {
+ await initBudget(kv as never, { sessionId: "s1", tokenCap: 100 });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 60,
+ outputTokens: 50,
+ });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 10,
+ outputTokens: 0,
+ });
+ const b = await getBudget(kv as never, "s1");
+ expect(b?.exhaustedAt).toBeTruthy();
+ const exhaustEvents = sdk.triggers.filter(
+ (t) => t.id === "event::mem::budget::exhausted",
+ );
+ expect(exhaustEvents).toHaveLength(1);
+ });
+
+ it("uses AGENTMEMORY_SESSION_TOKEN_CAP when init has no explicit cap", async () => {
+ process.env.AGENTMEMORY_SESSION_TOKEN_CAP = "500";
+ const budget = await initBudget(kv as never, { sessionId: "s-env" });
+ expect(budget.tokenCap).toBe(500);
+ });
+
+ it("serializes concurrent increments", async () => {
+ await initBudget(kv as never, { sessionId: "s1", tokenCap: 10_000 });
+ await Promise.all(
+ Array.from({ length: 10 }, () =>
+ recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 10,
+ outputTokens: 0,
+ }),
+ ),
+ );
+ const b = await getBudget(kv as never, "s1");
+ expect(b?.tokensUsed).toBe(100);
+ expect(b?.callCount).toBe(10);
+ });
+
+ it("tracks system-triggered calls under the sentinel scope", async () => {
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "",
+ inputTokens: 10,
+ outputTokens: 0,
+ });
+ const sentinel = await getBudget(kv as never, SYSTEM_SESSION);
+ expect(sentinel?.tokensUsed).toBe(10);
+ expect(sentinel?.tokenCap).toBe(1_000_000);
+ });
+ });
+
+ describe("ResilientProvider integration", () => {
+ it("blocks LLM calls once the session budget is exhausted", async () => {
+ initSessionBudgetMeter(kv as never, sdk as never);
+ await initBudget(kv as never, { sessionId: "s1", tokenCap: 10 });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "s1",
+ inputTokens: 20,
+ outputTokens: 0,
+ });
+ const inner = innerProvider();
+ const provider = new ResilientProvider(inner, "model-x");
+ await expect(
+ withSession("s1", () => provider.compress("sys", "user")),
+ ).rejects.toThrow("session_budget_exhausted");
+ expect(inner.compress).not.toHaveBeenCalled();
+ });
+
+ it("records nothing for a failed call", async () => {
+ initSessionBudgetMeter(kv as never, sdk as never);
+ const inner = innerProvider({
+ compress: vi.fn().mockRejectedValue(new Error("boom")),
+ });
+ const provider = new ResilientProvider(inner, "model-x");
+ await expect(
+ withSession("s2", () => provider.compress("sys", "user")),
+ ).rejects.toThrow("boom");
+ const b = await getBudget(kv as never, "s2");
+ expect(b?.tokensUsed).toBe(0);
+ expect(b?.callCount).toBe(1);
+ });
+
+ it("records estimated tokens for a successful call", async () => {
+ initSessionBudgetMeter(kv as never, sdk as never);
+ const inner = innerProvider({
+ compress: vi.fn().mockResolvedValue("a".repeat(300)),
+ });
+ const provider = new ResilientProvider(inner, "model-x");
+ await withSession("s3", () => provider.compress("system", "prompt"));
+ const b = await getBudget(kv as never, "s3");
+ expect(b?.inputTokens).toBeGreaterThan(0);
+ expect(b?.outputTokens).toBeGreaterThan(0);
+ expect(b?.tokensUsed).toBe((b?.inputTokens ?? 0) + (b?.outputTokens ?? 0));
+ });
+ });
+
+ describe("reapBudgets", () => {
+ it("reaps ended sessions past retention, keeps active and sentinel", async () => {
+ const old = new Date(Date.now() - 30 * 86400_000).toISOString();
+ await kv.set(KV.sessions, "s_old", {
+ id: "s_old",
+ endedAt: old,
+ } as Session);
+ await kv.set(KV.sessions, "s_active", { id: "s_active" } as Session);
+ await initBudget(kv as never, { sessionId: "s_old" });
+ await initBudget(kv as never, { sessionId: "s_active" });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: SYSTEM_SESSION,
+ inputTokens: 5,
+ outputTokens: 0,
+ });
+ // Backdate the ended session's budget so the orphan fallback is not
+ // what reaps it — the session endedAt is the signal here.
+ const result = await reapBudgets(kv as never);
+ expect(result.swept).toBe(1);
+ expect(await getBudget(kv as never, "s_old")).toBeNull();
+ expect(await getBudget(kv as never, "s_active")).not.toBeNull();
+ expect(await getBudget(kv as never, SYSTEM_SESSION)).not.toBeNull();
+ });
+ });
+
+ describe("registered functions", () => {
+ beforeEach(() => {
+ registerSessionBudgetFunctions(sdk as never, kv as never);
+ });
+
+ it("init is idempotent", async () => {
+ const first = await sdk.trigger({
+ function_id: "mem::session::budget::init",
+ payload: { sessionId: "s1", tokenCap: 500 },
+ });
+ const second = await sdk.trigger({
+ function_id: "mem::session::budget::init",
+ payload: { sessionId: "s1", tokenCap: 999 },
+ });
+ expect((first as { budget: SessionBudget }).budget.tokenCap).toBe(500);
+ // unchanged: still 500, not reset to 999
+ expect((second as { budget: SessionBudget }).budget.tokenCap).toBe(500);
+ });
+
+ it("get returns a single budget or the full list", async () => {
+ await sdk.trigger({
+ function_id: "mem::session::budget::init",
+ payload: { sessionId: "s1" },
+ });
+ const one = (await sdk.trigger({
+ function_id: "mem::session::budget::get",
+ payload: { sessionId: "s1" },
+ })) as { budget: SessionBudget | null };
+ expect(one.budget?.sessionId).toBe("s1");
+ const all = (await sdk.trigger({
+ function_id: "mem::session::budget::get",
+ payload: {},
+ })) as { budgets: SessionBudget[] };
+ expect(all.budgets.length).toBe(1);
+ });
+ });
+
+ describe("compress synthetic fallback on exhaustion", () => {
+ it("stores synthetic compression instead of dropping the observation", async () => {
+ initSessionBudgetMeter(kv as never, sdk as never);
+ await initBudget(kv as never, { sessionId: "sx", tokenCap: 10 });
+ await recordBudget(kv as never, sdk as never, {
+ sessionId: "sx",
+ inputTokens: 50,
+ outputTokens: 0,
+ });
+ const inner = innerProvider();
+ const provider = new ResilientProvider(inner, "model-x");
+ registerCompressFunction(sdk as never, kv as never, provider);
+
+ const raw: RawObservation = {
+ id: "obs1",
+ sessionId: "sx",
+ timestamp: new Date().toISOString(),
+ hookType: "post_tool_use",
+ toolName: "Read",
+ toolInput: { file_path: "/tmp/a.ts" },
+ toolOutput: "contents",
+ raw: {},
+ };
+ const result = (await sdk.trigger({
+ function_id: "mem::compress",
+ payload: { observationId: "obs1", sessionId: "sx", raw },
+ })) as { success: boolean; budgetExhausted?: boolean };
+
+ expect(result.success).toBe(true);
+ expect(result.budgetExhausted).toBe(true);
+ expect(inner.compress).not.toHaveBeenCalled();
+ const stored = await kv.get(
+ KV.observations("sx"),
+ "obs1",
+ );
+ expect(stored?.id).toBe("obs1");
+ expect(stored?.title).toBeTruthy();
+ });
+ });
+
+ describe("summarize truncation on exhaustion", () => {
+ it("stops before the next chunk and flags the summary truncated", async () => {
+ process.env.SUMMARIZE_CHUNK_SIZE = "1";
+ process.env.SUMMARIZE_CHUNK_CONCURRENCY = "1";
+ initSessionBudgetMeter(kv as never, sdk as never);
+ await initBudget(kv as never, { sessionId: "ss", tokenCap: 10 });
+
+ await kv.set(KV.sessions, "ss", {
+ id: "ss",
+ project: "proj",
+ } as Session);
+ for (let i = 0; i < 4; i++) {
+ await kv.set(KV.observations("ss"), `o${i}`, {
+ id: `o${i}`,
+ sessionId: "ss",
+ timestamp: new Date().toISOString(),
+ type: "file_read",
+ title: `obs ${i}`,
+ facts: [],
+ narrative: "did a thing",
+ concepts: ["c"],
+ files: ["f.ts"],
+ importance: 5,
+ } as CompressedObservation);
+ }
+
+ const inner = innerProvider({
+ summarize: vi
+ .fn()
+ .mockResolvedValue(
+ "ChunkThis chunk summarizes a meaningful slice of the session work.",
+ ),
+ });
+ const provider = new ResilientProvider(inner, "model-x");
+ registerSummarizeFunction(sdk as never, kv as never, provider);
+
+ const result = (await sdk.trigger({
+ function_id: "mem::summarize",
+ payload: { sessionId: "ss" },
+ })) as { success: boolean; summary?: { truncated?: boolean } };
+
+ expect(result.success).toBe(true);
+ expect(result.summary?.truncated).toBe(true);
+ // Did not process all four chunks before aborting.
+ expect((inner.summarize as ReturnType).mock.calls.length).toBeLessThan(4);
+ const saved = await kv.get<{ truncated?: boolean }>(KV.summaries, "ss");
+ expect(saved?.truncated).toBe(true);
+ });
+ });
+});