From 7c5a2aa7a83b35dd00a5cca8bfead0841503ae04 Mon Sep 17 00:00:00 2001 From: zhangjianan Date: Mon, 18 May 2026 10:48:47 +0800 Subject: [PATCH] feat(memory+diagnostics): surface compaction events + memory stats Compaction telemetry was already collected (CompactionCounters, 10 atomics behind /v1/diagnostics/memory) but invisible to both users and operators: the agent silently called memory.compact() each iteration with no event, the Diagnostics page only listed worktrees/runs, and the SlidingWindowMemory backend (the default `JARVIS_MEMORY_MODE=window`) had no stats provider at all. This makes the existing telemetry actionable end-to-end. Backend (harness-core / harness-memory) - New memory_event task-local channel mirroring plan/progress; carries CompactionInfo { source, turns_kept, turns_dropped, summary_chars, working_context_chars, model_input_tokens_est }. Emits are silent no-ops outside the agent loop, keeping memory unit tests trivial. - AgentEvent::MemoryCompacted variant; Agent::run_stream scopes the channel around each build_request call and yields drained events before the next LLM iteration. - SummarizingMemory::compact now classifies its outcome into one of CompactionSource {NoOp, WindowDropped, CacheMemory, CacheStore, FreshLlm, PtlRoundOne, PtlRoundTwo, SummaryUnavailable} and emits one CompactionInfo per call. Priority: PTL > summary source > SummaryUnavailable > NoOp. - SlidingWindowMemory gains a minimal SlidingCounters { compactions_total, window_dropped } so the default deployment also serves /v1/diagnostics/memory instead of 503. - jarvis-cli renders an inline one-liner for non-NoOp events. Frontend (jarvis-web) - Settings -> System -> Diagnostics now has a Memory panel polling /v1/diagnostics/memory every 10s; renders backend / compactions / cache hit rate / LLM failure rate / circuit opens-skips / PTL fallback for the summarizing backend, and a trimmed view (backend + compactions + window-dropped) for the sliding backend so no misleading zeros show up. - New memorySlice (per-conversation ring buffer, cap 20) + memoryFrames handler. `reset` clears markers across conversations to mirror planFrames. - New CompactionMarker component renders an inline dashed-border card in the chat transcript at the right message boundary; NoOp emits are filtered out (counters still record them). SummaryUnavailable and PTL fallbacks render with a warn tone (amber). - Full zh-CN + en i18n: diagnosticsMemoryHeading, MemoryUnavailable, and 7 compactionSourceXxx keys. Quality gates - cargo clippy --workspace --all-targets --exclude jarvis-desktop -D warnings: clean - cargo test --workspace --exclude jarvis-desktop: passed - harness-memory: 47/47 (5 new event tests + 1 sliding counter test) - harness-core: agent.rs run_stream_emits_memory_compacted + 4 memory_event::tests - pnpm typecheck: clean; vitest 346/346 (9 new across memorySlice / memoryFrames) - Browser preview: zh-CN labels resolve; populated panel renders 6 rows for summarizing (80% cache hit rate over the sample payload); sliding shape renders 2 rows; warn-tone marker renders amber dashed border + tinted pill. Co-Authored-By: Claude Opus 4.7 --- apps/jarvis-cli/src/runner.rs | 29 +++ .../src/components/Chat/CompactionMarker.tsx | 107 +++++++++ .../src/components/Chat/MessageList.tsx | 174 +++++++++----- .../Settings/sections/DiagnosticsSection.tsx | 117 ++++++++++ apps/jarvis-web/src/services/diagnostics.ts | 38 +++ apps/jarvis-web/src/services/frames/index.ts | 2 + .../src/services/frames/memoryFrames.test.ts | 93 ++++++++ .../src/services/frames/memoryFrames.ts | 45 ++++ apps/jarvis-web/src/store/appStore.ts | 3 + .../src/store/slices/memorySlice.test.ts | 105 +++++++++ .../src/store/slices/memorySlice.ts | 81 +++++++ apps/jarvis-web/src/styles.css | 92 ++++++++ apps/jarvis-web/src/utils/i18n.ts | 20 ++ apps/jarvis/src/serve.rs | 9 +- crates/harness-core/src/agent.rs | 79 ++++++- crates/harness-core/src/lib.rs | 5 + crates/harness-core/src/memory_event.rs | 221 ++++++++++++++++++ crates/harness-memory/src/sliding.rs | 191 +++++++++++++-- crates/harness-memory/src/summarizing.rs | 188 +++++++++++++-- 19 files changed, 1498 insertions(+), 101 deletions(-) create mode 100644 apps/jarvis-web/src/components/Chat/CompactionMarker.tsx create mode 100644 apps/jarvis-web/src/services/frames/memoryFrames.test.ts create mode 100644 apps/jarvis-web/src/services/frames/memoryFrames.ts create mode 100644 apps/jarvis-web/src/store/slices/memorySlice.test.ts create mode 100644 apps/jarvis-web/src/store/slices/memorySlice.ts create mode 100644 crates/harness-core/src/memory_event.rs diff --git a/apps/jarvis-cli/src/runner.rs b/apps/jarvis-cli/src/runner.rs index 7e38e7a..e83d5ca 100644 --- a/apps/jarvis-cli/src/runner.rs +++ b/apps/jarvis-cli/src/runner.rs @@ -579,6 +579,35 @@ async fn run_one_turn( println!(" {marker} {}", item.title); } } + AgentEvent::MemoryCompacted { info } => { + // NoOp emits every iteration and would drown + // out the rest of the stream; the diagnostics + // panel still picks it up via the counter + // snapshot, so we suppress it in the inline + // CLI feed and only render when work happened. + if !matches!(info.source, harness_core::CompactionSource::NoOp) { + if delta_open { println!(); delta_open = false; } + let label = match info.source { + harness_core::CompactionSource::WindowDropped => "window pruned", + harness_core::CompactionSource::CacheMemory => "summary cache hit", + harness_core::CompactionSource::CacheStore => "summary cache hit (store)", + harness_core::CompactionSource::FreshLlm => "fresh summary", + harness_core::CompactionSource::PtlRoundOne => "fallback prune (round 1)", + harness_core::CompactionSource::PtlRoundTwo => "fallback prune (round 2)", + harness_core::CompactionSource::SummaryUnavailable => "summary unavailable", + harness_core::CompactionSource::NoOp => "no-op", + }; + println!( + "{} {} {}", + dim("⊟"), + dim(&format!("compacted: {label}")), + dim(&format!( + "({} turn(s) dropped, ~{} input tokens)", + info.turns_dropped, info.model_input_tokens_est + )), + ); + } + } AgentEvent::Usage { .. } => { // Surfaced via /policy / future usage badge; // skip the noise inline. diff --git a/apps/jarvis-web/src/components/Chat/CompactionMarker.tsx b/apps/jarvis-web/src/components/Chat/CompactionMarker.tsx new file mode 100644 index 0000000..8886293 --- /dev/null +++ b/apps/jarvis-web/src/components/Chat/CompactionMarker.tsx @@ -0,0 +1,107 @@ +// Inline marker rendered in the chat transcript when the memory +// backend compacted earlier turns out of the next request. +// +// Mirrors the visual weight of `` so it doesn't +// look like a real assistant turn — the user's eye should skim +// over it unless they want to investigate. The disclosure shows a +// few quick metrics (chars, estimated tokens) that map back to the +// 10-counter view in Settings → Diagnostics → Memory. +// +// Source priority: +// - PtlRound{One,Two}: a budget escape hatch fired +// - Cache{Memory,Store}: cheap path — we paid for this once +// - FreshLlm: real summary call this iteration +// - SummaryUnavailable: circuit open / LLM error — degraded +// - WindowDropped: SlidingWindowMemory dropped without LLM +// - NoOp: filtered out by MessageList; never reaches this component + +import { useState } from "react"; +import type { CompactionMarker as CompactionMarkerData } from "../../store/slices/memorySlice"; +import { t } from "../../utils/i18n"; + +interface Props { + marker: CompactionMarkerData; +} + +function sourceLabel(source: string): string { + switch (source) { + case "cache_memory": + return t("compactionSourceCacheMemory") || "cache hit"; + case "cache_store": + return t("compactionSourceCacheStore") || "cache hit (store)"; + case "fresh_llm": + return t("compactionSourceFreshLlm") || "fresh summary"; + case "ptl_round_one": + return t("compactionSourcePtlRoundOne") || "fallback prune"; + case "ptl_round_two": + return t("compactionSourcePtlRoundTwo") || "fallback prune (round 2)"; + case "window_dropped": + return t("compactionSourceWindowDropped") || "window pruned"; + case "summary_unavailable": + return t("compactionSourceSummaryUnavailable") || "summary unavailable"; + default: + return source; + } +} + +function sourceTone(source: string): "info" | "warn" { + return source === "summary_unavailable" || + source === "ptl_round_one" || + source === "ptl_round_two" + ? "warn" + : "info"; +} + +export function CompactionMarker({ marker }: Props) { + const [open, setOpen] = useState(false); + const headline = + marker.turnsDropped > 0 + ? `${marker.turnsDropped} ${marker.turnsDropped === 1 ? "turn" : "turns"} summarized` + : `compaction · ${sourceLabel(marker.source)}`; + const tone = sourceTone(marker.source); + return ( +
+ + {open ? ( +
+
+
turns kept
+
{marker.turnsKept}
+
+
+
turns dropped
+
{marker.turnsDropped}
+
+ {typeof marker.summaryChars === "number" ? ( +
+
summary chars
+
{marker.summaryChars}
+
+ ) : null} + {typeof marker.workingContextChars === "number" ? ( +
+
working ctx chars
+
{marker.workingContextChars}
+
+ ) : null} +
+
est input tokens
+
~{marker.modelInputTokensEst}
+
+
+ ) : null} +
+ ); +} diff --git a/apps/jarvis-web/src/components/Chat/MessageList.tsx b/apps/jarvis-web/src/components/Chat/MessageList.tsx index 6d0e0aa..dfdb043 100644 --- a/apps/jarvis-web/src/components/Chat/MessageList.tsx +++ b/apps/jarvis-web/src/components/Chat/MessageList.tsx @@ -13,7 +13,7 @@ // `MIN_GROUP_SIZE` — small enough to be useful, large enough that // brief lookups don't get hidden behind an extra click. -import { useMemo } from "react"; +import { Fragment, useMemo, type ReactNode } from "react"; import { useAppStore } from "../../store/appStore"; import { useStickToBottom } from "../../hooks/useStickToBottom"; import { UserBubble } from "./UserBubble"; @@ -22,10 +22,12 @@ import { AgentLoadingFooter } from "./AgentLoadingFooter"; import { WelcomeScreen } from "./WelcomeScreen"; import { EmptyConvoHint } from "./EmptyConvoHint"; import { CollapsedToolGroup } from "./CollapsedToolGroup"; +import { CompactionMarker } from "./CompactionMarker"; import { MarkdownView } from "./MarkdownView"; import { isReadOnlyTool } from "./toolStepSummary"; import { t } from "../../utils/i18n"; import type { UiMessage, ToolBlockEntry } from "../../store/types"; +import type { CompactionMarker as CompactionMarkerData } from "../../store/slices/memorySlice"; const MIN_GROUP_SIZE = 3; @@ -98,6 +100,7 @@ export function MessageList() { const toolBlocks = useAppStore((s) => s.toolBlocks); const activeId = useAppStore((s) => s.activeId); const emptyHint = useAppStore((s) => s.emptyHintIdShort); + const markerMap = useAppStore((s) => s.compactionMarkers); const { ref } = useStickToBottom({ activeId }); const groups = useMemo( @@ -105,73 +108,130 @@ export function MessageList() { [messages, toolBlocks], ); + // Markers indexed by the message count at receive time. We render + // a marker *before* the group whose head sits at or past + // `marker.turnIndex` so a "compaction happened, then the LLM + // responded" pair reads top-to-bottom as: marker → assistant + // response. + const markers: CompactionMarkerData[] = useMemo(() => { + const key = activeId ?? "__active__"; + const list = markerMap[key] ?? []; + // NoOp emits every iteration; filter them out of the chat + // surface — the diagnostics panel still picks them up via + // /v1/diagnostics/memory counters. + return list.filter((m) => m.source !== "no_op"); + }, [activeId, markerMap]); + + // Group offsets: index into `messages` where each group starts. + // Lets us interleave compaction markers at the right boundary. + const groupOffsets = useMemo(() => { + const offs: number[] = []; + let acc = 0; + for (const g of groups) { + offs.push(acc); + acc += g.kind === "folded" ? g.messages.length : 1; + } + return offs; + }, [groups]); + return (
{messages.length === 0 && !emptyHint && } {messages.length === 0 && emptyHint && } {groups.map((g, gi) => { + const offset = groupOffsets[gi]; + const nextOffset = + gi + 1 < groupOffsets.length ? groupOffsets[gi + 1] : messages.length; + // Markers whose `turnIndex` lands at-or-before the next + // group's start are rendered before this group, so the + // compaction appears in the transcript where it actually + // happened. `gi === 0` also picks up the leading "before + // anything else" bucket. + const precedingMarkers = markers.filter( + (m) => + (gi === 0 ? m.turnIndex <= offset : m.turnIndex > offset && m.turnIndex <= nextOffset), + ); + const marker = (m: CompactionMarkerData) => ( + + ); + let body: ReactNode; if (g.kind === "folded") { const head = g.messages[0]; - return ; - } - const m = g.message; - if (m.kind === "user") { - return ( - - ); - } - if (m.kind === "assistant") { - // Coalesce consecutive assistant messages from the same - // user turn into a single visual bubble. The agent loop - // can fire multiple `assistant_message` events per turn - // (one per iteration: think → tool calls → reflect → - // tool calls → final reply); we keep them as separate - // UiMessages in the data model for clean per-iteration - // tool-call attribution but render them stacked under one - // avatar + name header so the user doesn't see "Jarvis, - // Jarvis, Jarvis" repeating down the page. - // - // Continuation here is computed against the prior *group*, - // not the prior raw message: a folded read-only run - // immediately followed by a final reply still wants the - // reply to read as a continuation of the same Jarvis turn. - const prev = groups[gi - 1]; - const continuation = - prev != null && - (prev.kind === "folded" || - (prev.kind === "single" && prev.message.kind === "assistant")); - return ( - - ); - } - if (m.kind === "system") { - return ( -
-
?
-
-
{t("system")}
-
- + body = ; + } else { + const m = g.message; + if (m.kind === "user") { + body = ( + + ); + } else if (m.kind === "assistant") { + // Coalesce consecutive assistant messages from the same + // user turn into a single visual bubble. The agent loop + // can fire multiple `assistant_message` events per turn + // (one per iteration: think → tool calls → reflect → + // tool calls → final reply); we keep them as separate + // UiMessages in the data model for clean per-iteration + // tool-call attribution but render them stacked under one + // avatar + name header so the user doesn't see "Jarvis, + // Jarvis, Jarvis" repeating down the page. + // + // Continuation here is computed against the prior *group*, + // not the prior raw message: a folded read-only run + // immediately followed by a final reply still wants the + // reply to read as a continuation of the same Jarvis turn. + const prev = groups[gi - 1]; + const continuation = + prev != null && + (prev.kind === "folded" || + (prev.kind === "single" && prev.message.kind === "assistant")); + body = ( + + ); + } else if (m.kind === "system") { + body = ( +
+
?
+
+
{t("system")}
+
+ +
-
- ); + ); + } else { + body = null; + } } - return null; + return ( + + {precedingMarkers.map(marker)} + {body} + + ); })} + {/* Trailing markers — emits that arrived after every existing + * message. Common case: a compaction-on-empty conversation + * (no messages yet) or events queued mid-stream just before + * the next `assistant_message` lands. */} + {markers + .filter((m) => m.turnIndex >= messages.length) + .map((m) => ( + + ))} {/* Pinned to the bottom of the scroller. Self-hides when no * turn is in flight — covers the silent gaps between LLM * iterations and during long tool execution that the diff --git a/apps/jarvis-web/src/components/Settings/sections/DiagnosticsSection.tsx b/apps/jarvis-web/src/components/Settings/sections/DiagnosticsSection.tsx index f064fef..c90b739 100644 --- a/apps/jarvis-web/src/components/Settings/sections/DiagnosticsSection.tsx +++ b/apps/jarvis-web/src/components/Settings/sections/DiagnosticsSection.tsx @@ -12,10 +12,12 @@ import { t } from "../../../utils/i18n"; import type { RequirementRun } from "../../../types/frames"; import { cleanupOrphanWorktrees, + getMemoryStats, listFailedRuns, listOrphanWorktrees, listStuckRuns, type CleanupReport, + type MemoryStats, type OrphanWorktree, type StuckRun, } from "../../../services/diagnostics"; @@ -54,6 +56,8 @@ export function DiagnosticsSection({ embedded }: { embedded?: boolean } = {}) { const [orphans, setOrphans] = useState(null); const [stuck, setStuck] = useState(null); const [failed, setFailed] = useState(null); + const [memory, setMemory] = useState(null); + const [memoryUnavailable, setMemoryUnavailable] = useState(false); const [unavailable, setUnavailable] = useState(false); const [busy, setBusy] = useState(false); const [report, setReport] = useState(null); @@ -78,6 +82,14 @@ export function DiagnosticsSection({ embedded }: { embedded?: boolean } = {}) { setStuck(stuckRows); const failedRows = await listFailedRuns(); setFailed(failedRows); + const mem = await getMemoryStats(); + if (mem === null) { + setMemoryUnavailable(true); + setMemory(null); + } else { + setMemoryUnavailable(false); + setMemory(mem); + } } catch (e) { setError(String(e)); } finally { @@ -87,6 +99,14 @@ export function DiagnosticsSection({ embedded }: { embedded?: boolean } = {}) { useEffect(() => { void refresh(); + // Memory counters move continuously while the agent runs — + // poll every 10s so operators see the rates update without + // mashing the refresh button. The other diagnostics blocks + // are cheap enough to recompute on the same tick. + const id = window.setInterval(() => { + void refresh(); + }, 10_000); + return () => window.clearInterval(id); }, []); const cleanup = async () => { @@ -212,6 +232,103 @@ export function DiagnosticsSection({ embedded }: { embedded?: boolean } = {}) { ))} )} + +

+ {tx("diagnosticsMemoryHeading", "Memory")} +

+
); } + +function MemoryPanel({ + stats, + unavailable, +}: { + stats: MemoryStats | null; + unavailable: boolean; +}) { + if (unavailable || !stats) { + return ( +

+ {tx( + "diagnosticsMemoryUnavailable", + "Memory stats provider not configured (set JARVIS_MEMORY_TOKENS).", + )} +

+ ); + } + // Both backends fill backend + compactions_total. Everything + // else is summarizing-only; skip the row when undefined so the + // sliding deployment doesn't render misleading "0 / 0". + const num = (v: number | undefined) => (typeof v === "number" ? v : 0); + const hitsMem = stats.cache_hits_memory; + const hitsStore = stats.cache_hits_store; + const needSummary = stats.summary_required; + const hasSummarizingFields = + hitsMem !== undefined || + hitsStore !== undefined || + needSummary !== undefined || + stats.llm_calls !== undefined; + const hitsCombined = + hitsMem !== undefined || hitsStore !== undefined + ? num(hitsMem) + num(hitsStore) + : null; + const hitRate = + hitsCombined !== null && needSummary !== undefined && needSummary > 0 + ? (hitsCombined / needSummary) * 100 + : null; + const failureRate = + stats.llm_calls !== undefined && stats.llm_calls > 0 + ? (num(stats.llm_failures) / stats.llm_calls) * 100 + : null; + const fmtPct = (v: number | null) => + v === null ? "—" : `${v.toFixed(0)}%`; + return ( +
    +
  • + backend + {stats.backend} +
  • +
  • + compactions + + {num(stats.compactions_total)} total + {hasSummarizingFields + ? ` · ${num(needSummary)} needed summary` + : stats.window_dropped !== undefined + ? ` · ${stats.window_dropped} window-dropped` + : null} + +
  • + {hasSummarizingFields ? ( + <> +
  • + cache hit rate + + {fmtPct(hitRate)} ({num(hitsMem)} mem + {num(hitsStore)} store) + +
  • +
  • + llm + + {num(stats.llm_calls)} calls · {fmtPct(failureRate)} failure rate + +
  • +
  • + circuit + + {num(stats.circuit_opens)} opens / {num(stats.circuit_skips)} skips + +
  • +
  • + PTL fallback + + round1 {num(stats.ptl_round_one)} / round2 {num(stats.ptl_round_two)} + +
  • + + ) : null} +
+ ); +} diff --git a/apps/jarvis-web/src/services/diagnostics.ts b/apps/jarvis-web/src/services/diagnostics.ts index 7969220..2050ac8 100644 --- a/apps/jarvis-web/src/services/diagnostics.ts +++ b/apps/jarvis-web/src/services/diagnostics.ts @@ -98,6 +98,44 @@ export async function listRecentRuns(limit = 50): Promise { + const r = await fetch(apiUrl("/v1/diagnostics/memory")); + if (r.status === 503) return null; + if (!r.ok) throw new Error(`memory stats: ${r.status}`); + return (await r.json()) as MemoryStats; +} + /// Fetch a single run by id. Used by the auto-mode dashboard's /// run-detail drawer. Returns `null` on 404 / 503 / network error /// so the drawer can render a friendly empty state. diff --git a/apps/jarvis-web/src/services/frames/index.ts b/apps/jarvis-web/src/services/frames/index.ts index fe0f490..8442279 100644 --- a/apps/jarvis-web/src/services/frames/index.ts +++ b/apps/jarvis-web/src/services/frames/index.ts @@ -15,6 +15,7 @@ import { hitlFrameHandlers } from "./hitlFrames"; import { lifecycleFrameHandlers } from "./lifecycleFrames"; import { domainFrameHandlers } from "./domainFrames"; import { fallbackFrameHandlers } from "./fallbackFrames"; +import { memoryFrameHandlers } from "./memoryFrames"; export const frameHandlers: Map void> = new Map( Object.entries({ @@ -26,6 +27,7 @@ export const frameHandlers: Map void> = new Map( ...hitlFrameHandlers, ...lifecycleFrameHandlers, ...fallbackFrameHandlers, + ...memoryFrameHandlers, ...domainFrameHandlers, }), ); diff --git a/apps/jarvis-web/src/services/frames/memoryFrames.test.ts b/apps/jarvis-web/src/services/frames/memoryFrames.test.ts new file mode 100644 index 0000000..af54f63 --- /dev/null +++ b/apps/jarvis-web/src/services/frames/memoryFrames.test.ts @@ -0,0 +1,93 @@ +// Frame handler dispatches MemoryCompacted events into the active +// conversation's slot. Tests the wire-shape → store-action path. + +import { describe, expect, it, beforeEach } from "vitest"; +import { useAppStore } from "../../store/appStore"; +import { memoryFrameHandlers } from "./memoryFrames"; + +beforeEach(() => { + useAppStore.getState().clearCompactionMarkers(null); +}); + +describe("memory_compacted handler", () => { + it("posts the CompactionInfo under the active conversation id", () => { + useAppStore.setState({ activeId: "convo-1", messages: [] }); + memoryFrameHandlers["memory_compacted"]({ + type: "memory_compacted", + info: { + source: "fresh_llm", + turns_kept: 2, + turns_dropped: 4, + summary_chars: 180, + working_context_chars: 90, + model_input_tokens_est: 1234, + }, + }); + const markers = useAppStore.getState().compactionMarkers["convo-1"]; + expect(markers).toHaveLength(1); + expect(markers[0]).toMatchObject({ + source: "fresh_llm", + turnsKept: 2, + turnsDropped: 4, + summaryChars: 180, + workingContextChars: 90, + modelInputTokensEst: 1234, + }); + }); + + it("uses the __active__ fallback key when no conversation is active", () => { + useAppStore.setState({ activeId: null, messages: [] }); + memoryFrameHandlers["memory_compacted"]({ + type: "memory_compacted", + info: { source: "cache_memory", turns_kept: 0, turns_dropped: 0, model_input_tokens_est: 0 }, + }); + expect(useAppStore.getState().compactionMarkers["__active__"]).toHaveLength(1); + }); + + it("defaults missing numeric fields to 0 and source to no_op", () => { + useAppStore.setState({ activeId: "c", messages: [] }); + memoryFrameHandlers["memory_compacted"]({ type: "memory_compacted", info: {} }); + const m = useAppStore.getState().compactionMarkers["c"][0]; + expect(m.source).toBe("no_op"); + expect(m.turnsKept).toBe(0); + expect(m.turnsDropped).toBe(0); + expect(m.modelInputTokensEst).toBe(0); + expect(m.summaryChars).toBeUndefined(); + }); + + it("reset clears every conversation's markers", () => { + const s = useAppStore.getState(); + s.pushCompactionMarker("c1", { + turnIndex: 0, + source: "fresh_llm", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 1, + }); + s.pushCompactionMarker("c2", { + turnIndex: 0, + source: "cache_memory", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 1, + }); + memoryFrameHandlers["reset"]({ type: "reset" }); + expect(useAppStore.getState().compactionMarkers).toEqual({}); + }); + + it("records turnIndex from the current message count", () => { + useAppStore.setState({ + activeId: "c", + messages: [ + { uid: "u1", kind: "user", content: "hi", userOrdinal: 1 }, + { uid: "a1", kind: "assistant", content: "hello", reasoning: "", toolCallIds: [], finalised: true }, + ] as never, + }); + memoryFrameHandlers["memory_compacted"]({ + type: "memory_compacted", + info: { source: "fresh_llm", turns_kept: 1, turns_dropped: 1, model_input_tokens_est: 5 }, + }); + const m = useAppStore.getState().compactionMarkers["c"][0]; + expect(m.turnIndex).toBe(2); + }); +}); diff --git a/apps/jarvis-web/src/services/frames/memoryFrames.ts b/apps/jarvis-web/src/services/frames/memoryFrames.ts new file mode 100644 index 0000000..2ead0d6 --- /dev/null +++ b/apps/jarvis-web/src/services/frames/memoryFrames.ts @@ -0,0 +1,45 @@ +// Memory compaction frames. The agent emits one +// `AgentEvent::MemoryCompacted` per iteration carrying a +// `CompactionInfo` describing what the memory backend did: +// cache hit / fresh summary / fallback prune / no-op. +// +// We post the marker into the active conversation's slot so +// `` can render an inline transcript card. NoOp +// emits are recorded too — the chat renderer filters them +// out, but a future diagnostics view may chart them. + +import { appStore } from "../../store/appStore"; + +const ACTIVE_KEY = "__active__"; + +export const memoryFrameHandlers: Record void> = { + memory_compacted: (ev) => { + const info = ev?.info ?? {}; + const state = appStore.getState(); + const conversationId = state.activeId ?? ACTIVE_KEY; + const messages = state.messages ?? []; + state.pushCompactionMarker(conversationId, { + turnIndex: messages.length, + source: typeof info.source === "string" ? info.source : "no_op", + turnsKept: typeof info.turns_kept === "number" ? info.turns_kept : 0, + turnsDropped: typeof info.turns_dropped === "number" ? info.turns_dropped : 0, + summaryChars: + typeof info.summary_chars === "number" ? info.summary_chars : undefined, + workingContextChars: + typeof info.working_context_chars === "number" + ? info.working_context_chars + : undefined, + modelInputTokensEst: + typeof info.model_input_tokens_est === "number" + ? info.model_input_tokens_est + : 0, + }); + }, + // `reset` is broadcast by the lifecycle layer when the user + // clears the current conversation or switches to a fresh one. + // Mirror what planFrames does — purge the markers so a new + // session doesn't inherit the previous one's compaction trail. + reset: () => { + appStore.getState().clearCompactionMarkers(null); + }, +}; diff --git a/apps/jarvis-web/src/store/appStore.ts b/apps/jarvis-web/src/store/appStore.ts index def7f7c..44d77c9 100644 --- a/apps/jarvis-web/src/store/appStore.ts +++ b/apps/jarvis-web/src/store/appStore.ts @@ -24,6 +24,7 @@ import { createCoreSlice, type CoreSlice } from "./slices/coreSlice"; import { createFallbackSlice, type FallbackSlice } from "./slices/fallbackSlice"; import { createHitlSlice, type HitlSlice } from "./slices/hitlSlice"; import { createLifecycleSlice, type LifecycleSlice } from "./slices/lifecycleSlice"; +import { createMemorySlice, type MemorySlice } from "./slices/memorySlice"; import { createPlanSlice, type PlanSlice } from "./slices/planSlice"; import { createSubAgentSlice, type SubAgentSlice } from "./slices/subAgentSlice"; import { createToolSlice, type ToolSlice } from "./slices/toolSlice"; @@ -40,6 +41,7 @@ export type FullState = ChatSlice & SubAgentSlice & LifecycleSlice & FallbackSlice + & MemorySlice & CoreSlice; export const useAppStore = create()((...a) => ({ @@ -51,6 +53,7 @@ export const useAppStore = create()((...a) => ({ ...createSubAgentSlice(...a), ...createLifecycleSlice(...a), ...createFallbackSlice(...a), + ...createMemorySlice(...a), ...createCoreSlice(...a), })); diff --git a/apps/jarvis-web/src/store/slices/memorySlice.test.ts b/apps/jarvis-web/src/store/slices/memorySlice.test.ts new file mode 100644 index 0000000..dd7ca1e --- /dev/null +++ b/apps/jarvis-web/src/store/slices/memorySlice.test.ts @@ -0,0 +1,105 @@ +// Tests for the per-conversation compaction-marker slice. Validates +// the append + trim + clear behaviour the chat surface relies on. + +import { describe, expect, it, beforeEach } from "vitest"; +import { useAppStore } from "../appStore"; + +beforeEach(() => { + useAppStore.getState().clearCompactionMarkers(null); +}); + +describe("memorySlice", () => { + it("appends a marker under the supplied conversation id", () => { + useAppStore.getState().pushCompactionMarker("c1", { + turnIndex: 4, + source: "fresh_llm", + turnsKept: 2, + turnsDropped: 3, + summaryChars: 180, + modelInputTokensEst: 1234, + }); + const markers = useAppStore.getState().compactionMarkers["c1"]; + expect(markers).toHaveLength(1); + expect(markers[0].source).toBe("fresh_llm"); + expect(markers[0].seq).toBe(1); + }); + + it("assigns monotonic seq numbers per conversation", () => { + const s = useAppStore.getState(); + s.pushCompactionMarker("c1", { + turnIndex: 0, + source: "fresh_llm", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 10, + }); + s.pushCompactionMarker("c1", { + turnIndex: 1, + source: "cache_memory", + turnsKept: 1, + turnsDropped: 0, + modelInputTokensEst: 12, + }); + const markers = useAppStore.getState().compactionMarkers["c1"]; + expect(markers.map((m) => m.seq)).toEqual([1, 2]); + }); + + it("trims oldest entries past MAX_MARKERS_PER_CONVERSATION (20)", () => { + const s = useAppStore.getState(); + for (let i = 0; i < 25; i++) { + s.pushCompactionMarker("c1", { + turnIndex: i, + source: "no_op", + turnsKept: 0, + turnsDropped: 0, + modelInputTokensEst: i, + }); + } + const markers = useAppStore.getState().compactionMarkers["c1"]; + expect(markers).toHaveLength(20); + // First survivor is the 6th push (i=5) since the first 5 got trimmed. + expect(markers[0].modelInputTokensEst).toBe(5); + expect(markers[19].modelInputTokensEst).toBe(24); + }); + + it("clears markers for a single conversation", () => { + const s = useAppStore.getState(); + s.pushCompactionMarker("c1", { + turnIndex: 0, + source: "fresh_llm", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 1, + }); + s.pushCompactionMarker("c2", { + turnIndex: 0, + source: "fresh_llm", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 1, + }); + s.clearCompactionMarkers("c1"); + expect(useAppStore.getState().compactionMarkers["c1"]).toBeUndefined(); + expect(useAppStore.getState().compactionMarkers["c2"]).toHaveLength(1); + }); + + it("clears every conversation when passed null", () => { + const s = useAppStore.getState(); + s.pushCompactionMarker("c1", { + turnIndex: 0, + source: "fresh_llm", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 1, + }); + s.pushCompactionMarker("c2", { + turnIndex: 0, + source: "fresh_llm", + turnsKept: 0, + turnsDropped: 1, + modelInputTokensEst: 1, + }); + s.clearCompactionMarkers(null); + expect(useAppStore.getState().compactionMarkers).toEqual({}); + }); +}); diff --git a/apps/jarvis-web/src/store/slices/memorySlice.ts b/apps/jarvis-web/src/store/slices/memorySlice.ts new file mode 100644 index 0000000..6763eb7 --- /dev/null +++ b/apps/jarvis-web/src/store/slices/memorySlice.ts @@ -0,0 +1,81 @@ +// Memory compaction markers. The agent emits one +// `AgentEvent::MemoryCompacted` per iteration; the chat surface +// renders an inline marker in the transcript when something +// non-trivial happened (cache hit, fresh summary, fallback prune). +// NoOp emits are still recorded so a future "diagnostics across +// time" view can chart them, but the chat renderer filters them out. + +import type { StateCreator } from "zustand"; +import type { FullState } from "../appStore"; + +export type CompactionSource = + | "no_op" + | "window_dropped" + | "cache_memory" + | "cache_store" + | "fresh_llm" + | "ptl_round_one" + | "ptl_round_two" + | "summary_unavailable" + // Forward-compat: unknown values from a newer backend. + | (string & {}); + +export interface CompactionMarker { + /// Where the compaction sat in the conversation when the event + /// arrived. We use `messages.length` at receive time so the + /// marker renders just before the *next* message. + turnIndex: number; + source: CompactionSource; + turnsKept: number; + turnsDropped: number; + summaryChars?: number; + workingContextChars?: number; + modelInputTokensEst: number; + /// Monotonically increasing within a conversation so React keys + /// stay stable even when the same turn boundary fires twice. + seq: number; +} + +const MAX_MARKERS_PER_CONVERSATION = 20; + +export interface MemorySlice { + /// Compaction markers per conversation. Active-conversation + /// fallback under the magic key `"__active__"` so the unscoped + /// frame dispatch path can still post markers when no id is + /// resolvable. + compactionMarkers: Record; + + /// Append one marker to the indicated conversation. Trims to + /// MAX_MARKERS_PER_CONVERSATION oldest-first. + pushCompactionMarker: (conversationId: string, m: Omit) => void; + + /// Clear markers for a single conversation (used on `reset`). + /// Pass `null` to wipe every conversation. + clearCompactionMarkers: (conversationId: string | null) => void; +} + +export const createMemorySlice: StateCreator = (set) => ({ + compactionMarkers: {}, + pushCompactionMarker: (conversationId, marker) => + set((s) => { + const existing = s.compactionMarkers[conversationId] ?? []; + const seq = existing.length > 0 ? existing[existing.length - 1].seq + 1 : 1; + const next = existing.concat({ ...marker, seq }); + const trimmed = + next.length > MAX_MARKERS_PER_CONVERSATION + ? next.slice(next.length - MAX_MARKERS_PER_CONVERSATION) + : next; + return { + compactionMarkers: { ...s.compactionMarkers, [conversationId]: trimmed }, + }; + }), + clearCompactionMarkers: (conversationId) => + set((s) => { + if (conversationId === null) { + return { compactionMarkers: {} }; + } + const next = { ...s.compactionMarkers }; + delete next[conversationId]; + return { compactionMarkers: next }; + }), +}); diff --git a/apps/jarvis-web/src/styles.css b/apps/jarvis-web/src/styles.css index 4974001..95fd8ac 100644 --- a/apps/jarvis-web/src/styles.css +++ b/apps/jarvis-web/src/styles.css @@ -23547,3 +23547,95 @@ select.settings-input { gap: 8px; margin-top: 4px; } + +/* ========== Compaction marker — inline transcript card emitted + * by `AgentEvent::MemoryCompacted`. Designed to read as a quiet, + * non-message-like rail so users skim past it; the disclosure + * opens detailed counters for operators investigating. ========== */ +.compaction-marker { + margin: 8px 0; + padding: 0; + background: transparent; + border: 1px dashed var(--border); + border-radius: 6px; + color: var(--text-muted); +} + +.compaction-marker-row { + display: flex; + align-items: center; + gap: 8px; + width: 100%; + padding: 4px 10px; + background: transparent; + border: 0; + color: inherit; + font: inherit; + font-size: var(--fs-12); + text-align: left; + cursor: pointer; + line-height: 1.5; +} + +.compaction-marker-row:hover { + background: var(--tool-header-hover); + color: var(--text); +} + +.compaction-marker-icon { + flex: 0 0 auto; + width: 14px; + text-align: center; + color: var(--text-muted); +} + +.compaction-marker-summary { + flex: 1 1 auto; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.compaction-marker-source { + flex: 0 0 auto; + padding: 1px 8px; + font-size: var(--fs-11); + border-radius: 999px; + border: 1px solid var(--border); + background: var(--panel); + color: var(--text-soft); +} + +.compaction-marker[data-tone="warn"] { + border-color: rgba(210, 150, 60, 0.5); +} + +.compaction-marker[data-tone="warn"] .compaction-marker-source { + color: rgba(210, 150, 60, 1); + border-color: rgba(210, 150, 60, 0.5); + background: rgba(210, 150, 60, 0.08); +} + +.compaction-marker-detail { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(140px, 1fr)); + gap: 4px 18px; + padding: 0 10px 8px; + margin: 0; + font-size: var(--fs-11); +} + +.compaction-marker-detail > div { + display: flex; + flex-direction: column; +} + +.compaction-marker-detail dt { + color: var(--text-muted); +} + +.compaction-marker-detail dd { + margin: 0; + color: var(--text-soft); + font-variant-numeric: tabular-nums; +} diff --git a/apps/jarvis-web/src/utils/i18n.ts b/apps/jarvis-web/src/utils/i18n.ts index 837dc23..7377cbc 100644 --- a/apps/jarvis-web/src/utils/i18n.ts +++ b/apps/jarvis-web/src/utils/i18n.ts @@ -1591,6 +1591,16 @@ Expected workflow: diagnosticsNoStuck: "No stuck runs.", diagnosticsNoFailed: "No recent failures.", diagnosticsRunStoreUnavailable: "Run store not configured.", + diagnosticsMemoryHeading: "Memory", + diagnosticsMemoryUnavailable: + "Memory stats provider not configured (set JARVIS_MEMORY_TOKENS).", + compactionSourceCacheMemory: "cache hit", + compactionSourceCacheStore: "cache hit (store)", + compactionSourceFreshLlm: "fresh summary", + compactionSourcePtlRoundOne: "fallback prune", + compactionSourcePtlRoundTwo: "fallback prune (round 2)", + compactionSourceWindowDropped: "window pruned", + compactionSourceSummaryUnavailable: "summary unavailable", detailAssigneeLabel: "Handler", detailAssigneeUnassigned: "Jarvis default", verifyRunLabel: "Verification commands (one per line)", @@ -3905,6 +3915,16 @@ ${blockedRows} diagnosticsNoStuck: "没有卡住的 run。", diagnosticsNoFailed: "没有最近失败。", diagnosticsRunStoreUnavailable: "未配置 run 存储。", + diagnosticsMemoryHeading: "记忆", + diagnosticsMemoryUnavailable: + "未启用记忆指标(设置 JARVIS_MEMORY_TOKENS)。", + compactionSourceCacheMemory: "缓存命中", + compactionSourceCacheStore: "缓存命中(持久层)", + compactionSourceFreshLlm: "新建摘要", + compactionSourcePtlRoundOne: "预算回退", + compactionSourcePtlRoundTwo: "预算回退(第 2 轮)", + compactionSourceWindowDropped: "窗口裁剪", + compactionSourceSummaryUnavailable: "摘要不可用", detailAssigneeLabel: "处理方", detailAssigneeUnassigned: "Jarvis 默认", verifyRunLabel: "验证命令(每行一条)", diff --git a/apps/jarvis/src/serve.rs b/apps/jarvis/src/serve.rs index 7a594e8..322de47 100644 --- a/apps/jarvis/src/serve.rs +++ b/apps/jarvis/src/serve.rs @@ -2125,7 +2125,7 @@ fn build_memory( // reflects what the model actually counts. Cheap to ask once per // memory backend; the estimator is `Arc`-shared internally. let estimator = llm.estimator(); - let mut stats: Option> = None; + let stats: Option>; let mem: Arc = match mode.as_str() { "summary" => { let summary_model = pick_string_opt("JARVIS_MEMORY_MODEL", cfg.memory.model.as_deref()) @@ -2160,7 +2160,12 @@ fn build_memory( } "window" => { info!(memory_tokens = budget, "sliding-window memory enabled"); - Arc::new(SlidingWindowMemory::new(budget).with_estimator(estimator)) + let sw = SlidingWindowMemory::new(budget).with_estimator(estimator); + // Expose the lightweight (compactions_total / window_dropped) + // counters so `/v1/diagnostics/memory` returns something other + // than 503 on the default `mode=window` deployment. + stats = Some(sw.counters() as Arc); + Arc::new(sw) } other => { anyhow::bail!("memory.mode=`{other}` is not recognised; use `window` or `summary`"); diff --git a/crates/harness-core/src/agent.rs b/crates/harness-core/src/agent.rs index b8ccd58..72aa8dc 100644 --- a/crates/harness-core/src/agent.rs +++ b/crates/harness-core/src/agent.rs @@ -269,6 +269,18 @@ pub enum AgentEvent { /// [`crate::plan::emit`]. UIs typically render this as a /// checklist that updates in place. PlanUpdate { items: Vec }, + /// The memory backend ran a compaction round before the next + /// LLM iteration. Carries the [`CompactionInfo`] describing + /// what happened (cache hit, fresh summary, fallback prune, + /// no-op) and a few quick metrics (`turns_dropped`, + /// `summary_chars`, `model_input_tokens_est`). UIs typically + /// render an inline marker in the chat transcript when + /// `source != NoOp`. Emitted by memory backends via + /// [`crate::memory_event::emit`] and forwarded by the agent + /// loop after each `build_request` call. + MemoryCompacted { + info: crate::memory_event::CompactionInfo, + }, /// One frame from a running subagent. Emitted while a /// `subagent.` tool is executing — the subagent itself /// publishes via [`crate::subagent::emit`] and the agent loop @@ -581,7 +593,22 @@ impl Agent { ); for iter in 1..=agent.config.max_iterations { - let req = match agent.build_request(&conversation).await { + // Scope a per-iteration compaction-info channel around + // `build_request`. Memory backends emit one record per + // `compact()` call via `crate::memory_event::emit`; + // drained immediately after build_request returns so + // events land in the stream before the next LLM call. + let (mem_tx, mut mem_rx) = + tokio::sync::mpsc::unbounded_channel::(); + let build_result = crate::memory_event::with_compaction_channel( + mem_tx, + agent.build_request(&conversation), + ) + .await; + while let Ok(info) = mem_rx.try_recv() { + yield AgentEvent::MemoryCompacted { info }; + } + let req = match build_result { Ok(r) => r, Err(e) => { yield AgentEvent::Error { message: e.to_string() }; @@ -1994,6 +2021,56 @@ mod tests { assert_eq!(req.parallel_tool_calls, None); } + /// Stub Memory that emits a `CompactionInfo` per `compact()` + /// call. Used to confirm the agent's per-iteration channel scope + /// + drain delivers the event downstream. + struct EmittingMemory { + source: crate::memory_event::CompactionSource, + } + + #[async_trait::async_trait] + impl crate::Memory for EmittingMemory { + async fn compact( + &self, + messages: &[Message], + ) -> std::result::Result, BoxError> { + crate::memory_event::emit(crate::memory_event::CompactionInfo { + source: self.source, + turns_kept: messages.len(), + turns_dropped: 0, + summary_chars: Some(42), + working_context_chars: None, + model_input_tokens_est: 100, + }); + Ok(messages.to_vec()) + } + } + + #[tokio::test] + async fn run_stream_emits_memory_compacted_when_compactor_fires() { + use crate::memory_event::CompactionSource; + use futures::StreamExt; + + let cfg = AgentConfig::new("test-model") + .with_memory(Arc::new(EmittingMemory { + source: CompactionSource::CacheMemory, + }) as Arc); + let agent = Arc::new(Agent::new(ScriptedLlm::new("noop") as _, cfg)); + let mut stream = agent.run_stream(Conversation::new()); + let mut got: Vec = vec![]; + while let Some(ev) = stream.next().await { + match ev { + AgentEvent::MemoryCompacted { info } => got.push(info.source), + AgentEvent::Done { .. } => break, + _ => {} + } + } + assert!( + got.iter().any(|s| matches!(s, CompactionSource::CacheMemory)), + "expected at least one CacheMemory event, got: {got:?}", + ); + } + #[test] fn ensure_system_prompt_inserts_when_missing() { let mut conv = Conversation::new(); diff --git a/crates/harness-core/src/lib.rs b/crates/harness-core/src/lib.rs index f97b886..7cac140 100644 --- a/crates/harness-core/src/lib.rs +++ b/crates/harness-core/src/lib.rs @@ -14,6 +14,7 @@ pub mod fallback_event; pub mod hitl; pub mod llm; pub mod memory; +pub mod memory_event; pub mod message; pub mod mode_signal; pub mod permission; @@ -57,6 +58,10 @@ pub use memory::{ cache_breakpoint_indices, default_estimator, estimate_tokens, estimate_total_tokens, CharRatioEstimator, JsonAwareEstimator, Memory, MemoryStatsProvider, TokenEstimator, }; +pub use memory_event::{ + emit as emit_memory_compaction, is_active as memory_compaction_active, + with_compaction_channel, CompactionInfo, CompactionSource, +}; pub use message::{CacheHint, Message, ToolCall}; pub use mode_signal::{ emit as emit_mode_signal, is_active as mode_signal_active, with_mode_signal, diff --git a/crates/harness-core/src/memory_event.rs b/crates/harness-core/src/memory_event.rs new file mode 100644 index 0000000..8f3fbdf --- /dev/null +++ b/crates/harness-core/src/memory_event.rs @@ -0,0 +1,221 @@ +//! Memory compaction signal — typed "what just happened during +//! compaction" stream. +//! +//! Mirrors the [`crate::plan`] / [`crate::progress`] task-local +//! channel pattern. The agent loop calls +//! [`crate::memory::Memory::compact`] silently before every LLM +//! iteration; without this channel the only way a transport learns +//! that turns were summarised or cache-hit is to compare message +//! counts on either side and guess. Wired up properly, the +//! summariser publishes one [`CompactionInfo`] per `compact()` call +//! describing what happened and why. +//! +//! Wire model: +//! +//! - The agent loop installs an +//! [`mpsc::UnboundedSender`] in a `tokio::task_local` +//! for the duration of each `build_request` call, scoped via +//! [`with_compaction_channel`]. +//! - Memory backends call [`emit`] from inside `compact()` at the +//! point where they decide what to send (cache hit / fresh +//! summary / fallback prune / no-op). +//! - The agent drains the receiver after `build_request` returns +//! and forwards each info as `AgentEvent::MemoryCompacted` on both +//! the streaming and blocking paths. +//! +//! Outside an agent invocation (or before the channel is installed) +//! emits are silent no-ops, which keeps `harness-memory`'s unit +//! tests trivial. + +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; + +/// Why the compactor picked the slice it returned. +/// +/// Each variant is rendered as its own front-end label, so adding a +/// new one is a wire-compat extension — older clients that don't +/// recognise a value should treat it as "compaction happened, no +/// specific source". +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CompactionSource { + /// The full input slice fits within the budget; nothing was + /// dropped or summarised. Emitted so the front end can show + /// the diagnostic panel a steady stream of "compactor was + /// consulted" pings even when no work was needed. + NoOp, + /// `SlidingWindowMemory` dropped older turns to fit the + /// budget. No LLM call involved. + WindowDropped, + /// `SummarizingMemory` hit the in-memory single-slot cache + /// (Tier 1). + CacheMemory, + /// `SummarizingMemory` hit the persistent `ConversationStore` + /// cache (Tier 2). The summary was loaded from disk / SQL. + CacheStore, + /// `SummarizingMemory` called the LLM to produce a fresh + /// summary (Tier 3). This is the expensive path. + FreshLlm, + /// `SummarizingMemory` ran the PTL fallback round one (drop + /// the oldest ~20 % of non-critical turns). Reached when the + /// summariser's output was over-budget or the circuit was + /// open. + PtlRoundOne, + /// `SummarizingMemory` ran the PTL fallback round two (hard + /// prune to the latest turn). Reached when round one's + /// output was still over-budget. + PtlRoundTwo, + /// `SummarizingMemory` wanted to produce a summary but the + /// circuit breaker was open or the upstream LLM returned + /// `Err`. The slice still has the dropped prefix elided — + /// just with a `[N earlier turn(s) omitted — summary + /// unavailable]` placeholder in place of a real summary. + /// UIs render this as a degraded state ("summariser + /// offline, history pruned without a summary") so operators + /// know to investigate, vs treating it as a normal cache + /// hit. + SummaryUnavailable, +} + +/// Snapshot of one compaction round — what changed and why. +/// +/// All fields are best-effort: `turns_*` come from +/// `crates/harness-memory::turns::split_into_turns` so they reflect +/// the same notion of "turn" the summariser uses internally +/// (User + every following Assistant/Tool until the next User). +/// `*_chars` are character counts from the rendered string the +/// backend chose, not token counts. `model_input_tokens_est` is the +/// total estimate across the *returned* slice using whatever +/// estimator the backend was constructed with — typically +/// `CharRatioEstimator` or a provider tokeniser plugged in via +/// `LlmProvider::estimator`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CompactionInfo { + /// Why this slice was chosen. See [`CompactionSource`]. + pub source: CompactionSource, + /// Turn count of the slice the backend returned. + pub turns_kept: usize, + /// `turns_in - turns_kept`. Zero when [`CompactionSource::NoOp`]. + pub turns_dropped: usize, + /// Length of the synthetic summary `System` message inserted + /// after the dropped prefix, in chars. `None` when no summary + /// was inserted (no-op, plain window-drop, PTL fallback). + #[serde(skip_serializing_if = "Option::is_none", default)] + pub summary_chars: Option, + /// Length of the rendered `=== working context ===` block + /// appended to the kept tail, in chars. `None` when no + /// working-context snapshot was active. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub working_context_chars: Option, + /// Estimated total tokens for the slice the backend returned. + /// Lets the UI show "we shipped ~3 200 tokens after + /// compaction" without re-running the estimator front-end side. + pub model_input_tokens_est: usize, +} + +tokio::task_local! { + /// Per-iteration compaction-info sender, scoped via + /// [`with_compaction_channel`]. Absent outside an agent loop. + static COMPACTION_TX: mpsc::UnboundedSender; +} + +/// Publish a compaction-info record. No-op when no listener is +/// installed (memory backends invoked outside the agent loop — +/// e.g. unit tests). +pub fn emit(info: CompactionInfo) { + let _ = COMPACTION_TX.try_with(|tx| { + let _ = tx.send(info); + }); +} + +/// Whether a compaction-info sender is installed for the current +/// task. Used by tests to assert the channel is wired correctly. +pub fn is_active() -> bool { + COMPACTION_TX.try_with(|_| ()).is_ok() +} + +/// Run `fut` with `tx` installed as the active sender. The agent +/// loop uses this to scope a sender around a single `build_request` +/// call so emits during compaction land in the right per-iteration +/// receiver. +pub async fn with_compaction_channel(tx: mpsc::UnboundedSender, fut: F) -> R +where + F: std::future::Future, +{ + COMPACTION_TX.scope(tx, fut).await +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_info(source: CompactionSource) -> CompactionInfo { + CompactionInfo { + source, + turns_kept: 2, + turns_dropped: 5, + summary_chars: Some(180), + working_context_chars: Some(90), + model_input_tokens_est: 1234, + } + } + + #[tokio::test] + async fn emit_inside_scope_reaches_receiver() { + let (tx, mut rx) = mpsc::unbounded_channel(); + with_compaction_channel(tx, async { + assert!(is_active()); + emit(sample_info(CompactionSource::CacheMemory)); + }) + .await; + let got = rx.try_recv().unwrap(); + assert_eq!(got.source, CompactionSource::CacheMemory); + assert_eq!(got.turns_kept, 2); + assert_eq!(got.turns_dropped, 5); + } + + #[tokio::test] + async fn emit_outside_scope_is_noop() { + assert!(!is_active()); + emit(sample_info(CompactionSource::FreshLlm)); + // The point: this didn't panic. + } + + #[test] + fn source_serialises_snake_case() { + let cases = [ + (CompactionSource::NoOp, "no_op"), + (CompactionSource::WindowDropped, "window_dropped"), + (CompactionSource::CacheMemory, "cache_memory"), + (CompactionSource::CacheStore, "cache_store"), + (CompactionSource::FreshLlm, "fresh_llm"), + (CompactionSource::PtlRoundOne, "ptl_round_one"), + (CompactionSource::PtlRoundTwo, "ptl_round_two"), + (CompactionSource::SummaryUnavailable, "summary_unavailable"), + ]; + for (source, expected) in cases { + let info = sample_info(source); + let json = serde_json::to_string(&info).unwrap(); + assert!( + json.contains(&format!("\"source\":\"{expected}\"")), + "expected `{expected}` in: {json}", + ); + } + } + + #[test] + fn info_skips_optionals_when_none() { + let info = CompactionInfo { + source: CompactionSource::NoOp, + turns_kept: 3, + turns_dropped: 0, + summary_chars: None, + working_context_chars: None, + model_input_tokens_est: 42, + }; + let json = serde_json::to_string(&info).unwrap(); + assert!(!json.contains("summary_chars"), "got: {json}"); + assert!(!json.contains("working_context_chars"), "got: {json}"); + assert!(json.contains("\"model_input_tokens_est\":42"), "got: {json}"); + } +} diff --git a/crates/harness-memory/src/sliding.rs b/crates/harness-memory/src/sliding.rs index f5d95cf..6a68f88 100644 --- a/crates/harness-memory/src/sliding.rs +++ b/crates/harness-memory/src/sliding.rs @@ -14,21 +14,61 @@ //! if it alone exceeds the budget — sending no recent context would be //! strictly worse than slightly overrunning. +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use async_trait::async_trait; use harness_core::{ - cache_breakpoint_indices, default_estimator, BoxError, Memory, Message, TokenEstimator, + cache_breakpoint_indices, default_estimator, emit_memory_compaction, BoxError, CompactionInfo, + CompactionSource, Memory, MemoryStatsProvider, Message, TokenEstimator, }; use tracing::debug; use crate::turns::{select_recent_turns, select_recent_turns_with_breakpoint, split_into_turns}; +/// Lightweight telemetry surface for [`SlidingWindowMemory`]. +/// +/// `SummarizingMemory` ships a richer 10-counter struct (cache +/// hits, circuit breaker, PTL), but the sliding backend has neither +/// LLM nor cache — it just drops oldest turns. The counters track +/// what *can* happen here so operators on the default `mode=window` +/// deployment see something other than "not configured" in the +/// Memory diagnostics panel. +#[derive(Debug, Default)] +pub struct SlidingCounters { + /// Total `compact()` calls. Same semantic as + /// `CompactionCounters::compactions_total` so the diagnostics + /// panel can use the same column unconditionally. + pub compactions_total: AtomicU64, + /// Calls that dropped at least one turn (`turns_dropped > 0`). + pub window_dropped: AtomicU64, +} + +impl SlidingCounters { + fn inc(c: &AtomicU64) { + c.fetch_add(1, Ordering::Relaxed); + } + fn load(c: &AtomicU64) -> u64 { + c.load(Ordering::Relaxed) + } +} + +impl MemoryStatsProvider for SlidingCounters { + fn snapshot(&self) -> serde_json::Value { + serde_json::json!({ + "backend": "sliding", + "compactions_total": Self::load(&self.compactions_total), + "window_dropped": Self::load(&self.window_dropped), + }) + } +} + /// Drop oldest turns until the estimated token count fits `max_tokens`. pub struct SlidingWindowMemory { max_tokens: usize, insert_marker: bool, estimator: Arc, + counters: Arc, } impl SlidingWindowMemory { @@ -37,6 +77,7 @@ impl SlidingWindowMemory { max_tokens, insert_marker: true, estimator: default_estimator(), + counters: Arc::new(SlidingCounters::default()), } } @@ -56,17 +97,30 @@ impl SlidingWindowMemory { self.estimator = estimator; self } + + /// Clone-able handle to the telemetry counters. Composition + /// roots stash this on `AppState::memory_stats` so the + /// `GET /v1/diagnostics/memory` endpoint can render a snapshot + /// even on the default `mode=window` backend. + pub fn counters(&self) -> Arc { + self.counters.clone() + } } #[async_trait] impl Memory for SlidingWindowMemory { async fn compact(&self, messages: &[Message]) -> Result, BoxError> { - Ok(compact( + SlidingCounters::inc(&self.counters.compactions_total); + let (out, dropped) = compact( messages, self.max_tokens, self.insert_marker, self.estimator.as_ref(), - )) + ); + if dropped > 0 { + SlidingCounters::inc(&self.counters.window_dropped); + } + Ok(out) } } @@ -75,7 +129,7 @@ fn compact( max_tokens: usize, insert_marker: bool, estimator: &dyn TokenEstimator, -) -> Vec { +) -> (Vec, usize) { let (system_idxs, turns) = split_into_turns(messages); let system_tokens: usize = system_idxs @@ -106,10 +160,11 @@ fn compact( }) }; - let dropped_turns = turns.len() - kept.len(); + let kept_len = kept.len(); + let dropped_turns = turns.len() - kept_len; debug!( total_turns = turns.len(), - kept_turns = kept.len(), + kept_turns = kept_len, dropped_turns, "compact (sliding)", ); @@ -140,8 +195,29 @@ fn compact( // nothing. Outside an agent run (e.g. memory unit tests without // `with_working_context`) the snapshot is also `None`, so this // is a no-op in tests that don't opt in. + let len_before_wc = out.len(); append_working_context(&mut out); - out + let working_context_chars = match out.get(len_before_wc) { + Some(Message::System { content, .. }) if content.starts_with("=== working context ===") => { + Some(content.chars().count()) + } + _ => None, + }; + + emit_memory_compaction(CompactionInfo { + source: if dropped_turns > 0 { + CompactionSource::WindowDropped + } else { + CompactionSource::NoOp + }, + turns_kept: kept_len, + turns_dropped: dropped_turns, + summary_chars: None, + working_context_chars, + model_input_tokens_est: estimator.estimate_messages(&out), + }); + + (out, dropped_turns) } /// Append the current `WorkingContext` snapshot as a trailing @@ -194,7 +270,7 @@ mod tests { #[test] fn under_budget_returns_everything() { let msgs = vec![system("you are jarvis"), user("hi"), assistant("hello")]; - let out = compact(&msgs, 10_000, true, &CharRatioEstimator); + let (out, _) = compact(&msgs, 10_000, true, &CharRatioEstimator); assert_eq!(out.len(), msgs.len()); } @@ -213,7 +289,7 @@ mod tests { let budget = tokens(&msgs[0..1]) + tokens(&msgs[3..5]) // turn 2 + tokens(&msgs[5..7]); // turn 3 - let out = compact(&msgs, budget, true, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, true, &CharRatioEstimator); // System + marker + turn 2 + turn 3 assert!(out @@ -237,7 +313,7 @@ mod tests { fn always_keeps_latest_turn_even_if_oversized() { let big = "x".repeat(10_000); let msgs = vec![system("sys"), user(&big), assistant(&big)]; - let out = compact(&msgs, 10, true, &CharRatioEstimator); + let (out, _) = compact(&msgs, 10, true, &CharRatioEstimator); assert!(out .iter() .any(|m| matches!(m, Message::User { content, .. } if content.starts_with("xxxx")))); @@ -256,7 +332,7 @@ mod tests { ]; // Budget that only fits the recent turn (5 messages from index 3..7). let budget = tokens(&msgs[0..1]) + tokens(&msgs[3..7]); - let out = compact(&msgs, budget, false, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, false, &CharRatioEstimator); // The Tool reply must be in there together with the Assistant // tool-call that produced it — both kept or both dropped. @@ -284,7 +360,7 @@ mod tests { user("c"), assistant("d"), ]; - let out = compact(&msgs, 10_000, true, &CharRatioEstimator); + let (out, _) = compact(&msgs, 10_000, true, &CharRatioEstimator); assert!(!out .iter() .any(|m| matches!(m, Message::System { content, .. } if content.contains("omitted")))); @@ -300,7 +376,7 @@ mod tests { assistant("reply 2"), ]; let budget = tokens(&msgs[0..1]) + tokens(&msgs[3..5]); - let out = compact(&msgs, budget, false, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, false, &CharRatioEstimator); assert!(!out .iter() .any(|m| matches!(m, Message::System { content, .. } if content.contains("omitted")))); @@ -330,8 +406,8 @@ mod tests { assistant("r"), ]; let budget = tokens(&drops_one[0..1]) + tokens(&drops_one[3..5]); - let out1 = compact(&drops_one, budget, true, &CharRatioEstimator); - let out2 = compact(&drops_three, budget, true, &CharRatioEstimator); + let (out1, _) = compact(&drops_one, budget, true, &CharRatioEstimator); + let (out2, _) = compact(&drops_three, budget, true, &CharRatioEstimator); let marker1 = out1 .iter() @@ -369,7 +445,7 @@ mod tests { assistant("a3"), ]; let budget = tokens(&msgs[0..1]) + tokens(&msgs[3..7]); - let out = compact(&msgs, budget, false, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, false, &CharRatioEstimator); // Find indices of the user messages we kept. let positions: Vec<&str> = out @@ -404,7 +480,7 @@ mod tests { ]; // Budget that fits sys + only TWO of the four turns at once. let budget = tokens(&msgs[0..1]) + tokens(&msgs[1..3]) + tokens(&msgs[7..9]); - let out = compact(&msgs, budget, false, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, false, &CharRatioEstimator); // Turn 1 (cached prefix) survives even though it's the oldest. assert!( @@ -443,7 +519,7 @@ mod tests { assistant("done"), ]; let budget = tokens(&msgs[0..1]) + tokens(&msgs[3..7]); - let out = compact(&msgs, budget, false, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, false, &CharRatioEstimator); let has_call = out .iter() @@ -474,7 +550,7 @@ mod tests { ]; // Budget too tight to fit the cached prefix. let budget = 50; - let out = compact(&msgs, budget, false, &CharRatioEstimator); + let (out, _) = compact(&msgs, budget, false, &CharRatioEstimator); // Recent turn should still be there. assert!(out .iter() @@ -555,6 +631,83 @@ mod tests { .await; } + #[tokio::test] + async fn counters_track_compactions_and_window_drops() { + use harness_core::MemoryStatsProvider; + // Tight budget — sized so the second conversation must drop. + let big = vec![ + system("sys"), + user("turn 1 long enough for the estimator"), + assistant("reply 1 also reasonably long here"), + user("turn 2 another long user message"), + assistant("reply 2 with extra padding text"), + user("turn 3 most recent here"), + assistant("reply 3 short"), + ]; + let budget = tokens(&big[0..1]) + tokens(&big[5..7]); // sys + last turn + let mem = SlidingWindowMemory::new(budget); + let counters = mem.counters(); + // First run: tiny conversation, fits → no drops. + let small = vec![system("sys"), user("hi"), assistant("hello")]; + let _ = mem.compact(&small).await.unwrap(); + // Second run: must drop earlier turns to fit `budget`. + let _ = mem.compact(&big).await.unwrap(); + assert_eq!(counters.compactions_total.load(Ordering::Relaxed), 2); + assert_eq!( + counters.window_dropped.load(Ordering::Relaxed), + 1, + "the second call should have window-dropped", + ); + let snap = counters.snapshot(); + assert_eq!(snap["backend"], "sliding"); + assert_eq!(snap["compactions_total"], 2); + assert_eq!(snap["window_dropped"], 1); + } + + #[tokio::test] + async fn emits_window_dropped_event_when_turns_pruned() { + use harness_core::{with_compaction_channel, CompactionSource}; + use tokio::sync::mpsc; + let msgs = vec![ + system("sys"), + user("turn 1 user"), + assistant("turn 1 reply"), + user("turn 2 user"), + assistant("turn 2 reply"), + user("turn 3 user"), + assistant("turn 3 reply"), + ]; + let budget = tokens(&msgs[0..1]) + tokens(&msgs[3..5]) + tokens(&msgs[5..7]); + let (tx, mut rx) = mpsc::unbounded_channel(); + with_compaction_channel(tx, async { + let mem = SlidingWindowMemory::new(budget); + let _ = mem.compact(&msgs).await.unwrap(); + }) + .await; + let info = rx.try_recv().expect("expected an emitted CompactionInfo"); + assert_eq!(info.source, CompactionSource::WindowDropped); + assert!(info.turns_dropped >= 1, "expected at least one turn dropped"); + assert!(info.turns_kept >= 1, "should keep recency invariant"); + assert!(info.summary_chars.is_none()); + assert!(info.model_input_tokens_est > 0); + } + + #[tokio::test] + async fn emits_no_op_event_when_under_budget() { + use harness_core::{with_compaction_channel, CompactionSource}; + use tokio::sync::mpsc; + let msgs = vec![system("sys"), user("hi"), assistant("hello")]; + let (tx, mut rx) = mpsc::unbounded_channel(); + with_compaction_channel(tx, async { + let mem = SlidingWindowMemory::new(10_000); + let _ = mem.compact(&msgs).await.unwrap(); + }) + .await; + let info = rx.try_recv().expect("expected an emitted CompactionInfo"); + assert_eq!(info.source, CompactionSource::NoOp); + assert_eq!(info.turns_dropped, 0); + } + #[tokio::test] async fn custom_estimator_halves_effective_budget() { let msgs = vec![ diff --git a/crates/harness-memory/src/summarizing.rs b/crates/harness-memory/src/summarizing.rs index a80f798..4a8426a 100644 --- a/crates/harness-memory/src/summarizing.rs +++ b/crates/harness-memory/src/summarizing.rs @@ -46,9 +46,9 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use harness_core::{ - cache_breakpoint_indices, default_estimator, BoxError, ChatRequest, Conversation, - ConversationStore, Error as CoreError, LlmProvider, Memory, MemoryStatsProvider, Message, - TokenEstimator, + cache_breakpoint_indices, default_estimator, emit_memory_compaction, BoxError, ChatRequest, + CompactionInfo, CompactionSource, Conversation, ConversationStore, Error as CoreError, + LlmProvider, Memory, MemoryStatsProvider, Message, TokenEstimator, }; use tracing::{debug, warn}; @@ -355,49 +355,55 @@ impl Memory for SummarizingMemory { // would otherwise burn quota on every compaction. The // circuit auto-resets on time-out, so we recover without // operator intervention. - let summary = if dropped_msgs.is_empty() { - None + let (summary, summary_source) = if dropped_msgs.is_empty() { + (None, None) } else { self.counters.inc(&self.counters.summary_required); if self.circuit_open() { self.counters.inc(&self.counters.circuit_skips); debug!(dropped = dropped_count, "summary circuit open; skipping LLM call"); - None + (None, None) } else { match self.summarise(&dropped_msgs).await { - Ok(s) => { + Ok((s, src)) => { self.record_summary_success(); - Some(s) + (Some(s), Some(src)) } Err(e) => { self.counters.inc(&self.counters.llm_failures); self.record_summary_failure(); warn!(error = %e, dropped = dropped_count, "summary failed; falling back to placeholder note"); - None + (None, None) } } } }; + let kept_turn_count = kept.len(); let mut out: Vec = Vec::with_capacity(system_idxs.len() + kept.iter().map(|t| t.len()).sum::() + 1); for &i in &system_idxs { out.push(messages[i].clone()); } - if let Some(s) = summary { + let summary_chars: Option = if let Some(s) = summary { + let chars = s.chars().count(); out.push(Message::system(format!( "Earlier conversation summary ({dropped_count} turn(s) compressed):\n{s}" ))); - } else if dropped_count > 0 { - // Surfacing the gap explicitly is better than silent - // truncation; keeps the model from getting confused - // about why the conversation seems to start mid-thought. - out.push(Message::system(format!( - "[{dropped_count} earlier turn(s) omitted — summary unavailable]" - ))); - } + Some(chars) + } else { + if dropped_count > 0 { + // Surfacing the gap explicitly is better than silent + // truncation; keeps the model from getting confused + // about why the conversation seems to start mid-thought. + out.push(Message::system(format!( + "[{dropped_count} earlier turn(s) omitted — summary unavailable]" + ))); + } + None + }; for turn in kept { for &i in turn { out.push(messages[i].clone()); @@ -406,7 +412,16 @@ impl Memory for SummarizingMemory { // Append the agent's working-context snapshot. Same helper // as `SlidingWindowMemory` so the two backends produce the // same trailing block. + let len_before_wc = out.len(); crate::sliding::append_working_context(&mut out); + let working_context_chars = match out.get(len_before_wc) { + Some(Message::System { content, .. }) + if content.starts_with("=== working context ===") => + { + Some(content.chars().count()) + } + _ => None, + }; // PTL safety net: if the summary itself ran long or the // working-context block tipped us over, drop oldest turns // until the estimate fits. Never returns `Err` — the entire @@ -422,6 +437,33 @@ impl Memory for SummarizingMemory { self.counters.inc(&self.counters.ptl_round_two); } } + + // Decide the canonical source for the emitted event. PTL + // outcomes win because they represent a budget escape hatch + // *after* whatever the summariser produced — the user wants + // to know the safety net fired. Otherwise the summariser's + // self-reported source (cache memory / store / fresh LLM) + // wins. When dropped > 0 but summary is None (circuit open + // or LLM error), we surface SummaryUnavailable so the UI + // can show a degraded-state marker. No drops + no summary = + // NoOp. + let source = match outcome { + PtlOutcome::RoundTwo => CompactionSource::PtlRoundTwo, + PtlOutcome::RoundOne => CompactionSource::PtlRoundOne, + PtlOutcome::None => match (summary_source, dropped_count) { + (Some(src), _) => src, + (None, n) if n > 0 => CompactionSource::SummaryUnavailable, + _ => CompactionSource::NoOp, + }, + }; + emit_memory_compaction(CompactionInfo { + source, + turns_kept: kept_turn_count, + turns_dropped: dropped_count, + summary_chars, + working_context_chars, + model_input_tokens_est: estimator.estimate_messages(&out), + }); Ok(out) } } @@ -488,14 +530,14 @@ impl SummarizingMemory { } } - async fn summarise(&self, dropped: &[Message]) -> Result { + async fn summarise(&self, dropped: &[Message]) -> Result<(String, CompactionSource), BoxError> { let fp = fingerprint(dropped); // Tier 1: in-memory single-slot cache. if let Some(text) = self.cache_lookup(&fp) { self.counters.inc(&self.counters.cache_hits_memory); debug!(fingerprint = %fp, "summary cache hit (memory)"); - return Ok(text); + return Ok((text, CompactionSource::CacheMemory)); } // Tier 2: durable store, when configured. @@ -506,7 +548,7 @@ impl SummarizingMemory { self.counters.inc(&self.counters.cache_hits_store); debug!(fingerprint = %fp, "summary cache hit (store)"); self.cache_set(&fp, &text); - return Ok(text); + return Ok((text, CompactionSource::CacheStore)); } else { warn!( fingerprint = %fp, @@ -625,7 +667,7 @@ impl SummarizingMemory { warn!(error = %e, fingerprint = %fp, "summary store save failed"); } } - Ok(text) + Ok((text, CompactionSource::FreshLlm)) } fn cache_lookup(&self, fingerprint: &str) -> Option { @@ -1536,6 +1578,108 @@ mod tests { assert_eq!(counters.circuit_skips.load(Ordering::Relaxed), 2); } + #[tokio::test] + async fn events_emit_fresh_then_cache_memory() { + use harness_core::{with_compaction_channel, CompactionSource}; + let llm = FakeLlm::new("ALPHA AND BETA HAPPENED"); + let mem = SummarizingMemory::new(llm.clone(), "test-model", 256); + let msgs = vec![ + system("sys"), + user("turn 1"), + assistant("reply 1"), + user("turn 2"), + assistant("reply 2"), + user("turn 3 most recent"), + assistant("reply 3"), + ]; + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + with_compaction_channel(tx, async { + // Fresh: LLM gets called and emits FreshLlm. + let _ = mem.compact(&msgs).await.unwrap(); + // Same dropped prefix: in-memory cache hits, emits CacheMemory. + let _ = mem.compact(&msgs).await.unwrap(); + }) + .await; + let first = rx.try_recv().expect("first compaction event"); + let second = rx.try_recv().expect("second compaction event"); + assert_eq!(first.source, CompactionSource::FreshLlm); + assert_eq!(second.source, CompactionSource::CacheMemory); + assert!(first.summary_chars.is_some()); + assert!(second.summary_chars.is_some()); + assert!(first.turns_dropped >= 1); + } + + #[tokio::test] + async fn events_emit_summary_unavailable_when_circuit_open() { + use harness_core::{with_compaction_channel, CompactionSource}; + let calls = Arc::new(AtomicUsize::new(0)); + let llm = Arc::new(CountingFailingLlmCb { + calls: calls.clone(), + }); + let mem = SummarizingMemory::new(llm, "test-model", 64); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + with_compaction_channel(tx, async { + // Trip the breaker (3 consecutive failures), then run one + // more — the next call should see the circuit open and + // fall through with `SummaryUnavailable`. + for i in 0..5 { + let msgs = vec![ + system("sys"), + user(&format!("old-{i}")), + assistant("old reply"), + user("recent"), + assistant("recent reply"), + ]; + let _ = mem.compact(&msgs).await.unwrap(); + } + }) + .await; + // Collect everything emitted. There should be at least one + // SummaryUnavailable event in the trailing two iterations. + let mut sources = Vec::new(); + while let Ok(info) = rx.try_recv() { + sources.push(info.source); + } + assert!( + sources + .iter() + .any(|s| matches!(s, CompactionSource::SummaryUnavailable)), + "expected SummaryUnavailable in the emitted sources, got: {sources:?}" + ); + } + + #[tokio::test] + async fn events_emit_ptl_when_summary_overruns_budget() { + use harness_core::{with_compaction_channel, CompactionSource}; + let llm = FakeLlm::new("X".repeat(2000)); + let mem = SummarizingMemory::new(llm, "test-model", 300); + let msgs = vec![ + system("sys"), + user("turn 1"), + assistant("reply 1 with some longer text"), + user("turn 2"), + assistant("reply 2 with some longer text"), + user("turn 3"), + assistant("reply 3 with some longer text"), + user("turn 4 most recent"), + assistant("reply 4"), + ]; + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + with_compaction_channel(tx, async { + let _ = mem.compact(&msgs).await.unwrap(); + }) + .await; + let info = rx.try_recv().expect("expected compaction event"); + assert!( + matches!( + info.source, + CompactionSource::PtlRoundOne | CompactionSource::PtlRoundTwo + ), + "PTL fallback should be the source when summary overruns; got {:?}", + info.source + ); + } + #[tokio::test] async fn counters_track_ptl_rounds() { let llm = FakeLlm::new("X".repeat(2000));