diff --git a/.env.example b/.env.example index 77ca0f3a3..8ae162861 100644 --- a/.env.example +++ b/.env.example @@ -100,6 +100,11 @@ # MAX_OBS_PER_SESSION=500 # Per-session observation cap before consolidation kicks in # SUMMARIZE_CHUNK_SIZE=400 # When mem::summarize sees a session larger than this, it chunks observations and map-reduces (chunk-summarize → reduce-merge) to stay within the LLM's context window. Default 400 ≈ 50k tokens per chunk at ~110 tok/obs. Native sessions are capped by MAX_OBS_PER_SESSION; chunking primarily matters for bulk-imported jsonl sessions, which bypass that cap. # SUMMARIZE_CHUNK_CONCURRENCY=6 # Parallel chunk LLM calls during chunked summarize. Default 6 fits ~100-chunk sessions under iii's 180s function-invocation timeout at typical ~8s/call. High-throughput providers (Novita, DeepInfra, DeepSeek) commonly allow 100+ concurrent — bump this for very large imported sessions. +# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Per-session hard cap on estimated LLM tokens (compress + summarize). Once exceeded, further LLM calls for that session are blocked and compression falls back to zero-LLM synthetic output. A soft-warn event fires at 80%. Cost safety net for pathological sessions (runaway tool loops). Per-session override via the tokenCap field on POST /session/start. +# AGENTMEMORY_SYSTEM_TOKEN_CAP=1000000 # Separate cap for cron-fired / cross-session LLM work (consolidation, reflect, graph extraction) that has no active session; tracked under the "__system__" sentinel. +# AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS=7 # How long a session's token-budget row is kept after the session ends before the hourly reaper deletes it. +# AGENTMEMORY_COST_IN_PER_1M=0.14 # USD per 1M input tokens, used to display a rough per-session costEstimate from the raw token counts. +# AGENTMEMORY_COST_OUT_PER_1M=0.28 # USD per 1M output tokens for the costEstimate. # ----------------------------------------------------------------------------- # 5. Behaviour flags diff --git a/AGENTS.md b/AGENTS.md index 873ef10fc..a0b0abbe1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -117,7 +117,7 @@ Hook scripts in `src/hooks/` are standalone Node.js scripts (no iii-sdk import). ## Current Stats (v0.9.16) - 53 MCP tools (8 visible by default, `AGENTMEMORY_TOOLS=all` for all) -- 128 REST endpoints +- 129 REST endpoints - 6 MCP resources, 3 MCP prompts - 12 hooks, 15 skills - 50+ iii functions diff --git a/README.md b/README.md index c4ec2c1e0..33992d9cc 100644 --- a/README.md +++ b/README.md @@ -1294,6 +1294,18 @@ Quality vs cost tradeoff for memory work: compression is a summarization task wi Sources: [OpenRouter pricing for Sonnet 4.6](https://openrouter.ai/anthropic/claude-sonnet-4.6/pricing), [DeepSeek V4 Pro](https://openrouter.ai/deepseek/deepseek-v4-pro), [DeepSeek pricing notes](https://api-docs.deepseek.com/quick_start/pricing/). +### Per-session token budget + +Pathological sessions (long-running CC instances, runaway tool loops) can rack up unbounded background compress/summarize spend. Each session gets a running token budget with a **hard cap** (default **100k estimated tokens**) and an **80% soft warning**. + +| Knob | Default | Purpose | +|------|---------|---------| +| `AGENTMEMORY_SESSION_TOKEN_CAP` | `100000` | Hard cap per session. Once exceeded, further LLM calls for that session are blocked; compression falls back to zero-LLM synthetic output. | +| `AGENTMEMORY_SYSTEM_TOKEN_CAP` | `1000000` | Separate cap for cron-fired / cross-session LLM work (consolidation, reflect) tracked under the `__system__` sentinel. | +| `AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS` | `7` | How long budget rows persist after a session ends before the hourly reaper deletes them. | + +Per-session override: pass `tokenCap` in the body of `POST /agentmemory/session/start`. Budget state is KV-persisted and survives restarts. `agentmemory status` reports `N active, M near-cap, K exhausted`. OTEL histogram: `session.tokens_used`. + ### Multi-agent memory (`AGENT_ID` + `AGENTMEMORY_AGENT_SCOPE`) In multi-agent setups where several roles share one agentmemory server (architect / developer / reviewer / researcher / support-agent), `AGENT_ID` tags every write with the role that made it. `AGENTMEMORY_AGENT_SCOPE` controls whether recall filters by that tag. @@ -1448,6 +1460,13 @@ Create `~/.agentmemory/.env`: # LLM provider to compress the # observation — expect significant # token spend on active sessions. +# AGENTMEMORY_SESSION_TOKEN_CAP=100000 # Per-session hard cap on estimated + # LLM tokens (compress + summarize). + # Soft-warn at 80%; synthetic fallback + # after exhaustion. Override per + # session via tokenCap on /session/start. +# AGENTMEMORY_SYSTEM_TOKEN_CAP=1000000 # Cap for cron / cross-session LLM work. +# AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS=7 # AGENTMEMORY_SLOTS=false # OFF by default. Editable pinned # memory slots — persona, # user_preferences, tool_guidelines, @@ -1500,7 +1519,7 @@ Create `~/.agentmemory/.env`:

API

