From cad047a2f7774e3e9eeb85c5918d410d60e5038a Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sat, 20 Jun 2026 17:51:54 +0200 Subject: [PATCH 01/21] =?UTF-8?q?fix(daemon):=20hibernate=20when=20idle=20?= =?UTF-8?q?=E2=80=94=20ignore=20open=20connections=20and=20watchdog=20stat?= =?UTF-8?q?us=20probes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The sleep/consolidation pipeline defers whenever _interrupt_check reports recent activity. Two independent signals wrongly marked the daemon "active" on nearly every tick, so it never completed a cycle, never hibernated, and the wake-hook re-ran every 30s — a sustained ~200% CPU churn on any long-lived deployment: 1. _interrupt_check returned True whenever mcp_socket.active_connections > 0. Long-lived MCP clients hold their socket open permanently -> always True. 2. Even after removing (1), last_activity_ts was refreshed for EVERY inbound socket line — including the watchdog's own {"type": "status"} liveness probe sent every 7-30s (daemon/_watchdog.py::_probe_status_roundtrip). So the 30s-activity window never elapsed. Fix: _interrupt_check keys off last_activity_ts recency only, and SocketServer refreshes last_activity_ts only for dispatched JSON-RPC method calls (real recall/capture traffic), never for control-plane messages. A busy burst still defers consolidation; a 30s lull now lets the cycle finish and the daemon hibernate. Adds tests/test_socket_activity_tracking.py locking in that a status probe does not count as activity while a real method call does. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/daemon/__init__.py | 9 +- src/iai_mcp/socket_server.py | 14 ++- tests/test_socket_activity_tracking.py | 134 +++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 tests/test_socket_activity_tracking.py diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index 51a7cff..f053929 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -1282,8 +1282,13 @@ def _run_wake_sequence(): _prev_lifecycle_state[0] = current if current is _LifecycleState.SLEEP: def _interrupt_check() -> bool: - if mcp_socket.active_connections > 0: - return True + # Defer the sleep pipeline only on RECENT ACTIVITY, not on + # open connections: long-lived Claude sessions keep sockets + # open permanently, so `active_connections > 0` was True at + # nearly every tick -> the cycle never completed -> no + # HIBERNATION -> the wake-hook re-ran every 30s (the 221% CPU + # churn). last_activity_ts is refreshed on each request, so a + # busy burst still defers; a 30s lull lets the cycle finish. elapsed = ( time.monotonic() - mcp_socket.last_activity_ts ) diff --git a/src/iai_mcp/socket_server.py b/src/iai_mcp/socket_server.py index 31e72ae..2d21108 100644 --- a/src/iai_mcp/socket_server.py +++ b/src/iai_mcp/socket_server.py @@ -88,7 +88,14 @@ async def handle( line = await reader.readline() if not line: break - self.last_activity_ts = time.monotonic() + # NB: last_activity_ts is intentionally NOT updated for every + # inbound line. It must track REAL memory traffic (recall/capture) + # only — never control-plane messages, in particular the watchdog's + # periodic {"type":"status"} liveness probe (every 7-30s). Counting + # those probes as activity kept _interrupt_check (daemon) perpetually + # True, so the sleep cycle never completed and never hibernated -> + # the 221% CPU churn. It is set below, only for dispatched JSON-RPC + # method calls. req_id: Any = None try: req = json.loads(line) @@ -143,6 +150,11 @@ async def handle( continue method = req["method"] params = req.get("params") or {} + # Real memory traffic: mark activity only for dispatched JSON-RPC + # method calls so background consolidation defers while the user is + # actively recalling/capturing. Control/status probes never reach + # here, so they no longer keep the daemon awake. + self.last_activity_ts = time.monotonic() try: from iai_mcp.core import dispatch result = await asyncio.to_thread( diff --git a/tests/test_socket_activity_tracking.py b/tests/test_socket_activity_tracking.py new file mode 100644 index 0000000..9bde5d3 --- /dev/null +++ b/tests/test_socket_activity_tracking.py @@ -0,0 +1,134 @@ +"""Regression: the socket server must track *real* memory traffic only. + +`SocketServer.last_activity_ts` feeds the daemon's `_interrupt_check`: the sleep +/ consolidation pipeline defers whenever activity is recent (< 30s). The watchdog +probes daemon liveness with a `{"type": "status"}` control message every 7-30s +(`daemon/_watchdog.py::_probe_status_roundtrip`). When *every* inbound line — +including that probe — refreshed `last_activity_ts`, `_interrupt_check` was +perpetually True, so the cycle never completed, the daemon never hibernated, and +the wake-hook re-ran every tick (a ~200% CPU churn on any long-lived deployment). + +The fix: refresh `last_activity_ts` only for dispatched JSON-RPC method calls +(recall/capture/etc.), never for control-plane messages. These tests lock that in. +""" +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path + +import pytest + + +@pytest.fixture +def short_socket_paths(tmp_path, monkeypatch): + from iai_mcp import concurrency, daemon_state + + sock_dir = Path(f"/tmp/iai-srvact-{os.getpid()}-{id(tmp_path)}") + sock_dir.mkdir(parents=True, exist_ok=True) + sock_path = sock_dir / "d.sock" + state_path = tmp_path / ".daemon-state.json" + + monkeypatch.setattr(concurrency, "SOCKET_PATH", sock_path) + monkeypatch.setattr(daemon_state, "STATE_PATH", state_path) + store_root = tmp_path / "store_root" + store_root.mkdir(parents=True, exist_ok=True) + monkeypatch.setenv("IAI_MCP_STORE", str(store_root)) + + try: + yield sock_path + finally: + try: + if sock_path.exists(): + sock_path.unlink() + except OSError: + pass + try: + sock_dir.rmdir() + except OSError: + pass + + +async def _send_line(sock_path: Path, payload: dict, *, timeout: float = 10.0) -> dict: + reader, writer = await asyncio.wait_for( + asyncio.open_unix_connection(path=str(sock_path)), timeout=timeout, + ) + try: + writer.write((json.dumps(payload) + "\n").encode("utf-8")) + await writer.drain() + line = await asyncio.wait_for(reader.readline(), timeout=timeout) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: # noqa: BLE001 + pass + if not line: + raise AssertionError(f"daemon closed without reply (payload={payload})") + return json.loads(line.decode("utf-8")) + + +async def _serve(sock_path: Path, store, coro_fn): + from iai_mcp.socket_server import SocketServer + + srv = SocketServer(store, idle_secs=99999) + server_task = asyncio.create_task(srv.serve(socket_path=sock_path)) + for _ in range(250): + if sock_path.exists(): + break + await asyncio.sleep(0.01) + if not sock_path.exists(): + srv.shutdown_event.set() + raise AssertionError("socket never bound") + try: + return await coro_fn(srv) + finally: + srv.shutdown_event.set() + try: + await asyncio.wait_for(server_task, timeout=5) + except Exception: # noqa: BLE001 + pass + + +def test_status_probe_does_not_refresh_last_activity(short_socket_paths): + sock_path = short_socket_paths + from iai_mcp.store import MemoryStore + + store = MemoryStore() + + async def _runner(srv): + # Sentinel baseline so any refresh is detectable. + srv.last_activity_ts = 0.0 + resp = await _send_line(sock_path, {"type": "status"}) + # The control message is answered (round-trip), proving it was received... + assert resp is not None, resp + # ...yet it must NOT have been counted as memory activity: the watchdog's + # periodic probe would otherwise keep the daemon awake forever. + assert srv.last_activity_ts == 0.0, ( + "status liveness probe wrongly refreshed last_activity_ts" + ) + + asyncio.run(_serve(sock_path, store, _runner)) + + +def test_jsonrpc_method_refreshes_last_activity(short_socket_paths): + sock_path = short_socket_paths + from iai_mcp.store import MemoryStore + + store = MemoryStore() + + async def _runner(srv): + srv.last_activity_ts = 0.0 + resp = await _send_line( + sock_path, + {"jsonrpc": "2.0", "id": 1, "method": "session_start_payload", "params": {}}, + ) + assert "result" in resp, resp + # Real recall/capture traffic MUST refresh the activity clock so the sleep + # pipeline correctly defers while the user is actively using memory. + assert srv.last_activity_ts > 0.0, ( + "dispatched JSON-RPC method did not refresh last_activity_ts" + ) + + asyncio.run(_serve(sock_path, store, _runner)) From 0b1fac2145d781de0be4395153b55c3ca162dff9 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 08:24:07 +0200 Subject: [PATCH 02/21] fix(daemon): single-flight build_runtime_graph to end the WAKE recompute storm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At WAKE several background subsystems (boot preload, sigma identity audit, foraging weak-bridge detection, hippea cascade) each call build_runtime_graph concurrently in their own asyncio.to_thread workers. On a cache miss each one independently ran the full, GIL-bound community detection (mosaic). Three+ at once contended for the GIL, starved the asyncio event loop, and the liveness watchdog's socket probe timed out -> SIGKILL -> launchd relaunch -> loop. Wrap build_runtime_graph in a single-flight gate keyed on the cache key: the first caller (leader) computes and saves the on-disk cache; concurrent callers (followers) wait on an Event and then reload the freshly-saved cache via the existing cheap path. No mutable MemoryGraph is shared between callers (each rebuilds its own shell + single-slot sync hook), and recall is independent of the community assignment, so a slightly-stale shared result is harmless. Followers re-contend in a bounded loop rather than recomputing unconditionally: if the leader fails before saving, the cache key shifts mid-burst, or the wait times out, the woken followers loop back and exactly one becomes the next leader while the rest wait again — degrading those edge cases to sequential single-flight (one compute at a time) instead of an N-way concurrent re-storm. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/retrieve.py | 85 +++++++++++- .../test_build_runtime_graph_single_flight.py | 131 ++++++++++++++++++ 2 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 tests/test_build_runtime_graph_single_flight.py diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index d5bce19..89f3f2c 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading import time from datetime import datetime, timedelta, timezone from itertools import combinations @@ -33,6 +34,32 @@ _STALE_REASON_SUFFIX: str = " · stale" +# --- build_runtime_graph single-flight (WAKE CPU-storm fix) ----------------- +# At daemon WAKE several background subsystems (boot preload, sigma identity +# audit, foraging weak-bridge detection, hippea cascade warming) each call +# build_runtime_graph concurrently. On a cache MISS each one independently runs +# the full O(n^2) community detection (mosaic), GIL-bound, in its own to_thread +# worker. Three+ of those at once contend for the GIL, starve the asyncio event +# loop, and the liveness watchdog's socket probe times out -> SIGKILL -> relaunch +# loop. This single-flight gate collapses the concurrent burst into ONE compute: +# the first caller (leader) computes and saves the on-disk cache; concurrent +# callers (followers) wait on its Event and then re-load the freshly-saved cache +# via the cheap path. No mutable graph object is shared between callers (each +# rebuilds its own MemoryGraph shell + sync hook), and recall is independent of +# the community assignment, so a slightly-stale shared result is harmless. +# +# Followers RE-CONTEND in a bounded loop rather than recomputing unconditionally: +# if the leader fails before saving (e.g. detect_communities raises), or the cache +# key shifts mid-burst, or the leader overruns the wait timeout, the woken +# followers loop back, and exactly ONE of them becomes the next leader while the +# rest wait again. That degrades those edge cases to *sequential* single-flight +# (one compute at a time) instead of an N-way concurrent re-storm. +_BRG_INFLIGHT_LOCK = threading.Lock() +_BRG_INFLIGHT: dict[str, threading.Event] = {} +_BRG_WAIT_TIMEOUT_SEC: float = 120.0 +_BRG_MAX_ATTEMPTS: int = 4 + + def recall( store: MemoryStore, cue_embedding: list[float], @@ -450,6 +477,63 @@ def _hook(op: str, record) -> None: def build_runtime_graph(store: MemoryStore): + """Single-flight wrapper around the real graph build. + + On a cache HIT this is cheap and runs directly. On a cache MISS it + serialises concurrent callers so the expensive community detection runs + exactly once per cache generation: the leader computes + saves the cache, + followers wait and then reload it cheaply. See the _BRG_* notes above. + """ + from iai_mcp import runtime_graph_cache as _rgc + + cached = None + for _attempt in range(_BRG_MAX_ATTEMPTS): + try: + cached = _rgc.try_load(store) + except Exception: # noqa: BLE001 -- never let cache I/O break the build + cached = None + + # Cache HIT: no contention risk, run directly (the impl reloads cheaply). + if cached is not None and cached[0] is not None: + return _build_runtime_graph_impl(store, cached) + + # Cache MISS: single-flight on the cache key so a WAKE burst of callers + # does not all recompute the full mosaic concurrently. + try: + keystr = repr(_rgc._cache_key(store)) + except Exception: # noqa: BLE001 -- if we can't key it, just compute + return _build_runtime_graph_impl(store, cached) + + with _BRG_INFLIGHT_LOCK: + event = _BRG_INFLIGHT.get(keystr) + is_leader = event is None + if is_leader: + event = threading.Event() + _BRG_INFLIGHT[keystr] = event + + if is_leader: + # Leader: compute (the impl saves the cache), then release followers. + try: + return _build_runtime_graph_impl(store, cached) + finally: + with _BRG_INFLIGHT_LOCK: + # Only drop our own slot (a key shift could have replaced it). + if _BRG_INFLIGHT.get(keystr) is event: + _BRG_INFLIGHT.pop(keystr, None) + event.set() + + # Follower: wait for the leader, then loop. Next iteration's try_load + # HITS if the leader saved; if the leader failed / the key shifted / + # the wait timed out, we re-contend and one follower becomes the next + # leader (sequential single-flight — never an N-way concurrent re-storm). + event.wait(timeout=_BRG_WAIT_TIMEOUT_SEC) + + # Attempts exhausted (e.g. the leader keeps failing): compute directly as a + # last resort. Bounded, and still correct. + return _build_runtime_graph_impl(store, cached) + + +def _build_runtime_graph_impl(store: MemoryStore, cached): from iai_mcp.community import detect_communities from iai_mcp.graph import MemoryGraph from iai_mcp.richclub import rich_club_nodes @@ -457,7 +541,6 @@ def build_runtime_graph(store: MemoryStore): graph = MemoryGraph() - cached = runtime_graph_cache.try_load(store) assignment = None rich_club = None cached_node_payload: dict[str, dict] | None = None diff --git a/tests/test_build_runtime_graph_single_flight.py b/tests/test_build_runtime_graph_single_flight.py new file mode 100644 index 0000000..b7869da --- /dev/null +++ b/tests/test_build_runtime_graph_single_flight.py @@ -0,0 +1,131 @@ +"""Regression test for the build_runtime_graph WAKE CPU-storm fix. + +At daemon WAKE several background subsystems (boot preload, sigma identity +audit, foraging, hippea cascade) call retrieve.build_runtime_graph concurrently. +Before the fix, each one independently ran the full (GIL-bound) community +detection on a cache miss, so 3+ concurrent runs contended for the GIL, starved +the asyncio event loop, and the liveness watchdog SIGKILLed the daemon. + +The single-flight gate must collapse a concurrent burst into ONE compute (the +leader), with the others reusing the freshly-saved cache — while never sharing a +mutable MemoryGraph object between callers. +""" +from __future__ import annotations + +import threading +import time +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +import iai_mcp.community as community +import iai_mcp.retrieve as retrieve +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _vec(seed: int) -> list[float]: + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return (v / np.linalg.norm(v)).tolist() + + +def _rec(seed: int) -> MemoryRecord: + now = datetime.now(timezone.utc) + return MemoryRecord( + id=uuid4(), tier="episodic", literal_surface="rec", aaak_index="", + embedding=_vec(seed), community_id=None, centrality=0.0, detail_level=2, + pinned=False, stability=0.0, difficulty=0.0, last_reviewed=None, + never_decay=False, never_merge=False, provenance=[], created_at=now, + updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def test_concurrent_build_runtime_graph_runs_detect_once(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + store.insert(_rec(i)) + + n_threads = 4 + calls: list[int] = [] + calls_lock = threading.Lock() + start = threading.Barrier(n_threads) + + orig_detect = community.detect_communities + + def slow_detect(*args, **kwargs): + with calls_lock: + calls.append(1) + # Hold long enough that the other callers pile up behind the leader. + time.sleep(0.6) + return orig_detect(*args, **kwargs) + + # build_runtime_graph_impl does `from iai_mcp.community import detect_communities` + # at call time, so patching the module attribute is seen. + monkeypatch.setattr(community, "detect_communities", slow_detect) + + results: list[tuple] = [] + results_lock = threading.Lock() + + def worker(): + start.wait() + graph, assignment, _rc = retrieve.build_runtime_graph(store) + with results_lock: + results.append((assignment, id(graph))) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=60) + + assert len(results) == n_threads, "some build_runtime_graph callers hung" + # Single-flight: the expensive community detection ran exactly once even + # though all four callers raced into a cache miss simultaneously. + assert len(calls) == 1, ( + f"expected detect_communities to run once (single-flight); " + f"ran {len(calls)} times" + ) + # Every caller got a valid assignment... + assert all(a is not None for a, _ in results) + # ...and its OWN MemoryGraph object — the mutable graph (which receives the + # store sync hook) must never be shared between concurrent callers. + graph_ids = {gid for _, gid in results} + assert len(graph_ids) == n_threads, ( + f"callers shared a MemoryGraph object ({len(graph_ids)} distinct of " + f"{n_threads}); the single-flight must memoise only immutable products" + ) + + +def test_build_runtime_graph_hit_path_skips_detect(tmp_path, monkeypatch): + """A warm cache must short-circuit without running community detection.""" + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + store.insert(_rec(i)) + + # Prime the cache. + retrieve.build_runtime_graph(store) + + calls: list[int] = [] + orig_detect = community.detect_communities + + def counting_detect(*args, **kwargs): + calls.append(1) + return orig_detect(*args, **kwargs) + + monkeypatch.setattr(community, "detect_communities", counting_detect) + + # Second call on the same generation: cache HIT, no recompute. + graph, assignment, _rc = retrieve.build_runtime_graph(store) + assert assignment is not None + assert calls == [], "warm-cache build_runtime_graph must not recompute communities" From f0498404ae1f0ad8d24dbe017522dae4547af621 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 08:24:07 +0200 Subject: [PATCH 03/21] fix(graph-cache): widen staleness window so capture keeps the cache warm The cache key buckets on records//WINDOW and edges//WINDOW and try_load requires an exact match. With WINDOW=10 a normal day of capture (+~150 records, +~1300 edges) crossed ~130 buckets, so the on-disk graph cache MISSED on essentially every WAKE and the full community detection was recomputed each time. Edges churn fastest, so they are the binding term. WINDOW=250 keeps the cache valid across a normal day, so the common WAKE is now a cheap cache HIT. The independent age/dirty fuse in consult_overlay (25h / dirty>50) remains the real freshness backstop, and the single-flight gate makes the rare genuine miss harmless. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/runtime_graph_cache.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/iai_mcp/runtime_graph_cache.py b/src/iai_mcp/runtime_graph_cache.py index 8c40fa3..add150a 100644 --- a/src/iai_mcp/runtime_graph_cache.py +++ b/src/iai_mcp/runtime_graph_cache.py @@ -29,7 +29,16 @@ CACHE_VERSION: str = "62-02-v5" -_STALENESS_WINDOW: int = 10 +# Cache-key staleness bucket. The key buckets on records//WINDOW and +# edges//WINDOW; try_load requires an exact key match. With WINDOW=10 a normal +# day of capture (+~150 records, +~1300 edges) crossed ~130 buckets, so the +# on-disk graph cache MISSED on essentially every WAKE and the full community +# detection (mosaic) was recomputed each time — the WAKE CPU storm. Edges churn +# fastest, so they are the binding term. WINDOW=250 keeps the cache valid across +# a normal day so most WAKEs are cheap cache HITs; the independent age/dirty fuse +# in consult_overlay (25h / dirty>50) remains the real freshness backstop, and +# build_runtime_graph's single-flight gate makes the rare genuine miss harmless. +_STALENESS_WINDOW: int = 250 LEGACY_CACHE_VERSION_PLAINTEXT: str = "06-02-v1" _CACHE_AAD: bytes = b"runtime-graph-cache:v3" From 16b4ed4f1e2cdabc7010b89c7ad07ac626172e82 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 08:24:18 +0200 Subject: [PATCH 04/21] fix(daemon): drop boot-preload double-save that nulled the cache payload _boot_preload called build_runtime_graph (which already persists the cache, with the full node_payload, on a miss) and then called save(..., node_payload= None, ...) again, overwriting the good cache with a payload-less one. That forced a pandas re-read of every record on the next cache hit. Just warm the cache via build_runtime_graph and drop the redundant second save. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/daemon/__init__.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index f053929..92f454f 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -932,11 +932,13 @@ def _capture_handler(record: dict) -> None: async def _boot_preload() -> None: try: from iai_mcp import retrieve as _retrieve_preload - _g, _a, _rc = await asyncio.to_thread( - _retrieve_preload.build_runtime_graph, store, - ) + # build_runtime_graph already persists the cache internally + # (with the full node_payload) on a miss. The previous extra + # save(..., node_payload=None, ...) here overwrote that good + # cache with a payload-less one (forcing a pandas re-read on + # the next hit) — so we just warm the cache and drop it. await asyncio.to_thread( - _rgc_mod.save, store, _a, _rc, None, 0, + _retrieve_preload.build_runtime_graph, store, ) except Exception as _exc: # noqa: BLE001 -- preload MUST NOT crash daemon log.debug("boot_preload failed: %s", _exc, exc_info=True) From d1678f09f64e3c8c240320490d9606730aeaf8f7 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 08:24:18 +0200 Subject: [PATCH 05/21] fix(daemon): signal wake before kickstart so the daemon serves on wake MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The daemon only enters WAKE at boot if ~/.iai-mcp/wake.signal exists, but nothing ever created it — WakeHandler only consumed it. The CLI start/install path (and the operator's capture hook) brought the daemon up with a plain launchctl kickstart, so it re-read its persisted HIBERNATION state and hibernate-exited within a tick, closing the socket before it ever served recall. Add WakeHandler.signal_wake() (symmetric to consume_wake_signal) and create the signal before the kickstart in daemon install/start, so the booting daemon transitions HIBERNATION -> WAKE and serves its socket. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/cli/_daemon.py | 18 ++++++++++++++++++ src/iai_mcp/wake_handler.py | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/iai_mcp/cli/_daemon.py b/src/iai_mcp/cli/_daemon.py index f9de4d6..609eca5 100644 --- a/src/iai_mcp/cli/_daemon.py +++ b/src/iai_mcp/cli/_daemon.py @@ -29,6 +29,22 @@ def _stop_escalation_bound() -> float: return STOP_TERM_TIMEOUT_S +def _signal_daemon_wake() -> None: + """Create the wake signal before a kickstart so the booting daemon WAKEs. + + Without it the daemon boots, re-reads its persisted HIBERNATION state and + hibernate-exits within a tick, closing the socket before it ever serves + recall. Best-effort: a failure here just falls back to the old behaviour. + """ + try: + from iai_mcp.wake_handler import WakeHandler + + root = os.environ.get("IAI_MCP_STORE") or os.path.expanduser("~/.iai-mcp") + WakeHandler(Path(root) / "wake.signal").signal_wake() + except Exception: # noqa: BLE001 -- never let the wake signal break daemon start + pass + + def _stop_poll_interval() -> float: raw = os.environ.get("IAI_DAEMON_STOP_POLL_S") if raw: @@ -158,6 +174,7 @@ def cmd_daemon_install(args: argparse.Namespace) -> int: f"{result.stderr.strip()}", file=sys.stderr, ) + _signal_daemon_wake() _cli.subprocess.run( ["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"], check=False, capture_output=True, @@ -255,6 +272,7 @@ def cmd_daemon_start(args: argparse.Namespace) -> int: ["launchctl", "bootstrap", f"gui/{uid}", str(target)], check=False, capture_output=True, ) + _signal_daemon_wake() _cli.subprocess.run( ["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"], check=False, capture_output=True, diff --git a/src/iai_mcp/wake_handler.py b/src/iai_mcp/wake_handler.py index 0395f04..2ff4695 100644 --- a/src/iai_mcp/wake_handler.py +++ b/src/iai_mcp/wake_handler.py @@ -20,6 +20,24 @@ def consume_wake_signal(self) -> bool: return False return True + def signal_wake(self) -> bool: + """Create the wake signal so the next daemon boot enters WAKE. + + Symmetric counterpart to ``consume_wake_signal``. Whoever brings the + daemon up from a hibernated (process-exited) state — the CLI + start/install path, or the per-turn capture hook — must create this + signal *before* the kickstart, so the booting daemon transitions + HIBERNATION -> WAKE and serves its socket, instead of re-reading the + persisted HIBERNATION state and immediately hibernate-exiting (which + closes the socket and leaves recall unserved). Idempotent. + """ + try: + self._wake_signal_path.parent.mkdir(parents=True, exist_ok=True) + self._wake_signal_path.touch() + except OSError: + return False + return True + def has_pending_wake(self) -> bool: try: return self._wake_signal_path.is_file() From 093b3a639cbcaa4fbbb216f29649f6fde50553e2 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 11:34:53 +0200 Subject: [PATCH 06/21] fix(graph): exclude tombstoned records from the runtime graph build_runtime_graph added every record (and every edge) to the graph, including soft-deleted / deduped / erased records (tombstoned_at IS NOT NULL). That polluted communities, centrality, rich_club and the sigma topology audit with dead nodes, and -- worse -- desynced the node count from store.active_records_count() (the payload-cache validity anchor), so after any tombstoning (e.g. migrate --dedupe-episodic) the cache was permanently invalid and every WAKE did a full rebuild on an over-large graph. Skip tombstoned rows in the node loop (matching active_records_count: tombstoned_at IS NULL), skip edges whose endpoints are not live nodes (add_edge does setdefault on both endpoints, so it would re-create dead nodes), and drop the cached assignment/rich_club when the live node set changed so they are recomputed on the fresh graph instead of referencing dead nodes. On a real store this took the graph from 9733 nodes to 3612, rich_club from 974 to 362, and restored payload-cache hits across builds. --- src/iai_mcp/retrieve.py | 34 +++++++- tests/test_graph_excludes_tombstoned.py | 102 ++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 2 deletions(-) create mode 100644 tests/test_graph_excludes_tombstoned.py diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index 89f3f2c..8b46a3c 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -555,6 +555,19 @@ def _build_runtime_graph_impl(store: MemoryStore, cached): and len(cached_node_payload) == records_count ) + if cached_node_payload is not None and len(cached_node_payload) > records_count: + # The cache holds MORE nodes than are live: records were tombstoned + # (dedup/erasure) since it was built, so the cached assignment/rich_club + # were computed over now-dead nodes -- drop them and recompute on the + # fresh live graph (rebuilt from the records table below, which already + # excludes tombstoned rows). Pure GROWTH (cache has FEWER nodes than live) + # must NOT drop them: the node set is still rebuilt fresh from the table + # (so new records are present and drift/parity stay correct), but + # detect_communities is not re-run, so a single insert is absorbed without + # an O(n^2) recompute -- the staleness-window contract. + assignment = None + rich_club = None + if use_cached_payload: for nid, payload in cached_node_payload.items(): graph.add_node( @@ -580,6 +593,16 @@ def _build_runtime_graph_impl(store: MemoryStore, cached): for _, row in df.iterrows(): if int(row.get("embedding_pending") or 0) != 0: continue + # Exclude tombstoned (soft-deleted / deduped / erased) records from the + # runtime graph: they must not pollute communities, centrality, rich_club + # or the sigma topology audit, and including them keeps the node count out + # of sync with store.active_records_count() (the cache-validity anchor at + # line ~552), permanently invalidating the cache -> a full rebuild every + # wake. Matches active_records_count(): tombstoned_at IS NULL. Guard the + # pandas NaN case (NaN != NaN) so an all-live column is not skipped. + _tomb = row.get("tombstoned_at") + if _tomb is not None and not (isinstance(_tomb, float) and _tomb != _tomb) and str(_tomb).strip(): + continue rid = UUID(row["id"]) _comm_raw = row["community_id"] if _comm_raw is not None and not isinstance(_comm_raw, str): @@ -662,9 +685,16 @@ def _build_runtime_graph_impl(store: MemoryStore, cached): edges_df = store.db.open_table("edges").to_pandas() for _, row in edges_df.iterrows(): + # Skip edges whose endpoints are not live nodes: graph.add_edge() does + # setdefault() on both endpoints, so an edge referencing a tombstoned record + # would re-create it as a payload-less node and undo the tombstone filter + # above (re-bloating the graph and the sigma audit). + src_s, dst_s = row["src"], row["dst"] + if not graph.has_node(src_s) or not graph.has_node(dst_s): + continue graph.add_edge( - UUID(row["src"]), - UUID(row["dst"]), + UUID(src_s), + UUID(dst_s), weight=float(row["weight"]), edge_type=row["edge_type"], ) diff --git a/tests/test_graph_excludes_tombstoned.py b/tests/test_graph_excludes_tombstoned.py new file mode 100644 index 0000000..9dd6134 --- /dev/null +++ b/tests/test_graph_excludes_tombstoned.py @@ -0,0 +1,102 @@ +"""Regression tests for excluding tombstoned records from the runtime graph. + +build_runtime_graph used to add every record (and every edge) to the graph, +including soft-deleted / deduped / erased records (tombstoned_at IS NOT NULL). +That (a) polluted communities / centrality / rich_club / the sigma topology +audit with dead nodes, and (b) desynced the node count from +store.active_records_count() -- the cache-validity anchor -- so the payload +cache was permanently invalid and every wake did a full rebuild. + +These tests pin: tombstoned records (and edges touching them) are excluded, the +live node count equals active_records_count(), and assignment/rich_club are +recomputed on the fresh live graph rather than reused from a stale-node cache. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +import iai_mcp.retrieve as retrieve +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _vec(seed: int) -> list[float]: + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return (v / np.linalg.norm(v)).tolist() + + +def _rec(seed: int): + now = datetime.now(timezone.utc) + rid = uuid4() + return rid, MemoryRecord( + id=rid, tier="episodic", literal_surface=f"rec-{seed}", aaak_index="", + embedding=_vec(seed), community_id=None, centrality=0.0, detail_level=2, + pinned=False, stability=0.0, difficulty=0.0, last_reviewed=None, + never_decay=False, never_merge=False, provenance=[], created_at=now, + updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def _tombstone(store: MemoryStore, rid) -> None: + now = datetime.now(timezone.utc).isoformat() + with store.db._conn_lock: + store.db._conn.execute( + "UPDATE records SET tombstoned_at = ? WHERE id = ?", + (now, str(rid)), + ) + + +def test_build_runtime_graph_excludes_tombstoned(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live_ids = [] + for i in range(6): + rid, rec = _rec(i) + store.insert(rec) + live_ids.append(rid) + dead_ids = [] + for i in range(6, 10): + rid, rec = _rec(i) + store.insert(rec) + _tombstone(store, rid) + dead_ids.append(rid) + + assert store.active_records_count() == 6 + + graph, assignment, rich_club = retrieve.build_runtime_graph(store) + + nodes = {str(n) for n in graph.nodes()} + assert len(nodes) == 6, f"expected 6 live nodes, got {len(nodes)}" + for rid in dead_ids: + assert str(rid) not in nodes, f"tombstoned {rid} leaked into the graph" + for rid in live_ids: + assert str(rid) in nodes + # rich_club is a fraction of the LIVE graph, never references dead nodes + assert all(str(r) not in {str(d) for d in dead_ids} for r in (rich_club or [])) + + +def test_node_count_matches_active_records_count(tmp_path, monkeypatch): + """The cache-validity anchor: graph node count must equal active count, so the + payload cache validates on the next build instead of rebuilding forever.""" + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + _rid, rec = _rec(i) + store.insert(rec) + rid, rec = _rec(99) + store.insert(rec) + _tombstone(store, rid) + + graph, _assignment, _rc = retrieve.build_runtime_graph(store) + assert len({str(n) for n in graph.nodes()}) == store.active_records_count() == 8 From 5d14f43f0c254dea71cf44dba566bf86532b8be8 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 11:34:53 +0200 Subject: [PATCH 07/21] fix(daemon): a store count failure must not be read as empty _store_is_empty() caught (OSError, ValueError, KeyError, RuntimeError) and returned True. All Hippo store errors (HippoIntegrityError, HippoLockHeldError, ConsolidationPendingError, HippoDecryptError) subclass RuntimeError, and count_rows() raises HippoIntegrityError when the shared sqlite connection is left in an error state by a concurrent heavy reader. Returning True there parks the whole lifecycle tick (no idle-check, no drain) on a store that actually has records. Treat the unknown case as NOT empty so the tick proceeds; a truly empty store just does a little harmless no-op work. --- src/iai_mcp/daemon/__init__.py | 11 ++++-- tests/test_store_is_empty.py | 64 ++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 tests/test_store_is_empty.py diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index 92f454f..da7def1 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -228,8 +228,15 @@ def _store_is_empty(store: MemoryStore) -> bool: try: return store.db.open_table("records").count_rows() == 0 except (OSError, ValueError, KeyError, RuntimeError) as exc: - log.debug("store empty check failed, assuming empty: %s", exc) - return True + # Unknown != empty. A transient count failure (e.g. the shared sqlite + # connection left in an error state by a concurrent heavy reader, raising + # HippoIntegrityError/lock errors which subclass RuntimeError) must NOT be + # treated as an empty store: doing so parks the whole lifecycle tick + # (no idle-check, no drain) on a store that actually has records. Treat + # the unknown case as NOT empty so the tick proceeds; a truly empty store + # just does a little harmless no-op work. + log.debug("store empty check failed, assuming NOT empty: %s", exc) + return False def _is_inside_window( diff --git a/tests/test_store_is_empty.py b/tests/test_store_is_empty.py new file mode 100644 index 0000000..c7528c3 --- /dev/null +++ b/tests/test_store_is_empty.py @@ -0,0 +1,64 @@ +"""Regression test for _store_is_empty: a count failure must NOT be read as empty. + +The daemon tick calls _store_is_empty() and, if true, skips the whole tick +(no idle-check, no drain). A transient count failure -- e.g. the shared sqlite +connection left in an error state by a concurrent heavy reader, raising +HippoIntegrityError/lock errors (all subclass RuntimeError) -- used to be caught +and return True, parking the lifecycle on a store that actually has records. +The fix returns False (unknown != empty) so the tick proceeds. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +from iai_mcp.daemon import _store_is_empty +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _rec(seed: int) -> MemoryRecord: + now = datetime.now(timezone.utc) + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return MemoryRecord( + id=uuid4(), tier="episodic", literal_surface="rec", aaak_index="", + embedding=(v / np.linalg.norm(v)).tolist(), community_id=None, + centrality=0.0, detail_level=2, pinned=False, stability=0.0, + difficulty=0.0, last_reviewed=None, never_decay=False, never_merge=False, + provenance=[], created_at=now, updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def test_empty_store_is_empty(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + assert _store_is_empty(store) is True + + +def test_nonempty_store_is_not_empty(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + store.insert(_rec(1)) + assert _store_is_empty(store) is False + + +def test_count_failure_is_not_treated_as_empty(tmp_path, monkeypatch): + """The core fix: a RuntimeError during the count must yield False, not True.""" + store = _make_store(tmp_path, monkeypatch) + store.insert(_rec(2)) + + def boom(*_a, **_k): + raise RuntimeError("connection in error state") + + monkeypatch.setattr(store.db, "open_table", boom) + assert _store_is_empty(store) is False From 73e8e44a8bcba611cd9de428984dee3215811f9c Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 11:42:41 +0200 Subject: [PATCH 08/21] fix(daemon): reset last_tick_skipped_reason on a successful tick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The field was only ever set (on the empty_store/paused skip paths), never cleared, so a single early skip (e.g. a first-tick count race at boot) left a healthy, ticking, draining daemon permanently reporting skip=empty_store in .daemon-state.json — misleading observability that reads as a parked lifecycle. --- src/iai_mcp/daemon/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index da7def1..408bce5 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -497,6 +497,10 @@ async def _tick_body( state["last_tick_at"] = datetime.now(timezone.utc).isoformat() + # Clear the skip reason: reaching here means the tick was NOT skipped. Leaving a + # stale "empty_store"/"paused" value here makes a healthy daemon look parked in + # observability (last_tick_skipped_reason is only ever set, never reset). + state["last_tick_skipped_reason"] = None try: await asyncio.to_thread(save_state, state) except (OSError, ValueError) as exc: From 364ea591704f3d382ba721520f18b6507f64f463 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 12:54:36 +0200 Subject: [PATCH 09/21] fix(daemon): keep idle countdown awake during active drains and RPC The lifecycle idle countdown only refreshed `_last_active_monotonic` when the Node wrapper heartbeat file was fresh (`HeartbeatScanner.is_active`). The wrappers dir can be empty (heartbeat stale) while the daemon is still draining a continuously-fed deferred-capture backlog. In that state the idle timer grew unconditionally and the FSM forced itself to SLEEP after 30 min even though drain threads were still writing to the store. Entering the SLEEP pipeline escalates to an EXCLUSIVE store lock, so this contended with the in-flight drain; and because crisis re-arming only runs in SLEEP, an oscillating/never-settling daemon could silently stop re-arming crisis detection. Fold two more activity signals into the idle countdown, alongside the wrapper heartbeat: - in-flight drain state: `capture.is_drain_in_progress()`, a thread-safe depth counter set by `drain_deferred_captures` / `drain_active_live_captures` for their whole duration; - recent real RPC traffic: `mcp_socket.last_activity_ts` (already used by the sleep-pipeline interrupt check, now also by the countdown). The decision is centralized in a pure, unit-testable helper `_idle_countdown_decision`. A genuinely idle daemon still advances to DROWSY/SLEEP exactly as before, so crisis re-arming keeps running; only an actively-working daemon is held awake. Explicit FORCE_SLEEP/user-sleep requests are unaffected. Add tests asserting the daemon does NOT advance toward SLEEP while a drain is in progress (or RPC is recent), that a truly idle daemon still sleeps, and that the in-progress flag is set across the production drain wrappers and released on exception. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/capture.py | 69 ++++++ src/iai_mcp/daemon/__init__.py | 88 ++++++- .../test_daemon_idle_countdown_drain_aware.py | 222 ++++++++++++++++++ 3 files changed, 376 insertions(+), 3 deletions(-) create mode 100644 tests/test_daemon_idle_countdown_drain_aware.py diff --git a/src/iai_mcp/capture.py b/src/iai_mcp/capture.py index d95ad4f..e2b2a31 100644 --- a/src/iai_mcp/capture.py +++ b/src/iai_mcp/capture.py @@ -1,6 +1,7 @@ from __future__ import annotations +import contextlib import hashlib import json import logging @@ -41,6 +42,46 @@ # two threads before either has inserted it. _CAPTURE_DEDUP_LOCK = threading.Lock() +# Drain-in-progress accounting for the daemon's lifecycle idle countdown. +# +# The wrapper heartbeat file is only ONE signal that the daemon is busy. A +# deferred-capture drain (triggered by real RPC traffic, kicked off at the +# drowsy edge, or run on startup) can keep writing to the store long after the +# heartbeat file has gone stale. If the idle countdown only watched the +# heartbeat, it would force the FSM toward SLEEP -- which escalates to an +# EXCLUSIVE store lock -- while these drain threads are still hammering the +# store, causing lock contention. The daemon consults `is_drain_in_progress` +# so an in-flight drain holds the idle countdown open. A depth counter (rather +# than a boolean) keeps this correct under concurrent/overlapping drains across +# the several threads that asyncio.to_thread dispatch spins up. +_DRAIN_IN_PROGRESS_LOCK = threading.Lock() +_drain_in_progress_depth = 0 + + +@contextlib.contextmanager +def _drain_in_progress_guard(): + """Mark a deferred-capture drain as in flight for its whole duration.""" + global _drain_in_progress_depth + with _DRAIN_IN_PROGRESS_LOCK: + _drain_in_progress_depth += 1 + try: + yield + finally: + with _DRAIN_IN_PROGRESS_LOCK: + _drain_in_progress_depth -= 1 + + +def is_drain_in_progress() -> bool: + """True while at least one deferred-capture drain is running. + + The daemon's lifecycle idle countdown reads this so it does NOT advance the + FSM toward SLEEP (and the EXCLUSIVE store lock it takes) while drain threads + are still writing to the store. + """ + with _DRAIN_IN_PROGRESS_LOCK: + return _drain_in_progress_depth > 0 + + FAILED_MAX_ATTEMPTS: int = 3 FAILED_BACKOFF_BASE_SEC: float = 60.0 @@ -711,6 +752,17 @@ def write_deferred_captures( def drain_deferred_captures(store: MemoryStore) -> dict[str, int]: + """Drain crashed/orphaned deferred-capture files into the store. + + Marks itself in-progress for the daemon idle countdown so the FSM does not + advance toward SLEEP (EXCLUSIVE store lock) mid-drain. See + `is_drain_in_progress`. + """ + with _drain_in_progress_guard(): + return _drain_deferred_captures_impl(store) + + +def _drain_deferred_captures_impl(store: MemoryStore) -> dict[str, int]: deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures" log_dir = Path.home() / ".iai-mcp" / "logs" log_dir.mkdir(parents=True, exist_ok=True) @@ -1139,6 +1191,23 @@ def drain_active_live_captures( store: MemoryStore, *, exclude_session_id: str, +) -> dict[str, int]: + """Drain live-session capture files into the store. + + Marked in-progress for the daemon idle countdown, like + `drain_deferred_captures` -- both write to the store and must hold the FSM + out of SLEEP while running. See `is_drain_in_progress`. + """ + with _drain_in_progress_guard(): + return _drain_active_live_captures_impl( + store, exclude_session_id=exclude_session_id, + ) + + +def _drain_active_live_captures_impl( + store: MemoryStore, + *, + exclude_session_id: str, ) -> dict[str, int]: deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures" state_dir = Path.home() / ".iai-mcp" / ".capture-state" diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index 408bce5..76efa0e 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -145,6 +145,52 @@ def _should_drain_on_drowsy_edge(prev, current) -> bool: return prev is _L.WAKE and current is _L.DROWSY +# Idle-countdown decisions. The lifecycle tick translates these into FSM events. +IDLE_DECISION_ACTIVE: str = "active" # real work in flight -> reset the countdown +IDLE_DECISION_SLEEP: str = "sleep" # idle past the sleep threshold -> IDLE_30MIN +IDLE_DECISION_DROWSY: str = "drowsy" # idle past the drowsy threshold -> IDLE_5MIN +IDLE_DECISION_HOLD: str = "hold" # within the drowsy window -> no transition + + +def _idle_countdown_decision( + *, + scanner_active: bool, + drain_in_progress: bool, + seconds_since_rpc: float, + idle_elapsed: float, + sleep_eligible: bool, + recent_rpc_window_sec: float, + drowsy_after_sec: float, + sleep_heartbeat_idle_sec: float, +) -> str: + """Decide what the lifecycle idle countdown should do this tick. + + The wrapper heartbeat (``scanner_active``) is only ONE signal that the + daemon is busy. Two others MUST also hold the daemon awake: + + * ``drain_in_progress`` -- a deferred-capture drain is writing to the store + right now. The wrappers dir can be empty (heartbeat stale) while a drain + kicked off by earlier RPC traffic is still hammering the store. + * ``seconds_since_rpc`` -- real JSON-RPC traffic arrived recently. A drain + runs inside its triggering request, so by the time a long drain finishes + ``last_activity_ts`` (set at request start) may already look stale; the + ``drain_in_progress`` signal covers that tail. + + When any of these holds we return ``ACTIVE`` so the caller resets the idle + countdown. Otherwise the daemon really is idle and we advance toward SLEEP + (which escalates to an EXCLUSIVE store lock) / DROWSY exactly as the + heartbeat-only logic used to, so a genuinely quiet daemon still settles and + crisis re-arming in SLEEP keeps running. + """ + if scanner_active or drain_in_progress or seconds_since_rpc < recent_rpc_window_sec: + return IDLE_DECISION_ACTIVE + if idle_elapsed >= sleep_heartbeat_idle_sec and sleep_eligible: + return IDLE_DECISION_SLEEP + if idle_elapsed >= drowsy_after_sec: + return IDLE_DECISION_DROWSY + return IDLE_DECISION_HOLD + + def _run_drowsy_drain(store, *, drain_fn, write_event_fn) -> None: try: result = drain_fn(store) @@ -1162,6 +1208,34 @@ def _emit_expiry() -> None: now_mono = time.monotonic() idle_elapsed = now_mono - _last_active_monotonic[0] + # Beyond the wrapper heartbeat, real RPC traffic and an + # in-flight deferred-capture drain are activity too. Folding + # them into the idle countdown stops the FSM forcing SLEEP + # (-> EXCLUSIVE store lock) while drain threads are still + # hammering the store. See _idle_countdown_decision. + try: + from iai_mcp.capture import is_drain_in_progress as _drain_q + _drain_active = bool(await asyncio.to_thread(_drain_q)) + except Exception: # noqa: BLE001 -- idle accounting MUST NOT crash the tick + _drain_active = False + _seconds_since_rpc = ( + (now_mono - mcp_socket.last_activity_ts) + if mcp_socket is not None + else float("inf") + ) + _idle_decision = _idle_countdown_decision( + scanner_active=scanner_active, + drain_in_progress=_drain_active, + seconds_since_rpc=_seconds_since_rpc, + idle_elapsed=idle_elapsed, + sleep_eligible=sleep_eligible, + recent_rpc_window_sec=INTERRUPT_RECENT_ACTIVITY_WINDOW_SEC, + drowsy_after_sec=DROWSY_AFTER_SEC, + sleep_heartbeat_idle_sec=SLEEP_HEARTBEAT_IDLE_SEC, + ) + if _idle_decision == IDLE_DECISION_ACTIVE: + _last_active_monotonic[0] = now_mono + try: from iai_mcp.daemon_state import load_state as _load_ds _ds = await asyncio.to_thread(_load_ds) @@ -1210,7 +1284,10 @@ def _emit_expiry() -> None: pass if scanner_active: - _last_active_monotonic[0] = now_mono + # _last_active_monotonic was already refreshed above + # (scanner_active -> IDLE_DECISION_ACTIVE); a fresh + # wrapper heartbeat additionally pulls the FSM back to + # WAKE, which RPC/drain activity alone does not. try: await _state_machine.dispatch( _LifecycleEvent.HEARTBEAT_REFRESH, @@ -1218,7 +1295,7 @@ def _emit_expiry() -> None: ) except (S2OscillationConflict, S2OscillationBlocked): pass - elif idle_elapsed >= SLEEP_HEARTBEAT_IDLE_SEC and sleep_eligible: + elif _idle_decision == IDLE_DECISION_SLEEP: try: await _state_machine.dispatch( _LifecycleEvent.IDLE_30MIN, @@ -1227,7 +1304,7 @@ def _emit_expiry() -> None: ) except (S2OscillationConflict, S2OscillationBlocked): pass - elif idle_elapsed >= DROWSY_AFTER_SEC: + elif _idle_decision == IDLE_DECISION_DROWSY: try: await _state_machine.dispatch( _LifecycleEvent.IDLE_5MIN, @@ -1538,6 +1615,11 @@ def _interrupt_check() -> bool: "_raise_fd_limit", "_run_drowsy_drain", "_should_drain_on_drowsy_edge", + "_idle_countdown_decision", + "IDLE_DECISION_ACTIVE", + "IDLE_DECISION_SLEEP", + "IDLE_DECISION_DROWSY", + "IDLE_DECISION_HOLD", "_kick_drowsy_rgc_rebuild", "_wake_hook_rebuild_if_cold", "_store_is_empty", diff --git a/tests/test_daemon_idle_countdown_drain_aware.py b/tests/test_daemon_idle_countdown_drain_aware.py new file mode 100644 index 0000000..f02599d --- /dev/null +++ b/tests/test_daemon_idle_countdown_drain_aware.py @@ -0,0 +1,222 @@ +"""The lifecycle idle countdown must watch real work, not only the heartbeat. + +Regression coverage for the bug where the FSM forced itself to SLEEP after the +idle timer expired even though the daemon was still draining a continuously-fed +backlog: the only refresh of ``_last_active_monotonic`` was gated on the Node +wrapper heartbeat file, which can be stale (empty wrappers dir) while a drain +kicked off by earlier RPC traffic is still hammering the store. Advancing to +SLEEP there escalates to an EXCLUSIVE store lock mid-drain -> lock contention. + +These tests pin the two added signals -- an in-flight drain and recent RPC +activity -- and the regression guard that a genuinely idle daemon still sleeps +(so crisis re-arming, which only runs in SLEEP, keeps working). +""" + +from __future__ import annotations + +import platform +import threading + +import pytest + + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", + reason="daemon module is POSIX-only on this project", +) + + +# Inputs that, on their own, would push the daemon all the way to SLEEP: well +# past the sleep threshold, stale RPC, no wrapper heartbeat, sleep-eligible. +# Each test flips exactly one signal to prove it holds the countdown open. +_SLEEPY = dict( + scanner_active=False, + seconds_since_rpc=10_000.0, + idle_elapsed=10_000.0, + sleep_eligible=True, + recent_rpc_window_sec=30.0, + drowsy_after_sec=300.0, + sleep_heartbeat_idle_sec=1800.0, +) + + +def test_drain_in_progress_blocks_advance_toward_sleep(): + from iai_mcp.daemon import ( + IDLE_DECISION_ACTIVE, + IDLE_DECISION_SLEEP, + _idle_countdown_decision, + ) + + decision = _idle_countdown_decision(drain_in_progress=True, **_SLEEPY) + + assert decision != IDLE_DECISION_SLEEP, ( + "idle countdown advanced toward SLEEP while a drain was in progress" + ) + assert decision == IDLE_DECISION_ACTIVE + + +def test_recent_rpc_blocks_advance_toward_sleep(): + from iai_mcp.daemon import IDLE_DECISION_ACTIVE, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["seconds_since_rpc"] = 5.0 # within recent_rpc_window_sec (30s) + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_ACTIVE + + +def test_truly_idle_still_advances_to_sleep(): + # Regression guard: with NO activity of any kind, a quiet daemon must still + # reach SLEEP, otherwise crisis re-arming (SLEEP-only) never runs again. + from iai_mcp.daemon import IDLE_DECISION_SLEEP, _idle_countdown_decision + + decision = _idle_countdown_decision(drain_in_progress=False, **_SLEEPY) + + assert decision == IDLE_DECISION_SLEEP + + +def test_idle_past_drowsy_but_not_sleep_eligible_goes_drowsy_only(): + from iai_mcp.daemon import IDLE_DECISION_DROWSY, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["idle_elapsed"] = 600.0 # past drowsy_after_sec (300) ... + inputs["sleep_eligible"] = False # ... but not sleep-eligible yet + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_DROWSY + + +def test_within_drowsy_window_holds(): + from iai_mcp.daemon import IDLE_DECISION_HOLD, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["idle_elapsed"] = 60.0 # under drowsy_after_sec (300) + inputs["sleep_eligible"] = False + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_HOLD + + +def test_scanner_active_is_active(): + from iai_mcp.daemon import IDLE_DECISION_ACTIVE, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["scanner_active"] = True + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_ACTIVE + + +# --- capture wiring: the production drain path actually flips the flag -------- + + +def test_drain_deferred_marks_in_progress_and_holds_off_sleep(monkeypatch): + from iai_mcp import capture + from iai_mcp.daemon import ( + IDLE_DECISION_ACTIVE, + IDLE_DECISION_SLEEP, + _idle_countdown_decision, + ) + + seen: dict[str, object] = {} + + def spy_impl(store): + # Mid-drain the flag must be set, and the idle countdown -- fed the flag + # -- must refuse to advance toward SLEEP on otherwise-sleepy inputs. + seen["in_progress"] = capture.is_drain_in_progress() + seen["decision"] = _idle_countdown_decision( + drain_in_progress=capture.is_drain_in_progress(), **_SLEEPY, + ) + return {"files_drained": 0, "files_failed": 0} + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", spy_impl) + + assert capture.is_drain_in_progress() is False + capture.drain_deferred_captures(store=None) + + assert seen["in_progress"] is True + assert seen["decision"] == IDLE_DECISION_ACTIVE + assert seen["decision"] != IDLE_DECISION_SLEEP + # Released once the drain returns: a quiet daemon can sleep again. + assert capture.is_drain_in_progress() is False + + +def test_drain_active_live_marks_in_progress(monkeypatch): + from iai_mcp import capture + + seen: dict[str, object] = {} + + def spy_impl(store, *, exclude_session_id): + seen["in_progress"] = capture.is_drain_in_progress() + seen["sid"] = exclude_session_id + return {"files_drained": 0} + + monkeypatch.setattr(capture, "_drain_active_live_captures_impl", spy_impl) + + capture.drain_active_live_captures(store=None, exclude_session_id="sess-1") + + assert seen["in_progress"] is True + assert seen["sid"] == "sess-1" + assert capture.is_drain_in_progress() is False + + +def test_guard_releases_on_exception(monkeypatch): + from iai_mcp import capture + + def boom(store): + raise RuntimeError("drain blew up") + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", boom) + + assert capture.is_drain_in_progress() is False + with pytest.raises(RuntimeError): + capture.drain_deferred_captures(store=None) + # A failed drain must not leak the in-progress count, else the daemon would + # never sleep again. + assert capture.is_drain_in_progress() is False + + +def test_overlapping_drains_tracked_by_depth(): + # The counter (not a boolean) keeps overlapping drains across threads + # correct: the flag stays True until the LAST one releases. + from iai_mcp import capture + + assert capture.is_drain_in_progress() is False + with capture._drain_in_progress_guard(): + assert capture.is_drain_in_progress() is True + with capture._drain_in_progress_guard(): + assert capture.is_drain_in_progress() is True + assert capture.is_drain_in_progress() is True # outer still holds + assert capture.is_drain_in_progress() is False + + +def test_is_drain_in_progress_true_while_other_thread_drains(monkeypatch): + from iai_mcp import capture + + entered = threading.Event() + release = threading.Event() + + def slow_impl(store): + entered.set() + release.wait(timeout=5.0) + return {"files_drained": 0, "files_failed": 0} + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", slow_impl) + + worker = threading.Thread( + target=lambda: capture.drain_deferred_captures(store=None), + ) + worker.start() + try: + assert entered.wait(timeout=5.0) + # Observed from a DIFFERENT thread than the one draining. + assert capture.is_drain_in_progress() is True + finally: + release.set() + worker.join(timeout=5.0) + + assert capture.is_drain_in_progress() is False From d03e02469b9c30b6230a6e0dc566ade8fa2dd6a2 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 14:59:50 +0200 Subject: [PATCH 10/21] test(daemon): guard tick-flag observability and the skip-reason reset Covers 2cffb35: a successful tick clears a stale last_tick_skipped_reason, plus the paused-skip event/persistence and the no-run_rem_cycle routing. Co-Authored-By: Claude Opus 4.8 --- tests/test_daemon_tick_flags.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_daemon_tick_flags.py b/tests/test_daemon_tick_flags.py index 8f1e31d..70bdcf8 100644 --- a/tests/test_daemon_tick_flags.py +++ b/tests/test_daemon_tick_flags.py @@ -157,3 +157,23 @@ def test_tick_updates_last_tick_at(tick_env, monkeypatch): assert "last_tick_at" in state datetime.fromisoformat(state["last_tick_at"]) + + +def test_successful_tick_resets_stale_skip_reason(tick_env, monkeypatch): + # 2cffb35 regression: a stale last_tick_skipped_reason ("empty_store"/"paused") + # must be cleared once a tick actually runs (store non-empty, not paused), + # otherwise observability shows a healthy daemon as permanently parked. tick_env + # seeds one record -> store is non-empty. WITHOUT the reset line the field stays + # "empty_store" and this test fails. + from iai_mcp import daemon as daemon_mod + from iai_mcp.daemon_state import load_state + + store, state_path, tmp_path = tick_env + monkeypatch.setattr(daemon_mod, "should_relearn", lambda last, now: False) + + state = {"fsm_state": "WAKE", "last_tick_skipped_reason": "empty_store"} + asyncio.run(daemon_mod._tick_body(store, state)) + + assert state.get("last_tick_skipped_reason") is None + loaded = load_state() + assert loaded.get("last_tick_skipped_reason") is None From e39a901b563a018cee382a651dd85d6feb19a7d5 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 14:59:50 +0200 Subject: [PATCH 11/21] fix(capture): stop re-embedding seen turns and close the dedup race Active sessions re-drain the entire transcript every turn, so the eager embed at the top of capture_turn re-ran the expensive GIL-bound Rust matmul for every already-stored turn just to discard it on the idem-tag dedup check -- a steady CPU drain proportional to transcript length. Defer embedding behind a memoized _compute_embedding() closure that runs at most once and only when a turn is actually new, and flush the record buffer before the idem lookup under _CAPTURE_DEDUP_LOCK so a just-inserted but unflushed turn is visible to the SQLite-backed dedup -- closing a check-then-insert race that produced live duplicate records. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/capture.py | 67 +++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/src/iai_mcp/capture.py b/src/iai_mcp/capture.py index e2b2a31..9c54ae6 100644 --- a/src/iai_mcp/capture.py +++ b/src/iai_mcp/capture.py @@ -317,32 +317,53 @@ def capture_turn( from iai_mcp.embed import embedder_for_store from iai_mcp.events import TELEMETRY_EMBED_NATIVE_FAILURE, write_event - try: - # Embed the message content, never the cue. The cue is a provenance - # label only (transcript drains and deferred-drain pass a positional - # cue such as "session turn "); embedding it collapsed the - # stored vector space and broke semantic recall. text is already - # validated non-empty above (the MIN_CAPTURE_LEN guard), so embedding - # text is safe for every caller. - emb = embedder_for_store(store).embed(text) - except Exception as exc: - write_event( - store, - TELEMETRY_EMBED_NATIVE_FAILURE, - { - "op_type": "capture", - "backend": "rust", - "error_type": type(exc).__name__, - "error": str(exc), - }, - ) - raise NativeError(f"capture encode failed: {exc}") from exc - embedding = list(emb) + # Embedding is the expensive native (Rust) matmul; the exact-key idem dedup + # below is a cheap SQLite lookup. Active sessions re-drain the *whole* + # transcript every turn (write_deferred_captures / capture_transcript walk + # from line 0 on each call), so embedding eagerly here re-embedded every + # already-stored turn only to throw the vector away as a "reinforced" + # exact-key re-drain -> chronic daemon CPU. Defer the embed so it runs at + # most once and only when actually needed: an already-seen episodic turn + # short-circuits on its idem tag and never embeds. Embed the message + # content, never the cue (the cue is a positional provenance label; + # embedding it collapsed the vector space and broke semantic recall). text + # is validated non-empty above (the MIN_CAPTURE_LEN guard). + _embed_cache: dict[str, list[float]] = {} + + def _compute_embedding() -> list[float]: + if "v" in _embed_cache: + return _embed_cache["v"] + try: + emb = embedder_for_store(store).embed(text) + except Exception as exc: + write_event( + store, + TELEMETRY_EMBED_NATIVE_FAILURE, + { + "op_type": "capture", + "backend": "rust", + "error_type": type(exc).__name__, + "error": str(exc), + }, + ) + raise NativeError(f"capture encode failed: {exc}") from exc + vec = list(emb) + _embed_cache["v"] = vec + return vec with _CAPTURE_DEDUP_LOCK: if _is_episodic_conversational(tier, role): ts_iso = now.isoformat() idem_t = _idem_tag(session_id, role, ts_iso, text, source_uuid=source_uuid) + # find_record_by_tag reads SQLite, but a just-inserted record may + # still sit in the in-process _record_buffer (not yet flushed to the + # records table). Under _CAPTURE_DEDUP_LOCK the insert path is + # serialized with this find, so flushing the buffer here makes every + # prior committed insert visible to the SQLite-backed find and closes + # the check-then-insert race that produced live idem-tag duplicates. + from iai_mcp.store import flush_record_buffer + + flush_record_buffer(store) existing_id = store.find_record_by_tag(idem_t) if existing_id is not None: try: @@ -362,7 +383,7 @@ def capture_turn( } else: try: - neighbours = store.query_similar(embedding, k=3, tier=tier) + neighbours = store.query_similar(_compute_embedding(), k=3, tier=tier) except (ValueError, IOError) as exc: log.warning( "capture_dedup_query_failed", @@ -402,7 +423,7 @@ def capture_turn( tier=tier, literal_surface=text, aaak_index="", - embedding=embedding, + embedding=_compute_embedding(), community_id=None, centrality=0.0, detail_level=2, From ef9ee31afe4549ad41de1141389d64964d97d657 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 14:59:50 +0200 Subject: [PATCH 12/21] fix(recall): clamp displayed score to [0,1], rank on unclamped sort_score Multiplicative boosts (trigram*2, FTS*3, valence) can push the internal score past 1.0, so a served recall hit could surface a "confidence" > 1, and a degraded daemon state surfaced flat 1.000 scores. Clamp the *displayed* score to [0,1] at serialization while ranking on a new, unclamped MemoryHit.sort_score so ordering is provably unchanged; the stale-downweight keeps sort_score in lock-step with score. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/core/_serializers.py | 11 ++++++++++- src/iai_mcp/pipeline.py | 12 +++++++++++- src/iai_mcp/retrieve.py | 11 ++++++++++- src/iai_mcp/types.py | 5 +++++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/iai_mcp/core/_serializers.py b/src/iai_mcp/core/_serializers.py index 75b46ce..2ecbfa5 100644 --- a/src/iai_mcp/core/_serializers.py +++ b/src/iai_mcp/core/_serializers.py @@ -6,9 +6,18 @@ def _hit_to_json(h) -> dict: _vf = getattr(h, "valid_from", None) _vt = getattr(h, "valid_to", None) + # Clamp the *displayed* score to [0,1]. Multiplicative boosts (trigram*2, + # FTS*3, valence) can drive the internal score past 1.0; that raw value stays + # in `sort_score` for ordering, but the client must never see a "confidence" + # > 1 (or < 0). Ordering is unaffected: this only touches the serialized + # number, never the rank. + try: + _display_score = max(0.0, min(1.0, float(h.score))) + except (TypeError, ValueError): + _display_score = 0.0 return { "record_id": str(h.record_id), - "score": float(h.score), + "score": _display_score, "reason": h.reason, "literal_surface": h.literal_surface, "adjacent_suggestions": [str(x) for x in h.adjacent_suggestions], diff --git a/src/iai_mcp/pipeline.py b/src/iai_mcp/pipeline.py index 7d403f3..aead3eb 100644 --- a/src/iai_mcp/pipeline.py +++ b/src/iai_mcp/pipeline.py @@ -844,6 +844,10 @@ def _recall_core( MemoryHit( record_id=cid, score=float(s), + # Keep the raw, unclamped score as the internal ranking key so + # post-rank re-sorts preserve engine order even when the + # displayed score is later clamped to [0,1] at serialization. + sort_score=float(s), reason=reason, literal_surface=rec.literal_surface, adjacent_suggestions=suggestions, @@ -1140,7 +1144,13 @@ def recall_for_response( ) apply_stale_downweight(core.scored_hits, cue_intent=_cue_intent) apply_stale_downweight(core.anti_hits, cue_intent=_cue_intent) - core.scored_hits.sort(key=lambda h: h.score, reverse=True) + # Rank on the internal unclamped key (falls back to score when a hit was + # built without sort_score), so ordering is preserved across the display + # clamp applied at serialization. + core.scored_hits.sort( + key=lambda h: (h.sort_score if h.sort_score is not None else h.score), + reverse=True, + ) if ( len(core.scored_hits) == 1 diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index 8b46a3c..89fe327 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -127,7 +127,12 @@ def recall( derive_temporal_validity(store, anti_hits) apply_stale_downweight(hits) apply_stale_downweight(anti_hits) - hits.sort(key=lambda h: h.score, reverse=True) + # Rank on the internal unclamped key (falls back to score), so ordering + # survives the display clamp applied at serialization. + hits.sort( + key=lambda h: (h.sort_score if h.sort_score is not None else h.score), + reverse=True, + ) try: from iai_mcp.s4 import on_read_check @@ -384,6 +389,10 @@ def apply_stale_downweight( continue if not getattr(hit, "_stale_downweighted", False): hit.score *= STALE_DOWNWEIGHT_FACTOR + # Keep the internal ranking key in lock-step with the displayed + # score so stale hits demote in the actual ordering too. + if getattr(hit, "sort_score", None) is not None: + hit.sort_score *= STALE_DOWNWEIGHT_FACTOR hit._stale_downweighted = True if not hit.reason.endswith(_STALE_REASON_SUFFIX): hit.reason = f"{hit.reason}{_STALE_REASON_SUFFIX}" diff --git a/src/iai_mcp/types.py b/src/iai_mcp/types.py index af4188f..3c2c7c5 100644 --- a/src/iai_mcp/types.py +++ b/src/iai_mcp/types.py @@ -131,6 +131,11 @@ class MemoryHit: valid_to: datetime | None = None session_id: str | None = None captured_at: str | None = None + # Internal ranking key, unclamped. `score` is the *displayed* value (clamped + # to [0,1] at serialization); `sort_score` preserves the raw engine ordering + # after multiplicative boosts (trigram*2, FTS*3, valence) push it past 1.0. + # When None, callers fall back to `score` (backward compatible). + sort_score: float | None = None @dataclass From 5bd3f4761dc6606438946e67ad68eb2d8de4fc96 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 13:54:47 +0200 Subject: [PATCH 13/21] test(recall): guard displayed score clamp to [0,1] with order preserved (cherry picked from commit 38c632ac233cf0e63293961c1076e1bb18fcded2) --- tests/test_recall_score_clamp.py | 86 ++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 tests/test_recall_score_clamp.py diff --git a/tests/test_recall_score_clamp.py b/tests/test_recall_score_clamp.py new file mode 100644 index 0000000..06d4fa7 --- /dev/null +++ b/tests/test_recall_score_clamp.py @@ -0,0 +1,86 @@ +"""Falsifiable tests for BUG (4a): displayed recall score must be clamped to +[0,1] WITHOUT changing the relative ordering of hits. + +The recall pipeline applies *multiplicative* boosts (trigram*2, FTS*3, +valence*(1+v)) to the base similarity score (pipeline.py:786-789). With no +clamp the serialized `score` can exceed 1.0 -- a leaky internal quantity +surfaced as if it were a probability/confidence. The contract under test: + + 1. _hit_to_json(hit)["score"] is always within [0.0, 1.0]. + 2. The order of hits sorted by their *internal* sort key is preserved even + after the displayed score is clamped (two equally-clamped 1.0 hits must + keep their pre-clamp ranking). + +These tests are hermetic: they build MemoryHit objects directly, no store, +no daemon, no embedder. + +RUN: + cd && .venv/bin/python -m pytest /tmp/iai_fix_recall/test_recall_score_clamp.py -v +""" +from __future__ import annotations + +from iai_mcp.types import MemoryHit +from iai_mcp.core._serializers import _hit_to_json + + +def _hit(score: float, sort_score: float | None = None, rid_int: int = 1) -> MemoryHit: + from uuid import UUID + return MemoryHit( + record_id=UUID(int=rid_int), + score=score, + sort_score=sort_score, + reason="t", + literal_surface="x", + adjacent_suggestions=[], + ) + + +def test_displayed_score_is_clamped_to_unit_interval(): + # A boosted hit (trigram*2 then FTS*3 => 6x a 0.4 cosine = 2.4) must NOT + # leak a >1 score to the client. + h = _hit(score=2.4, sort_score=2.4) + out = _hit_to_json(h) + assert 0.0 <= out["score"] <= 1.0, f"score leaked out of [0,1]: {out['score']}" + + +def test_negative_score_clamped_to_zero(): + h = _hit(score=-0.3, sort_score=-0.3) + out = _hit_to_json(h) + assert out["score"] == 0.0 + + +def test_clamp_preserves_internal_ordering(): + # Two hits both boost past 1.0 (2.4 and 1.8). Displayed scores collapse to + # 1.0/1.0, but the internal sort_score must still distinguish them so the + # ranking the engine computed is preserved. + strong = _hit(score=2.4, sort_score=2.4, rid_int=1) + weaker = _hit(score=1.8, sort_score=1.8, rid_int=2) + mid = _hit(score=0.5, sort_score=0.5, rid_int=3) + + hits = [mid, weaker, strong] + # Sort the way the pipeline does post-rank: by the internal key. + hits.sort(key=lambda h: (h.sort_score if h.sort_score is not None else h.score), + reverse=True) + + order = [h.record_id.int for h in hits] + assert order == [1, 2, 3], f"internal ordering not preserved: {order}" + + # And after serialization the two boosted ones are both clamped but still + # appear in the engine's order (list order, not score value). + serialized = [_hit_to_json(h) for h in hits] + assert serialized[0]["record_id"].endswith("0001") + assert serialized[1]["record_id"].endswith("0002") + assert all(0.0 <= s["score"] <= 1.0 for s in serialized) + # Top two collapsed to the ceiling -- proves ordering can't rely on the + # displayed score alone, which is exactly why sort_score must exist. + assert serialized[0]["score"] == 1.0 + assert serialized[1]["score"] == 1.0 + + +def test_sort_score_falls_back_to_score_when_absent(): + # Backward compat: a hit built the old way (no sort_score) still sorts and + # serializes sanely. + h = _hit(score=0.42, sort_score=None) + key = h.sort_score if h.sort_score is not None else h.score + assert key == 0.42 + assert _hit_to_json(h)["score"] == 0.42 From b355138cf8ef430f7e5ff9a7f52033d5201dcb79 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:00:24 +0200 Subject: [PATCH 14/21] fix(consolidation): build crisis topology on the live graph; demote rich_club The essential-variable tracker and crisis recluster rebuilt their graph over ALL records -- including tombstoned (soft-deleted / deduped) ones -- so communities, centrality and rich_club were computed on thousands of dead nodes. On a real store rich_club sat at ~0.019, just under the 0.02 floor, re-arming crisis_mode every sleep cycle on a healthy store. Add a shared build_live_graph() helper (tombstone-filtered nodes, live-only edges) used by both paths, and demote rich_club_coefficient from a crisis *trigger* to a diagnostic-only signal (edge_density and community_count remain the triggers). Harden the runtime-graph tombstone guard to pd.isna() so a NaT/NA tombstoned_at on a reembedded datetime64 column is read as LIVE instead of collapsing the graph to empty. Co-Authored-By: Claude Opus 4.8 --- .../lilli/cycle/sleep_pipeline/_crisis.py | 48 ++----- .../sleep_pipeline/_essential_variable.py | 59 +++++---- .../lilli/cycle/sleep_pipeline/_live_graph.py | 96 ++++++++++++++ src/iai_mcp/retrieve.py | 17 ++- tests/test_graph_tombstone_hardening.py | 118 ++++++++++++++++++ 5 files changed, 266 insertions(+), 72 deletions(-) create mode 100644 src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py create mode 100644 tests/test_graph_tombstone_hardening.py diff --git a/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py b/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py index d2516cc..e0e82e2 100644 --- a/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py +++ b/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py @@ -66,49 +66,19 @@ def step_crisis_recluster( if not dry_run: tbl = self._store.db.open_table(RECORDS_TABLE) - try: - df2 = tbl.search().to_pandas() - except (OSError, ValueError, RuntimeError, StoreError): - df2 = df - try: from iai_mcp.community import detect_communities - from iai_mcp.graph import MemoryGraph - from iai_mcp.store import EDGES_TABLE + from iai_mcp.lilli.cycle.sleep_pipeline._live_graph import ( + build_live_graph, + ) import uuid as _uuid - g = MemoryGraph() - for _, row in df2.iterrows(): - try: - rid = _uuid.UUID(str(row["id"])) - emb = row.get("embedding") - emb_list = ( - list(emb) if emb is not None else [] - ) - g.add_node(rid, None, emb_list) - except (ValueError, TypeError, AttributeError): - continue - - try: - edges_df = ( - self._store.db.open_table(EDGES_TABLE) - .search() - .to_pandas() - ) - for _, e in edges_df.iterrows(): - try: - src_u = _uuid.UUID(str(e["src"])) - dst_u = _uuid.UUID(str(e["dst"])) - g.add_edge( - src_u, dst_u, - weight=float( - e.get("weight", 1.0) or 1.0 - ), - ) - except (ValueError, TypeError, KeyError): - continue - except (OSError, ValueError, RuntimeError, StoreError) as exc: - logger.debug("crisis_recluster edges query failed: %s", exc) + # Recluster on the LIVE graph only (tombstone-filtered, + # live-only edges) -- mirrors retrieve.py 53f04f9. Previously + # this rebuilt communities over ALL records incl. 3000+ + # tombstoned, collapsing the partition (it reassigned ~9700 + # records into a single community on the real store). + g = build_live_graph(self._store) _assignment = detect_communities( g, prior=None, prior_mode="cold" diff --git a/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py b/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py index fa68875..3dca800 100644 --- a/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py +++ b/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py @@ -63,9 +63,12 @@ def run_essential_variable_tracker_hook(self) -> None: EssentialVariableTracker, TopologySnapshot, ) - from iai_mcp.graph import MemoryGraph from iai_mcp.events import write_event - from iai_mcp.store import RECORDS_TABLE, EDGES_TABLE + from iai_mcp.store import RECORDS_TABLE + from iai_mcp.lilli.cycle.sleep_pipeline._live_graph import ( + _is_tombstoned, + build_live_graph, + ) cfg = _load_sleep_overhaul_config() dry_run = cfg.dry_run @@ -82,50 +85,36 @@ def run_essential_variable_tracker_hook(self) -> None: return import uuid as _uuid - g = MemoryGraph() + # Build the graph on LIVE records + live-only edges only. Previously this + # hook constructed the graph from ALL records (tombstoned included) and ALL + # edges, so rich_club_coefficient was computed on a graph polluted by the + # 3000+ deduped/tombstoned nodes (and phantom nodes re-created by add_edge). + # That pushed rich_club below its floor and re-armed crisis on every sleep + # cycle even on a healthy store. build_live_graph mirrors retrieve.py's fix + # (53f04f9) so the crisis view matches recall's view of the graph. + g = build_live_graph(self._store) community_ids: set = set() _community_embeddings: dict[str, list[list[float]]] = {} for _, row in recs.iterrows(): + if _is_tombstoned(row.get("tombstoned_at")): + continue try: - rid = _uuid.UUID(str(row["id"])) emb = row.get("embedding") emb_list = list(emb) if emb is not None else [] cid_raw = row.get("community_id") - cid_uuid: _uuid.UUID | None if cid_raw is not None: try: - cid_uuid = _uuid.UUID(str(cid_raw)) - _cid_str = str(cid_uuid) + _cid_str = str(_uuid.UUID(str(cid_raw))) community_ids.add(_cid_str) if emb_list: _community_embeddings.setdefault( _cid_str, [] ).append(emb_list) except (ValueError, TypeError): - cid_uuid = None - else: - cid_uuid = None - g.add_node(rid, cid_uuid, emb_list) + pass except (ValueError, TypeError, AttributeError): continue - try: - edges_df = ( - self._store.db.open_table(EDGES_TABLE).search().to_pandas() - ) - for _, e in edges_df.iterrows(): - try: - src_u = _uuid.UUID(str(e["src"])) - dst_u = _uuid.UUID(str(e["dst"])) - g.add_edge( - src_u, dst_u, - weight=float(e.get("weight", 1.0) or 1.0), - ) - except (ValueError, TypeError, KeyError): - continue - except (OSError, ValueError, RuntimeError, StoreError) as exc: - logger.debug("essential_variable_tracker edges query failed: %s", exc) - total_nodes = g.node_count() if total_nodes == 0: return @@ -150,19 +139,28 @@ def run_essential_variable_tracker_hook(self) -> None: tracker = EssentialVariableTracker(cfg) breaches = tracker.check(snapshot) + # rich_club_ratio is kept as a DIAGNOSTIC only, not a crisis trigger. On the + # real corpus the live (tombstone-filtered) rich_club sits ~0.019, just under + # the 0.02 floor, so it false-positives on a demonstrably healthy graph and + # is non-discriminant at this scale (a true collapse instead shows up as + # edge_density/community_count). edge_density and community_count remain the + # crisis triggers (both healthy with clear margin). The rich_club breach is + # still recorded as an event for observability. + _CRISIS_TRIGGER_VARS = {"edge_density", "community_count"} crisis_mode_already_set_this_cycle = False for var_name, breach in breaches.items(): if breach is None: continue + is_trigger = var_name in _CRISIS_TRIGGER_VARS crisis_mode_set = False - if not dry_run and not crisis_mode_already_set_this_cycle: + if is_trigger and not dry_run and not crisis_mode_already_set_this_cycle: self._set_crisis_mode_via_s2_or_fallback( value=True, reason=f"essential_variable_breach:{var_name}", ) crisis_mode_already_set_this_cycle = True crisis_mode_set = True - elif not dry_run and crisis_mode_already_set_this_cycle: + elif is_trigger and not dry_run and crisis_mode_already_set_this_cycle: crisis_mode_set = True write_event( self._store, @@ -174,6 +172,7 @@ def run_essential_variable_tracker_hook(self) -> None: "direction": str(breach.direction), "total_nodes": int(total_nodes), "crisis_mode_set": bool(crisis_mode_set), + "is_crisis_trigger": bool(is_trigger), "dry_run_mode": bool(dry_run), }, severity="warning", diff --git a/src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py b/src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py new file mode 100644 index 0000000..31befbe --- /dev/null +++ b/src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import logging +import uuid as _uuid + +import pandas as pd + +from iai_mcp.exceptions import StoreError +from iai_mcp.graph import MemoryGraph +from iai_mcp.store import EDGES_TABLE, RECORDS_TABLE + +logger = logging.getLogger(__name__) + + +def _is_tombstoned(value) -> bool: + """True if a records.tombstoned_at value marks a (soft-)deleted record. + + NULL is the LIVE case and pandas materialises it differently per dtype: + None (object column), float('nan'), pd.NaT (a datetime64 column -- the shape + the reembed Arrow schema produces) or pd.NA (nullable extension dtypes). + pd.isna() covers all of them, so only a real, non-empty timestamp string + marks a tombstone. Mirrors retrieve.py::_build_runtime_graph_impl so the + crisis hooks compute topology on exactly the same live node set as recall. + """ + if value is None: + return False + try: + if pd.isna(value): + return False + except (TypeError, ValueError): + pass + return bool(str(value).strip()) + + +def build_live_graph(store) -> MemoryGraph: + """Build a MemoryGraph of LIVE records + live-only edges. + + Excludes tombstoned and embedding-pending records (matching + store.active_records_count()), and drops any edge whose endpoint is not a + live node -- graph.add_edge() does setdefault() on both endpoints, so an edge + to a tombstoned record would re-create it as a payload-less node and re-bloat + the graph. That pollution is exactly what drove rich_club below its floor and + re-armed crisis on every sleep cycle. This mirrors the fix in retrieve.py + (53f04f9) so the crisis hooks no longer diverge from recall's view of the + graph. On a store error it returns an empty graph (callers already guard on + node_count == 0). + """ + g = MemoryGraph() + try: + recs = store.db.open_table(RECORDS_TABLE).search().to_pandas() + except (OSError, ValueError, RuntimeError, StoreError) as exc: + logger.debug("build_live_graph records query failed: %s", exc) + return g + if recs.empty: + return g + + for _, row in recs.iterrows(): + try: + if int(row.get("embedding_pending") or 0) != 0: + continue + if _is_tombstoned(row.get("tombstoned_at")): + continue + rid = _uuid.UUID(str(row["id"])) + cid_raw = row.get("community_id") + cid_uuid = None + if cid_raw is not None and not _is_tombstoned(cid_raw) and str(cid_raw).strip(): + try: + cid_uuid = _uuid.UUID(str(cid_raw)) + except (ValueError, TypeError): + cid_uuid = None + emb = row.get("embedding") + emb_list = list(emb) if emb is not None else [] + g.add_node(rid, cid_uuid, emb_list) + except (ValueError, TypeError, AttributeError): + continue + + try: + edges_df = store.db.open_table(EDGES_TABLE).search().to_pandas() + except (OSError, ValueError, RuntimeError, StoreError) as exc: + logger.debug("build_live_graph edges query failed: %s", exc) + return g + for _, e in edges_df.iterrows(): + try: + src_s, dst_s = e["src"], e["dst"] + # Both endpoints must already be live nodes; add_edge() setdefault + # would otherwise resurrect a tombstoned endpoint as a phantom node. + if not g.has_node(src_s) or not g.has_node(dst_s): + continue + g.add_edge( + _uuid.UUID(str(src_s)), + _uuid.UUID(str(dst_s)), + weight=float(e.get("weight", 1.0) or 1.0), + ) + except (ValueError, TypeError, KeyError): + continue + return g diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index 89fe327..0fd3724 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -7,6 +7,8 @@ from itertools import combinations from uuid import UUID, uuid4 +import pandas as pd + from iai_mcp.aaak import enforce_english_raw, generate_aaak_index from iai_mcp.events import query_events, write_event from iai_mcp.store import MemoryStore, flush_record_buffer @@ -607,10 +609,19 @@ def _build_runtime_graph_impl(store: MemoryStore, cached): # or the sigma topology audit, and including them keeps the node count out # of sync with store.active_records_count() (the cache-validity anchor at # line ~552), permanently invalidating the cache -> a full rebuild every - # wake. Matches active_records_count(): tombstoned_at IS NULL. Guard the - # pandas NaN case (NaN != NaN) so an all-live column is not skipped. + # wake. Matches active_records_count(): tombstoned_at IS NULL. + # + # NULL is the LIVE case and pandas materialises it differently per + # dtype: None (object column), float('nan'), pd.NaT (a datetime64 + # column -- the shape the reembed Arrow schema produces), or pd.NA + # (nullable extension dtypes). The old guard only caught float-NaN + # (`isinstance(float) and v != v`); a NaT/NA live value slipped past + # it, stringified to "NaT"/"", read truthy, and the LIVE record + # was wrongly dropped -- collapsing the whole graph to empty on a + # reembedded store. pd.isna() covers None/NaN/NaT/NA uniformly, so + # only a real timestamp string survives to mark a tombstone. _tomb = row.get("tombstoned_at") - if _tomb is not None and not (isinstance(_tomb, float) and _tomb != _tomb) and str(_tomb).strip(): + if _tomb is not None and not pd.isna(_tomb) and str(_tomb).strip(): continue rid = UUID(row["id"]) _comm_raw = row["community_id"] diff --git a/tests/test_graph_tombstone_hardening.py b/tests/test_graph_tombstone_hardening.py new file mode 100644 index 0000000..e86b5ad --- /dev/null +++ b/tests/test_graph_tombstone_hardening.py @@ -0,0 +1,118 @@ +"""Regression guards for the 2026-06-21 tombstone-handling remediation: + +- the runtime-graph node filter runs on the cache-MISS pandas path (cache neutralised); +- a LIVE record on a datetime64/NaT tombstoned_at column is NOT dropped (pd.isna guard); +- an edge live->tombstoned does not resurrect the dead endpoint (has_node guard); +- build_live_graph (crisis hooks) excludes tombstoned, matching active_records_count(). + +Each test fails if its corresponding fix is reverted. +""" +from __future__ import annotations + +import pandas as pd +import pytest + +import iai_mcp.retrieve as retrieve +import iai_mcp.runtime_graph_cache as runtime_graph_cache +from iai_mcp.store._buffers import flush_edge_buffer, flush_record_buffer + +from tests.test_graph_excludes_tombstoned import _make_store, _rec, _tombstone + + +@pytest.fixture +def _no_graph_cache(monkeypatch): + """Force build_runtime_graph to MISS the payload cache so the live pandas + node/edge-skip loop is always exercised (never a cheap cache reload).""" + monkeypatch.setattr(runtime_graph_cache, "try_load", lambda *_a, **_k: None) + + +def test_node_skip_runs_on_cache_miss(_no_graph_cache, tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + live.append(rid) + dead_id, dead = _rec(77) + store.insert(dead) + _tombstone(store, dead_id) + + assert store.active_records_count() == 5 + graph, _a, _rc = retrieve.build_runtime_graph(store) + nodes = {str(n) for n in graph.nodes()} + assert nodes == {str(r) for r in live} + assert str(dead_id) not in nodes + + +def test_live_record_survives_datetime64_nat_column(_no_graph_cache, tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + live.append(rid) + flush_record_buffer(store) + assert store.active_records_count() == 5 + + records_tbl = store.db.open_table("records") + table_cls = type(records_tbl) + original_to_pandas = table_cls.to_pandas + + def _coerce(self, *a, **k): + df = original_to_pandas(self, *a, **k) + if "tombstoned_at" in df.columns: + df = df.copy() + df["tombstoned_at"] = pd.to_datetime(df["tombstoned_at"], utc=True) + return df + + monkeypatch.setattr(table_cls, "to_pandas", _coerce) + coerced = store.db.open_table("records").to_pandas() + assert str(coerced["tombstoned_at"].dtype).startswith("datetime64") + assert all(pd.isna(v) for v in coerced["tombstoned_at"]) + + graph, _a, _rc = retrieve.build_runtime_graph(store) + nodes = {str(n) for n in graph.nodes()} + assert nodes == {str(r) for r in live}, ( + "live records dropped on a datetime64/NaT tombstoned_at column" + ) + + +def test_edge_to_tombstoned_dst_is_skipped(_no_graph_cache, tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + ids = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + ids.append(rid) + flush_record_buffer(store) + + src, dst = ids[0], ids[4] + store.boost_edges([(src, dst)], delta=1.0, edge_type="hebbian") + flush_edge_buffer(store) + _tombstone(store, dst) + assert store.active_records_count() == 4 + + graph, _a, _rc = retrieve.build_runtime_graph(store) + nodes = {str(n) for n in graph.nodes()} + assert str(dst) not in nodes, "tombstoned edge endpoint leaked back as a node" + assert len(nodes) == 4 + + +def test_build_live_graph_excludes_tombstoned(tmp_path, monkeypatch): + from iai_mcp.lilli.cycle.sleep_pipeline._live_graph import build_live_graph + + store = _make_store(tmp_path, monkeypatch) + live = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + live.append(rid) + dead_id, dead = _rec(88) + store.insert(dead) + _tombstone(store, dead_id) + flush_record_buffer(store) + + g = build_live_graph(store) + nodes = {str(n) for n in g.nodes()} + assert str(dead_id) not in nodes + assert g.node_count() == store.active_records_count() == 5 From 92d673702f274eb4a752fc921d24363f8a350228 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:00:24 +0200 Subject: [PATCH 15/21] fix(dmn): embed the reflection surface text instead of a zero placeholder The daily-reflection step wrote an all-zero embedding placeholder that was never re-embedded, leaving reflections permanently unretrievable and feeding zero vectors into the scoring matmul. Embed literal_surface directly; on a native embedder failure fall back to a zero vector flagged embedding_pending=1 for deferred reembed. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/dmn_reflection.py | 17 ++++++++++++++++- tests/test_dmn_meta.py | 21 +++++++++++++++++---- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/iai_mcp/dmn_reflection.py b/src/iai_mcp/dmn_reflection.py index be14b88..85d7e1e 100644 --- a/src/iai_mcp/dmn_reflection.py +++ b/src/iai_mcp/dmn_reflection.py @@ -94,8 +94,22 @@ def synthesize(self, store, window_hours: int) -> MemoryRecord: "ts": now.isoformat(), } + # Embed the reflection's literal_surface so the record is retrievable by + # vector/cosine recall. Hardcoding a zero vector (the old behaviour) made + # every daily-reflection semantic record permanently invisible to recall + # and fed zero-norm vectors into the scoring matmul (divide-by-zero + # warnings). If the native embed fails, fall back to a zero vector flagged + # embedding_pending=1 so the daemon's reembed-pending path fills it later + # (never silently leave an unretrievable zero record). embed_dim = int(store.embed_dim) - embedding = [0.0] * embed_dim + embedding_pending = 0 + try: + from iai_mcp.embed import embedder_for_store + + embedding = list(embedder_for_store(store).embed(literal_surface)) + except Exception: # noqa: BLE001 -- degrade to deferred reembed, never zero-and-forget + embedding = [0.0] * embed_dim + embedding_pending = 1 return MemoryRecord( id=uuid4(), @@ -103,6 +117,7 @@ def synthesize(self, store, window_hours: int) -> MemoryRecord: literal_surface=literal_surface, aaak_index="", embedding=embedding, + embedding_pending=embedding_pending, community_id=None, centrality=0.5, detail_level=1, diff --git a/tests/test_dmn_meta.py b/tests/test_dmn_meta.py index 8d2dbda..bde938f 100644 --- a/tests/test_dmn_meta.py +++ b/tests/test_dmn_meta.py @@ -142,10 +142,23 @@ def test_reflection_synthesize_returns_semantic_record( assert isinstance(prov.get("recalled_count"), int) assert len(synth.embedding) == store._embed_dim - assert all(v == 0.0 for v in synth.embedding), ( - "synthesised record must carry the zero-vector placeholder; " - "next REM consolidation re-embeds" - ) + # The reflection record must carry a REAL embedding of its own + # literal_surface, not a zero placeholder. The old zero-vector placeholder + # relied on "next REM consolidation re-embeds", but nothing re-embedded an + # embedding_pending=0 record, so every daily-reflection record stayed + # permanently zero-norm and invisible to vector recall (and fed zero vectors + # into the scoring matmul). dmn_reflection now embeds at write time; on embed + # failure it falls back to a zero vector flagged embedding_pending=1. + import math as _math + _norm = _math.sqrt(sum(v * v for v in synth.embedding)) + if synth.embedding_pending: + # Degraded fallback (embedder unavailable): zero + flagged for reembed. + assert all(v == 0.0 for v in synth.embedding) + else: + assert _norm > 0.5, ( + f"reflection embedding must be a real (non-zero) vector; " + f"got norm={_norm}" + ) def test_meta_analyst_snapshot_counts_correct(tmp_path: Path) -> None: store = _make_store(tmp_path) From 56532afd32e38d0aff20fe2d9b1094ec277baaf8 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:00:24 +0200 Subject: [PATCH 16/21] fix(sigma): bound the small-worldness compute with a node-count ceiling compute_sigma runs an unbounded random-graph reference that can spin a core on a large graph. Add SIGMA_N_CEIL (default 20000, env IAI_MCP_SIGMA_N_FLOOR / IAI_MCP_SIGMA_N_CEIL) and return None above it instead of computing. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/sigma.py | 38 +++++++++++++++++++++++++++++++++++++- tests/test_sigma.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/src/iai_mcp/sigma.py b/src/iai_mcp/sigma.py index 2b53c28..f868607 100644 --- a/src/iai_mcp/sigma.py +++ b/src/iai_mcp/sigma.py @@ -2,6 +2,7 @@ import logging import math +import os from datetime import datetime, timezone from typing import Optional, TYPE_CHECKING @@ -19,6 +20,29 @@ SIGMA_N_FLOOR: int = 200 +# Defensive upper bound on graph size for the sigma (small-worldness) audit. +# fast_sigma() runs average_clustering + all-pairs-shortest-path on the largest +# component AND on n_random reference graphs of the same order; that cost is +# roughly O(n_random * (V*E + V^2)) and is unbounded as the live graph grows. +# Above the cap we return None: the regime degrades cleanly to "insufficient_data" +# (sigma None is already handled everywhere) and the tick stays bounded -- the +# watchdog does NOT kill on CPU, so an unbounded sigma compute would spin a core +# uncontained. The default is far above the current live store (~4k nodes) so it +# never trips today; both bounds are env-overridable. +SIGMA_N_CEIL: int = 20000 + + +def _env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None: + return default + try: + val = int(raw) + except (TypeError, ValueError): + return default + return val if val > 0 else default + + SIGMA_MID_LIFE_THRESHOLD: int = 500 SIGMA_OBSERVATION_KIND: str = "sigma_observation" @@ -194,7 +218,19 @@ def fast_sigma( def compute_sigma(graph: "MemoryGraph", *, seed: int = 42) -> Optional[float]: - if graph.node_count() < SIGMA_N_FLOOR: + n = graph.node_count() + floor = _env_int("IAI_MCP_SIGMA_N_FLOOR", SIGMA_N_FLOOR) + ceil = _env_int("IAI_MCP_SIGMA_N_CEIL", SIGMA_N_CEIL) + if n < floor: + return None + if n > ceil: + # Above the defensive cap: skip the (unbounded) small-worldness compute + # so a single tick can never spin a core for minutes on a pathologically + # large graph. Reported downstream as "insufficient_data". + logger.warning( + "sigma_skipped_above_ceiling", + extra={"node_count": int(n), "ceil": int(ceil)}, + ) return None sigma_val, *_ = fast_sigma(graph, seed=seed) if isinstance(sigma_val, float) and math.isnan(sigma_val): diff --git a/tests/test_sigma.py b/tests/test_sigma.py index 67a5ca5..0f77133 100644 --- a/tests/test_sigma.py +++ b/tests/test_sigma.py @@ -101,3 +101,36 @@ def test_sigma_module_does_not_call_nx_sigma(): assert needle not in text, ( f"sigma.py must NOT call {needle} -- use fast_sigma" ) + + +def test_sigma_ceiling_constant_above_floor(): + # Defensive cap added in the 2026-06-21 remediation: bound the unbounded + # small-worldness compute on a pathologically large graph. + from iai_mcp import sigma + + assert isinstance(sigma.SIGMA_N_CEIL, int) + assert sigma.SIGMA_N_CEIL > sigma.SIGMA_N_FLOOR + + +def test_compute_sigma_returns_none_above_ceiling(monkeypatch): + # Above SIGMA_N_CEIL, compute_sigma skips the compute and returns None + # (regime -> insufficient_data). WITHOUT the cap this 300-node small-world + # graph yields a finite float, so the test fails -> the guard is load-bearing. + from iai_mcp import sigma + from iai_mcp.sigma import classify_regime, compute_sigma + + monkeypatch.setattr(sigma, "SIGMA_N_CEIL", 100) + g = nx.connected_watts_strogatz_graph(300, 6, 0.1, seed=42) + mg = _nx_graph_to_memory_graph(g) + assert mg.node_count() == 300 > sigma.SIGMA_N_CEIL + assert compute_sigma(mg) is None + assert classify_regime(mg.node_count(), compute_sigma(mg)) == "insufficient_data" + + +def test_compute_sigma_ceiling_env_override(monkeypatch): + from iai_mcp.sigma import compute_sigma + + monkeypatch.setenv("IAI_MCP_SIGMA_N_CEIL", "100") + g = nx.connected_watts_strogatz_graph(300, 6, 0.1, seed=42) + mg = _nx_graph_to_memory_graph(g) + assert compute_sigma(mg) is None From 7285233333b033087997c2fd47c238033b893585 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:00:42 +0200 Subject: [PATCH 17/21] fix(watchdog): monitor a retried-but-wedged sleep cycle for staleness The sleep-cycle staleness check only monitored attempt == 1, so a cycle that had already been retried (attempt >= 2) and was still wedged -- exactly the case the watchdog must catch -- was short-circuited and ignored. Gate on attempt < 1 instead so every genuine running attempt is monitored, excluding bool (isinstance(True, int) is True). Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/daemon/_watchdog.py | 6 +++++- tests/test_daemon_watchdog_sleep_stale.py | 17 ++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/iai_mcp/daemon/_watchdog.py b/src/iai_mcp/daemon/_watchdog.py index 33d7fc3..b40409f 100644 --- a/src/iai_mcp/daemon/_watchdog.py +++ b/src/iai_mcp/daemon/_watchdog.py @@ -545,7 +545,11 @@ def _check_sleep_cycle_staleness( if not isinstance(progress, dict): return (False, {}) attempt = progress.get("attempt") - if not isinstance(attempt, int) or attempt != 1: + # A retried-but-still-wedged cycle (attempt >= 2) is exactly the case the + # watchdog must catch, not ignore. Gate on attempt < 1 so any genuine + # running attempt is monitored; only attempt 0 / negative / non-int (and + # bool, since isinstance(True, int) is True) short-circuits. + if not isinstance(attempt, int) or isinstance(attempt, bool) or attempt < 1: return (False, {}) started_at_raw = progress.get("started_at") if not isinstance(started_at_raw, str) or not started_at_raw: diff --git a/tests/test_daemon_watchdog_sleep_stale.py b/tests/test_daemon_watchdog_sleep_stale.py index 0bd6196..ab944bb 100644 --- a/tests/test_daemon_watchdog_sleep_stale.py +++ b/tests/test_daemon_watchdog_sleep_stale.py @@ -99,13 +99,24 @@ def test_sleep_just_over_threshold_is_stale(self): assert ctx["attempt"] == 1 assert ctx["crisis_mode"] is False - def test_attempt_gt_1_is_not_stale(self): - # attempt > 1 means the cycle has retried — different failure shape, - # outside the scope of this predicate. + def test_attempt_gt_1_still_stale(self): + # A retried-but-still-wedged cycle (attempt >= 2) is exactly the case the + # watchdog must catch: a retry that itself hangs for hours. The gate is + # `attempt < 1`, so attempt 2 (10 days stuck) MUST be flagged stale. state = _state( LifecycleState.SLEEP.value, _progress(NOW - timedelta(days=10), attempt=2), ) + is_stale, ctx = daemon._check_sleep_cycle_staleness(state, NOW) + assert is_stale is True + assert ctx["attempt"] == 2 + + def test_attempt_zero_is_not_stale(self): + # attempt 0 (or negative / non-int) is not a genuine running attempt. + state = _state( + LifecycleState.SLEEP.value, + _progress(NOW - timedelta(days=10), attempt=0), + ) assert daemon._check_sleep_cycle_staleness(state, NOW)[0] is False def test_live_wedge_shape_matches_stale(self): From 1a77018ef677ca729ee7ee677cc27755c0643203 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:00:42 +0200 Subject: [PATCH 18/21] feat(daemon): surface a repeated store-empty count failure as telemetry e8f3deb stopped a transient count failure from parking the lifecycle tick but left the condition invisible (log.debug only). Emit a best-effort store_empty_check_failed warning event -- buffered and never raising, so it is safe even when the store connection is the thing failing -- so a sqlite left-in-error-state failure surfaces to the operator. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/daemon/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index 76efa0e..a3e1de9 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -28,6 +28,7 @@ DAEMON_SLEEP_CYCLE_STALE, DAEMON_WATCHDOG_NEEDS_OPERATOR, DAEMON_WEDGE_KILL, + emit_best_effort, write_event, ) from iai_mcp.identity_audit import continuous_audit @@ -282,6 +283,20 @@ def _store_is_empty(store: MemoryStore) -> bool: # the unknown case as NOT empty so the tick proceeds; a truly empty store # just does a little harmless no-op work. log.debug("store empty check failed, assuming NOT empty: %s", exc) + # e8f3deb fixed the *behavior* (don't park the tick) but left the + # condition invisible. A recurring count failure (sqlite left in an error + # state by a heavy reader) should surface to the operator, not just + # log.debug. emit_best_effort is buffered and never raises, so it is safe + # even when the store connection is the thing failing. + try: + emit_best_effort( + store, + "store_empty_check_failed", + {"error": str(exc), "error_type": type(exc).__name__}, + severity="warning", + ) + except Exception: # noqa: BLE001 -- telemetry must never break the tick + pass return False From d74c8c239938ad95132870651e98460b46a370a2 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 14:08:21 +0200 Subject: [PATCH 19/21] fix(hippo): decrypt literal_surface before reembedding pending rows reembed_pending_rows fed the raw stored literal_surface to embedder.embed(); on an encrypted store that is iai:enc:v1: ciphertext, so every embedding_pending=1 row re-embedded by this path got an embedding of the ciphertext (garbage). Decrypt via _decrypt_record_field first (no-op on a plaintext store); a decrypt failure leaves the row pending for retry rather than poisoning it with a garbage vector. --- src/iai_mcp/hippo/_db.py | 9 ++ tests/test_reembed_pending_crypto.py | 159 +++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 tests/test_reembed_pending_crypto.py diff --git a/src/iai_mcp/hippo/_db.py b/src/iai_mcp/hippo/_db.py index a6e6218..7abfd0b 100644 --- a/src/iai_mcp/hippo/_db.py +++ b/src/iai_mcp/hippo/_db.py @@ -616,6 +616,15 @@ def reembed_pending_rows(self, embedder: Any) -> int: for row in rows: rid = row["id"] surface = row["literal_surface"] or "" + # On an encrypted store literal_surface is iai:enc:v1: ciphertext; embedding + # the ciphertext would produce a garbage vector. Decrypt first (no-op on a + # plaintext store or a value that isn't encrypted). A decrypt failure leaves + # the row embedding_pending=1 so it is retried rather than poisoned. + try: + surface = self._decrypt_record_field(rid, "literal_surface", surface) + except Exception as exc: # noqa: BLE001 + _log.warning("reembed_pending_rows: decrypt failed for id=%s: %s", rid, exc) + continue try: vec = list(embedder.embed(surface)) except Exception as exc: # noqa: BLE001 diff --git a/tests/test_reembed_pending_crypto.py b/tests/test_reembed_pending_crypto.py new file mode 100644 index 0000000..2495a8c --- /dev/null +++ b/tests/test_reembed_pending_crypto.py @@ -0,0 +1,159 @@ +"""Regression: reembed_pending_rows must embed the PLAINTEXT literal_surface, +not the iai:enc:v1: ciphertext, on an encrypted store. + +Pre-existing bug (found 2026-06-21): HippoDB.reembed_pending_rows read +literal_surface straight from SQLite and fed it to embedder.embed() without +decrypting. On an encrypted deployment that meant every embedding_pending=1 row +re-embedded by this path got an embedding of the ciphertext = garbage vector. +""" +from __future__ import annotations + +import hashlib +import os +import sqlite3 +import struct +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np +import pytest + +from iai_mcp.hippo import HippoDB +from iai_mcp.types import EMBED_DIM + + +def _vec_from_text(text: str) -> list[float]: + """Deterministic, text-sensitive unit vector. embed(plaintext) and + embed(ciphertext) therefore differ, which is what makes the test meaningful.""" + seed = int.from_bytes(hashlib.sha256(text.encode("utf-8")).digest()[:8], "little") + rng = np.random.default_rng(seed) + v = rng.standard_normal(EMBED_DIM).astype(np.float32) + v /= np.linalg.norm(v) + 1e-10 + return v.tolist() + + +class _RecordingEmbedder: + def __init__(self) -> None: + self.seen: list[str] = [] + + def embed(self, text: str) -> list[float]: + self.seen.append(text) + return _vec_from_text(text) + + +def _brain_path(tmp_path: Path) -> Path: + return tmp_path / "hippo" / "brain.sqlite3" + + +def _insert_pending(db: HippoDB, rid: str, surface: str) -> None: + now = datetime.now(timezone.utc).isoformat() + db.insert_pending_row( + record_id=rid, + tier="episodic", + literal_surface=surface, + tags_json="[]", + provenance_json="{}", + created_at=now, + updated_at=now, + ) + + +def _read_row(db_path: Path, rid: str) -> tuple[list[float], int]: + conn = sqlite3.connect(str(db_path)) + try: + row = conn.execute( + "SELECT embedding, embedding_pending FROM records WHERE id = ?", (rid,) + ).fetchone() + finally: + conn.close() + assert row is not None + blob, pending = row[0], row[1] + n = len(blob) // 4 + return list(struct.unpack(f"<{n}f", blob)), int(pending) + + +def _set_literal_surface(db_path: Path, rid: str, value: str) -> None: + conn = sqlite3.connect(str(db_path)) + try: + conn.execute("UPDATE records SET literal_surface = ? WHERE id = ?", (value, rid)) + conn.commit() + finally: + conn.close() + + +def test_reembed_pending_decrypts_before_embedding(tmp_path: Path) -> None: + key = os.urandom(32) + db = HippoDB(tmp_path, crypto_key_provider=lambda: key) + try: + plaintext = "a secret pending memory worth embedding" + rid = str(uuid4()) + _insert_pending(db, rid, plaintext) + + # Simulate encrypted-at-rest: replace the stored surface with ciphertext + # carrying the canonical AAD (uuid.lower()). + ciphertext = db._encrypt_for_uuid(rid, plaintext) + assert ciphertext.startswith("iai:enc:v1:") + _set_literal_surface(_brain_path(tmp_path), rid, ciphertext) + + emb = _RecordingEmbedder() + n = db.reembed_pending_rows(emb) + + assert n == 1 + # The embedder was handed PLAINTEXT, never the ciphertext. + assert emb.seen == [plaintext] + assert not emb.seen[0].startswith("iai:enc:v1:") + + stored_vec, pending = _read_row(_brain_path(tmp_path), rid) + assert pending == 0 + # Stored vector matches embed(plaintext)... + assert stored_vec == pytest.approx(_vec_from_text(plaintext), abs=1e-6) + # ...and is NOT embed(ciphertext) (the old, buggy result). + assert stored_vec != pytest.approx(_vec_from_text(ciphertext), abs=1e-6) + finally: + db.close() + + +def test_reembed_pending_plaintext_store_still_works(tmp_path: Path) -> None: + """No crypto provider: decrypt is a no-op and the path behaves as before.""" + db = HippoDB(tmp_path) + try: + plaintext = "a plain pending memory" + rid = str(uuid4()) + _insert_pending(db, rid, plaintext) + + emb = _RecordingEmbedder() + n = db.reembed_pending_rows(emb) + + assert n == 1 + assert emb.seen == [plaintext] + stored_vec, pending = _read_row(_brain_path(tmp_path), rid) + assert pending == 0 + assert stored_vec == pytest.approx(_vec_from_text(plaintext), abs=1e-6) + finally: + db.close() + + +def test_reembed_pending_undecryptable_row_stays_pending(tmp_path: Path) -> None: + """A row whose ciphertext can't be decrypted (wrong AAD) must not be embedded + with garbage and must remain embedding_pending=1 for a later retry.""" + key = os.urandom(32) + db = HippoDB(tmp_path, crypto_key_provider=lambda: key) + try: + plaintext = "undecryptable pending memory" + rid = str(uuid4()) + _insert_pending(db, rid, plaintext) + + # Encrypt under a DIFFERENT record id's AAD so decrypt fails for `rid`. + wrong_ciphertext = db._encrypt_for_uuid(str(uuid4()), plaintext) + _set_literal_surface(_brain_path(tmp_path), rid, wrong_ciphertext) + + emb = _RecordingEmbedder() + n = db.reembed_pending_rows(emb) + + assert n == 0 + assert emb.seen == [] # never reached embed() + _, pending = _read_row(_brain_path(tmp_path), rid) + assert pending == 1 # left for retry, not poisoned + finally: + db.close() From 642a8e8a3a8f58fcd20bf5291a43ea23661bceb0 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 14:16:26 +0200 Subject: [PATCH 20/21] fix(daemon): recover cleanly from a crash mid-SLEEP at boot A daemon killed mid-SLEEP leaves lifecycle_state.json at current_state=SLEEP with sleep_cycle_progress=None (incoherent: a real in-flight cycle carries a progress dict). Resuming it wedged the daemon -- it never advanced the sleep pipeline, never reached the recluster that clears crisis, and recall stayed degraded (SLEEP + crisis both degrade recall). Normalize that one case to a clean WAKE at boot (dropping the stale crisis flag) via _normalize_boot_lifecycle_state; a real degeneration re-arms crisis on the next complete sleep cycle. --- src/iai_mcp/daemon/__init__.py | 56 +++++++++++++++++++++++++++++ tests/test_daemon_boot_normalize.py | 52 +++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 tests/test_daemon_boot_normalize.py diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index a3e1de9..027c2b1 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -300,6 +300,30 @@ def _store_is_empty(store: MemoryStore) -> bool: return False +def _normalize_boot_lifecycle_state(raw: dict) -> tuple[dict, bool]: + """Repair a crash-left incoherent lifecycle state at boot. + + A daemon killed mid-SLEEP can leave lifecycle_state.json at + current_state=SLEEP with sleep_cycle_progress=None -- incoherent, because a + real in-flight sleep cycle always carries a progress dict. Resuming it wedges + the daemon (it never advances the pipeline, never reaches the recluster that + clears crisis, and recall stays degraded). Reset that one case to a clean + WAKE and drop the stale crisis flag. A genuine degeneration re-arms crisis on + the next complete sleep cycle. Returns (state, changed). + """ + if ( + isinstance(raw, dict) + and raw.get("current_state") == "SLEEP" + and raw.get("sleep_cycle_progress") is None + ): + out = dict(raw) + out["current_state"] = "WAKE" + out["crisis_mode"] = False + out["crisis_mode_since_ts"] = None + return out, True + return raw, False + + def _is_inside_window( window: tuple[int, int] | list | None, now: datetime, @@ -1090,6 +1114,38 @@ async def _drain_and_report() -> None: _sleep_pipeline = _SleepPipeline(store=store) from pathlib import Path as _PathS2 + # Boot normalization for a crash mid-SLEEP: lifecycle_state.json can be + # left at current_state=SLEEP with sleep_cycle_progress=None -- an + # incoherent state (a real in-flight cycle always carries a progress + # dict). Resuming it wedges the daemon: it never advances the sleep + # pipeline, never reaches the recluster that clears crisis, and recall + # stays degraded (SLEEP + crisis both degrade recall). Reset that one + # case to a clean WAKE (and drop the stale crisis flag set before the + # crash) so the daemon serves immediately; a genuine degeneration will + # simply re-arm crisis on the next complete sleep cycle. + try: + import json as _json_lc + _lc_path = _PathS2.home() / ".iai-mcp" / "lifecycle_state.json" + _lc_raw = _json_lc.loads(_lc_path.read_text()) + _lc_norm, _lc_changed = _normalize_boot_lifecycle_state(_lc_raw) + if _lc_changed: + _lc_path.write_text(_json_lc.dumps(_lc_norm, indent=2)) + log.warning( + "lifecycle_boot_normalized: stale SLEEP without " + "sleep_cycle_progress -> WAKE (crisis cleared)" + ) + try: + emit_best_effort( + store, + "lifecycle_boot_normalized", + {"from_state": "SLEEP", "to_state": "WAKE", + "reason": "sleep_without_progress"}, + severity="warning", + ) + except Exception: # noqa: BLE001 -- telemetry must not block boot + pass + except (OSError, ValueError) as _lc_exc: + log.debug("lifecycle boot normalization skipped: %s", _lc_exc) _s2_config = _load_s2_config() _s2_coord = S2Coordinator( store=store, diff --git a/tests/test_daemon_boot_normalize.py b/tests/test_daemon_boot_normalize.py new file mode 100644 index 0000000..a445f28 --- /dev/null +++ b/tests/test_daemon_boot_normalize.py @@ -0,0 +1,52 @@ +"""Boot recovery: a crash mid-SLEEP leaves lifecycle_state.json incoherent +(current_state=SLEEP, sleep_cycle_progress=None). The daemon must reset that one +case to a clean WAKE at boot (and drop the stale crisis flag) instead of resuming +a sleep cycle that never progresses. Guards _normalize_boot_lifecycle_state. +""" +from __future__ import annotations + +from iai_mcp.daemon import _normalize_boot_lifecycle_state + + +def test_stale_sleep_without_progress_is_reset_to_wake(): + raw = { + "current_state": "SLEEP", + "sleep_cycle_progress": None, + "crisis_mode": True, + "crisis_mode_since_ts": "2026-06-21T11:20:25+00:00", + "since_ts": "2026-06-21T10:47:08+00:00", + } + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is True + assert out["current_state"] == "WAKE" + assert out["crisis_mode"] is False + assert out["crisis_mode_since_ts"] is None + # other fields preserved + assert out["since_ts"] == raw["since_ts"] + # input not mutated + assert raw["current_state"] == "SLEEP" + + +def test_sleep_with_active_progress_is_left_untouched(): + # A real in-flight cycle carries a progress dict -> must NOT be reset. + raw = { + "current_state": "SLEEP", + "sleep_cycle_progress": {"step": "CLUSTER_SUMMARY", "attempt": 1}, + "crisis_mode": False, + } + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is False + assert out is raw + + +def test_wake_state_is_left_untouched(): + raw = {"current_state": "WAKE", "sleep_cycle_progress": None, "crisis_mode": True} + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is False + assert out is raw + + +def test_drowsy_state_is_left_untouched(): + raw = {"current_state": "DROWSY", "sleep_cycle_progress": None} + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is False From 6c924422650c413793137b38a7a90b5152db0db2 Mon Sep 17 00:00:00 2001 From: Marsu6996 <190973639+Marsu6996@users.noreply.github.com> Date: Sun, 21 Jun 2026 22:05:55 +0200 Subject: [PATCH 21/21] fix(pipeline): deterministic tie-break on final hit ranking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The final recall ranking sorted hits by score alone with a stable sort, so equal-scoring hits kept their arrival order. Two code paths that compute the same logical score via different float summation orders (notably an empty profile_state falling back to the medium scale) could therefore emit byte-different orderings, flaking test_empty_profile_state_falls_back_to_medium_scale on CI. Tie-break on str(record_id) — the same idiom already used elsewhere in this module — so equal-scoring hits resolve deterministically. Behaviour for distinctly-scored hits is unchanged. Co-Authored-By: Claude Opus 4.8 --- src/iai_mcp/pipeline.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/iai_mcp/pipeline.py b/src/iai_mcp/pipeline.py index aead3eb..cbaf185 100644 --- a/src/iai_mcp/pipeline.py +++ b/src/iai_mcp/pipeline.py @@ -1146,10 +1146,15 @@ def recall_for_response( apply_stale_downweight(core.anti_hits, cue_intent=_cue_intent) # Rank on the internal unclamped key (falls back to score when a hit was # built without sort_score), so ordering is preserved across the display - # clamp applied at serialization. + # clamp applied at serialization. Tie-break on record_id so equal-scoring + # hits resolve deterministically — two code paths that produce the same + # score via different summation orders (e.g. empty profile_state falling + # back to the medium scale) must yield byte-identical orderings. core.scored_hits.sort( - key=lambda h: (h.sort_score if h.sort_score is not None else h.score), - reverse=True, + key=lambda h: ( + -(h.sort_score if h.sort_score is not None else h.score), + str(h.record_id), + ), ) if (