diff --git a/src/iai_mcp/capture.py b/src/iai_mcp/capture.py index d95ad4f..e2b2a31 100644 --- a/src/iai_mcp/capture.py +++ b/src/iai_mcp/capture.py @@ -1,6 +1,7 @@ from __future__ import annotations +import contextlib import hashlib import json import logging @@ -41,6 +42,46 @@ # two threads before either has inserted it. _CAPTURE_DEDUP_LOCK = threading.Lock() +# Drain-in-progress accounting for the daemon's lifecycle idle countdown. +# +# The wrapper heartbeat file is only ONE signal that the daemon is busy. A +# deferred-capture drain (triggered by real RPC traffic, kicked off at the +# drowsy edge, or run on startup) can keep writing to the store long after the +# heartbeat file has gone stale. If the idle countdown only watched the +# heartbeat, it would force the FSM toward SLEEP -- which escalates to an +# EXCLUSIVE store lock -- while these drain threads are still hammering the +# store, causing lock contention. The daemon consults `is_drain_in_progress` +# so an in-flight drain holds the idle countdown open. A depth counter (rather +# than a boolean) keeps this correct under concurrent/overlapping drains across +# the several threads that asyncio.to_thread dispatch spins up. +_DRAIN_IN_PROGRESS_LOCK = threading.Lock() +_drain_in_progress_depth = 0 + + +@contextlib.contextmanager +def _drain_in_progress_guard(): + """Mark a deferred-capture drain as in flight for its whole duration.""" + global _drain_in_progress_depth + with _DRAIN_IN_PROGRESS_LOCK: + _drain_in_progress_depth += 1 + try: + yield + finally: + with _DRAIN_IN_PROGRESS_LOCK: + _drain_in_progress_depth -= 1 + + +def is_drain_in_progress() -> bool: + """True while at least one deferred-capture drain is running. + + The daemon's lifecycle idle countdown reads this so it does NOT advance the + FSM toward SLEEP (and the EXCLUSIVE store lock it takes) while drain threads + are still writing to the store. + """ + with _DRAIN_IN_PROGRESS_LOCK: + return _drain_in_progress_depth > 0 + + FAILED_MAX_ATTEMPTS: int = 3 FAILED_BACKOFF_BASE_SEC: float = 60.0 @@ -711,6 +752,17 @@ def write_deferred_captures( def drain_deferred_captures(store: MemoryStore) -> dict[str, int]: + """Drain crashed/orphaned deferred-capture files into the store. + + Marks itself in-progress for the daemon idle countdown so the FSM does not + advance toward SLEEP (EXCLUSIVE store lock) mid-drain. See + `is_drain_in_progress`. + """ + with _drain_in_progress_guard(): + return _drain_deferred_captures_impl(store) + + +def _drain_deferred_captures_impl(store: MemoryStore) -> dict[str, int]: deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures" log_dir = Path.home() / ".iai-mcp" / "logs" log_dir.mkdir(parents=True, exist_ok=True) @@ -1139,6 +1191,23 @@ def drain_active_live_captures( store: MemoryStore, *, exclude_session_id: str, +) -> dict[str, int]: + """Drain live-session capture files into the store. + + Marked in-progress for the daemon idle countdown, like + `drain_deferred_captures` -- both write to the store and must hold the FSM + out of SLEEP while running. See `is_drain_in_progress`. + """ + with _drain_in_progress_guard(): + return _drain_active_live_captures_impl( + store, exclude_session_id=exclude_session_id, + ) + + +def _drain_active_live_captures_impl( + store: MemoryStore, + *, + exclude_session_id: str, ) -> dict[str, int]: deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures" state_dir = Path.home() / ".iai-mcp" / ".capture-state" diff --git a/src/iai_mcp/cli/_daemon.py b/src/iai_mcp/cli/_daemon.py index f9de4d6..609eca5 100644 --- a/src/iai_mcp/cli/_daemon.py +++ b/src/iai_mcp/cli/_daemon.py @@ -29,6 +29,22 @@ def _stop_escalation_bound() -> float: return STOP_TERM_TIMEOUT_S +def _signal_daemon_wake() -> None: + """Create the wake signal before a kickstart so the booting daemon WAKEs. + + Without it the daemon boots, re-reads its persisted HIBERNATION state and + hibernate-exits within a tick, closing the socket before it ever serves + recall. Best-effort: a failure here just falls back to the old behaviour. + """ + try: + from iai_mcp.wake_handler import WakeHandler + + root = os.environ.get("IAI_MCP_STORE") or os.path.expanduser("~/.iai-mcp") + WakeHandler(Path(root) / "wake.signal").signal_wake() + except Exception: # noqa: BLE001 -- never let the wake signal break daemon start + pass + + def _stop_poll_interval() -> float: raw = os.environ.get("IAI_DAEMON_STOP_POLL_S") if raw: @@ -158,6 +174,7 @@ def cmd_daemon_install(args: argparse.Namespace) -> int: f"{result.stderr.strip()}", file=sys.stderr, ) + _signal_daemon_wake() _cli.subprocess.run( ["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"], check=False, capture_output=True, @@ -255,6 +272,7 @@ def cmd_daemon_start(args: argparse.Namespace) -> int: ["launchctl", "bootstrap", f"gui/{uid}", str(target)], check=False, capture_output=True, ) + _signal_daemon_wake() _cli.subprocess.run( ["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"], check=False, capture_output=True, diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index 51a7cff..76efa0e 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -145,6 +145,52 @@ def _should_drain_on_drowsy_edge(prev, current) -> bool: return prev is _L.WAKE and current is _L.DROWSY +# Idle-countdown decisions. The lifecycle tick translates these into FSM events. +IDLE_DECISION_ACTIVE: str = "active" # real work in flight -> reset the countdown +IDLE_DECISION_SLEEP: str = "sleep" # idle past the sleep threshold -> IDLE_30MIN +IDLE_DECISION_DROWSY: str = "drowsy" # idle past the drowsy threshold -> IDLE_5MIN +IDLE_DECISION_HOLD: str = "hold" # within the drowsy window -> no transition + + +def _idle_countdown_decision( + *, + scanner_active: bool, + drain_in_progress: bool, + seconds_since_rpc: float, + idle_elapsed: float, + sleep_eligible: bool, + recent_rpc_window_sec: float, + drowsy_after_sec: float, + sleep_heartbeat_idle_sec: float, +) -> str: + """Decide what the lifecycle idle countdown should do this tick. + + The wrapper heartbeat (``scanner_active``) is only ONE signal that the + daemon is busy. Two others MUST also hold the daemon awake: + + * ``drain_in_progress`` -- a deferred-capture drain is writing to the store + right now. The wrappers dir can be empty (heartbeat stale) while a drain + kicked off by earlier RPC traffic is still hammering the store. + * ``seconds_since_rpc`` -- real JSON-RPC traffic arrived recently. A drain + runs inside its triggering request, so by the time a long drain finishes + ``last_activity_ts`` (set at request start) may already look stale; the + ``drain_in_progress`` signal covers that tail. + + When any of these holds we return ``ACTIVE`` so the caller resets the idle + countdown. Otherwise the daemon really is idle and we advance toward SLEEP + (which escalates to an EXCLUSIVE store lock) / DROWSY exactly as the + heartbeat-only logic used to, so a genuinely quiet daemon still settles and + crisis re-arming in SLEEP keeps running. + """ + if scanner_active or drain_in_progress or seconds_since_rpc < recent_rpc_window_sec: + return IDLE_DECISION_ACTIVE + if idle_elapsed >= sleep_heartbeat_idle_sec and sleep_eligible: + return IDLE_DECISION_SLEEP + if idle_elapsed >= drowsy_after_sec: + return IDLE_DECISION_DROWSY + return IDLE_DECISION_HOLD + + def _run_drowsy_drain(store, *, drain_fn, write_event_fn) -> None: try: result = drain_fn(store) @@ -228,8 +274,15 @@ def _store_is_empty(store: MemoryStore) -> bool: try: return store.db.open_table("records").count_rows() == 0 except (OSError, ValueError, KeyError, RuntimeError) as exc: - log.debug("store empty check failed, assuming empty: %s", exc) - return True + # Unknown != empty. A transient count failure (e.g. the shared sqlite + # connection left in an error state by a concurrent heavy reader, raising + # HippoIntegrityError/lock errors which subclass RuntimeError) must NOT be + # treated as an empty store: doing so parks the whole lifecycle tick + # (no idle-check, no drain) on a store that actually has records. Treat + # the unknown case as NOT empty so the tick proceeds; a truly empty store + # just does a little harmless no-op work. + log.debug("store empty check failed, assuming NOT empty: %s", exc) + return False def _is_inside_window( @@ -490,6 +543,10 @@ async def _tick_body( state["last_tick_at"] = datetime.now(timezone.utc).isoformat() + # Clear the skip reason: reaching here means the tick was NOT skipped. Leaving a + # stale "empty_store"/"paused" value here makes a healthy daemon look parked in + # observability (last_tick_skipped_reason is only ever set, never reset). + state["last_tick_skipped_reason"] = None try: await asyncio.to_thread(save_state, state) except (OSError, ValueError) as exc: @@ -932,11 +989,13 @@ def _capture_handler(record: dict) -> None: async def _boot_preload() -> None: try: from iai_mcp import retrieve as _retrieve_preload - _g, _a, _rc = await asyncio.to_thread( - _retrieve_preload.build_runtime_graph, store, - ) + # build_runtime_graph already persists the cache internally + # (with the full node_payload) on a miss. The previous extra + # save(..., node_payload=None, ...) here overwrote that good + # cache with a payload-less one (forcing a pandas re-read on + # the next hit) — so we just warm the cache and drop it. await asyncio.to_thread( - _rgc_mod.save, store, _a, _rc, None, 0, + _retrieve_preload.build_runtime_graph, store, ) except Exception as _exc: # noqa: BLE001 -- preload MUST NOT crash daemon log.debug("boot_preload failed: %s", _exc, exc_info=True) @@ -1149,6 +1208,34 @@ def _emit_expiry() -> None: now_mono = time.monotonic() idle_elapsed = now_mono - _last_active_monotonic[0] + # Beyond the wrapper heartbeat, real RPC traffic and an + # in-flight deferred-capture drain are activity too. Folding + # them into the idle countdown stops the FSM forcing SLEEP + # (-> EXCLUSIVE store lock) while drain threads are still + # hammering the store. See _idle_countdown_decision. + try: + from iai_mcp.capture import is_drain_in_progress as _drain_q + _drain_active = bool(await asyncio.to_thread(_drain_q)) + except Exception: # noqa: BLE001 -- idle accounting MUST NOT crash the tick + _drain_active = False + _seconds_since_rpc = ( + (now_mono - mcp_socket.last_activity_ts) + if mcp_socket is not None + else float("inf") + ) + _idle_decision = _idle_countdown_decision( + scanner_active=scanner_active, + drain_in_progress=_drain_active, + seconds_since_rpc=_seconds_since_rpc, + idle_elapsed=idle_elapsed, + sleep_eligible=sleep_eligible, + recent_rpc_window_sec=INTERRUPT_RECENT_ACTIVITY_WINDOW_SEC, + drowsy_after_sec=DROWSY_AFTER_SEC, + sleep_heartbeat_idle_sec=SLEEP_HEARTBEAT_IDLE_SEC, + ) + if _idle_decision == IDLE_DECISION_ACTIVE: + _last_active_monotonic[0] = now_mono + try: from iai_mcp.daemon_state import load_state as _load_ds _ds = await asyncio.to_thread(_load_ds) @@ -1197,7 +1284,10 @@ def _emit_expiry() -> None: pass if scanner_active: - _last_active_monotonic[0] = now_mono + # _last_active_monotonic was already refreshed above + # (scanner_active -> IDLE_DECISION_ACTIVE); a fresh + # wrapper heartbeat additionally pulls the FSM back to + # WAKE, which RPC/drain activity alone does not. try: await _state_machine.dispatch( _LifecycleEvent.HEARTBEAT_REFRESH, @@ -1205,7 +1295,7 @@ def _emit_expiry() -> None: ) except (S2OscillationConflict, S2OscillationBlocked): pass - elif idle_elapsed >= SLEEP_HEARTBEAT_IDLE_SEC and sleep_eligible: + elif _idle_decision == IDLE_DECISION_SLEEP: try: await _state_machine.dispatch( _LifecycleEvent.IDLE_30MIN, @@ -1214,7 +1304,7 @@ def _emit_expiry() -> None: ) except (S2OscillationConflict, S2OscillationBlocked): pass - elif idle_elapsed >= DROWSY_AFTER_SEC: + elif _idle_decision == IDLE_DECISION_DROWSY: try: await _state_machine.dispatch( _LifecycleEvent.IDLE_5MIN, @@ -1282,8 +1372,13 @@ def _run_wake_sequence(): _prev_lifecycle_state[0] = current if current is _LifecycleState.SLEEP: def _interrupt_check() -> bool: - if mcp_socket.active_connections > 0: - return True + # Defer the sleep pipeline only on RECENT ACTIVITY, not on + # open connections: long-lived Claude sessions keep sockets + # open permanently, so `active_connections > 0` was True at + # nearly every tick -> the cycle never completed -> no + # HIBERNATION -> the wake-hook re-ran every 30s (the 221% CPU + # churn). last_activity_ts is refreshed on each request, so a + # busy burst still defers; a 30s lull lets the cycle finish. elapsed = ( time.monotonic() - mcp_socket.last_activity_ts ) @@ -1520,6 +1615,11 @@ def _interrupt_check() -> bool: "_raise_fd_limit", "_run_drowsy_drain", "_should_drain_on_drowsy_edge", + "_idle_countdown_decision", + "IDLE_DECISION_ACTIVE", + "IDLE_DECISION_SLEEP", + "IDLE_DECISION_DROWSY", + "IDLE_DECISION_HOLD", "_kick_drowsy_rgc_rebuild", "_wake_hook_rebuild_if_cold", "_store_is_empty", diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index d5bce19..8b46a3c 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading import time from datetime import datetime, timedelta, timezone from itertools import combinations @@ -33,6 +34,32 @@ _STALE_REASON_SUFFIX: str = " · stale" +# --- build_runtime_graph single-flight (WAKE CPU-storm fix) ----------------- +# At daemon WAKE several background subsystems (boot preload, sigma identity +# audit, foraging weak-bridge detection, hippea cascade warming) each call +# build_runtime_graph concurrently. On a cache MISS each one independently runs +# the full O(n^2) community detection (mosaic), GIL-bound, in its own to_thread +# worker. Three+ of those at once contend for the GIL, starve the asyncio event +# loop, and the liveness watchdog's socket probe times out -> SIGKILL -> relaunch +# loop. This single-flight gate collapses the concurrent burst into ONE compute: +# the first caller (leader) computes and saves the on-disk cache; concurrent +# callers (followers) wait on its Event and then re-load the freshly-saved cache +# via the cheap path. No mutable graph object is shared between callers (each +# rebuilds its own MemoryGraph shell + sync hook), and recall is independent of +# the community assignment, so a slightly-stale shared result is harmless. +# +# Followers RE-CONTEND in a bounded loop rather than recomputing unconditionally: +# if the leader fails before saving (e.g. detect_communities raises), or the cache +# key shifts mid-burst, or the leader overruns the wait timeout, the woken +# followers loop back, and exactly ONE of them becomes the next leader while the +# rest wait again. That degrades those edge cases to *sequential* single-flight +# (one compute at a time) instead of an N-way concurrent re-storm. +_BRG_INFLIGHT_LOCK = threading.Lock() +_BRG_INFLIGHT: dict[str, threading.Event] = {} +_BRG_WAIT_TIMEOUT_SEC: float = 120.0 +_BRG_MAX_ATTEMPTS: int = 4 + + def recall( store: MemoryStore, cue_embedding: list[float], @@ -450,6 +477,63 @@ def _hook(op: str, record) -> None: def build_runtime_graph(store: MemoryStore): + """Single-flight wrapper around the real graph build. + + On a cache HIT this is cheap and runs directly. On a cache MISS it + serialises concurrent callers so the expensive community detection runs + exactly once per cache generation: the leader computes + saves the cache, + followers wait and then reload it cheaply. See the _BRG_* notes above. + """ + from iai_mcp import runtime_graph_cache as _rgc + + cached = None + for _attempt in range(_BRG_MAX_ATTEMPTS): + try: + cached = _rgc.try_load(store) + except Exception: # noqa: BLE001 -- never let cache I/O break the build + cached = None + + # Cache HIT: no contention risk, run directly (the impl reloads cheaply). + if cached is not None and cached[0] is not None: + return _build_runtime_graph_impl(store, cached) + + # Cache MISS: single-flight on the cache key so a WAKE burst of callers + # does not all recompute the full mosaic concurrently. + try: + keystr = repr(_rgc._cache_key(store)) + except Exception: # noqa: BLE001 -- if we can't key it, just compute + return _build_runtime_graph_impl(store, cached) + + with _BRG_INFLIGHT_LOCK: + event = _BRG_INFLIGHT.get(keystr) + is_leader = event is None + if is_leader: + event = threading.Event() + _BRG_INFLIGHT[keystr] = event + + if is_leader: + # Leader: compute (the impl saves the cache), then release followers. + try: + return _build_runtime_graph_impl(store, cached) + finally: + with _BRG_INFLIGHT_LOCK: + # Only drop our own slot (a key shift could have replaced it). + if _BRG_INFLIGHT.get(keystr) is event: + _BRG_INFLIGHT.pop(keystr, None) + event.set() + + # Follower: wait for the leader, then loop. Next iteration's try_load + # HITS if the leader saved; if the leader failed / the key shifted / + # the wait timed out, we re-contend and one follower becomes the next + # leader (sequential single-flight — never an N-way concurrent re-storm). + event.wait(timeout=_BRG_WAIT_TIMEOUT_SEC) + + # Attempts exhausted (e.g. the leader keeps failing): compute directly as a + # last resort. Bounded, and still correct. + return _build_runtime_graph_impl(store, cached) + + +def _build_runtime_graph_impl(store: MemoryStore, cached): from iai_mcp.community import detect_communities from iai_mcp.graph import MemoryGraph from iai_mcp.richclub import rich_club_nodes @@ -457,7 +541,6 @@ def build_runtime_graph(store: MemoryStore): graph = MemoryGraph() - cached = runtime_graph_cache.try_load(store) assignment = None rich_club = None cached_node_payload: dict[str, dict] | None = None @@ -472,6 +555,19 @@ def build_runtime_graph(store: MemoryStore): and len(cached_node_payload) == records_count ) + if cached_node_payload is not None and len(cached_node_payload) > records_count: + # The cache holds MORE nodes than are live: records were tombstoned + # (dedup/erasure) since it was built, so the cached assignment/rich_club + # were computed over now-dead nodes -- drop them and recompute on the + # fresh live graph (rebuilt from the records table below, which already + # excludes tombstoned rows). Pure GROWTH (cache has FEWER nodes than live) + # must NOT drop them: the node set is still rebuilt fresh from the table + # (so new records are present and drift/parity stay correct), but + # detect_communities is not re-run, so a single insert is absorbed without + # an O(n^2) recompute -- the staleness-window contract. + assignment = None + rich_club = None + if use_cached_payload: for nid, payload in cached_node_payload.items(): graph.add_node( @@ -497,6 +593,16 @@ def build_runtime_graph(store: MemoryStore): for _, row in df.iterrows(): if int(row.get("embedding_pending") or 0) != 0: continue + # Exclude tombstoned (soft-deleted / deduped / erased) records from the + # runtime graph: they must not pollute communities, centrality, rich_club + # or the sigma topology audit, and including them keeps the node count out + # of sync with store.active_records_count() (the cache-validity anchor at + # line ~552), permanently invalidating the cache -> a full rebuild every + # wake. Matches active_records_count(): tombstoned_at IS NULL. Guard the + # pandas NaN case (NaN != NaN) so an all-live column is not skipped. + _tomb = row.get("tombstoned_at") + if _tomb is not None and not (isinstance(_tomb, float) and _tomb != _tomb) and str(_tomb).strip(): + continue rid = UUID(row["id"]) _comm_raw = row["community_id"] if _comm_raw is not None and not isinstance(_comm_raw, str): @@ -579,9 +685,16 @@ def build_runtime_graph(store: MemoryStore): edges_df = store.db.open_table("edges").to_pandas() for _, row in edges_df.iterrows(): + # Skip edges whose endpoints are not live nodes: graph.add_edge() does + # setdefault() on both endpoints, so an edge referencing a tombstoned record + # would re-create it as a payload-less node and undo the tombstone filter + # above (re-bloating the graph and the sigma audit). + src_s, dst_s = row["src"], row["dst"] + if not graph.has_node(src_s) or not graph.has_node(dst_s): + continue graph.add_edge( - UUID(row["src"]), - UUID(row["dst"]), + UUID(src_s), + UUID(dst_s), weight=float(row["weight"]), edge_type=row["edge_type"], ) diff --git a/src/iai_mcp/runtime_graph_cache.py b/src/iai_mcp/runtime_graph_cache.py index 8c40fa3..add150a 100644 --- a/src/iai_mcp/runtime_graph_cache.py +++ b/src/iai_mcp/runtime_graph_cache.py @@ -29,7 +29,16 @@ CACHE_VERSION: str = "62-02-v5" -_STALENESS_WINDOW: int = 10 +# Cache-key staleness bucket. The key buckets on records//WINDOW and +# edges//WINDOW; try_load requires an exact key match. With WINDOW=10 a normal +# day of capture (+~150 records, +~1300 edges) crossed ~130 buckets, so the +# on-disk graph cache MISSED on essentially every WAKE and the full community +# detection (mosaic) was recomputed each time — the WAKE CPU storm. Edges churn +# fastest, so they are the binding term. WINDOW=250 keeps the cache valid across +# a normal day so most WAKEs are cheap cache HITs; the independent age/dirty fuse +# in consult_overlay (25h / dirty>50) remains the real freshness backstop, and +# build_runtime_graph's single-flight gate makes the rare genuine miss harmless. +_STALENESS_WINDOW: int = 250 LEGACY_CACHE_VERSION_PLAINTEXT: str = "06-02-v1" _CACHE_AAD: bytes = b"runtime-graph-cache:v3" diff --git a/src/iai_mcp/socket_server.py b/src/iai_mcp/socket_server.py index 31e72ae..2d21108 100644 --- a/src/iai_mcp/socket_server.py +++ b/src/iai_mcp/socket_server.py @@ -88,7 +88,14 @@ async def handle( line = await reader.readline() if not line: break - self.last_activity_ts = time.monotonic() + # NB: last_activity_ts is intentionally NOT updated for every + # inbound line. It must track REAL memory traffic (recall/capture) + # only — never control-plane messages, in particular the watchdog's + # periodic {"type":"status"} liveness probe (every 7-30s). Counting + # those probes as activity kept _interrupt_check (daemon) perpetually + # True, so the sleep cycle never completed and never hibernated -> + # the 221% CPU churn. It is set below, only for dispatched JSON-RPC + # method calls. req_id: Any = None try: req = json.loads(line) @@ -143,6 +150,11 @@ async def handle( continue method = req["method"] params = req.get("params") or {} + # Real memory traffic: mark activity only for dispatched JSON-RPC + # method calls so background consolidation defers while the user is + # actively recalling/capturing. Control/status probes never reach + # here, so they no longer keep the daemon awake. + self.last_activity_ts = time.monotonic() try: from iai_mcp.core import dispatch result = await asyncio.to_thread( diff --git a/src/iai_mcp/wake_handler.py b/src/iai_mcp/wake_handler.py index 0395f04..2ff4695 100644 --- a/src/iai_mcp/wake_handler.py +++ b/src/iai_mcp/wake_handler.py @@ -20,6 +20,24 @@ def consume_wake_signal(self) -> bool: return False return True + def signal_wake(self) -> bool: + """Create the wake signal so the next daemon boot enters WAKE. + + Symmetric counterpart to ``consume_wake_signal``. Whoever brings the + daemon up from a hibernated (process-exited) state — the CLI + start/install path, or the per-turn capture hook — must create this + signal *before* the kickstart, so the booting daemon transitions + HIBERNATION -> WAKE and serves its socket, instead of re-reading the + persisted HIBERNATION state and immediately hibernate-exiting (which + closes the socket and leaves recall unserved). Idempotent. + """ + try: + self._wake_signal_path.parent.mkdir(parents=True, exist_ok=True) + self._wake_signal_path.touch() + except OSError: + return False + return True + def has_pending_wake(self) -> bool: try: return self._wake_signal_path.is_file() diff --git a/tests/test_build_runtime_graph_single_flight.py b/tests/test_build_runtime_graph_single_flight.py new file mode 100644 index 0000000..b7869da --- /dev/null +++ b/tests/test_build_runtime_graph_single_flight.py @@ -0,0 +1,131 @@ +"""Regression test for the build_runtime_graph WAKE CPU-storm fix. + +At daemon WAKE several background subsystems (boot preload, sigma identity +audit, foraging, hippea cascade) call retrieve.build_runtime_graph concurrently. +Before the fix, each one independently ran the full (GIL-bound) community +detection on a cache miss, so 3+ concurrent runs contended for the GIL, starved +the asyncio event loop, and the liveness watchdog SIGKILLed the daemon. + +The single-flight gate must collapse a concurrent burst into ONE compute (the +leader), with the others reusing the freshly-saved cache — while never sharing a +mutable MemoryGraph object between callers. +""" +from __future__ import annotations + +import threading +import time +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +import iai_mcp.community as community +import iai_mcp.retrieve as retrieve +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _vec(seed: int) -> list[float]: + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return (v / np.linalg.norm(v)).tolist() + + +def _rec(seed: int) -> MemoryRecord: + now = datetime.now(timezone.utc) + return MemoryRecord( + id=uuid4(), tier="episodic", literal_surface="rec", aaak_index="", + embedding=_vec(seed), community_id=None, centrality=0.0, detail_level=2, + pinned=False, stability=0.0, difficulty=0.0, last_reviewed=None, + never_decay=False, never_merge=False, provenance=[], created_at=now, + updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def test_concurrent_build_runtime_graph_runs_detect_once(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + store.insert(_rec(i)) + + n_threads = 4 + calls: list[int] = [] + calls_lock = threading.Lock() + start = threading.Barrier(n_threads) + + orig_detect = community.detect_communities + + def slow_detect(*args, **kwargs): + with calls_lock: + calls.append(1) + # Hold long enough that the other callers pile up behind the leader. + time.sleep(0.6) + return orig_detect(*args, **kwargs) + + # build_runtime_graph_impl does `from iai_mcp.community import detect_communities` + # at call time, so patching the module attribute is seen. + monkeypatch.setattr(community, "detect_communities", slow_detect) + + results: list[tuple] = [] + results_lock = threading.Lock() + + def worker(): + start.wait() + graph, assignment, _rc = retrieve.build_runtime_graph(store) + with results_lock: + results.append((assignment, id(graph))) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=60) + + assert len(results) == n_threads, "some build_runtime_graph callers hung" + # Single-flight: the expensive community detection ran exactly once even + # though all four callers raced into a cache miss simultaneously. + assert len(calls) == 1, ( + f"expected detect_communities to run once (single-flight); " + f"ran {len(calls)} times" + ) + # Every caller got a valid assignment... + assert all(a is not None for a, _ in results) + # ...and its OWN MemoryGraph object — the mutable graph (which receives the + # store sync hook) must never be shared between concurrent callers. + graph_ids = {gid for _, gid in results} + assert len(graph_ids) == n_threads, ( + f"callers shared a MemoryGraph object ({len(graph_ids)} distinct of " + f"{n_threads}); the single-flight must memoise only immutable products" + ) + + +def test_build_runtime_graph_hit_path_skips_detect(tmp_path, monkeypatch): + """A warm cache must short-circuit without running community detection.""" + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + store.insert(_rec(i)) + + # Prime the cache. + retrieve.build_runtime_graph(store) + + calls: list[int] = [] + orig_detect = community.detect_communities + + def counting_detect(*args, **kwargs): + calls.append(1) + return orig_detect(*args, **kwargs) + + monkeypatch.setattr(community, "detect_communities", counting_detect) + + # Second call on the same generation: cache HIT, no recompute. + graph, assignment, _rc = retrieve.build_runtime_graph(store) + assert assignment is not None + assert calls == [], "warm-cache build_runtime_graph must not recompute communities" diff --git a/tests/test_daemon_idle_countdown_drain_aware.py b/tests/test_daemon_idle_countdown_drain_aware.py new file mode 100644 index 0000000..f02599d --- /dev/null +++ b/tests/test_daemon_idle_countdown_drain_aware.py @@ -0,0 +1,222 @@ +"""The lifecycle idle countdown must watch real work, not only the heartbeat. + +Regression coverage for the bug where the FSM forced itself to SLEEP after the +idle timer expired even though the daemon was still draining a continuously-fed +backlog: the only refresh of ``_last_active_monotonic`` was gated on the Node +wrapper heartbeat file, which can be stale (empty wrappers dir) while a drain +kicked off by earlier RPC traffic is still hammering the store. Advancing to +SLEEP there escalates to an EXCLUSIVE store lock mid-drain -> lock contention. + +These tests pin the two added signals -- an in-flight drain and recent RPC +activity -- and the regression guard that a genuinely idle daemon still sleeps +(so crisis re-arming, which only runs in SLEEP, keeps working). +""" + +from __future__ import annotations + +import platform +import threading + +import pytest + + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", + reason="daemon module is POSIX-only on this project", +) + + +# Inputs that, on their own, would push the daemon all the way to SLEEP: well +# past the sleep threshold, stale RPC, no wrapper heartbeat, sleep-eligible. +# Each test flips exactly one signal to prove it holds the countdown open. +_SLEEPY = dict( + scanner_active=False, + seconds_since_rpc=10_000.0, + idle_elapsed=10_000.0, + sleep_eligible=True, + recent_rpc_window_sec=30.0, + drowsy_after_sec=300.0, + sleep_heartbeat_idle_sec=1800.0, +) + + +def test_drain_in_progress_blocks_advance_toward_sleep(): + from iai_mcp.daemon import ( + IDLE_DECISION_ACTIVE, + IDLE_DECISION_SLEEP, + _idle_countdown_decision, + ) + + decision = _idle_countdown_decision(drain_in_progress=True, **_SLEEPY) + + assert decision != IDLE_DECISION_SLEEP, ( + "idle countdown advanced toward SLEEP while a drain was in progress" + ) + assert decision == IDLE_DECISION_ACTIVE + + +def test_recent_rpc_blocks_advance_toward_sleep(): + from iai_mcp.daemon import IDLE_DECISION_ACTIVE, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["seconds_since_rpc"] = 5.0 # within recent_rpc_window_sec (30s) + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_ACTIVE + + +def test_truly_idle_still_advances_to_sleep(): + # Regression guard: with NO activity of any kind, a quiet daemon must still + # reach SLEEP, otherwise crisis re-arming (SLEEP-only) never runs again. + from iai_mcp.daemon import IDLE_DECISION_SLEEP, _idle_countdown_decision + + decision = _idle_countdown_decision(drain_in_progress=False, **_SLEEPY) + + assert decision == IDLE_DECISION_SLEEP + + +def test_idle_past_drowsy_but_not_sleep_eligible_goes_drowsy_only(): + from iai_mcp.daemon import IDLE_DECISION_DROWSY, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["idle_elapsed"] = 600.0 # past drowsy_after_sec (300) ... + inputs["sleep_eligible"] = False # ... but not sleep-eligible yet + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_DROWSY + + +def test_within_drowsy_window_holds(): + from iai_mcp.daemon import IDLE_DECISION_HOLD, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["idle_elapsed"] = 60.0 # under drowsy_after_sec (300) + inputs["sleep_eligible"] = False + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_HOLD + + +def test_scanner_active_is_active(): + from iai_mcp.daemon import IDLE_DECISION_ACTIVE, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["scanner_active"] = True + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_ACTIVE + + +# --- capture wiring: the production drain path actually flips the flag -------- + + +def test_drain_deferred_marks_in_progress_and_holds_off_sleep(monkeypatch): + from iai_mcp import capture + from iai_mcp.daemon import ( + IDLE_DECISION_ACTIVE, + IDLE_DECISION_SLEEP, + _idle_countdown_decision, + ) + + seen: dict[str, object] = {} + + def spy_impl(store): + # Mid-drain the flag must be set, and the idle countdown -- fed the flag + # -- must refuse to advance toward SLEEP on otherwise-sleepy inputs. + seen["in_progress"] = capture.is_drain_in_progress() + seen["decision"] = _idle_countdown_decision( + drain_in_progress=capture.is_drain_in_progress(), **_SLEEPY, + ) + return {"files_drained": 0, "files_failed": 0} + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", spy_impl) + + assert capture.is_drain_in_progress() is False + capture.drain_deferred_captures(store=None) + + assert seen["in_progress"] is True + assert seen["decision"] == IDLE_DECISION_ACTIVE + assert seen["decision"] != IDLE_DECISION_SLEEP + # Released once the drain returns: a quiet daemon can sleep again. + assert capture.is_drain_in_progress() is False + + +def test_drain_active_live_marks_in_progress(monkeypatch): + from iai_mcp import capture + + seen: dict[str, object] = {} + + def spy_impl(store, *, exclude_session_id): + seen["in_progress"] = capture.is_drain_in_progress() + seen["sid"] = exclude_session_id + return {"files_drained": 0} + + monkeypatch.setattr(capture, "_drain_active_live_captures_impl", spy_impl) + + capture.drain_active_live_captures(store=None, exclude_session_id="sess-1") + + assert seen["in_progress"] is True + assert seen["sid"] == "sess-1" + assert capture.is_drain_in_progress() is False + + +def test_guard_releases_on_exception(monkeypatch): + from iai_mcp import capture + + def boom(store): + raise RuntimeError("drain blew up") + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", boom) + + assert capture.is_drain_in_progress() is False + with pytest.raises(RuntimeError): + capture.drain_deferred_captures(store=None) + # A failed drain must not leak the in-progress count, else the daemon would + # never sleep again. + assert capture.is_drain_in_progress() is False + + +def test_overlapping_drains_tracked_by_depth(): + # The counter (not a boolean) keeps overlapping drains across threads + # correct: the flag stays True until the LAST one releases. + from iai_mcp import capture + + assert capture.is_drain_in_progress() is False + with capture._drain_in_progress_guard(): + assert capture.is_drain_in_progress() is True + with capture._drain_in_progress_guard(): + assert capture.is_drain_in_progress() is True + assert capture.is_drain_in_progress() is True # outer still holds + assert capture.is_drain_in_progress() is False + + +def test_is_drain_in_progress_true_while_other_thread_drains(monkeypatch): + from iai_mcp import capture + + entered = threading.Event() + release = threading.Event() + + def slow_impl(store): + entered.set() + release.wait(timeout=5.0) + return {"files_drained": 0, "files_failed": 0} + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", slow_impl) + + worker = threading.Thread( + target=lambda: capture.drain_deferred_captures(store=None), + ) + worker.start() + try: + assert entered.wait(timeout=5.0) + # Observed from a DIFFERENT thread than the one draining. + assert capture.is_drain_in_progress() is True + finally: + release.set() + worker.join(timeout=5.0) + + assert capture.is_drain_in_progress() is False diff --git a/tests/test_daemon_tick_flags.py b/tests/test_daemon_tick_flags.py index 8f1e31d..70bdcf8 100644 --- a/tests/test_daemon_tick_flags.py +++ b/tests/test_daemon_tick_flags.py @@ -157,3 +157,23 @@ def test_tick_updates_last_tick_at(tick_env, monkeypatch): assert "last_tick_at" in state datetime.fromisoformat(state["last_tick_at"]) + + +def test_successful_tick_resets_stale_skip_reason(tick_env, monkeypatch): + # 2cffb35 regression: a stale last_tick_skipped_reason ("empty_store"/"paused") + # must be cleared once a tick actually runs (store non-empty, not paused), + # otherwise observability shows a healthy daemon as permanently parked. tick_env + # seeds one record -> store is non-empty. WITHOUT the reset line the field stays + # "empty_store" and this test fails. + from iai_mcp import daemon as daemon_mod + from iai_mcp.daemon_state import load_state + + store, state_path, tmp_path = tick_env + monkeypatch.setattr(daemon_mod, "should_relearn", lambda last, now: False) + + state = {"fsm_state": "WAKE", "last_tick_skipped_reason": "empty_store"} + asyncio.run(daemon_mod._tick_body(store, state)) + + assert state.get("last_tick_skipped_reason") is None + loaded = load_state() + assert loaded.get("last_tick_skipped_reason") is None diff --git a/tests/test_graph_excludes_tombstoned.py b/tests/test_graph_excludes_tombstoned.py new file mode 100644 index 0000000..9dd6134 --- /dev/null +++ b/tests/test_graph_excludes_tombstoned.py @@ -0,0 +1,102 @@ +"""Regression tests for excluding tombstoned records from the runtime graph. + +build_runtime_graph used to add every record (and every edge) to the graph, +including soft-deleted / deduped / erased records (tombstoned_at IS NOT NULL). +That (a) polluted communities / centrality / rich_club / the sigma topology +audit with dead nodes, and (b) desynced the node count from +store.active_records_count() -- the cache-validity anchor -- so the payload +cache was permanently invalid and every wake did a full rebuild. + +These tests pin: tombstoned records (and edges touching them) are excluded, the +live node count equals active_records_count(), and assignment/rich_club are +recomputed on the fresh live graph rather than reused from a stale-node cache. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +import iai_mcp.retrieve as retrieve +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _vec(seed: int) -> list[float]: + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return (v / np.linalg.norm(v)).tolist() + + +def _rec(seed: int): + now = datetime.now(timezone.utc) + rid = uuid4() + return rid, MemoryRecord( + id=rid, tier="episodic", literal_surface=f"rec-{seed}", aaak_index="", + embedding=_vec(seed), community_id=None, centrality=0.0, detail_level=2, + pinned=False, stability=0.0, difficulty=0.0, last_reviewed=None, + never_decay=False, never_merge=False, provenance=[], created_at=now, + updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def _tombstone(store: MemoryStore, rid) -> None: + now = datetime.now(timezone.utc).isoformat() + with store.db._conn_lock: + store.db._conn.execute( + "UPDATE records SET tombstoned_at = ? WHERE id = ?", + (now, str(rid)), + ) + + +def test_build_runtime_graph_excludes_tombstoned(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live_ids = [] + for i in range(6): + rid, rec = _rec(i) + store.insert(rec) + live_ids.append(rid) + dead_ids = [] + for i in range(6, 10): + rid, rec = _rec(i) + store.insert(rec) + _tombstone(store, rid) + dead_ids.append(rid) + + assert store.active_records_count() == 6 + + graph, assignment, rich_club = retrieve.build_runtime_graph(store) + + nodes = {str(n) for n in graph.nodes()} + assert len(nodes) == 6, f"expected 6 live nodes, got {len(nodes)}" + for rid in dead_ids: + assert str(rid) not in nodes, f"tombstoned {rid} leaked into the graph" + for rid in live_ids: + assert str(rid) in nodes + # rich_club is a fraction of the LIVE graph, never references dead nodes + assert all(str(r) not in {str(d) for d in dead_ids} for r in (rich_club or [])) + + +def test_node_count_matches_active_records_count(tmp_path, monkeypatch): + """The cache-validity anchor: graph node count must equal active count, so the + payload cache validates on the next build instead of rebuilding forever.""" + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + _rid, rec = _rec(i) + store.insert(rec) + rid, rec = _rec(99) + store.insert(rec) + _tombstone(store, rid) + + graph, _assignment, _rc = retrieve.build_runtime_graph(store) + assert len({str(n) for n in graph.nodes()}) == store.active_records_count() == 8 diff --git a/tests/test_socket_activity_tracking.py b/tests/test_socket_activity_tracking.py new file mode 100644 index 0000000..9bde5d3 --- /dev/null +++ b/tests/test_socket_activity_tracking.py @@ -0,0 +1,134 @@ +"""Regression: the socket server must track *real* memory traffic only. + +`SocketServer.last_activity_ts` feeds the daemon's `_interrupt_check`: the sleep +/ consolidation pipeline defers whenever activity is recent (< 30s). The watchdog +probes daemon liveness with a `{"type": "status"}` control message every 7-30s +(`daemon/_watchdog.py::_probe_status_roundtrip`). When *every* inbound line — +including that probe — refreshed `last_activity_ts`, `_interrupt_check` was +perpetually True, so the cycle never completed, the daemon never hibernated, and +the wake-hook re-ran every tick (a ~200% CPU churn on any long-lived deployment). + +The fix: refresh `last_activity_ts` only for dispatched JSON-RPC method calls +(recall/capture/etc.), never for control-plane messages. These tests lock that in. +""" +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path + +import pytest + + +@pytest.fixture +def short_socket_paths(tmp_path, monkeypatch): + from iai_mcp import concurrency, daemon_state + + sock_dir = Path(f"/tmp/iai-srvact-{os.getpid()}-{id(tmp_path)}") + sock_dir.mkdir(parents=True, exist_ok=True) + sock_path = sock_dir / "d.sock" + state_path = tmp_path / ".daemon-state.json" + + monkeypatch.setattr(concurrency, "SOCKET_PATH", sock_path) + monkeypatch.setattr(daemon_state, "STATE_PATH", state_path) + store_root = tmp_path / "store_root" + store_root.mkdir(parents=True, exist_ok=True) + monkeypatch.setenv("IAI_MCP_STORE", str(store_root)) + + try: + yield sock_path + finally: + try: + if sock_path.exists(): + sock_path.unlink() + except OSError: + pass + try: + sock_dir.rmdir() + except OSError: + pass + + +async def _send_line(sock_path: Path, payload: dict, *, timeout: float = 10.0) -> dict: + reader, writer = await asyncio.wait_for( + asyncio.open_unix_connection(path=str(sock_path)), timeout=timeout, + ) + try: + writer.write((json.dumps(payload) + "\n").encode("utf-8")) + await writer.drain() + line = await asyncio.wait_for(reader.readline(), timeout=timeout) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: # noqa: BLE001 + pass + if not line: + raise AssertionError(f"daemon closed without reply (payload={payload})") + return json.loads(line.decode("utf-8")) + + +async def _serve(sock_path: Path, store, coro_fn): + from iai_mcp.socket_server import SocketServer + + srv = SocketServer(store, idle_secs=99999) + server_task = asyncio.create_task(srv.serve(socket_path=sock_path)) + for _ in range(250): + if sock_path.exists(): + break + await asyncio.sleep(0.01) + if not sock_path.exists(): + srv.shutdown_event.set() + raise AssertionError("socket never bound") + try: + return await coro_fn(srv) + finally: + srv.shutdown_event.set() + try: + await asyncio.wait_for(server_task, timeout=5) + except Exception: # noqa: BLE001 + pass + + +def test_status_probe_does_not_refresh_last_activity(short_socket_paths): + sock_path = short_socket_paths + from iai_mcp.store import MemoryStore + + store = MemoryStore() + + async def _runner(srv): + # Sentinel baseline so any refresh is detectable. + srv.last_activity_ts = 0.0 + resp = await _send_line(sock_path, {"type": "status"}) + # The control message is answered (round-trip), proving it was received... + assert resp is not None, resp + # ...yet it must NOT have been counted as memory activity: the watchdog's + # periodic probe would otherwise keep the daemon awake forever. + assert srv.last_activity_ts == 0.0, ( + "status liveness probe wrongly refreshed last_activity_ts" + ) + + asyncio.run(_serve(sock_path, store, _runner)) + + +def test_jsonrpc_method_refreshes_last_activity(short_socket_paths): + sock_path = short_socket_paths + from iai_mcp.store import MemoryStore + + store = MemoryStore() + + async def _runner(srv): + srv.last_activity_ts = 0.0 + resp = await _send_line( + sock_path, + {"jsonrpc": "2.0", "id": 1, "method": "session_start_payload", "params": {}}, + ) + assert "result" in resp, resp + # Real recall/capture traffic MUST refresh the activity clock so the sleep + # pipeline correctly defers while the user is actively using memory. + assert srv.last_activity_ts > 0.0, ( + "dispatched JSON-RPC method did not refresh last_activity_ts" + ) + + asyncio.run(_serve(sock_path, store, _runner)) diff --git a/tests/test_store_is_empty.py b/tests/test_store_is_empty.py new file mode 100644 index 0000000..c7528c3 --- /dev/null +++ b/tests/test_store_is_empty.py @@ -0,0 +1,64 @@ +"""Regression test for _store_is_empty: a count failure must NOT be read as empty. + +The daemon tick calls _store_is_empty() and, if true, skips the whole tick +(no idle-check, no drain). A transient count failure -- e.g. the shared sqlite +connection left in an error state by a concurrent heavy reader, raising +HippoIntegrityError/lock errors (all subclass RuntimeError) -- used to be caught +and return True, parking the lifecycle on a store that actually has records. +The fix returns False (unknown != empty) so the tick proceeds. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +from iai_mcp.daemon import _store_is_empty +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _rec(seed: int) -> MemoryRecord: + now = datetime.now(timezone.utc) + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return MemoryRecord( + id=uuid4(), tier="episodic", literal_surface="rec", aaak_index="", + embedding=(v / np.linalg.norm(v)).tolist(), community_id=None, + centrality=0.0, detail_level=2, pinned=False, stability=0.0, + difficulty=0.0, last_reviewed=None, never_decay=False, never_merge=False, + provenance=[], created_at=now, updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def test_empty_store_is_empty(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + assert _store_is_empty(store) is True + + +def test_nonempty_store_is_not_empty(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + store.insert(_rec(1)) + assert _store_is_empty(store) is False + + +def test_count_failure_is_not_treated_as_empty(tmp_path, monkeypatch): + """The core fix: a RuntimeError during the count must yield False, not True.""" + store = _make_store(tmp_path, monkeypatch) + store.insert(_rec(2)) + + def boom(*_a, **_k): + raise RuntimeError("connection in error state") + + monkeypatch.setattr(store.db, "open_table", boom) + assert _store_is_empty(store) is False