-128 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers. +129 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers.
Key endpoints diff --git a/plugin/skills/agentmemory-config/REFERENCE.md b/plugin/skills/agentmemory-config/REFERENCE.md index 08c873a44..9679ad959 100644 --- a/plugin/skills/agentmemory-config/REFERENCE.md +++ b/plugin/skills/agentmemory-config/REFERENCE.md @@ -3,13 +3,15 @@ Generated by scanning `src/` for `AGENTMEMORY_*` usage. Do not edit the block below by hand; run `npm run skills:gen` after adding or removing a variable. Internal markers ending in two underscores are excluded. -Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 34 recognized variables: +Configuration is read from the environment and from `~/.agentmemory/.env` (no `export` prefix). 39 recognized variables: - `AGENTMEMORY_AGENT_SCOPE` - `AGENTMEMORY_ALLOW_AGENT_SDK` - `AGENTMEMORY_AUTO_COMPRESS` - `AGENTMEMORY_COMMIT_SHA` - `AGENTMEMORY_COPILOT_MCP_BLOCK` +- `AGENTMEMORY_COST_IN_PER_1M` +- `AGENTMEMORY_COST_OUT_PER_1M` - `AGENTMEMORY_CWD` - `AGENTMEMORY_DEBUG` - `AGENTMEMORY_DROP_STALE_INDEX` @@ -30,9 +32,12 @@ Configuration is read from the environment and from `~/.agentmemory/.env` (no `e - `AGENTMEMORY_REFLECT` - `AGENTMEMORY_SDK_CHILD` - `AGENTMEMORY_SECRET` +- `AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS` - `AGENTMEMORY_SESSION_ID` +- `AGENTMEMORY_SESSION_TOKEN_CAP` - `AGENTMEMORY_SLOTS` - `AGENTMEMORY_SUPPRESS_COST_WARNING` +- `AGENTMEMORY_SYSTEM_TOKEN_CAP` - `AGENTMEMORY_TOOLS` - `AGENTMEMORY_URL` - `AGENTMEMORY_USE_DOCKER` diff --git a/plugin/skills/agentmemory-rest-api/REFERENCE.md b/plugin/skills/agentmemory-rest-api/REFERENCE.md index d36171def..f4bb5f954 100644 --- a/plugin/skills/agentmemory-rest-api/REFERENCE.md +++ b/plugin/skills/agentmemory-rest-api/REFERENCE.md @@ -5,7 +5,7 @@ Generated from `src/triggers/api.ts`. Do not edit the block below by hand; run ` The REST API is the primary surface. All paths are under `http://localhost:3111` (override with `--port`). When `AGENTMEMORY_SECRET` is set, send `Authorization: Bearer $AGENTMEMORY_SECRET`; localhost is otherwise open. -117 registered endpoints: +118 registered endpoints: | Method | Path | | --- | --- | @@ -96,6 +96,7 @@ The REST API is the primary surface. All paths are under `http://localhost:3111` | POST | `/agentmemory/sentinels/cancel` | | POST | `/agentmemory/sentinels/check` | | POST | `/agentmemory/sentinels/trigger` | +| GET | `/agentmemory/session/budget` | | GET | `/agentmemory/session/by-commit` | | POST | `/agentmemory/session/commit` | | POST | `/agentmemory/session/end` | diff --git a/src/cli.ts b/src/cli.ts index 48f6f09b6..a9663657c 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -1345,13 +1345,14 @@ async function runStatus() { } try { - const [healthRes, sessionsRes, graphRes, memoriesRes, flagsRes, followupRes] = await Promise.all([ + const [healthRes, sessionsRes, graphRes, memoriesRes, flagsRes, followupRes, budgetsRes] = await Promise.all([ apiFetch(base, "health"), apiFetch(base, "sessions"), apiFetch(base, "graph/stats"), apiFetch(base, "memories?count=true"), apiFetch(base, "config/flags"), apiFetch(base, "diagnostics/followup"), + apiFetch(base, "session/budget").catch(() => null), ]); if (typeof healthRes?.viewerPort === "number") { @@ -1422,6 +1423,33 @@ async function runStatus() { ); } + const budgetList = Array.isArray(budgetsRes?.budgets) ? budgetsRes.budgets : []; + const sessionBudgets = budgetList.filter( + (b: any) => b && b.sessionId && b.sessionId !== "__system__", + ); + if (sessionBudgets.length > 0) { + let exhausted = 0; + let nearCap = 0; + let active = 0; + for (const b of sessionBudgets) { + const used = Number(b.tokensUsed) || 0; + const cap = Number(b.tokenCap) || 0; + const isExhausted = Boolean(b.exhaustedAt) || (cap > 0 && used >= cap); + if (isExhausted) { + exhausted++; + } else if (b.warnEmittedAt || (cap > 0 && used >= cap * 0.8)) { + nearCap++; + active++; + } else { + active++; + } + } + lines.push(""); + lines.push( + `Token budgets: ${active} active, ${nearCap} near-cap, ${exhausted} exhausted`, + ); + } + p.note(lines.join("\n"), "agentmemory"); } catch (err) { p.log.error(err instanceof Error ? err.message : String(err)); diff --git a/src/config.ts b/src/config.ts index f68da2e31..db6b6ebc6 100644 --- a/src/config.ts +++ b/src/config.ts @@ -403,6 +403,55 @@ export function getConsolidationDecayDays(): number { return safeParseInt(getMergedEnv()["CONSOLIDATION_DECAY_DAYS"], 30); } +// Per-session LLM token budget. Hard cap default is 100k +// estimated tokens per session. AGENTMEMORY_SESSION_TOKEN_CAP overrides the +// global default; mem::session::start can override per-session. +const SESSION_TOKEN_CAP_DEFAULT = 100_000; +const SYSTEM_TOKEN_CAP_DEFAULT = 1_000_000; +const SESSION_BUDGET_RETENTION_DAYS_DEFAULT = 7; +// Soft warning fires at this fraction of the cap. +export const SESSION_BUDGET_WARN_RATIO = 0.8; + +export function getSessionTokenCap(): number { + const n = safeParseInt( +getMergedEnv()["AGENTMEMORY_SESSION_TOKEN_CAP"], + SESSION_TOKEN_CAP_DEFAULT + ); + return n > 0 ? n : SESSION_TOKEN_CAP_DEFAULT; +} + +// Cron-fired / cross-session LLM calls (consolidation, reflect, graph +// extraction) have no active session; they bill the "__system__" sentinel +// against this separate, larger cap. +export function getSystemTokenCap(): number { + const n = safeParseInt( + getMergedEnv()["AGENTMEMORY_SYSTEM_TOKEN_CAP"], + SYSTEM_TOKEN_CAP_DEFAULT, + ); + return n > 0 ? n : SYSTEM_TOKEN_CAP_DEFAULT; +} + +export function getSessionBudgetRetentionDays(): number { + const n = safeParseInt( + getMergedEnv()["AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS"], + SESSION_BUDGET_RETENTION_DAYS_DEFAULT, + ); + return n > 0 ? n : SESSION_BUDGET_RETENTION_DAYS_DEFAULT; +} + +// USD-per-1M-token rates used to normalize raw token counts into a rough +// costEstimate at record time. Override with +// AGENTMEMORY_COST_IN_PER_1M / AGENTMEMORY_COST_OUT_PER_1M. +export function getCostRatesPer1M(): { input: number; output: number } { + const env = getMergedEnv(); + const input = parseFloat(env["AGENTMEMORY_COST_IN_PER_1M"] ?? ""); + const output = parseFloat(env["AGENTMEMORY_COST_OUT_PER_1M"] ?? ""); + return { + input: Number.isFinite(input) && input >= 0 ? input : 0.14, + output: Number.isFinite(output) && output >= 0 ? output : 0.28, + }; +} + export function isStandaloneMcp(): boolean { return getMergedEnv()["STANDALONE_MCP"] === "true"; } diff --git a/src/functions/compress.ts b/src/functions/compress.ts index 0569555e0..216647486 100644 --- a/src/functions/compress.ts +++ b/src/functions/compress.ts @@ -16,6 +16,8 @@ import { import { VISION_DESCRIPTION_PROMPT } from "../prompts/vision.js"; import { getXmlTag, getXmlChildren } from "../prompts/xml.js"; import { getSearchIndex, vectorIndexAddGuarded } from "./search.js"; +import { buildSyntheticCompression } from "./compress-synthetic.js"; +import { withSession } from "../state/session-context.js"; import { CompressOutputSchema } from "../eval/schemas.js"; import { validateOutput } from "../eval/validator.js"; import { scoreCompression } from "../eval/quality.js"; @@ -75,7 +77,7 @@ export function registerCompressFunction( observationId: string; sessionId: string; raw: RawObservation; - }) => { + }) => withSession(data.sessionId, async () => { const startMs = Date.now(); let imageDescription: string | undefined; @@ -253,6 +255,43 @@ export function registerCompressFunction( } catch (err) { const msg = err instanceof Error ? err.message : String(err); const latencyMs = Date.now() - startMs; + + // the session's token cap is exhausted. Don't drop + // the observation — fall back to zero-LLM synthetic compression so + // recall/search still work, exactly as if AGENTMEMORY_AUTO_COMPRESS + // were off, until the budget is reset or the session ends. + if (msg === "session_budget_exhausted") { + const synthetic = buildSyntheticCompression(data.raw); + synthetic.id = data.observationId; + synthetic.sessionId = data.sessionId; + await kv.set( + KV.observations(data.sessionId), + data.observationId, + synthetic, + ); + try { + getSearchIndex().add(synthetic); + } catch {} + await vectorIndexAddGuarded( + synthetic.id, + synthetic.sessionId, + synthetic.title + " " + (synthetic.narrative || ""), + { kind: "synthetic", logId: synthetic.id }, + ).catch(() => {}); + if (metricsStore) { + await metricsStore.record("mem::compress", latencyMs, false); + } + logger.warn("Compression budget exhausted; stored synthetic", { + obsId: data.observationId, + sessionId: data.sessionId, + }); + return { + success: true, + compressed: synthetic, + budgetExhausted: true, + }; + } + if (metricsStore) { await metricsStore.record("mem::compress", latencyMs, false); } @@ -262,6 +301,6 @@ export function registerCompressFunction( }); return { success: false, error: "compression_failed" }; } - }, + }), ); } diff --git a/src/functions/observe.ts b/src/functions/observe.ts index 2bf13d547..535a31379 100644 --- a/src/functions/observe.ts +++ b/src/functions/observe.ts @@ -272,6 +272,14 @@ export function registerObserveFunction( ? { firstPrompt: trimmedPrompt } : {}), }); + // OpenCode and other plugins that skip + // POST /session/start create the session here; seed its token + // budget too so background compress/summarize spend is capped. + sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { sessionId: payload.sessionId }, + action: TriggerAction.Void(), + }); } // Per-observation LLM compression is opt-in as of 0.8.8 (#138). diff --git a/src/functions/session-budget.ts b/src/functions/session-budget.ts new file mode 100644 index 000000000..78bd2f33a --- /dev/null +++ b/src/functions/session-budget.ts @@ -0,0 +1,331 @@ +import { TriggerAction, type ISdk } from "iii-sdk"; +import type { SessionBudget } from "../types.js"; +import { KV } from "../state/schema.js"; +import type { StateKV } from "../state/kv.js"; +import { withKeyedLock } from "../state/keyed-mutex.js"; +import { SYSTEM_SESSION } from "../state/session-context.js"; +import { + getSessionTokenCap, + getSystemTokenCap, + getSessionBudgetRetentionDays, + getCostRatesPer1M, + SESSION_BUDGET_WARN_RATIO, +} from "../config.js"; +import { safeAudit } from "./audit.js"; +import { getHistograms } from "../telemetry/setup.js"; +import { logger } from "../logger.js"; + +function defaultCapFor(sessionId: string): number { + return sessionId === SYSTEM_SESSION ? getSystemTokenCap() : getSessionTokenCap(); +} + +function newBudget( + sessionId: string, + tokenCap: number, + now: string, +): SessionBudget { + return { + sessionId, + tokenCap, + tokensUsed: 0, + inputTokens: 0, + outputTokens: 0, + costEstimate: 0, + callCount: 0, + createdAt: now, + updatedAt: now, + }; +} + +function emitBudgetEvent( + sdk: ISdk, + functionId: string, + budget: SessionBudget, +): Promise { + return Promise.resolve( + sdk.trigger({ + function_id: functionId, + payload: { budget }, + action: TriggerAction.Void(), + }), + ).catch(() => {}); +} + +// Returns the existing budget untouched if one already exists +// so a re-fired session/start never resets a live counter. Fresh per +// sessionId — a forked session gets its own row and its own cap. +export async function initBudget( + kv: StateKV, + params: { sessionId: string; tokenCap?: number }, +): Promise { + const sessionId = params.sessionId; + return withKeyedLock(`budget:${sessionId}`, async () => { + const existing = await kv + .get(KV.sessionBudget, sessionId) + .catch(() => null); + if (existing) return existing; + const now = new Date().toISOString(); + const cap = typeof params.tokenCap === "number" && params.tokenCap > 0 + ? Math.floor(params.tokenCap) + : defaultCapFor(sessionId); + const budget = newBudget(sessionId, cap, now); + await kv.set(KV.sessionBudget, sessionId, budget); + return budget; + }); +} + +export async function getBudget( + kv: StateKV, + sessionId: string, +): Promise { + return kv.get(KV.sessionBudget, sessionId).catch(() => null); +} + +// Read-only fast path used by the provider wrapper before each LLM call. +export async function isBudgetExhausted( + kv: StateKV, + sessionId: string, +): Promise { + const b = await getBudget(kv, sessionId); + if (!b) return false; + if (b.exhaustedAt) return true; + return b.tokensUsed >= b.tokenCap; +} + + +export async function recordBudget( + kv: StateKV, + sdk: ISdk, + params: { + sessionId: string; + inputTokens: number; + outputTokens: number; + model?: string; + }, +): Promise { + const sessionId = + typeof params.sessionId === "string" && params.sessionId.trim().length > 0 + ? params.sessionId.trim() + : SYSTEM_SESSION; + const inTok = Math.max(0, Math.floor(params.inputTokens || 0)); + const outTok = Math.max(0, Math.floor(params.outputTokens || 0)); + + return withKeyedLock(`budget:${sessionId}`, async () => { + const now = new Date().toISOString(); + const existing = await kv + .get(KV.sessionBudget, sessionId) + .catch(() => null); + const budget = + existing ?? newBudget(sessionId, defaultCapFor(sessionId), now); + const rates = getCostRatesPer1M(); + + const prevUsed = budget.tokensUsed; + budget.inputTokens += inTok; + budget.outputTokens += outTok; + budget.tokensUsed += inTok + outTok; + budget.callCount += 1; + budget.costEstimate += + (inTok / 1_000_000) * rates.input + + (outTok / 1_000_000) * rates.output; + budget.updatedAt = now; + + const warnAt = Math.floor(budget.tokenCap * SESSION_BUDGET_WARN_RATIO); + const crossedWarn = + !budget.warnEmittedAt && + prevUsed < warnAt && + budget.tokensUsed >= warnAt && + budget.tokensUsed < budget.tokenCap; + const crossedExhausted = + !budget.exhaustedAt && budget.tokensUsed >= budget.tokenCap; + + if (crossedWarn) budget.warnEmittedAt = now; + if (crossedExhausted) budget.exhaustedAt = now; + + await kv.set(KV.sessionBudget, sessionId, budget); + + try { + getHistograms().sessionTokensUsed.record(budget.tokensUsed); + } catch {} + + if (crossedWarn) { + void emitBudgetEvent(sdk, "event::mem::budget::soft-warned", budget); + await safeAudit( + kv, + "budget_warn", + "mem::session::budget::record", + [sessionId], + { tokensUsed: budget.tokensUsed, tokenCap: budget.tokenCap }, + ); + } + if (crossedExhausted) { + void emitBudgetEvent(sdk, "event::mem::budget::exhausted", budget); + await safeAudit( + kv, + "budget_exhausted", + "mem::session::budget::record", + [sessionId], + { tokensUsed: budget.tokensUsed, tokenCap: budget.tokenCap }, + ); + } + return budget; + }); +} + +export async function reapBudgets( + kv: StateKV, +): Promise<{ swept: number; kept: number }> { + const retentionMs = getSessionBudgetRetentionDays() * 24 * 60 * 60 * 1000; + const cutoff = Date.now() - retentionMs; + const budgets = await kv + .list(KV.sessionBudget) + .catch(() => [] as SessionBudget[]); + + let swept = 0; + let kept = 0; + for (const b of budgets) { + if (!b || !b.sessionId || b.sessionId === SYSTEM_SESSION) { + kept++; + continue; + } + const session = await kv + .get<{ endedAt?: string }>(KV.sessions, b.sessionId) + .catch(() => null); + + let reapable = false; + if (session?.endedAt) { + reapable = new Date(session.endedAt).getTime() < cutoff; + } else if (!session) { + reapable = new Date(b.updatedAt).getTime() < cutoff; + } + + if (reapable) { + try { + await kv.delete(KV.sessionBudget, b.sessionId); + swept++; + } catch (err) { + kept++; + logger.warn("session budget reap delete failed", { + sessionId: b.sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } + } else { + kept++; + } + } + return { swept, kept }; +} + + +export interface SessionBudgetMeter { + isExhausted(sessionId: string): Promise; + record( + sessionId: string, + inputTokens: number, + outputTokens: number, + model?: string, + ): Promise; +} + +const NOOP_METER: SessionBudgetMeter = { + isExhausted: async () => false, + record: async () => {}, +}; + +let activeMeter: SessionBudgetMeter = NOOP_METER; + +export function initSessionBudgetMeter( + kv: StateKV, + sdk: ISdk, +): SessionBudgetMeter { + activeMeter = { + isExhausted: (sessionId) => isBudgetExhausted(kv, sessionId), + record: async (sessionId, inputTokens, outputTokens, model) => { + try { + await recordBudget(kv, sdk, { + sessionId, + inputTokens, + outputTokens, + model, + }); + } catch (err) { + logger.warn("session budget record failed", { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + } + }, + }; + return activeMeter; +} + +export function getSessionBudgetMeter(): SessionBudgetMeter { + return activeMeter; +} + +export function resetSessionBudgetMeter(): void { + activeMeter = NOOP_METER; +} + +export function registerSessionBudgetFunctions(sdk: ISdk, kv: StateKV): void { + sdk.registerFunction( + "mem::session::budget::init", + async (data: { sessionId?: string; tokenCap?: number } | undefined) => { + if (!data?.sessionId || typeof data.sessionId !== "string") { + return { success: false, error: "sessionId is required" }; + } + const budget = await initBudget(kv, { + sessionId: data.sessionId.trim(), + tokenCap: data.tokenCap, + }); + return { success: true, budget }; + }, + ); + + sdk.registerFunction( + "mem::session::budget::record", + async ( + data: + | { + sessionId?: string; + inputTokens?: number; + outputTokens?: number; + model?: string; + } + | undefined, + ) => { + if (!data?.sessionId || typeof data.sessionId !== "string") { + return { success: false, error: "sessionId is required" }; + } + const budget = await recordBudget(kv, sdk, { + sessionId: data.sessionId.trim(), + inputTokens: data.inputTokens ?? 0, + outputTokens: data.outputTokens ?? 0, + model: data.model, + }); + return { success: true, budget }; + }, + ); + + sdk.registerFunction( + "mem::session::budget::get", + async (data: { sessionId?: string } | undefined) => { + if (data?.sessionId && typeof data.sessionId === "string") { + const budget = await getBudget(kv, data.sessionId.trim()); + return { success: true, budget: budget ?? null }; + } + const budgets = await kv + .list(KV.sessionBudget) + .catch(() => [] as SessionBudget[]); + return { success: true, budgets }; + }, + ); + + sdk.registerFunction("mem::session::budget::reap", async () => { + const result = await reapBudgets(kv); + if (result.swept > 0) { + logger.info("Session budget reap complete", result); + } + return { success: true, ...result }; + }); +} diff --git a/src/functions/summarize.ts b/src/functions/summarize.ts index 4c501ca8c..68408975f 100644 --- a/src/functions/summarize.ts +++ b/src/functions/summarize.ts @@ -19,6 +19,8 @@ import { validateOutput } from "../eval/validator.js"; import { scoreSummary } from "../eval/quality.js"; import type { MetricsStore } from "../eval/metrics-store.js"; import { safeAudit } from "./audit.js"; +import { withSession } from "../state/session-context.js"; +import { getSessionBudgetMeter } from "./session-budget.js"; import { logger } from "../logger.js"; // Per-chunk observation budget when a session is too large to fit in one @@ -105,6 +107,7 @@ async function produceSummaryXml( mode: "single" | "chunked"; chunks: number; skipped?: number; + truncated?: boolean; }> { const chunkSize = getChunkSize(); if (compressed.length <= chunkSize) { @@ -132,7 +135,23 @@ async function produceSummaryXml( // so the reduce step sees partials in chronological order even when some // were skipped. const partialByIdx: Array = new Array(chunks.length).fill(null); + let truncated = false; + let processedChunks = 0; + const meter = getSessionBudgetMeter(); for (let batchStart = 0; batchStart < chunks.length; batchStart += concurrency) { + // BEFORE dispatching the next batch when the + // session's token cap is exhausted. The in-flight batch already + // completed; we reduce over whatever partials we have and mark the + // summary truncated rather than burning past the cap. + if (batchStart > 0 && (await meter.isExhausted(sessionId))) { + truncated = true; + logger.warn("Summarize aborted mid-session: token budget exhausted", { + sessionId, + processedChunks, + totalChunks: chunks.length, + }); + break; + } const batch = chunks.slice(batchStart, batchStart + concurrency); await Promise.all( batch.map(async (chunk, j) => { @@ -147,14 +166,20 @@ async function produceSummaryXml( ); }), ); + processedChunks += batch.length; } - const skipped = partialByIdx.filter((p) => p === null).length; + // When truncated, only the chunks we actually attempted count toward the + // skip ratio — unprocessed tail chunks aren't failures. + const consideredChunks = truncated ? processedChunks : chunks.length; + const skipped = partialByIdx + .slice(0, consideredChunks) + .filter((p) => p === null).length; const partials = partialByIdx.filter((p): p is SessionSummary => p !== null); - if (skipped > Math.floor(chunks.length * MAX_SKIP_RATIO)) { + if (skipped > Math.floor(consideredChunks * MAX_SKIP_RATIO)) { throw new Error( - `too_many_chunks_skipped: ${skipped}/${chunks.length} chunks failed to parse after retry`, + `too_many_chunks_skipped: ${skipped}/${consideredChunks} chunks failed to parse after retry`, ); } if (skipped > 0) { @@ -165,6 +190,23 @@ async function produceSummaryXml( }); } + // When truncated, the budget is already exhausted — a reduce LLM call + // would be blocked by the same gate and throw away the partial work. + // Merge the partials deterministically (no LLM) so the truncated summary + // is still persisted. + if (truncated) { + if (partials.length === 0) { + return { response: "", mode: "chunked", chunks: processedChunks, skipped, truncated }; + } + return { + response: synthesizeReducedXml(partials), + mode: "chunked", + chunks: processedChunks, + skipped, + truncated, + }; + } + const reduceInput = partials.map((p) => { const originalIdx = partialByIdx.indexOf(p); return { @@ -177,11 +219,67 @@ async function produceSummaryXml( obsRangeEnd: Math.min((originalIdx + 1) * chunkSize, compressed.length), }; }); - const response = await provider.summarize( - REDUCE_SYSTEM, - buildReducePrompt(reduceInput), + // The reduce call can be blocked if the final chunk pushed the session + // over its cap (no mid-loop trigger fired). Rather than lose the whole + // summary, fall back to the deterministic merge and mark it truncated. + let response: string; + try { + response = await provider.summarize( + REDUCE_SYSTEM, + buildReducePrompt(reduceInput), + ); + } catch (err) { + if ( + err instanceof Error && + err.message === "session_budget_exhausted" && + partials.length > 0 + ) { + return { + response: synthesizeReducedXml(partials), + mode: "chunked", + chunks: chunks.length, + skipped, + truncated: true, + }; + } + throw err; + } + return { + response, + mode: "chunked", + chunks: chunks.length, + skipped, + truncated, + }; +} + +// Deterministic merge of chunk partials into a parseable summary XML, used +// only on the truncated (budget-exhausted) path where an LLM reduce call +// is not permitted. +function synthesizeReducedXml(partials: SessionSummary[]): string { + const esc = (s: string) => + String(s ?? "") + .replace(/&/g, "&") + .replace(//g, ">"); + const uniq = (xs: string[]) => [...new Set(xs.filter(Boolean))]; + const title = esc(partials[0]?.title || "Session summary (partial)"); + const narrative = esc( + partials + .map((p) => p.narrative) + .filter(Boolean) + .join(" "), + ); + const decisions = uniq(partials.flatMap((p) => p.keyDecisions || [])); + const files = uniq(partials.flatMap((p) => p.filesModified || [])); + const concepts = uniq(partials.flatMap((p) => p.concepts || [])); + return ( + `${title}` + + `${narrative}` + + `${decisions.map((d) => `${esc(d)}`).join("")}` + + `${files.map((f) => `${esc(f)}`).join("")}` + + `${concepts.map((c) => `${esc(c)}`).join("")}` ); - return { response, mode: "chunked", chunks: chunks.length, skipped }; } // #783: many LLMs (DeepSeek, GPT variants, some Anthropic responses) @@ -233,7 +331,8 @@ export function registerSummarizeFunction( metricsStore?: MetricsStore, ): void { sdk.registerFunction("mem::summarize", - async (data: { sessionId: string } | undefined) => { + async (data: { sessionId: string } | undefined) => + withSession(data?.sessionId, async () => { const startMs = Date.now(); if (!data || typeof data.sessionId !== "string" || !data.sessionId.trim()) { return { success: false, error: "sessionId is required" }; @@ -282,6 +381,7 @@ export function registerSummarizeFunction( let response = ""; let mode = "single"; let chunks = 1; + let truncated = false; for (let attempt = 1; attempt <= 2; attempt++) { const produced = await produceSummaryXml( provider, @@ -292,6 +392,7 @@ export function registerSummarizeFunction( response = produced.response; mode = produced.mode; chunks = produced.chunks; + truncated = produced.truncated ?? false; if (!response || !response.trim()) { logger.warn("Empty provider response on summarize", { sessionId, @@ -356,10 +457,13 @@ export function registerSummarizeFunction( const qualityScore = scoreSummary(summaryForValidation); + if (truncated) summary.truncated = true; + await kv.set(KV.summaries, sessionId, summary); await safeAudit(kv, "compress", "mem::summarize", [sessionId], { title: summary.title, observationCount: compressed.length, + ...(truncated ? { truncated: true } : {}), }); const latencyMs = Date.now() - startMs; @@ -393,6 +497,6 @@ export function registerSummarizeFunction( }); return { success: false, error: msg }; } - }, + }), ); } diff --git a/src/index.ts b/src/index.ts index 4233e8a67..d1a346c82 100644 --- a/src/index.ts +++ b/src/index.ts @@ -88,6 +88,7 @@ import { registerTemporalGraphFunctions } from "./functions/temporal-graph.js"; import { registerRetentionFunctions } from "./functions/retention.js"; import { registerCompressFileFunction } from "./functions/compress-file.js"; import { registerReplayFunctions } from "./functions/replay.js"; +import { registerSessionBudgetFunctions,initSessionBudgetMeter} from "./functions/session-budget.js"; import { registerApiTriggers } from "./triggers/api.js"; import { registerEventTriggers } from "./triggers/events.js"; import { registerMcpEndpoints } from "./mcp/server.js"; @@ -220,6 +221,7 @@ async function main() { const secret = getEnvVar("AGENTMEMORY_SECRET"); const metricsStore = new MetricsStore(kv); const dedupMap = new DedupMap(); + initSessionBudgetMeter(kv, sdk); const vectorIndex = embeddingProvider ? new VectorIndex() : null; @@ -332,6 +334,7 @@ async function main() { registerRetentionFunctions(sdk, kv); registerCompressFileFunction(sdk, kv, provider); registerReplayFunctions(sdk, kv); + registerSessionBudgetFunctions(sdk, kv); bootLog( `v0.6 advanced retrieval: sliding-window, query-expansion, temporal-graph, retention-scoring`, ); @@ -518,7 +521,7 @@ async function main() { `Ready. ${embeddingProvider ? "Triple-stream (BM25+Vector+Graph)" : "BM25+Graph"} search active.`, ); bootLog( - `REST API: 128 endpoints at http://localhost:${config.restPort}/agentmemory/*`, + `REST API: 129 endpoints at http://localhost:${config.restPort}/agentmemory/*`, ); bootLog( `MCP surface (opt-in via \`npx @agentmemory/mcp\`): ${getAllTools().length} tools · 6 resources · 3 prompts`, @@ -579,6 +582,19 @@ async function main() { }, 60 * 60 * 1000); recentSearchesSweepTimer.unref(); + // reap token budgets for sessions whose endedAt + + // retention window has passed (plus orphaned rows). + // Hourly, unref'd so it never holds the process open. + const sessionBudgetReapTimer = setInterval(async () => { + try { + await sdk.trigger({ + function_id: "mem::session::budget::reap", + payload: {}, + }); + } catch {} + }, 60 * 60 * 1000); + sessionBudgetReapTimer.unref(); + if (isConsolidationEnabled()) { const consolidationTimer = setInterval(async () => { try { diff --git a/src/providers/index.ts b/src/providers/index.ts index 0ec3feba0..e1aeda0d6 100644 --- a/src/providers/index.ts +++ b/src/providers/index.ts @@ -55,7 +55,7 @@ function defaultModelFor(providerType: ProviderConfig["provider"]): string { } export function createProvider(config: ProviderConfig): ResilientProvider { - return new ResilientProvider(createBaseProvider(config)); + return new ResilientProvider(createBaseProvider(config), config.model); } export function createFallbackProvider( @@ -87,9 +87,9 @@ export function createFallbackProvider( } if (providers.length > 1) { - return new ResilientProvider(new FallbackChainProvider(providers)); + return new ResilientProvider(new FallbackChainProvider(providers),config.model,); } - return new ResilientProvider(providers[0]); + return new ResilientProvider(providers[0], config.model); } function createBaseProvider(config: ProviderConfig): MemoryProvider { diff --git a/src/providers/resilient.ts b/src/providers/resilient.ts index 95ece40c9..b0dd5d314 100644 --- a/src/providers/resilient.ts +++ b/src/providers/resilient.ts @@ -1,34 +1,77 @@ import type { MemoryProvider, CircuitBreakerState } from "../types.js"; import { CircuitBreaker } from "./circuit-breaker.js"; +import { currentSessionId } from "../state/session-context.js"; +import { getSessionBudgetMeter } from "../functions/session-budget.js"; + +// Rough token estimate (char/3) reused from the context +// renderer. Per-session budgets are a cost safety net, not billing-grade +// accounting — providers return a bare string with no usage field, so we +// estimate from prompt + response length rather than threading exact usage +// through every provider method. +function estimateTokens(text: string): number { + if (!text) return 0; + return Math.ceil(text.length / 3); +} export class ResilientProvider implements MemoryProvider { private breaker = new CircuitBreaker(); name: string; - constructor(private inner: MemoryProvider) { + constructor(private inner: MemoryProvider, private modelName = inner.name) { this.name = `resilient(${inner.name})`; } - private async call(fn: () => Promise): Promise { + // All LLM traffic funnels through here. Order: circuit-breaker gate -> + // per-session budget gate -> inner call -> record estimated tokens in a + // finally (0/0 on failure so partial calls are never double-counted). + private async call(systemPrompt: string, userPrompt: string, fn: () => Promise): Promise { if (!this.breaker.isAllowed) { throw new Error("circuit_breaker_open"); } + + const sessionId = currentSessionId(); + const meter = getSessionBudgetMeter(); + if (await meter.isExhausted(sessionId)) { + throw new Error("session_budget_exhausted"); + } + + const inputTokens = estimateTokens(systemPrompt) + estimateTokens(userPrompt); + let outputTokens = 0; + let succeeded = false; try { const result = await fn(); + outputTokens = estimateTokens(result); + succeeded = true; this.breaker.recordSuccess(); return result; } catch (err) { this.breaker.recordFailure(); throw err; + } finally { + // Failed calls record 0/0: the inner provider may have aborted before + // consuming tokens, and counting a best-guess input on every retry + // would over-bill the cap on flaky providers. + await meter + .record( + sessionId, + succeeded ? inputTokens : 0, + succeeded ? outputTokens : 0, + this.modelName, + ) + .catch(() => {}); } } async compress(systemPrompt: string, userPrompt: string): Promise { - return this.call(() => this.inner.compress(systemPrompt, userPrompt)); + return this.call(systemPrompt, userPrompt, () => + this.inner.compress(systemPrompt, userPrompt), + ); } async summarize(systemPrompt: string, userPrompt: string): Promise { - return this.call(() => this.inner.summarize(systemPrompt, userPrompt)); + return this.call(systemPrompt, userPrompt, () => + this.inner.summarize(systemPrompt, userPrompt), + ); } get circuitState(): CircuitBreakerState { diff --git a/src/state/schema.ts b/src/state/schema.ts index cb29d41ad..0233a2c67 100644 --- a/src/state/schema.ts +++ b/src/state/schema.ts @@ -72,6 +72,10 @@ export const KV = { // #771: tracks the most recent smart-search call per session, used by // the followup-rate diagnostic. Key = sessionId. TTL-swept hourly. recentSearches: "mem:recent-searches", + // estimated-token-per-session LLM token budget. Key = sessionId (or the + // "__system__" sentinel for cron / cross-session LLM calls). Reaped by + // mem::session::budget::reap once a session's endedAt + retention passes. + sessionBudget: "mem:session-budget", } as const; export const STREAM = { diff --git a/src/state/session-context.ts b/src/state/session-context.ts new file mode 100644 index 000000000..7757d1d27 --- /dev/null +++ b/src/state/session-context.ts @@ -0,0 +1,20 @@ +import { AsyncLocalStorage } from "node:async_hooks"; + +export const SYSTEM_SESSION = "__system__"; + +interface SessionStore { + sessionId: string; +} + +export const sessionContext = new AsyncLocalStorage(); + +export function withSession(sessionId: string | undefined, fn: () => Promise): Promise { + const id = typeof sessionId === "string" && sessionId.trim().length > 0 + ? sessionId.trim() + : SYSTEM_SESSION; + return sessionContext.run({ sessionId: id }, fn); +} + +export function currentSessionId(): string { + return sessionContext.getStore()?.sessionId ?? SYSTEM_SESSION; +} diff --git a/src/telemetry/setup.ts b/src/telemetry/setup.ts index fc55b4f55..00690d347 100644 --- a/src/telemetry/setup.ts +++ b/src/telemetry/setup.ts @@ -61,6 +61,7 @@ interface Histograms { qualityScore: Histogram; embeddingLatency: Histogram; vectorSearchLatency: Histogram; + sessionTokensUsed: Histogram; } type Meter = { @@ -105,6 +106,7 @@ const HISTOGRAM_NAMES: Array<[keyof Histograms, string]> = [ ["qualityScore", "quality.score"], ["embeddingLatency", "embedding.latency_ms"], ["vectorSearchLatency", "vector_search.latency_ms"], + ["sessionTokensUsed", "session.tokens_used"], ]; // Accessors so functions outside `initMetrics`'s closure can record into diff --git a/src/triggers/api.ts b/src/triggers/api.ts index 7b6c2bf2b..adbbfcd12 100644 --- a/src/triggers/api.ts +++ b/src/triggers/api.ts @@ -1,5 +1,5 @@ import { TriggerAction, type ISdk, type ApiRequest } from "iii-sdk"; -import type { Session, CompressedObservation, HookPayload, CommitLink, SessionSummary } from "../types.js"; +import type { Session, CompressedObservation, HookPayload, CommitLink, SessionSummary, SessionBudget } from "../types.js"; import { withKeyedLock } from "../state/keyed-mutex.js"; import { KV } from "../state/schema.js"; import { StateKV } from "../state/kv.js"; @@ -594,6 +594,21 @@ export function registerApiTriggers( ...(agentId ? { agentId } : {}), }; await kv.set(KV.sessions, sessionId, session); + // Seed the per-session token budget. An optional + // tokenCap in the request body overrides the global default for this + // session only; init is idempotent so a re-fired start never resets + // a live counter. + const tokenCap = parseOptionalInt(body.tokenCap); + sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { + sessionId, + ...(typeof tokenCap === "number" && tokenCap > 0 + ? { tokenCap } + : {}), + }, + action: TriggerAction.Void(), + }); const contextResult = await sdk.trigger< { sessionId: string; project: string }, { context: string } @@ -653,6 +668,33 @@ export function registerApiTriggers( }, }); + // Read per-session token-budget state. With ?sessionId, + // returns that session's budget; without it, returns all budget rows + // (used by `agentmemory status` and the viewer to surface near-cap and + // exhausted sessions). + sdk.registerFunction("api::session::budget", + async (req: ApiRequest): Promise => { + const authErr = checkAuth(req, secret); + if (authErr) return authErr; + const sessionId = asNonEmptyString(req.query_params?.["sessionId"]); + if (sessionId) { + const budget = await kv + .get(KV.sessionBudget, sessionId) + .catch(() => null); + return { status_code: 200, body: { budget: budget ?? null } }; + } + const budgets = await kv + .list(KV.sessionBudget) + .catch(() => [] as SessionBudget[]); + return { status_code: 200, body: { budgets } }; + }, + ); + sdk.registerTrigger({ + type: "http", + function_id: "api::session::budget", + config: { api_path: "/agentmemory/session/budget", http_method: "GET" }, + }); + sdk.registerFunction("api::summarize", async (req: ApiRequest<{ sessionId: string }>): Promise => { const sessionId = asNonEmptyString((req.body as Record)?.sessionId); diff --git a/src/triggers/events.ts b/src/triggers/events.ts index e38b58db4..a220eeac1 100644 --- a/src/triggers/events.ts +++ b/src/triggers/events.ts @@ -104,6 +104,35 @@ export function registerEventTriggers(sdk: ISdk, kv: StateKV): void { config: { topic: "agentmemory.session.ended" }, }); + // Estimated-token-per-session budget lifecycle events. mem::session::budget::record + // fires these when a session crosses 80% (soft warn) or hits its cap + // (exhausted). Each fans the budget out to the viewer stream group so + // dashboards can surface near-cap / exhausted sessions in real time. + const registerBudgetEvent = ( + functionId: string, + eventType: string, + ): void => { + sdk.registerFunction( + functionId, + async (data: { budget?: unknown }) => { + await sdk.trigger({ + function_id: "stream::send", + payload: { + stream_name: STREAM.name, + group_id: STREAM.viewerGroup, + id: `${eventType}-${Date.now()}`, + type: eventType, + data: { budget: data?.budget ?? null }, + }, + action: TriggerAction.Void(), + }); + return { emitted: true }; + }, + ); + }; + registerBudgetEvent("event::mem::budget::soft-warned", "budget.soft_warned"); + registerBudgetEvent("event::mem::budget::exhausted", "budget.exhausted"); + // React to observation count changes and emit a lightweight live event for dashboards/viewer. sdk.registerFunction( "event::session::observation-count-changed", diff --git a/src/types.ts b/src/types.ts index 6797dfaf9..4f2c3e9e8 100644 --- a/src/types.ts +++ b/src/types.ts @@ -114,6 +114,21 @@ export interface SessionSummary { filesModified: string[]; concepts: string[]; observationCount: number; + truncated?: boolean; +} + +export interface SessionBudget { + sessionId: string; + tokenCap: number; + tokensUsed: number; + inputTokens: number; + outputTokens: number; + costEstimate: number; + callCount: number; + warnEmittedAt?: string; + exhaustedAt?: string; + createdAt: string; + updatedAt: string; } export type HookType = @@ -608,7 +623,9 @@ export interface AuditEntry { | "slot_replace" | "slot_create" | "slot_delete" - | "slot_reflect"; + | "slot_reflect" + | "budget_warn" + | "budget_exhausted"; userId?: string; functionId: string; targetIds: string[]; diff --git a/test/session-budget.test.ts b/test/session-budget.test.ts new file mode 100644 index 000000000..7a695d3ad --- /dev/null +++ b/test/session-budget.test.ts @@ -0,0 +1,414 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; + +vi.mock("../src/logger.js", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, + bootLog: vi.fn(), +})); + +import { + registerSessionBudgetFunctions, + initBudget, + recordBudget, + isBudgetExhausted, + reapBudgets, + getBudget, + initSessionBudgetMeter, + resetSessionBudgetMeter, +} from "../src/functions/session-budget.js"; +import { ResilientProvider } from "../src/providers/resilient.js"; +import { withSession, SYSTEM_SESSION } from "../src/state/session-context.js"; +import { registerCompressFunction } from "../src/functions/compress.js"; +import { registerSummarizeFunction } from "../src/functions/summarize.js"; +import { KV } from "../src/state/schema.js"; +import type { + SessionBudget, + MemoryProvider, + RawObservation, + Session, + CompressedObservation, +} from "../src/types.js"; + +function mockKV() { + const store = new Map>(); + return { + store, + get: async (scope: string, key: string): Promise => + (store.get(scope)?.get(key) as T) ?? null, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + delete: async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }, + list: async (scope: string): Promise => { + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }, + update: async ( + scope: string, + key: string, + ops: Array<{ type: string; path: string; value?: unknown }>, + ): Promise => { + const cur = (store.get(scope)?.get(key) as Record) ?? {}; + for (const op of ops) { + if (op.type === "set") cur[op.path] = op.value; + } + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, cur); + return cur; + }, + }; +} + +function mockSdk() { + const functions = new Map(); + const triggers: Array<{ id: string; payload: unknown }> = []; + return { + functions, + triggers, + registerFunction: (idOrOpts: string | { id: string }, handler: Function) => { + const id = typeof idOrOpts === "string" ? idOrOpts : idOrOpts.id; + functions.set(id, handler); + }, + registerTrigger: () => {}, + trigger: async ( + idOrInput: string | { function_id: string; payload: unknown }, + data?: unknown, + ) => { + const id = typeof idOrInput === "string" ? idOrInput : idOrInput.function_id; + const payload = typeof idOrInput === "string" ? data : idOrInput.payload; + triggers.push({ id, payload }); + const fn = functions.get(id); + if (!fn) return undefined; // events/streams not registered in unit ctx + return fn(payload); + }, + }; +} + +function innerProvider(overrides: Partial = {}): MemoryProvider { + return { + name: "test-inner", + compress: vi.fn().mockResolvedValue("ok"), + summarize: vi.fn().mockResolvedValue("ok"), + ...overrides, + }; +} + +describe("session budget", () => { + let kv: ReturnType; + let sdk: ReturnType; + + beforeEach(() => { + kv = mockKV(); + sdk = mockSdk(); + delete process.env.AGENTMEMORY_SESSION_TOKEN_CAP; + delete process.env.AGENTMEMORY_SYSTEM_TOKEN_CAP; + delete process.env.AGENTMEMORY_SESSION_BUDGET_RETENTION_DAYS; + delete process.env.SUMMARIZE_CHUNK_SIZE; + delete process.env.SUMMARIZE_CHUNK_CONCURRENCY; + }); + + afterEach(() => { + resetSessionBudgetMeter(); + vi.clearAllMocks(); + }); + + describe("recordBudget", () => { + it("forks fresh per session", async () => { + await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 50, + outputTokens: 50, + }); + const s1 = await getBudget(kv as never, "s1"); + const s2 = await getBudget(kv as never, "s2"); + expect(s1?.tokensUsed).toBe(100); + expect(s2).toBeNull(); + }); + + it("enforces the hard cap", async () => { + await initBudget(kv as never, { sessionId: "s1", tokenCap: 100 }); + const b = await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 60, + outputTokens: 50, + }); + expect(b.tokensUsed).toBe(110); + expect(b.exhaustedAt).toBeTruthy(); + expect(await isBudgetExhausted(kv as never, "s1")).toBe(true); + }); + + it("emits a soft warning exactly once at 80%", async () => { + await initBudget(kv as never, { sessionId: "s1", tokenCap: 100 }); + await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 0, + outputTokens: 85, + }); + await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 0, + outputTokens: 5, + }); + const b = await getBudget(kv as never, "s1"); + expect(b?.warnEmittedAt).toBeTruthy(); + expect(b?.exhaustedAt).toBeFalsy(); + const warnEvents = sdk.triggers.filter( + (t) => t.id === "event::mem::budget::soft-warned", + ); + expect(warnEvents).toHaveLength(1); + }); + + it("emits an exhausted event exactly once when crossing the cap", async () => { + await initBudget(kv as never, { sessionId: "s1", tokenCap: 100 }); + await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 60, + outputTokens: 50, + }); + await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 10, + outputTokens: 0, + }); + const b = await getBudget(kv as never, "s1"); + expect(b?.exhaustedAt).toBeTruthy(); + const exhaustEvents = sdk.triggers.filter( + (t) => t.id === "event::mem::budget::exhausted", + ); + expect(exhaustEvents).toHaveLength(1); + }); + + it("uses AGENTMEMORY_SESSION_TOKEN_CAP when init has no explicit cap", async () => { + process.env.AGENTMEMORY_SESSION_TOKEN_CAP = "500"; + const budget = await initBudget(kv as never, { sessionId: "s-env" }); + expect(budget.tokenCap).toBe(500); + }); + + it("serializes concurrent increments", async () => { + await initBudget(kv as never, { sessionId: "s1", tokenCap: 10_000 }); + await Promise.all( + Array.from({ length: 10 }, () => + recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 10, + outputTokens: 0, + }), + ), + ); + const b = await getBudget(kv as never, "s1"); + expect(b?.tokensUsed).toBe(100); + expect(b?.callCount).toBe(10); + }); + + it("tracks system-triggered calls under the sentinel scope", async () => { + await recordBudget(kv as never, sdk as never, { + sessionId: "", + inputTokens: 10, + outputTokens: 0, + }); + const sentinel = await getBudget(kv as never, SYSTEM_SESSION); + expect(sentinel?.tokensUsed).toBe(10); + expect(sentinel?.tokenCap).toBe(1_000_000); + }); + }); + + describe("ResilientProvider integration", () => { + it("blocks LLM calls once the session budget is exhausted", async () => { + initSessionBudgetMeter(kv as never, sdk as never); + await initBudget(kv as never, { sessionId: "s1", tokenCap: 10 }); + await recordBudget(kv as never, sdk as never, { + sessionId: "s1", + inputTokens: 20, + outputTokens: 0, + }); + const inner = innerProvider(); + const provider = new ResilientProvider(inner, "model-x"); + await expect( + withSession("s1", () => provider.compress("sys", "user")), + ).rejects.toThrow("session_budget_exhausted"); + expect(inner.compress).not.toHaveBeenCalled(); + }); + + it("records nothing for a failed call", async () => { + initSessionBudgetMeter(kv as never, sdk as never); + const inner = innerProvider({ + compress: vi.fn().mockRejectedValue(new Error("boom")), + }); + const provider = new ResilientProvider(inner, "model-x"); + await expect( + withSession("s2", () => provider.compress("sys", "user")), + ).rejects.toThrow("boom"); + const b = await getBudget(kv as never, "s2"); + expect(b?.tokensUsed).toBe(0); + expect(b?.callCount).toBe(1); + }); + + it("records estimated tokens for a successful call", async () => { + initSessionBudgetMeter(kv as never, sdk as never); + const inner = innerProvider({ + compress: vi.fn().mockResolvedValue("a".repeat(300)), + }); + const provider = new ResilientProvider(inner, "model-x"); + await withSession("s3", () => provider.compress("system", "prompt")); + const b = await getBudget(kv as never, "s3"); + expect(b?.inputTokens).toBeGreaterThan(0); + expect(b?.outputTokens).toBeGreaterThan(0); + expect(b?.tokensUsed).toBe((b?.inputTokens ?? 0) + (b?.outputTokens ?? 0)); + }); + }); + + describe("reapBudgets", () => { + it("reaps ended sessions past retention, keeps active and sentinel", async () => { + const old = new Date(Date.now() - 30 * 86400_000).toISOString(); + await kv.set(KV.sessions, "s_old", { + id: "s_old", + endedAt: old, + } as Session); + await kv.set(KV.sessions, "s_active", { id: "s_active" } as Session); + await initBudget(kv as never, { sessionId: "s_old" }); + await initBudget(kv as never, { sessionId: "s_active" }); + await recordBudget(kv as never, sdk as never, { + sessionId: SYSTEM_SESSION, + inputTokens: 5, + outputTokens: 0, + }); + // Backdate the ended session's budget so the orphan fallback is not + // what reaps it — the session endedAt is the signal here. + const result = await reapBudgets(kv as never); + expect(result.swept).toBe(1); + expect(await getBudget(kv as never, "s_old")).toBeNull(); + expect(await getBudget(kv as never, "s_active")).not.toBeNull(); + expect(await getBudget(kv as never, SYSTEM_SESSION)).not.toBeNull(); + }); + }); + + describe("registered functions", () => { + beforeEach(() => { + registerSessionBudgetFunctions(sdk as never, kv as never); + }); + + it("init is idempotent", async () => { + const first = await sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { sessionId: "s1", tokenCap: 500 }, + }); + const second = await sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { sessionId: "s1", tokenCap: 999 }, + }); + expect((first as { budget: SessionBudget }).budget.tokenCap).toBe(500); + // unchanged: still 500, not reset to 999 + expect((second as { budget: SessionBudget }).budget.tokenCap).toBe(500); + }); + + it("get returns a single budget or the full list", async () => { + await sdk.trigger({ + function_id: "mem::session::budget::init", + payload: { sessionId: "s1" }, + }); + const one = (await sdk.trigger({ + function_id: "mem::session::budget::get", + payload: { sessionId: "s1" }, + })) as { budget: SessionBudget | null }; + expect(one.budget?.sessionId).toBe("s1"); + const all = (await sdk.trigger({ + function_id: "mem::session::budget::get", + payload: {}, + })) as { budgets: SessionBudget[] }; + expect(all.budgets.length).toBe(1); + }); + }); + + describe("compress synthetic fallback on exhaustion", () => { + it("stores synthetic compression instead of dropping the observation", async () => { + initSessionBudgetMeter(kv as never, sdk as never); + await initBudget(kv as never, { sessionId: "sx", tokenCap: 10 }); + await recordBudget(kv as never, sdk as never, { + sessionId: "sx", + inputTokens: 50, + outputTokens: 0, + }); + const inner = innerProvider(); + const provider = new ResilientProvider(inner, "model-x"); + registerCompressFunction(sdk as never, kv as never, provider); + + const raw: RawObservation = { + id: "obs1", + sessionId: "sx", + timestamp: new Date().toISOString(), + hookType: "post_tool_use", + toolName: "Read", + toolInput: { file_path: "/tmp/a.ts" }, + toolOutput: "contents", + raw: {}, + }; + const result = (await sdk.trigger({ + function_id: "mem::compress", + payload: { observationId: "obs1", sessionId: "sx", raw }, + })) as { success: boolean; budgetExhausted?: boolean }; + + expect(result.success).toBe(true); + expect(result.budgetExhausted).toBe(true); + expect(inner.compress).not.toHaveBeenCalled(); + const stored = await kv.get( + KV.observations("sx"), + "obs1", + ); + expect(stored?.id).toBe("obs1"); + expect(stored?.title).toBeTruthy(); + }); + }); + + describe("summarize truncation on exhaustion", () => { + it("stops before the next chunk and flags the summary truncated", async () => { + process.env.SUMMARIZE_CHUNK_SIZE = "1"; + process.env.SUMMARIZE_CHUNK_CONCURRENCY = "1"; + initSessionBudgetMeter(kv as never, sdk as never); + await initBudget(kv as never, { sessionId: "ss", tokenCap: 10 }); + + await kv.set(KV.sessions, "ss", { + id: "ss", + project: "proj", + } as Session); + for (let i = 0; i < 4; i++) { + await kv.set(KV.observations("ss"), `o${i}`, { + id: `o${i}`, + sessionId: "ss", + timestamp: new Date().toISOString(), + type: "file_read", + title: `obs ${i}`, + facts: [], + narrative: "did a thing", + concepts: ["c"], + files: ["f.ts"], + importance: 5, + } as CompressedObservation); + } + + const inner = innerProvider({ + summarize: vi + .fn() + .mockResolvedValue( + "ChunkThis chunk summarizes a meaningful slice of the session work.", + ), + }); + const provider = new ResilientProvider(inner, "model-x"); + registerSummarizeFunction(sdk as never, kv as never, provider); + + const result = (await sdk.trigger({ + function_id: "mem::summarize", + payload: { sessionId: "ss" }, + })) as { success: boolean; summary?: { truncated?: boolean } }; + + expect(result.success).toBe(true); + expect(result.summary?.truncated).toBe(true); + // Did not process all four chunks before aborting. + expect((inner.summarize as ReturnType).mock.calls.length).toBeLessThan(4); + const saved = await kv.get<{ truncated?: boolean }>(KV.summaries, "ss"); + expect(saved?.truncated).toBe(true); + }); + }); +});