diff --git a/src/iai_mcp/capture.py b/src/iai_mcp/capture.py index d95ad4f..9c54ae6 100644 --- a/src/iai_mcp/capture.py +++ b/src/iai_mcp/capture.py @@ -1,6 +1,7 @@ from __future__ import annotations +import contextlib import hashlib import json import logging @@ -41,6 +42,46 @@ # two threads before either has inserted it. _CAPTURE_DEDUP_LOCK = threading.Lock() +# Drain-in-progress accounting for the daemon's lifecycle idle countdown. +# +# The wrapper heartbeat file is only ONE signal that the daemon is busy. A +# deferred-capture drain (triggered by real RPC traffic, kicked off at the +# drowsy edge, or run on startup) can keep writing to the store long after the +# heartbeat file has gone stale. If the idle countdown only watched the +# heartbeat, it would force the FSM toward SLEEP -- which escalates to an +# EXCLUSIVE store lock -- while these drain threads are still hammering the +# store, causing lock contention. The daemon consults `is_drain_in_progress` +# so an in-flight drain holds the idle countdown open. A depth counter (rather +# than a boolean) keeps this correct under concurrent/overlapping drains across +# the several threads that asyncio.to_thread dispatch spins up. +_DRAIN_IN_PROGRESS_LOCK = threading.Lock() +_drain_in_progress_depth = 0 + + +@contextlib.contextmanager +def _drain_in_progress_guard(): + """Mark a deferred-capture drain as in flight for its whole duration.""" + global _drain_in_progress_depth + with _DRAIN_IN_PROGRESS_LOCK: + _drain_in_progress_depth += 1 + try: + yield + finally: + with _DRAIN_IN_PROGRESS_LOCK: + _drain_in_progress_depth -= 1 + + +def is_drain_in_progress() -> bool: + """True while at least one deferred-capture drain is running. + + The daemon's lifecycle idle countdown reads this so it does NOT advance the + FSM toward SLEEP (and the EXCLUSIVE store lock it takes) while drain threads + are still writing to the store. + """ + with _DRAIN_IN_PROGRESS_LOCK: + return _drain_in_progress_depth > 0 + + FAILED_MAX_ATTEMPTS: int = 3 FAILED_BACKOFF_BASE_SEC: float = 60.0 @@ -276,32 +317,53 @@ def capture_turn( from iai_mcp.embed import embedder_for_store from iai_mcp.events import TELEMETRY_EMBED_NATIVE_FAILURE, write_event - try: - # Embed the message content, never the cue. The cue is a provenance - # label only (transcript drains and deferred-drain pass a positional - # cue such as "session turn "); embedding it collapsed the - # stored vector space and broke semantic recall. text is already - # validated non-empty above (the MIN_CAPTURE_LEN guard), so embedding - # text is safe for every caller. - emb = embedder_for_store(store).embed(text) - except Exception as exc: - write_event( - store, - TELEMETRY_EMBED_NATIVE_FAILURE, - { - "op_type": "capture", - "backend": "rust", - "error_type": type(exc).__name__, - "error": str(exc), - }, - ) - raise NativeError(f"capture encode failed: {exc}") from exc - embedding = list(emb) + # Embedding is the expensive native (Rust) matmul; the exact-key idem dedup + # below is a cheap SQLite lookup. Active sessions re-drain the *whole* + # transcript every turn (write_deferred_captures / capture_transcript walk + # from line 0 on each call), so embedding eagerly here re-embedded every + # already-stored turn only to throw the vector away as a "reinforced" + # exact-key re-drain -> chronic daemon CPU. Defer the embed so it runs at + # most once and only when actually needed: an already-seen episodic turn + # short-circuits on its idem tag and never embeds. Embed the message + # content, never the cue (the cue is a positional provenance label; + # embedding it collapsed the vector space and broke semantic recall). text + # is validated non-empty above (the MIN_CAPTURE_LEN guard). + _embed_cache: dict[str, list[float]] = {} + + def _compute_embedding() -> list[float]: + if "v" in _embed_cache: + return _embed_cache["v"] + try: + emb = embedder_for_store(store).embed(text) + except Exception as exc: + write_event( + store, + TELEMETRY_EMBED_NATIVE_FAILURE, + { + "op_type": "capture", + "backend": "rust", + "error_type": type(exc).__name__, + "error": str(exc), + }, + ) + raise NativeError(f"capture encode failed: {exc}") from exc + vec = list(emb) + _embed_cache["v"] = vec + return vec with _CAPTURE_DEDUP_LOCK: if _is_episodic_conversational(tier, role): ts_iso = now.isoformat() idem_t = _idem_tag(session_id, role, ts_iso, text, source_uuid=source_uuid) + # find_record_by_tag reads SQLite, but a just-inserted record may + # still sit in the in-process _record_buffer (not yet flushed to the + # records table). Under _CAPTURE_DEDUP_LOCK the insert path is + # serialized with this find, so flushing the buffer here makes every + # prior committed insert visible to the SQLite-backed find and closes + # the check-then-insert race that produced live idem-tag duplicates. + from iai_mcp.store import flush_record_buffer + + flush_record_buffer(store) existing_id = store.find_record_by_tag(idem_t) if existing_id is not None: try: @@ -321,7 +383,7 @@ def capture_turn( } else: try: - neighbours = store.query_similar(embedding, k=3, tier=tier) + neighbours = store.query_similar(_compute_embedding(), k=3, tier=tier) except (ValueError, IOError) as exc: log.warning( "capture_dedup_query_failed", @@ -361,7 +423,7 @@ def capture_turn( tier=tier, literal_surface=text, aaak_index="", - embedding=embedding, + embedding=_compute_embedding(), community_id=None, centrality=0.0, detail_level=2, @@ -711,6 +773,17 @@ def write_deferred_captures( def drain_deferred_captures(store: MemoryStore) -> dict[str, int]: + """Drain crashed/orphaned deferred-capture files into the store. + + Marks itself in-progress for the daemon idle countdown so the FSM does not + advance toward SLEEP (EXCLUSIVE store lock) mid-drain. See + `is_drain_in_progress`. + """ + with _drain_in_progress_guard(): + return _drain_deferred_captures_impl(store) + + +def _drain_deferred_captures_impl(store: MemoryStore) -> dict[str, int]: deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures" log_dir = Path.home() / ".iai-mcp" / "logs" log_dir.mkdir(parents=True, exist_ok=True) @@ -1139,6 +1212,23 @@ def drain_active_live_captures( store: MemoryStore, *, exclude_session_id: str, +) -> dict[str, int]: + """Drain live-session capture files into the store. + + Marked in-progress for the daemon idle countdown, like + `drain_deferred_captures` -- both write to the store and must hold the FSM + out of SLEEP while running. See `is_drain_in_progress`. + """ + with _drain_in_progress_guard(): + return _drain_active_live_captures_impl( + store, exclude_session_id=exclude_session_id, + ) + + +def _drain_active_live_captures_impl( + store: MemoryStore, + *, + exclude_session_id: str, ) -> dict[str, int]: deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures" state_dir = Path.home() / ".iai-mcp" / ".capture-state" diff --git a/src/iai_mcp/cli/_daemon.py b/src/iai_mcp/cli/_daemon.py index f9de4d6..609eca5 100644 --- a/src/iai_mcp/cli/_daemon.py +++ b/src/iai_mcp/cli/_daemon.py @@ -29,6 +29,22 @@ def _stop_escalation_bound() -> float: return STOP_TERM_TIMEOUT_S +def _signal_daemon_wake() -> None: + """Create the wake signal before a kickstart so the booting daemon WAKEs. + + Without it the daemon boots, re-reads its persisted HIBERNATION state and + hibernate-exits within a tick, closing the socket before it ever serves + recall. Best-effort: a failure here just falls back to the old behaviour. + """ + try: + from iai_mcp.wake_handler import WakeHandler + + root = os.environ.get("IAI_MCP_STORE") or os.path.expanduser("~/.iai-mcp") + WakeHandler(Path(root) / "wake.signal").signal_wake() + except Exception: # noqa: BLE001 -- never let the wake signal break daemon start + pass + + def _stop_poll_interval() -> float: raw = os.environ.get("IAI_DAEMON_STOP_POLL_S") if raw: @@ -158,6 +174,7 @@ def cmd_daemon_install(args: argparse.Namespace) -> int: f"{result.stderr.strip()}", file=sys.stderr, ) + _signal_daemon_wake() _cli.subprocess.run( ["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"], check=False, capture_output=True, @@ -255,6 +272,7 @@ def cmd_daemon_start(args: argparse.Namespace) -> int: ["launchctl", "bootstrap", f"gui/{uid}", str(target)], check=False, capture_output=True, ) + _signal_daemon_wake() _cli.subprocess.run( ["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"], check=False, capture_output=True, diff --git a/src/iai_mcp/core/_serializers.py b/src/iai_mcp/core/_serializers.py index 75b46ce..2ecbfa5 100644 --- a/src/iai_mcp/core/_serializers.py +++ b/src/iai_mcp/core/_serializers.py @@ -6,9 +6,18 @@ def _hit_to_json(h) -> dict: _vf = getattr(h, "valid_from", None) _vt = getattr(h, "valid_to", None) + # Clamp the *displayed* score to [0,1]. Multiplicative boosts (trigram*2, + # FTS*3, valence) can drive the internal score past 1.0; that raw value stays + # in `sort_score` for ordering, but the client must never see a "confidence" + # > 1 (or < 0). Ordering is unaffected: this only touches the serialized + # number, never the rank. + try: + _display_score = max(0.0, min(1.0, float(h.score))) + except (TypeError, ValueError): + _display_score = 0.0 return { "record_id": str(h.record_id), - "score": float(h.score), + "score": _display_score, "reason": h.reason, "literal_surface": h.literal_surface, "adjacent_suggestions": [str(x) for x in h.adjacent_suggestions], diff --git a/src/iai_mcp/daemon/__init__.py b/src/iai_mcp/daemon/__init__.py index 51a7cff..027c2b1 100644 --- a/src/iai_mcp/daemon/__init__.py +++ b/src/iai_mcp/daemon/__init__.py @@ -28,6 +28,7 @@ DAEMON_SLEEP_CYCLE_STALE, DAEMON_WATCHDOG_NEEDS_OPERATOR, DAEMON_WEDGE_KILL, + emit_best_effort, write_event, ) from iai_mcp.identity_audit import continuous_audit @@ -145,6 +146,52 @@ def _should_drain_on_drowsy_edge(prev, current) -> bool: return prev is _L.WAKE and current is _L.DROWSY +# Idle-countdown decisions. The lifecycle tick translates these into FSM events. +IDLE_DECISION_ACTIVE: str = "active" # real work in flight -> reset the countdown +IDLE_DECISION_SLEEP: str = "sleep" # idle past the sleep threshold -> IDLE_30MIN +IDLE_DECISION_DROWSY: str = "drowsy" # idle past the drowsy threshold -> IDLE_5MIN +IDLE_DECISION_HOLD: str = "hold" # within the drowsy window -> no transition + + +def _idle_countdown_decision( + *, + scanner_active: bool, + drain_in_progress: bool, + seconds_since_rpc: float, + idle_elapsed: float, + sleep_eligible: bool, + recent_rpc_window_sec: float, + drowsy_after_sec: float, + sleep_heartbeat_idle_sec: float, +) -> str: + """Decide what the lifecycle idle countdown should do this tick. + + The wrapper heartbeat (``scanner_active``) is only ONE signal that the + daemon is busy. Two others MUST also hold the daemon awake: + + * ``drain_in_progress`` -- a deferred-capture drain is writing to the store + right now. The wrappers dir can be empty (heartbeat stale) while a drain + kicked off by earlier RPC traffic is still hammering the store. + * ``seconds_since_rpc`` -- real JSON-RPC traffic arrived recently. A drain + runs inside its triggering request, so by the time a long drain finishes + ``last_activity_ts`` (set at request start) may already look stale; the + ``drain_in_progress`` signal covers that tail. + + When any of these holds we return ``ACTIVE`` so the caller resets the idle + countdown. Otherwise the daemon really is idle and we advance toward SLEEP + (which escalates to an EXCLUSIVE store lock) / DROWSY exactly as the + heartbeat-only logic used to, so a genuinely quiet daemon still settles and + crisis re-arming in SLEEP keeps running. + """ + if scanner_active or drain_in_progress or seconds_since_rpc < recent_rpc_window_sec: + return IDLE_DECISION_ACTIVE + if idle_elapsed >= sleep_heartbeat_idle_sec and sleep_eligible: + return IDLE_DECISION_SLEEP + if idle_elapsed >= drowsy_after_sec: + return IDLE_DECISION_DROWSY + return IDLE_DECISION_HOLD + + def _run_drowsy_drain(store, *, drain_fn, write_event_fn) -> None: try: result = drain_fn(store) @@ -228,8 +275,53 @@ def _store_is_empty(store: MemoryStore) -> bool: try: return store.db.open_table("records").count_rows() == 0 except (OSError, ValueError, KeyError, RuntimeError) as exc: - log.debug("store empty check failed, assuming empty: %s", exc) - return True + # Unknown != empty. A transient count failure (e.g. the shared sqlite + # connection left in an error state by a concurrent heavy reader, raising + # HippoIntegrityError/lock errors which subclass RuntimeError) must NOT be + # treated as an empty store: doing so parks the whole lifecycle tick + # (no idle-check, no drain) on a store that actually has records. Treat + # the unknown case as NOT empty so the tick proceeds; a truly empty store + # just does a little harmless no-op work. + log.debug("store empty check failed, assuming NOT empty: %s", exc) + # e8f3deb fixed the *behavior* (don't park the tick) but left the + # condition invisible. A recurring count failure (sqlite left in an error + # state by a heavy reader) should surface to the operator, not just + # log.debug. emit_best_effort is buffered and never raises, so it is safe + # even when the store connection is the thing failing. + try: + emit_best_effort( + store, + "store_empty_check_failed", + {"error": str(exc), "error_type": type(exc).__name__}, + severity="warning", + ) + except Exception: # noqa: BLE001 -- telemetry must never break the tick + pass + return False + + +def _normalize_boot_lifecycle_state(raw: dict) -> tuple[dict, bool]: + """Repair a crash-left incoherent lifecycle state at boot. + + A daemon killed mid-SLEEP can leave lifecycle_state.json at + current_state=SLEEP with sleep_cycle_progress=None -- incoherent, because a + real in-flight sleep cycle always carries a progress dict. Resuming it wedges + the daemon (it never advances the pipeline, never reaches the recluster that + clears crisis, and recall stays degraded). Reset that one case to a clean + WAKE and drop the stale crisis flag. A genuine degeneration re-arms crisis on + the next complete sleep cycle. Returns (state, changed). + """ + if ( + isinstance(raw, dict) + and raw.get("current_state") == "SLEEP" + and raw.get("sleep_cycle_progress") is None + ): + out = dict(raw) + out["current_state"] = "WAKE" + out["crisis_mode"] = False + out["crisis_mode_since_ts"] = None + return out, True + return raw, False def _is_inside_window( @@ -490,6 +582,10 @@ async def _tick_body( state["last_tick_at"] = datetime.now(timezone.utc).isoformat() + # Clear the skip reason: reaching here means the tick was NOT skipped. Leaving a + # stale "empty_store"/"paused" value here makes a healthy daemon look parked in + # observability (last_tick_skipped_reason is only ever set, never reset). + state["last_tick_skipped_reason"] = None try: await asyncio.to_thread(save_state, state) except (OSError, ValueError) as exc: @@ -932,11 +1028,13 @@ def _capture_handler(record: dict) -> None: async def _boot_preload() -> None: try: from iai_mcp import retrieve as _retrieve_preload - _g, _a, _rc = await asyncio.to_thread( - _retrieve_preload.build_runtime_graph, store, - ) + # build_runtime_graph already persists the cache internally + # (with the full node_payload) on a miss. The previous extra + # save(..., node_payload=None, ...) here overwrote that good + # cache with a payload-less one (forcing a pandas re-read on + # the next hit) — so we just warm the cache and drop it. await asyncio.to_thread( - _rgc_mod.save, store, _a, _rc, None, 0, + _retrieve_preload.build_runtime_graph, store, ) except Exception as _exc: # noqa: BLE001 -- preload MUST NOT crash daemon log.debug("boot_preload failed: %s", _exc, exc_info=True) @@ -1016,6 +1114,38 @@ async def _drain_and_report() -> None: _sleep_pipeline = _SleepPipeline(store=store) from pathlib import Path as _PathS2 + # Boot normalization for a crash mid-SLEEP: lifecycle_state.json can be + # left at current_state=SLEEP with sleep_cycle_progress=None -- an + # incoherent state (a real in-flight cycle always carries a progress + # dict). Resuming it wedges the daemon: it never advances the sleep + # pipeline, never reaches the recluster that clears crisis, and recall + # stays degraded (SLEEP + crisis both degrade recall). Reset that one + # case to a clean WAKE (and drop the stale crisis flag set before the + # crash) so the daemon serves immediately; a genuine degeneration will + # simply re-arm crisis on the next complete sleep cycle. + try: + import json as _json_lc + _lc_path = _PathS2.home() / ".iai-mcp" / "lifecycle_state.json" + _lc_raw = _json_lc.loads(_lc_path.read_text()) + _lc_norm, _lc_changed = _normalize_boot_lifecycle_state(_lc_raw) + if _lc_changed: + _lc_path.write_text(_json_lc.dumps(_lc_norm, indent=2)) + log.warning( + "lifecycle_boot_normalized: stale SLEEP without " + "sleep_cycle_progress -> WAKE (crisis cleared)" + ) + try: + emit_best_effort( + store, + "lifecycle_boot_normalized", + {"from_state": "SLEEP", "to_state": "WAKE", + "reason": "sleep_without_progress"}, + severity="warning", + ) + except Exception: # noqa: BLE001 -- telemetry must not block boot + pass + except (OSError, ValueError) as _lc_exc: + log.debug("lifecycle boot normalization skipped: %s", _lc_exc) _s2_config = _load_s2_config() _s2_coord = S2Coordinator( store=store, @@ -1149,6 +1279,34 @@ def _emit_expiry() -> None: now_mono = time.monotonic() idle_elapsed = now_mono - _last_active_monotonic[0] + # Beyond the wrapper heartbeat, real RPC traffic and an + # in-flight deferred-capture drain are activity too. Folding + # them into the idle countdown stops the FSM forcing SLEEP + # (-> EXCLUSIVE store lock) while drain threads are still + # hammering the store. See _idle_countdown_decision. + try: + from iai_mcp.capture import is_drain_in_progress as _drain_q + _drain_active = bool(await asyncio.to_thread(_drain_q)) + except Exception: # noqa: BLE001 -- idle accounting MUST NOT crash the tick + _drain_active = False + _seconds_since_rpc = ( + (now_mono - mcp_socket.last_activity_ts) + if mcp_socket is not None + else float("inf") + ) + _idle_decision = _idle_countdown_decision( + scanner_active=scanner_active, + drain_in_progress=_drain_active, + seconds_since_rpc=_seconds_since_rpc, + idle_elapsed=idle_elapsed, + sleep_eligible=sleep_eligible, + recent_rpc_window_sec=INTERRUPT_RECENT_ACTIVITY_WINDOW_SEC, + drowsy_after_sec=DROWSY_AFTER_SEC, + sleep_heartbeat_idle_sec=SLEEP_HEARTBEAT_IDLE_SEC, + ) + if _idle_decision == IDLE_DECISION_ACTIVE: + _last_active_monotonic[0] = now_mono + try: from iai_mcp.daemon_state import load_state as _load_ds _ds = await asyncio.to_thread(_load_ds) @@ -1197,7 +1355,10 @@ def _emit_expiry() -> None: pass if scanner_active: - _last_active_monotonic[0] = now_mono + # _last_active_monotonic was already refreshed above + # (scanner_active -> IDLE_DECISION_ACTIVE); a fresh + # wrapper heartbeat additionally pulls the FSM back to + # WAKE, which RPC/drain activity alone does not. try: await _state_machine.dispatch( _LifecycleEvent.HEARTBEAT_REFRESH, @@ -1205,7 +1366,7 @@ def _emit_expiry() -> None: ) except (S2OscillationConflict, S2OscillationBlocked): pass - elif idle_elapsed >= SLEEP_HEARTBEAT_IDLE_SEC and sleep_eligible: + elif _idle_decision == IDLE_DECISION_SLEEP: try: await _state_machine.dispatch( _LifecycleEvent.IDLE_30MIN, @@ -1214,7 +1375,7 @@ def _emit_expiry() -> None: ) except (S2OscillationConflict, S2OscillationBlocked): pass - elif idle_elapsed >= DROWSY_AFTER_SEC: + elif _idle_decision == IDLE_DECISION_DROWSY: try: await _state_machine.dispatch( _LifecycleEvent.IDLE_5MIN, @@ -1282,8 +1443,13 @@ def _run_wake_sequence(): _prev_lifecycle_state[0] = current if current is _LifecycleState.SLEEP: def _interrupt_check() -> bool: - if mcp_socket.active_connections > 0: - return True + # Defer the sleep pipeline only on RECENT ACTIVITY, not on + # open connections: long-lived Claude sessions keep sockets + # open permanently, so `active_connections > 0` was True at + # nearly every tick -> the cycle never completed -> no + # HIBERNATION -> the wake-hook re-ran every 30s (the 221% CPU + # churn). last_activity_ts is refreshed on each request, so a + # busy burst still defers; a 30s lull lets the cycle finish. elapsed = ( time.monotonic() - mcp_socket.last_activity_ts ) @@ -1520,6 +1686,11 @@ def _interrupt_check() -> bool: "_raise_fd_limit", "_run_drowsy_drain", "_should_drain_on_drowsy_edge", + "_idle_countdown_decision", + "IDLE_DECISION_ACTIVE", + "IDLE_DECISION_SLEEP", + "IDLE_DECISION_DROWSY", + "IDLE_DECISION_HOLD", "_kick_drowsy_rgc_rebuild", "_wake_hook_rebuild_if_cold", "_store_is_empty", diff --git a/src/iai_mcp/daemon/_watchdog.py b/src/iai_mcp/daemon/_watchdog.py index 33d7fc3..b40409f 100644 --- a/src/iai_mcp/daemon/_watchdog.py +++ b/src/iai_mcp/daemon/_watchdog.py @@ -545,7 +545,11 @@ def _check_sleep_cycle_staleness( if not isinstance(progress, dict): return (False, {}) attempt = progress.get("attempt") - if not isinstance(attempt, int) or attempt != 1: + # A retried-but-still-wedged cycle (attempt >= 2) is exactly the case the + # watchdog must catch, not ignore. Gate on attempt < 1 so any genuine + # running attempt is monitored; only attempt 0 / negative / non-int (and + # bool, since isinstance(True, int) is True) short-circuits. + if not isinstance(attempt, int) or isinstance(attempt, bool) or attempt < 1: return (False, {}) started_at_raw = progress.get("started_at") if not isinstance(started_at_raw, str) or not started_at_raw: diff --git a/src/iai_mcp/dmn_reflection.py b/src/iai_mcp/dmn_reflection.py index be14b88..85d7e1e 100644 --- a/src/iai_mcp/dmn_reflection.py +++ b/src/iai_mcp/dmn_reflection.py @@ -94,8 +94,22 @@ def synthesize(self, store, window_hours: int) -> MemoryRecord: "ts": now.isoformat(), } + # Embed the reflection's literal_surface so the record is retrievable by + # vector/cosine recall. Hardcoding a zero vector (the old behaviour) made + # every daily-reflection semantic record permanently invisible to recall + # and fed zero-norm vectors into the scoring matmul (divide-by-zero + # warnings). If the native embed fails, fall back to a zero vector flagged + # embedding_pending=1 so the daemon's reembed-pending path fills it later + # (never silently leave an unretrievable zero record). embed_dim = int(store.embed_dim) - embedding = [0.0] * embed_dim + embedding_pending = 0 + try: + from iai_mcp.embed import embedder_for_store + + embedding = list(embedder_for_store(store).embed(literal_surface)) + except Exception: # noqa: BLE001 -- degrade to deferred reembed, never zero-and-forget + embedding = [0.0] * embed_dim + embedding_pending = 1 return MemoryRecord( id=uuid4(), @@ -103,6 +117,7 @@ def synthesize(self, store, window_hours: int) -> MemoryRecord: literal_surface=literal_surface, aaak_index="", embedding=embedding, + embedding_pending=embedding_pending, community_id=None, centrality=0.5, detail_level=1, diff --git a/src/iai_mcp/hippo/_db.py b/src/iai_mcp/hippo/_db.py index a6e6218..7abfd0b 100644 --- a/src/iai_mcp/hippo/_db.py +++ b/src/iai_mcp/hippo/_db.py @@ -616,6 +616,15 @@ def reembed_pending_rows(self, embedder: Any) -> int: for row in rows: rid = row["id"] surface = row["literal_surface"] or "" + # On an encrypted store literal_surface is iai:enc:v1: ciphertext; embedding + # the ciphertext would produce a garbage vector. Decrypt first (no-op on a + # plaintext store or a value that isn't encrypted). A decrypt failure leaves + # the row embedding_pending=1 so it is retried rather than poisoned. + try: + surface = self._decrypt_record_field(rid, "literal_surface", surface) + except Exception as exc: # noqa: BLE001 + _log.warning("reembed_pending_rows: decrypt failed for id=%s: %s", rid, exc) + continue try: vec = list(embedder.embed(surface)) except Exception as exc: # noqa: BLE001 diff --git a/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py b/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py index d2516cc..e0e82e2 100644 --- a/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py +++ b/src/iai_mcp/lilli/cycle/sleep_pipeline/_crisis.py @@ -66,49 +66,19 @@ def step_crisis_recluster( if not dry_run: tbl = self._store.db.open_table(RECORDS_TABLE) - try: - df2 = tbl.search().to_pandas() - except (OSError, ValueError, RuntimeError, StoreError): - df2 = df - try: from iai_mcp.community import detect_communities - from iai_mcp.graph import MemoryGraph - from iai_mcp.store import EDGES_TABLE + from iai_mcp.lilli.cycle.sleep_pipeline._live_graph import ( + build_live_graph, + ) import uuid as _uuid - g = MemoryGraph() - for _, row in df2.iterrows(): - try: - rid = _uuid.UUID(str(row["id"])) - emb = row.get("embedding") - emb_list = ( - list(emb) if emb is not None else [] - ) - g.add_node(rid, None, emb_list) - except (ValueError, TypeError, AttributeError): - continue - - try: - edges_df = ( - self._store.db.open_table(EDGES_TABLE) - .search() - .to_pandas() - ) - for _, e in edges_df.iterrows(): - try: - src_u = _uuid.UUID(str(e["src"])) - dst_u = _uuid.UUID(str(e["dst"])) - g.add_edge( - src_u, dst_u, - weight=float( - e.get("weight", 1.0) or 1.0 - ), - ) - except (ValueError, TypeError, KeyError): - continue - except (OSError, ValueError, RuntimeError, StoreError) as exc: - logger.debug("crisis_recluster edges query failed: %s", exc) + # Recluster on the LIVE graph only (tombstone-filtered, + # live-only edges) -- mirrors retrieve.py 53f04f9. Previously + # this rebuilt communities over ALL records incl. 3000+ + # tombstoned, collapsing the partition (it reassigned ~9700 + # records into a single community on the real store). + g = build_live_graph(self._store) _assignment = detect_communities( g, prior=None, prior_mode="cold" diff --git a/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py b/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py index fa68875..3dca800 100644 --- a/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py +++ b/src/iai_mcp/lilli/cycle/sleep_pipeline/_essential_variable.py @@ -63,9 +63,12 @@ def run_essential_variable_tracker_hook(self) -> None: EssentialVariableTracker, TopologySnapshot, ) - from iai_mcp.graph import MemoryGraph from iai_mcp.events import write_event - from iai_mcp.store import RECORDS_TABLE, EDGES_TABLE + from iai_mcp.store import RECORDS_TABLE + from iai_mcp.lilli.cycle.sleep_pipeline._live_graph import ( + _is_tombstoned, + build_live_graph, + ) cfg = _load_sleep_overhaul_config() dry_run = cfg.dry_run @@ -82,50 +85,36 @@ def run_essential_variable_tracker_hook(self) -> None: return import uuid as _uuid - g = MemoryGraph() + # Build the graph on LIVE records + live-only edges only. Previously this + # hook constructed the graph from ALL records (tombstoned included) and ALL + # edges, so rich_club_coefficient was computed on a graph polluted by the + # 3000+ deduped/tombstoned nodes (and phantom nodes re-created by add_edge). + # That pushed rich_club below its floor and re-armed crisis on every sleep + # cycle even on a healthy store. build_live_graph mirrors retrieve.py's fix + # (53f04f9) so the crisis view matches recall's view of the graph. + g = build_live_graph(self._store) community_ids: set = set() _community_embeddings: dict[str, list[list[float]]] = {} for _, row in recs.iterrows(): + if _is_tombstoned(row.get("tombstoned_at")): + continue try: - rid = _uuid.UUID(str(row["id"])) emb = row.get("embedding") emb_list = list(emb) if emb is not None else [] cid_raw = row.get("community_id") - cid_uuid: _uuid.UUID | None if cid_raw is not None: try: - cid_uuid = _uuid.UUID(str(cid_raw)) - _cid_str = str(cid_uuid) + _cid_str = str(_uuid.UUID(str(cid_raw))) community_ids.add(_cid_str) if emb_list: _community_embeddings.setdefault( _cid_str, [] ).append(emb_list) except (ValueError, TypeError): - cid_uuid = None - else: - cid_uuid = None - g.add_node(rid, cid_uuid, emb_list) + pass except (ValueError, TypeError, AttributeError): continue - try: - edges_df = ( - self._store.db.open_table(EDGES_TABLE).search().to_pandas() - ) - for _, e in edges_df.iterrows(): - try: - src_u = _uuid.UUID(str(e["src"])) - dst_u = _uuid.UUID(str(e["dst"])) - g.add_edge( - src_u, dst_u, - weight=float(e.get("weight", 1.0) or 1.0), - ) - except (ValueError, TypeError, KeyError): - continue - except (OSError, ValueError, RuntimeError, StoreError) as exc: - logger.debug("essential_variable_tracker edges query failed: %s", exc) - total_nodes = g.node_count() if total_nodes == 0: return @@ -150,19 +139,28 @@ def run_essential_variable_tracker_hook(self) -> None: tracker = EssentialVariableTracker(cfg) breaches = tracker.check(snapshot) + # rich_club_ratio is kept as a DIAGNOSTIC only, not a crisis trigger. On the + # real corpus the live (tombstone-filtered) rich_club sits ~0.019, just under + # the 0.02 floor, so it false-positives on a demonstrably healthy graph and + # is non-discriminant at this scale (a true collapse instead shows up as + # edge_density/community_count). edge_density and community_count remain the + # crisis triggers (both healthy with clear margin). The rich_club breach is + # still recorded as an event for observability. + _CRISIS_TRIGGER_VARS = {"edge_density", "community_count"} crisis_mode_already_set_this_cycle = False for var_name, breach in breaches.items(): if breach is None: continue + is_trigger = var_name in _CRISIS_TRIGGER_VARS crisis_mode_set = False - if not dry_run and not crisis_mode_already_set_this_cycle: + if is_trigger and not dry_run and not crisis_mode_already_set_this_cycle: self._set_crisis_mode_via_s2_or_fallback( value=True, reason=f"essential_variable_breach:{var_name}", ) crisis_mode_already_set_this_cycle = True crisis_mode_set = True - elif not dry_run and crisis_mode_already_set_this_cycle: + elif is_trigger and not dry_run and crisis_mode_already_set_this_cycle: crisis_mode_set = True write_event( self._store, @@ -174,6 +172,7 @@ def run_essential_variable_tracker_hook(self) -> None: "direction": str(breach.direction), "total_nodes": int(total_nodes), "crisis_mode_set": bool(crisis_mode_set), + "is_crisis_trigger": bool(is_trigger), "dry_run_mode": bool(dry_run), }, severity="warning", diff --git a/src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py b/src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py new file mode 100644 index 0000000..31befbe --- /dev/null +++ b/src/iai_mcp/lilli/cycle/sleep_pipeline/_live_graph.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import logging +import uuid as _uuid + +import pandas as pd + +from iai_mcp.exceptions import StoreError +from iai_mcp.graph import MemoryGraph +from iai_mcp.store import EDGES_TABLE, RECORDS_TABLE + +logger = logging.getLogger(__name__) + + +def _is_tombstoned(value) -> bool: + """True if a records.tombstoned_at value marks a (soft-)deleted record. + + NULL is the LIVE case and pandas materialises it differently per dtype: + None (object column), float('nan'), pd.NaT (a datetime64 column -- the shape + the reembed Arrow schema produces) or pd.NA (nullable extension dtypes). + pd.isna() covers all of them, so only a real, non-empty timestamp string + marks a tombstone. Mirrors retrieve.py::_build_runtime_graph_impl so the + crisis hooks compute topology on exactly the same live node set as recall. + """ + if value is None: + return False + try: + if pd.isna(value): + return False + except (TypeError, ValueError): + pass + return bool(str(value).strip()) + + +def build_live_graph(store) -> MemoryGraph: + """Build a MemoryGraph of LIVE records + live-only edges. + + Excludes tombstoned and embedding-pending records (matching + store.active_records_count()), and drops any edge whose endpoint is not a + live node -- graph.add_edge() does setdefault() on both endpoints, so an edge + to a tombstoned record would re-create it as a payload-less node and re-bloat + the graph. That pollution is exactly what drove rich_club below its floor and + re-armed crisis on every sleep cycle. This mirrors the fix in retrieve.py + (53f04f9) so the crisis hooks no longer diverge from recall's view of the + graph. On a store error it returns an empty graph (callers already guard on + node_count == 0). + """ + g = MemoryGraph() + try: + recs = store.db.open_table(RECORDS_TABLE).search().to_pandas() + except (OSError, ValueError, RuntimeError, StoreError) as exc: + logger.debug("build_live_graph records query failed: %s", exc) + return g + if recs.empty: + return g + + for _, row in recs.iterrows(): + try: + if int(row.get("embedding_pending") or 0) != 0: + continue + if _is_tombstoned(row.get("tombstoned_at")): + continue + rid = _uuid.UUID(str(row["id"])) + cid_raw = row.get("community_id") + cid_uuid = None + if cid_raw is not None and not _is_tombstoned(cid_raw) and str(cid_raw).strip(): + try: + cid_uuid = _uuid.UUID(str(cid_raw)) + except (ValueError, TypeError): + cid_uuid = None + emb = row.get("embedding") + emb_list = list(emb) if emb is not None else [] + g.add_node(rid, cid_uuid, emb_list) + except (ValueError, TypeError, AttributeError): + continue + + try: + edges_df = store.db.open_table(EDGES_TABLE).search().to_pandas() + except (OSError, ValueError, RuntimeError, StoreError) as exc: + logger.debug("build_live_graph edges query failed: %s", exc) + return g + for _, e in edges_df.iterrows(): + try: + src_s, dst_s = e["src"], e["dst"] + # Both endpoints must already be live nodes; add_edge() setdefault + # would otherwise resurrect a tombstoned endpoint as a phantom node. + if not g.has_node(src_s) or not g.has_node(dst_s): + continue + g.add_edge( + _uuid.UUID(str(src_s)), + _uuid.UUID(str(dst_s)), + weight=float(e.get("weight", 1.0) or 1.0), + ) + except (ValueError, TypeError, KeyError): + continue + return g diff --git a/src/iai_mcp/pipeline.py b/src/iai_mcp/pipeline.py index 7d403f3..cbaf185 100644 --- a/src/iai_mcp/pipeline.py +++ b/src/iai_mcp/pipeline.py @@ -844,6 +844,10 @@ def _recall_core( MemoryHit( record_id=cid, score=float(s), + # Keep the raw, unclamped score as the internal ranking key so + # post-rank re-sorts preserve engine order even when the + # displayed score is later clamped to [0,1] at serialization. + sort_score=float(s), reason=reason, literal_surface=rec.literal_surface, adjacent_suggestions=suggestions, @@ -1140,7 +1144,18 @@ def recall_for_response( ) apply_stale_downweight(core.scored_hits, cue_intent=_cue_intent) apply_stale_downweight(core.anti_hits, cue_intent=_cue_intent) - core.scored_hits.sort(key=lambda h: h.score, reverse=True) + # Rank on the internal unclamped key (falls back to score when a hit was + # built without sort_score), so ordering is preserved across the display + # clamp applied at serialization. Tie-break on record_id so equal-scoring + # hits resolve deterministically — two code paths that produce the same + # score via different summation orders (e.g. empty profile_state falling + # back to the medium scale) must yield byte-identical orderings. + core.scored_hits.sort( + key=lambda h: ( + -(h.sort_score if h.sort_score is not None else h.score), + str(h.record_id), + ), + ) if ( len(core.scored_hits) == 1 diff --git a/src/iai_mcp/retrieve.py b/src/iai_mcp/retrieve.py index d5bce19..0fd3724 100644 --- a/src/iai_mcp/retrieve.py +++ b/src/iai_mcp/retrieve.py @@ -1,11 +1,14 @@ from __future__ import annotations import logging +import threading import time from datetime import datetime, timedelta, timezone from itertools import combinations from uuid import UUID, uuid4 +import pandas as pd + from iai_mcp.aaak import enforce_english_raw, generate_aaak_index from iai_mcp.events import query_events, write_event from iai_mcp.store import MemoryStore, flush_record_buffer @@ -33,6 +36,32 @@ _STALE_REASON_SUFFIX: str = " · stale" +# --- build_runtime_graph single-flight (WAKE CPU-storm fix) ----------------- +# At daemon WAKE several background subsystems (boot preload, sigma identity +# audit, foraging weak-bridge detection, hippea cascade warming) each call +# build_runtime_graph concurrently. On a cache MISS each one independently runs +# the full O(n^2) community detection (mosaic), GIL-bound, in its own to_thread +# worker. Three+ of those at once contend for the GIL, starve the asyncio event +# loop, and the liveness watchdog's socket probe times out -> SIGKILL -> relaunch +# loop. This single-flight gate collapses the concurrent burst into ONE compute: +# the first caller (leader) computes and saves the on-disk cache; concurrent +# callers (followers) wait on its Event and then re-load the freshly-saved cache +# via the cheap path. No mutable graph object is shared between callers (each +# rebuilds its own MemoryGraph shell + sync hook), and recall is independent of +# the community assignment, so a slightly-stale shared result is harmless. +# +# Followers RE-CONTEND in a bounded loop rather than recomputing unconditionally: +# if the leader fails before saving (e.g. detect_communities raises), or the cache +# key shifts mid-burst, or the leader overruns the wait timeout, the woken +# followers loop back, and exactly ONE of them becomes the next leader while the +# rest wait again. That degrades those edge cases to *sequential* single-flight +# (one compute at a time) instead of an N-way concurrent re-storm. +_BRG_INFLIGHT_LOCK = threading.Lock() +_BRG_INFLIGHT: dict[str, threading.Event] = {} +_BRG_WAIT_TIMEOUT_SEC: float = 120.0 +_BRG_MAX_ATTEMPTS: int = 4 + + def recall( store: MemoryStore, cue_embedding: list[float], @@ -100,7 +129,12 @@ def recall( derive_temporal_validity(store, anti_hits) apply_stale_downweight(hits) apply_stale_downweight(anti_hits) - hits.sort(key=lambda h: h.score, reverse=True) + # Rank on the internal unclamped key (falls back to score), so ordering + # survives the display clamp applied at serialization. + hits.sort( + key=lambda h: (h.sort_score if h.sort_score is not None else h.score), + reverse=True, + ) try: from iai_mcp.s4 import on_read_check @@ -357,6 +391,10 @@ def apply_stale_downweight( continue if not getattr(hit, "_stale_downweighted", False): hit.score *= STALE_DOWNWEIGHT_FACTOR + # Keep the internal ranking key in lock-step with the displayed + # score so stale hits demote in the actual ordering too. + if getattr(hit, "sort_score", None) is not None: + hit.sort_score *= STALE_DOWNWEIGHT_FACTOR hit._stale_downweighted = True if not hit.reason.endswith(_STALE_REASON_SUFFIX): hit.reason = f"{hit.reason}{_STALE_REASON_SUFFIX}" @@ -450,6 +488,63 @@ def _hook(op: str, record) -> None: def build_runtime_graph(store: MemoryStore): + """Single-flight wrapper around the real graph build. + + On a cache HIT this is cheap and runs directly. On a cache MISS it + serialises concurrent callers so the expensive community detection runs + exactly once per cache generation: the leader computes + saves the cache, + followers wait and then reload it cheaply. See the _BRG_* notes above. + """ + from iai_mcp import runtime_graph_cache as _rgc + + cached = None + for _attempt in range(_BRG_MAX_ATTEMPTS): + try: + cached = _rgc.try_load(store) + except Exception: # noqa: BLE001 -- never let cache I/O break the build + cached = None + + # Cache HIT: no contention risk, run directly (the impl reloads cheaply). + if cached is not None and cached[0] is not None: + return _build_runtime_graph_impl(store, cached) + + # Cache MISS: single-flight on the cache key so a WAKE burst of callers + # does not all recompute the full mosaic concurrently. + try: + keystr = repr(_rgc._cache_key(store)) + except Exception: # noqa: BLE001 -- if we can't key it, just compute + return _build_runtime_graph_impl(store, cached) + + with _BRG_INFLIGHT_LOCK: + event = _BRG_INFLIGHT.get(keystr) + is_leader = event is None + if is_leader: + event = threading.Event() + _BRG_INFLIGHT[keystr] = event + + if is_leader: + # Leader: compute (the impl saves the cache), then release followers. + try: + return _build_runtime_graph_impl(store, cached) + finally: + with _BRG_INFLIGHT_LOCK: + # Only drop our own slot (a key shift could have replaced it). + if _BRG_INFLIGHT.get(keystr) is event: + _BRG_INFLIGHT.pop(keystr, None) + event.set() + + # Follower: wait for the leader, then loop. Next iteration's try_load + # HITS if the leader saved; if the leader failed / the key shifted / + # the wait timed out, we re-contend and one follower becomes the next + # leader (sequential single-flight — never an N-way concurrent re-storm). + event.wait(timeout=_BRG_WAIT_TIMEOUT_SEC) + + # Attempts exhausted (e.g. the leader keeps failing): compute directly as a + # last resort. Bounded, and still correct. + return _build_runtime_graph_impl(store, cached) + + +def _build_runtime_graph_impl(store: MemoryStore, cached): from iai_mcp.community import detect_communities from iai_mcp.graph import MemoryGraph from iai_mcp.richclub import rich_club_nodes @@ -457,7 +552,6 @@ def build_runtime_graph(store: MemoryStore): graph = MemoryGraph() - cached = runtime_graph_cache.try_load(store) assignment = None rich_club = None cached_node_payload: dict[str, dict] | None = None @@ -472,6 +566,19 @@ def build_runtime_graph(store: MemoryStore): and len(cached_node_payload) == records_count ) + if cached_node_payload is not None and len(cached_node_payload) > records_count: + # The cache holds MORE nodes than are live: records were tombstoned + # (dedup/erasure) since it was built, so the cached assignment/rich_club + # were computed over now-dead nodes -- drop them and recompute on the + # fresh live graph (rebuilt from the records table below, which already + # excludes tombstoned rows). Pure GROWTH (cache has FEWER nodes than live) + # must NOT drop them: the node set is still rebuilt fresh from the table + # (so new records are present and drift/parity stay correct), but + # detect_communities is not re-run, so a single insert is absorbed without + # an O(n^2) recompute -- the staleness-window contract. + assignment = None + rich_club = None + if use_cached_payload: for nid, payload in cached_node_payload.items(): graph.add_node( @@ -497,6 +604,25 @@ def build_runtime_graph(store: MemoryStore): for _, row in df.iterrows(): if int(row.get("embedding_pending") or 0) != 0: continue + # Exclude tombstoned (soft-deleted / deduped / erased) records from the + # runtime graph: they must not pollute communities, centrality, rich_club + # or the sigma topology audit, and including them keeps the node count out + # of sync with store.active_records_count() (the cache-validity anchor at + # line ~552), permanently invalidating the cache -> a full rebuild every + # wake. Matches active_records_count(): tombstoned_at IS NULL. + # + # NULL is the LIVE case and pandas materialises it differently per + # dtype: None (object column), float('nan'), pd.NaT (a datetime64 + # column -- the shape the reembed Arrow schema produces), or pd.NA + # (nullable extension dtypes). The old guard only caught float-NaN + # (`isinstance(float) and v != v`); a NaT/NA live value slipped past + # it, stringified to "NaT"/"", read truthy, and the LIVE record + # was wrongly dropped -- collapsing the whole graph to empty on a + # reembedded store. pd.isna() covers None/NaN/NaT/NA uniformly, so + # only a real timestamp string survives to mark a tombstone. + _tomb = row.get("tombstoned_at") + if _tomb is not None and not pd.isna(_tomb) and str(_tomb).strip(): + continue rid = UUID(row["id"]) _comm_raw = row["community_id"] if _comm_raw is not None and not isinstance(_comm_raw, str): @@ -579,9 +705,16 @@ def build_runtime_graph(store: MemoryStore): edges_df = store.db.open_table("edges").to_pandas() for _, row in edges_df.iterrows(): + # Skip edges whose endpoints are not live nodes: graph.add_edge() does + # setdefault() on both endpoints, so an edge referencing a tombstoned record + # would re-create it as a payload-less node and undo the tombstone filter + # above (re-bloating the graph and the sigma audit). + src_s, dst_s = row["src"], row["dst"] + if not graph.has_node(src_s) or not graph.has_node(dst_s): + continue graph.add_edge( - UUID(row["src"]), - UUID(row["dst"]), + UUID(src_s), + UUID(dst_s), weight=float(row["weight"]), edge_type=row["edge_type"], ) diff --git a/src/iai_mcp/runtime_graph_cache.py b/src/iai_mcp/runtime_graph_cache.py index 8c40fa3..add150a 100644 --- a/src/iai_mcp/runtime_graph_cache.py +++ b/src/iai_mcp/runtime_graph_cache.py @@ -29,7 +29,16 @@ CACHE_VERSION: str = "62-02-v5" -_STALENESS_WINDOW: int = 10 +# Cache-key staleness bucket. The key buckets on records//WINDOW and +# edges//WINDOW; try_load requires an exact key match. With WINDOW=10 a normal +# day of capture (+~150 records, +~1300 edges) crossed ~130 buckets, so the +# on-disk graph cache MISSED on essentially every WAKE and the full community +# detection (mosaic) was recomputed each time — the WAKE CPU storm. Edges churn +# fastest, so they are the binding term. WINDOW=250 keeps the cache valid across +# a normal day so most WAKEs are cheap cache HITs; the independent age/dirty fuse +# in consult_overlay (25h / dirty>50) remains the real freshness backstop, and +# build_runtime_graph's single-flight gate makes the rare genuine miss harmless. +_STALENESS_WINDOW: int = 250 LEGACY_CACHE_VERSION_PLAINTEXT: str = "06-02-v1" _CACHE_AAD: bytes = b"runtime-graph-cache:v3" diff --git a/src/iai_mcp/sigma.py b/src/iai_mcp/sigma.py index 2b53c28..f868607 100644 --- a/src/iai_mcp/sigma.py +++ b/src/iai_mcp/sigma.py @@ -2,6 +2,7 @@ import logging import math +import os from datetime import datetime, timezone from typing import Optional, TYPE_CHECKING @@ -19,6 +20,29 @@ SIGMA_N_FLOOR: int = 200 +# Defensive upper bound on graph size for the sigma (small-worldness) audit. +# fast_sigma() runs average_clustering + all-pairs-shortest-path on the largest +# component AND on n_random reference graphs of the same order; that cost is +# roughly O(n_random * (V*E + V^2)) and is unbounded as the live graph grows. +# Above the cap we return None: the regime degrades cleanly to "insufficient_data" +# (sigma None is already handled everywhere) and the tick stays bounded -- the +# watchdog does NOT kill on CPU, so an unbounded sigma compute would spin a core +# uncontained. The default is far above the current live store (~4k nodes) so it +# never trips today; both bounds are env-overridable. +SIGMA_N_CEIL: int = 20000 + + +def _env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None: + return default + try: + val = int(raw) + except (TypeError, ValueError): + return default + return val if val > 0 else default + + SIGMA_MID_LIFE_THRESHOLD: int = 500 SIGMA_OBSERVATION_KIND: str = "sigma_observation" @@ -194,7 +218,19 @@ def fast_sigma( def compute_sigma(graph: "MemoryGraph", *, seed: int = 42) -> Optional[float]: - if graph.node_count() < SIGMA_N_FLOOR: + n = graph.node_count() + floor = _env_int("IAI_MCP_SIGMA_N_FLOOR", SIGMA_N_FLOOR) + ceil = _env_int("IAI_MCP_SIGMA_N_CEIL", SIGMA_N_CEIL) + if n < floor: + return None + if n > ceil: + # Above the defensive cap: skip the (unbounded) small-worldness compute + # so a single tick can never spin a core for minutes on a pathologically + # large graph. Reported downstream as "insufficient_data". + logger.warning( + "sigma_skipped_above_ceiling", + extra={"node_count": int(n), "ceil": int(ceil)}, + ) return None sigma_val, *_ = fast_sigma(graph, seed=seed) if isinstance(sigma_val, float) and math.isnan(sigma_val): diff --git a/src/iai_mcp/socket_server.py b/src/iai_mcp/socket_server.py index 31e72ae..2d21108 100644 --- a/src/iai_mcp/socket_server.py +++ b/src/iai_mcp/socket_server.py @@ -88,7 +88,14 @@ async def handle( line = await reader.readline() if not line: break - self.last_activity_ts = time.monotonic() + # NB: last_activity_ts is intentionally NOT updated for every + # inbound line. It must track REAL memory traffic (recall/capture) + # only — never control-plane messages, in particular the watchdog's + # periodic {"type":"status"} liveness probe (every 7-30s). Counting + # those probes as activity kept _interrupt_check (daemon) perpetually + # True, so the sleep cycle never completed and never hibernated -> + # the 221% CPU churn. It is set below, only for dispatched JSON-RPC + # method calls. req_id: Any = None try: req = json.loads(line) @@ -143,6 +150,11 @@ async def handle( continue method = req["method"] params = req.get("params") or {} + # Real memory traffic: mark activity only for dispatched JSON-RPC + # method calls so background consolidation defers while the user is + # actively recalling/capturing. Control/status probes never reach + # here, so they no longer keep the daemon awake. + self.last_activity_ts = time.monotonic() try: from iai_mcp.core import dispatch result = await asyncio.to_thread( diff --git a/src/iai_mcp/types.py b/src/iai_mcp/types.py index af4188f..3c2c7c5 100644 --- a/src/iai_mcp/types.py +++ b/src/iai_mcp/types.py @@ -131,6 +131,11 @@ class MemoryHit: valid_to: datetime | None = None session_id: str | None = None captured_at: str | None = None + # Internal ranking key, unclamped. `score` is the *displayed* value (clamped + # to [0,1] at serialization); `sort_score` preserves the raw engine ordering + # after multiplicative boosts (trigram*2, FTS*3, valence) push it past 1.0. + # When None, callers fall back to `score` (backward compatible). + sort_score: float | None = None @dataclass diff --git a/src/iai_mcp/wake_handler.py b/src/iai_mcp/wake_handler.py index 0395f04..2ff4695 100644 --- a/src/iai_mcp/wake_handler.py +++ b/src/iai_mcp/wake_handler.py @@ -20,6 +20,24 @@ def consume_wake_signal(self) -> bool: return False return True + def signal_wake(self) -> bool: + """Create the wake signal so the next daemon boot enters WAKE. + + Symmetric counterpart to ``consume_wake_signal``. Whoever brings the + daemon up from a hibernated (process-exited) state — the CLI + start/install path, or the per-turn capture hook — must create this + signal *before* the kickstart, so the booting daemon transitions + HIBERNATION -> WAKE and serves its socket, instead of re-reading the + persisted HIBERNATION state and immediately hibernate-exiting (which + closes the socket and leaves recall unserved). Idempotent. + """ + try: + self._wake_signal_path.parent.mkdir(parents=True, exist_ok=True) + self._wake_signal_path.touch() + except OSError: + return False + return True + def has_pending_wake(self) -> bool: try: return self._wake_signal_path.is_file() diff --git a/tests/test_build_runtime_graph_single_flight.py b/tests/test_build_runtime_graph_single_flight.py new file mode 100644 index 0000000..b7869da --- /dev/null +++ b/tests/test_build_runtime_graph_single_flight.py @@ -0,0 +1,131 @@ +"""Regression test for the build_runtime_graph WAKE CPU-storm fix. + +At daemon WAKE several background subsystems (boot preload, sigma identity +audit, foraging, hippea cascade) call retrieve.build_runtime_graph concurrently. +Before the fix, each one independently ran the full (GIL-bound) community +detection on a cache miss, so 3+ concurrent runs contended for the GIL, starved +the asyncio event loop, and the liveness watchdog SIGKILLed the daemon. + +The single-flight gate must collapse a concurrent burst into ONE compute (the +leader), with the others reusing the freshly-saved cache — while never sharing a +mutable MemoryGraph object between callers. +""" +from __future__ import annotations + +import threading +import time +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +import iai_mcp.community as community +import iai_mcp.retrieve as retrieve +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _vec(seed: int) -> list[float]: + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return (v / np.linalg.norm(v)).tolist() + + +def _rec(seed: int) -> MemoryRecord: + now = datetime.now(timezone.utc) + return MemoryRecord( + id=uuid4(), tier="episodic", literal_surface="rec", aaak_index="", + embedding=_vec(seed), community_id=None, centrality=0.0, detail_level=2, + pinned=False, stability=0.0, difficulty=0.0, last_reviewed=None, + never_decay=False, never_merge=False, provenance=[], created_at=now, + updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def test_concurrent_build_runtime_graph_runs_detect_once(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + store.insert(_rec(i)) + + n_threads = 4 + calls: list[int] = [] + calls_lock = threading.Lock() + start = threading.Barrier(n_threads) + + orig_detect = community.detect_communities + + def slow_detect(*args, **kwargs): + with calls_lock: + calls.append(1) + # Hold long enough that the other callers pile up behind the leader. + time.sleep(0.6) + return orig_detect(*args, **kwargs) + + # build_runtime_graph_impl does `from iai_mcp.community import detect_communities` + # at call time, so patching the module attribute is seen. + monkeypatch.setattr(community, "detect_communities", slow_detect) + + results: list[tuple] = [] + results_lock = threading.Lock() + + def worker(): + start.wait() + graph, assignment, _rc = retrieve.build_runtime_graph(store) + with results_lock: + results.append((assignment, id(graph))) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=60) + + assert len(results) == n_threads, "some build_runtime_graph callers hung" + # Single-flight: the expensive community detection ran exactly once even + # though all four callers raced into a cache miss simultaneously. + assert len(calls) == 1, ( + f"expected detect_communities to run once (single-flight); " + f"ran {len(calls)} times" + ) + # Every caller got a valid assignment... + assert all(a is not None for a, _ in results) + # ...and its OWN MemoryGraph object — the mutable graph (which receives the + # store sync hook) must never be shared between concurrent callers. + graph_ids = {gid for _, gid in results} + assert len(graph_ids) == n_threads, ( + f"callers shared a MemoryGraph object ({len(graph_ids)} distinct of " + f"{n_threads}); the single-flight must memoise only immutable products" + ) + + +def test_build_runtime_graph_hit_path_skips_detect(tmp_path, monkeypatch): + """A warm cache must short-circuit without running community detection.""" + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + store.insert(_rec(i)) + + # Prime the cache. + retrieve.build_runtime_graph(store) + + calls: list[int] = [] + orig_detect = community.detect_communities + + def counting_detect(*args, **kwargs): + calls.append(1) + return orig_detect(*args, **kwargs) + + monkeypatch.setattr(community, "detect_communities", counting_detect) + + # Second call on the same generation: cache HIT, no recompute. + graph, assignment, _rc = retrieve.build_runtime_graph(store) + assert assignment is not None + assert calls == [], "warm-cache build_runtime_graph must not recompute communities" diff --git a/tests/test_daemon_boot_normalize.py b/tests/test_daemon_boot_normalize.py new file mode 100644 index 0000000..a445f28 --- /dev/null +++ b/tests/test_daemon_boot_normalize.py @@ -0,0 +1,52 @@ +"""Boot recovery: a crash mid-SLEEP leaves lifecycle_state.json incoherent +(current_state=SLEEP, sleep_cycle_progress=None). The daemon must reset that one +case to a clean WAKE at boot (and drop the stale crisis flag) instead of resuming +a sleep cycle that never progresses. Guards _normalize_boot_lifecycle_state. +""" +from __future__ import annotations + +from iai_mcp.daemon import _normalize_boot_lifecycle_state + + +def test_stale_sleep_without_progress_is_reset_to_wake(): + raw = { + "current_state": "SLEEP", + "sleep_cycle_progress": None, + "crisis_mode": True, + "crisis_mode_since_ts": "2026-06-21T11:20:25+00:00", + "since_ts": "2026-06-21T10:47:08+00:00", + } + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is True + assert out["current_state"] == "WAKE" + assert out["crisis_mode"] is False + assert out["crisis_mode_since_ts"] is None + # other fields preserved + assert out["since_ts"] == raw["since_ts"] + # input not mutated + assert raw["current_state"] == "SLEEP" + + +def test_sleep_with_active_progress_is_left_untouched(): + # A real in-flight cycle carries a progress dict -> must NOT be reset. + raw = { + "current_state": "SLEEP", + "sleep_cycle_progress": {"step": "CLUSTER_SUMMARY", "attempt": 1}, + "crisis_mode": False, + } + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is False + assert out is raw + + +def test_wake_state_is_left_untouched(): + raw = {"current_state": "WAKE", "sleep_cycle_progress": None, "crisis_mode": True} + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is False + assert out is raw + + +def test_drowsy_state_is_left_untouched(): + raw = {"current_state": "DROWSY", "sleep_cycle_progress": None} + out, changed = _normalize_boot_lifecycle_state(raw) + assert changed is False diff --git a/tests/test_daemon_idle_countdown_drain_aware.py b/tests/test_daemon_idle_countdown_drain_aware.py new file mode 100644 index 0000000..f02599d --- /dev/null +++ b/tests/test_daemon_idle_countdown_drain_aware.py @@ -0,0 +1,222 @@ +"""The lifecycle idle countdown must watch real work, not only the heartbeat. + +Regression coverage for the bug where the FSM forced itself to SLEEP after the +idle timer expired even though the daemon was still draining a continuously-fed +backlog: the only refresh of ``_last_active_monotonic`` was gated on the Node +wrapper heartbeat file, which can be stale (empty wrappers dir) while a drain +kicked off by earlier RPC traffic is still hammering the store. Advancing to +SLEEP there escalates to an EXCLUSIVE store lock mid-drain -> lock contention. + +These tests pin the two added signals -- an in-flight drain and recent RPC +activity -- and the regression guard that a genuinely idle daemon still sleeps +(so crisis re-arming, which only runs in SLEEP, keeps working). +""" + +from __future__ import annotations + +import platform +import threading + +import pytest + + +pytestmark = pytest.mark.skipif( + platform.system() == "Windows", + reason="daemon module is POSIX-only on this project", +) + + +# Inputs that, on their own, would push the daemon all the way to SLEEP: well +# past the sleep threshold, stale RPC, no wrapper heartbeat, sleep-eligible. +# Each test flips exactly one signal to prove it holds the countdown open. +_SLEEPY = dict( + scanner_active=False, + seconds_since_rpc=10_000.0, + idle_elapsed=10_000.0, + sleep_eligible=True, + recent_rpc_window_sec=30.0, + drowsy_after_sec=300.0, + sleep_heartbeat_idle_sec=1800.0, +) + + +def test_drain_in_progress_blocks_advance_toward_sleep(): + from iai_mcp.daemon import ( + IDLE_DECISION_ACTIVE, + IDLE_DECISION_SLEEP, + _idle_countdown_decision, + ) + + decision = _idle_countdown_decision(drain_in_progress=True, **_SLEEPY) + + assert decision != IDLE_DECISION_SLEEP, ( + "idle countdown advanced toward SLEEP while a drain was in progress" + ) + assert decision == IDLE_DECISION_ACTIVE + + +def test_recent_rpc_blocks_advance_toward_sleep(): + from iai_mcp.daemon import IDLE_DECISION_ACTIVE, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["seconds_since_rpc"] = 5.0 # within recent_rpc_window_sec (30s) + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_ACTIVE + + +def test_truly_idle_still_advances_to_sleep(): + # Regression guard: with NO activity of any kind, a quiet daemon must still + # reach SLEEP, otherwise crisis re-arming (SLEEP-only) never runs again. + from iai_mcp.daemon import IDLE_DECISION_SLEEP, _idle_countdown_decision + + decision = _idle_countdown_decision(drain_in_progress=False, **_SLEEPY) + + assert decision == IDLE_DECISION_SLEEP + + +def test_idle_past_drowsy_but_not_sleep_eligible_goes_drowsy_only(): + from iai_mcp.daemon import IDLE_DECISION_DROWSY, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["idle_elapsed"] = 600.0 # past drowsy_after_sec (300) ... + inputs["sleep_eligible"] = False # ... but not sleep-eligible yet + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_DROWSY + + +def test_within_drowsy_window_holds(): + from iai_mcp.daemon import IDLE_DECISION_HOLD, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["idle_elapsed"] = 60.0 # under drowsy_after_sec (300) + inputs["sleep_eligible"] = False + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_HOLD + + +def test_scanner_active_is_active(): + from iai_mcp.daemon import IDLE_DECISION_ACTIVE, _idle_countdown_decision + + inputs = dict(_SLEEPY) + inputs["scanner_active"] = True + + decision = _idle_countdown_decision(drain_in_progress=False, **inputs) + + assert decision == IDLE_DECISION_ACTIVE + + +# --- capture wiring: the production drain path actually flips the flag -------- + + +def test_drain_deferred_marks_in_progress_and_holds_off_sleep(monkeypatch): + from iai_mcp import capture + from iai_mcp.daemon import ( + IDLE_DECISION_ACTIVE, + IDLE_DECISION_SLEEP, + _idle_countdown_decision, + ) + + seen: dict[str, object] = {} + + def spy_impl(store): + # Mid-drain the flag must be set, and the idle countdown -- fed the flag + # -- must refuse to advance toward SLEEP on otherwise-sleepy inputs. + seen["in_progress"] = capture.is_drain_in_progress() + seen["decision"] = _idle_countdown_decision( + drain_in_progress=capture.is_drain_in_progress(), **_SLEEPY, + ) + return {"files_drained": 0, "files_failed": 0} + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", spy_impl) + + assert capture.is_drain_in_progress() is False + capture.drain_deferred_captures(store=None) + + assert seen["in_progress"] is True + assert seen["decision"] == IDLE_DECISION_ACTIVE + assert seen["decision"] != IDLE_DECISION_SLEEP + # Released once the drain returns: a quiet daemon can sleep again. + assert capture.is_drain_in_progress() is False + + +def test_drain_active_live_marks_in_progress(monkeypatch): + from iai_mcp import capture + + seen: dict[str, object] = {} + + def spy_impl(store, *, exclude_session_id): + seen["in_progress"] = capture.is_drain_in_progress() + seen["sid"] = exclude_session_id + return {"files_drained": 0} + + monkeypatch.setattr(capture, "_drain_active_live_captures_impl", spy_impl) + + capture.drain_active_live_captures(store=None, exclude_session_id="sess-1") + + assert seen["in_progress"] is True + assert seen["sid"] == "sess-1" + assert capture.is_drain_in_progress() is False + + +def test_guard_releases_on_exception(monkeypatch): + from iai_mcp import capture + + def boom(store): + raise RuntimeError("drain blew up") + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", boom) + + assert capture.is_drain_in_progress() is False + with pytest.raises(RuntimeError): + capture.drain_deferred_captures(store=None) + # A failed drain must not leak the in-progress count, else the daemon would + # never sleep again. + assert capture.is_drain_in_progress() is False + + +def test_overlapping_drains_tracked_by_depth(): + # The counter (not a boolean) keeps overlapping drains across threads + # correct: the flag stays True until the LAST one releases. + from iai_mcp import capture + + assert capture.is_drain_in_progress() is False + with capture._drain_in_progress_guard(): + assert capture.is_drain_in_progress() is True + with capture._drain_in_progress_guard(): + assert capture.is_drain_in_progress() is True + assert capture.is_drain_in_progress() is True # outer still holds + assert capture.is_drain_in_progress() is False + + +def test_is_drain_in_progress_true_while_other_thread_drains(monkeypatch): + from iai_mcp import capture + + entered = threading.Event() + release = threading.Event() + + def slow_impl(store): + entered.set() + release.wait(timeout=5.0) + return {"files_drained": 0, "files_failed": 0} + + monkeypatch.setattr(capture, "_drain_deferred_captures_impl", slow_impl) + + worker = threading.Thread( + target=lambda: capture.drain_deferred_captures(store=None), + ) + worker.start() + try: + assert entered.wait(timeout=5.0) + # Observed from a DIFFERENT thread than the one draining. + assert capture.is_drain_in_progress() is True + finally: + release.set() + worker.join(timeout=5.0) + + assert capture.is_drain_in_progress() is False diff --git a/tests/test_daemon_tick_flags.py b/tests/test_daemon_tick_flags.py index 8f1e31d..70bdcf8 100644 --- a/tests/test_daemon_tick_flags.py +++ b/tests/test_daemon_tick_flags.py @@ -157,3 +157,23 @@ def test_tick_updates_last_tick_at(tick_env, monkeypatch): assert "last_tick_at" in state datetime.fromisoformat(state["last_tick_at"]) + + +def test_successful_tick_resets_stale_skip_reason(tick_env, monkeypatch): + # 2cffb35 regression: a stale last_tick_skipped_reason ("empty_store"/"paused") + # must be cleared once a tick actually runs (store non-empty, not paused), + # otherwise observability shows a healthy daemon as permanently parked. tick_env + # seeds one record -> store is non-empty. WITHOUT the reset line the field stays + # "empty_store" and this test fails. + from iai_mcp import daemon as daemon_mod + from iai_mcp.daemon_state import load_state + + store, state_path, tmp_path = tick_env + monkeypatch.setattr(daemon_mod, "should_relearn", lambda last, now: False) + + state = {"fsm_state": "WAKE", "last_tick_skipped_reason": "empty_store"} + asyncio.run(daemon_mod._tick_body(store, state)) + + assert state.get("last_tick_skipped_reason") is None + loaded = load_state() + assert loaded.get("last_tick_skipped_reason") is None diff --git a/tests/test_daemon_watchdog_sleep_stale.py b/tests/test_daemon_watchdog_sleep_stale.py index 0bd6196..ab944bb 100644 --- a/tests/test_daemon_watchdog_sleep_stale.py +++ b/tests/test_daemon_watchdog_sleep_stale.py @@ -99,13 +99,24 @@ def test_sleep_just_over_threshold_is_stale(self): assert ctx["attempt"] == 1 assert ctx["crisis_mode"] is False - def test_attempt_gt_1_is_not_stale(self): - # attempt > 1 means the cycle has retried — different failure shape, - # outside the scope of this predicate. + def test_attempt_gt_1_still_stale(self): + # A retried-but-still-wedged cycle (attempt >= 2) is exactly the case the + # watchdog must catch: a retry that itself hangs for hours. The gate is + # `attempt < 1`, so attempt 2 (10 days stuck) MUST be flagged stale. state = _state( LifecycleState.SLEEP.value, _progress(NOW - timedelta(days=10), attempt=2), ) + is_stale, ctx = daemon._check_sleep_cycle_staleness(state, NOW) + assert is_stale is True + assert ctx["attempt"] == 2 + + def test_attempt_zero_is_not_stale(self): + # attempt 0 (or negative / non-int) is not a genuine running attempt. + state = _state( + LifecycleState.SLEEP.value, + _progress(NOW - timedelta(days=10), attempt=0), + ) assert daemon._check_sleep_cycle_staleness(state, NOW)[0] is False def test_live_wedge_shape_matches_stale(self): diff --git a/tests/test_dmn_meta.py b/tests/test_dmn_meta.py index 8d2dbda..bde938f 100644 --- a/tests/test_dmn_meta.py +++ b/tests/test_dmn_meta.py @@ -142,10 +142,23 @@ def test_reflection_synthesize_returns_semantic_record( assert isinstance(prov.get("recalled_count"), int) assert len(synth.embedding) == store._embed_dim - assert all(v == 0.0 for v in synth.embedding), ( - "synthesised record must carry the zero-vector placeholder; " - "next REM consolidation re-embeds" - ) + # The reflection record must carry a REAL embedding of its own + # literal_surface, not a zero placeholder. The old zero-vector placeholder + # relied on "next REM consolidation re-embeds", but nothing re-embedded an + # embedding_pending=0 record, so every daily-reflection record stayed + # permanently zero-norm and invisible to vector recall (and fed zero vectors + # into the scoring matmul). dmn_reflection now embeds at write time; on embed + # failure it falls back to a zero vector flagged embedding_pending=1. + import math as _math + _norm = _math.sqrt(sum(v * v for v in synth.embedding)) + if synth.embedding_pending: + # Degraded fallback (embedder unavailable): zero + flagged for reembed. + assert all(v == 0.0 for v in synth.embedding) + else: + assert _norm > 0.5, ( + f"reflection embedding must be a real (non-zero) vector; " + f"got norm={_norm}" + ) def test_meta_analyst_snapshot_counts_correct(tmp_path: Path) -> None: store = _make_store(tmp_path) diff --git a/tests/test_graph_excludes_tombstoned.py b/tests/test_graph_excludes_tombstoned.py new file mode 100644 index 0000000..9dd6134 --- /dev/null +++ b/tests/test_graph_excludes_tombstoned.py @@ -0,0 +1,102 @@ +"""Regression tests for excluding tombstoned records from the runtime graph. + +build_runtime_graph used to add every record (and every edge) to the graph, +including soft-deleted / deduped / erased records (tombstoned_at IS NOT NULL). +That (a) polluted communities / centrality / rich_club / the sigma topology +audit with dead nodes, and (b) desynced the node count from +store.active_records_count() -- the cache-validity anchor -- so the payload +cache was permanently invalid and every wake did a full rebuild. + +These tests pin: tombstoned records (and edges touching them) are excluded, the +live node count equals active_records_count(), and assignment/rich_club are +recomputed on the fresh live graph rather than reused from a stale-node cache. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +import iai_mcp.retrieve as retrieve +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _vec(seed: int) -> list[float]: + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return (v / np.linalg.norm(v)).tolist() + + +def _rec(seed: int): + now = datetime.now(timezone.utc) + rid = uuid4() + return rid, MemoryRecord( + id=rid, tier="episodic", literal_surface=f"rec-{seed}", aaak_index="", + embedding=_vec(seed), community_id=None, centrality=0.0, detail_level=2, + pinned=False, stability=0.0, difficulty=0.0, last_reviewed=None, + never_decay=False, never_merge=False, provenance=[], created_at=now, + updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def _tombstone(store: MemoryStore, rid) -> None: + now = datetime.now(timezone.utc).isoformat() + with store.db._conn_lock: + store.db._conn.execute( + "UPDATE records SET tombstoned_at = ? WHERE id = ?", + (now, str(rid)), + ) + + +def test_build_runtime_graph_excludes_tombstoned(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live_ids = [] + for i in range(6): + rid, rec = _rec(i) + store.insert(rec) + live_ids.append(rid) + dead_ids = [] + for i in range(6, 10): + rid, rec = _rec(i) + store.insert(rec) + _tombstone(store, rid) + dead_ids.append(rid) + + assert store.active_records_count() == 6 + + graph, assignment, rich_club = retrieve.build_runtime_graph(store) + + nodes = {str(n) for n in graph.nodes()} + assert len(nodes) == 6, f"expected 6 live nodes, got {len(nodes)}" + for rid in dead_ids: + assert str(rid) not in nodes, f"tombstoned {rid} leaked into the graph" + for rid in live_ids: + assert str(rid) in nodes + # rich_club is a fraction of the LIVE graph, never references dead nodes + assert all(str(r) not in {str(d) for d in dead_ids} for r in (rich_club or [])) + + +def test_node_count_matches_active_records_count(tmp_path, monkeypatch): + """The cache-validity anchor: graph node count must equal active count, so the + payload cache validates on the next build instead of rebuilding forever.""" + store = _make_store(tmp_path, monkeypatch) + for i in range(8): + _rid, rec = _rec(i) + store.insert(rec) + rid, rec = _rec(99) + store.insert(rec) + _tombstone(store, rid) + + graph, _assignment, _rc = retrieve.build_runtime_graph(store) + assert len({str(n) for n in graph.nodes()}) == store.active_records_count() == 8 diff --git a/tests/test_graph_tombstone_hardening.py b/tests/test_graph_tombstone_hardening.py new file mode 100644 index 0000000..e86b5ad --- /dev/null +++ b/tests/test_graph_tombstone_hardening.py @@ -0,0 +1,118 @@ +"""Regression guards for the 2026-06-21 tombstone-handling remediation: + +- the runtime-graph node filter runs on the cache-MISS pandas path (cache neutralised); +- a LIVE record on a datetime64/NaT tombstoned_at column is NOT dropped (pd.isna guard); +- an edge live->tombstoned does not resurrect the dead endpoint (has_node guard); +- build_live_graph (crisis hooks) excludes tombstoned, matching active_records_count(). + +Each test fails if its corresponding fix is reverted. +""" +from __future__ import annotations + +import pandas as pd +import pytest + +import iai_mcp.retrieve as retrieve +import iai_mcp.runtime_graph_cache as runtime_graph_cache +from iai_mcp.store._buffers import flush_edge_buffer, flush_record_buffer + +from tests.test_graph_excludes_tombstoned import _make_store, _rec, _tombstone + + +@pytest.fixture +def _no_graph_cache(monkeypatch): + """Force build_runtime_graph to MISS the payload cache so the live pandas + node/edge-skip loop is always exercised (never a cheap cache reload).""" + monkeypatch.setattr(runtime_graph_cache, "try_load", lambda *_a, **_k: None) + + +def test_node_skip_runs_on_cache_miss(_no_graph_cache, tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + live.append(rid) + dead_id, dead = _rec(77) + store.insert(dead) + _tombstone(store, dead_id) + + assert store.active_records_count() == 5 + graph, _a, _rc = retrieve.build_runtime_graph(store) + nodes = {str(n) for n in graph.nodes()} + assert nodes == {str(r) for r in live} + assert str(dead_id) not in nodes + + +def test_live_record_survives_datetime64_nat_column(_no_graph_cache, tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + live = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + live.append(rid) + flush_record_buffer(store) + assert store.active_records_count() == 5 + + records_tbl = store.db.open_table("records") + table_cls = type(records_tbl) + original_to_pandas = table_cls.to_pandas + + def _coerce(self, *a, **k): + df = original_to_pandas(self, *a, **k) + if "tombstoned_at" in df.columns: + df = df.copy() + df["tombstoned_at"] = pd.to_datetime(df["tombstoned_at"], utc=True) + return df + + monkeypatch.setattr(table_cls, "to_pandas", _coerce) + coerced = store.db.open_table("records").to_pandas() + assert str(coerced["tombstoned_at"].dtype).startswith("datetime64") + assert all(pd.isna(v) for v in coerced["tombstoned_at"]) + + graph, _a, _rc = retrieve.build_runtime_graph(store) + nodes = {str(n) for n in graph.nodes()} + assert nodes == {str(r) for r in live}, ( + "live records dropped on a datetime64/NaT tombstoned_at column" + ) + + +def test_edge_to_tombstoned_dst_is_skipped(_no_graph_cache, tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + ids = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + ids.append(rid) + flush_record_buffer(store) + + src, dst = ids[0], ids[4] + store.boost_edges([(src, dst)], delta=1.0, edge_type="hebbian") + flush_edge_buffer(store) + _tombstone(store, dst) + assert store.active_records_count() == 4 + + graph, _a, _rc = retrieve.build_runtime_graph(store) + nodes = {str(n) for n in graph.nodes()} + assert str(dst) not in nodes, "tombstoned edge endpoint leaked back as a node" + assert len(nodes) == 4 + + +def test_build_live_graph_excludes_tombstoned(tmp_path, monkeypatch): + from iai_mcp.lilli.cycle.sleep_pipeline._live_graph import build_live_graph + + store = _make_store(tmp_path, monkeypatch) + live = [] + for i in range(5): + rid, rec = _rec(i) + store.insert(rec) + live.append(rid) + dead_id, dead = _rec(88) + store.insert(dead) + _tombstone(store, dead_id) + flush_record_buffer(store) + + g = build_live_graph(store) + nodes = {str(n) for n in g.nodes()} + assert str(dead_id) not in nodes + assert g.node_count() == store.active_records_count() == 5 diff --git a/tests/test_recall_score_clamp.py b/tests/test_recall_score_clamp.py new file mode 100644 index 0000000..06d4fa7 --- /dev/null +++ b/tests/test_recall_score_clamp.py @@ -0,0 +1,86 @@ +"""Falsifiable tests for BUG (4a): displayed recall score must be clamped to +[0,1] WITHOUT changing the relative ordering of hits. + +The recall pipeline applies *multiplicative* boosts (trigram*2, FTS*3, +valence*(1+v)) to the base similarity score (pipeline.py:786-789). With no +clamp the serialized `score` can exceed 1.0 -- a leaky internal quantity +surfaced as if it were a probability/confidence. The contract under test: + + 1. _hit_to_json(hit)["score"] is always within [0.0, 1.0]. + 2. The order of hits sorted by their *internal* sort key is preserved even + after the displayed score is clamped (two equally-clamped 1.0 hits must + keep their pre-clamp ranking). + +These tests are hermetic: they build MemoryHit objects directly, no store, +no daemon, no embedder. + +RUN: + cd && .venv/bin/python -m pytest /tmp/iai_fix_recall/test_recall_score_clamp.py -v +""" +from __future__ import annotations + +from iai_mcp.types import MemoryHit +from iai_mcp.core._serializers import _hit_to_json + + +def _hit(score: float, sort_score: float | None = None, rid_int: int = 1) -> MemoryHit: + from uuid import UUID + return MemoryHit( + record_id=UUID(int=rid_int), + score=score, + sort_score=sort_score, + reason="t", + literal_surface="x", + adjacent_suggestions=[], + ) + + +def test_displayed_score_is_clamped_to_unit_interval(): + # A boosted hit (trigram*2 then FTS*3 => 6x a 0.4 cosine = 2.4) must NOT + # leak a >1 score to the client. + h = _hit(score=2.4, sort_score=2.4) + out = _hit_to_json(h) + assert 0.0 <= out["score"] <= 1.0, f"score leaked out of [0,1]: {out['score']}" + + +def test_negative_score_clamped_to_zero(): + h = _hit(score=-0.3, sort_score=-0.3) + out = _hit_to_json(h) + assert out["score"] == 0.0 + + +def test_clamp_preserves_internal_ordering(): + # Two hits both boost past 1.0 (2.4 and 1.8). Displayed scores collapse to + # 1.0/1.0, but the internal sort_score must still distinguish them so the + # ranking the engine computed is preserved. + strong = _hit(score=2.4, sort_score=2.4, rid_int=1) + weaker = _hit(score=1.8, sort_score=1.8, rid_int=2) + mid = _hit(score=0.5, sort_score=0.5, rid_int=3) + + hits = [mid, weaker, strong] + # Sort the way the pipeline does post-rank: by the internal key. + hits.sort(key=lambda h: (h.sort_score if h.sort_score is not None else h.score), + reverse=True) + + order = [h.record_id.int for h in hits] + assert order == [1, 2, 3], f"internal ordering not preserved: {order}" + + # And after serialization the two boosted ones are both clamped but still + # appear in the engine's order (list order, not score value). + serialized = [_hit_to_json(h) for h in hits] + assert serialized[0]["record_id"].endswith("0001") + assert serialized[1]["record_id"].endswith("0002") + assert all(0.0 <= s["score"] <= 1.0 for s in serialized) + # Top two collapsed to the ceiling -- proves ordering can't rely on the + # displayed score alone, which is exactly why sort_score must exist. + assert serialized[0]["score"] == 1.0 + assert serialized[1]["score"] == 1.0 + + +def test_sort_score_falls_back_to_score_when_absent(): + # Backward compat: a hit built the old way (no sort_score) still sorts and + # serializes sanely. + h = _hit(score=0.42, sort_score=None) + key = h.sort_score if h.sort_score is not None else h.score + assert key == 0.42 + assert _hit_to_json(h)["score"] == 0.42 diff --git a/tests/test_reembed_pending_crypto.py b/tests/test_reembed_pending_crypto.py new file mode 100644 index 0000000..2495a8c --- /dev/null +++ b/tests/test_reembed_pending_crypto.py @@ -0,0 +1,159 @@ +"""Regression: reembed_pending_rows must embed the PLAINTEXT literal_surface, +not the iai:enc:v1: ciphertext, on an encrypted store. + +Pre-existing bug (found 2026-06-21): HippoDB.reembed_pending_rows read +literal_surface straight from SQLite and fed it to embedder.embed() without +decrypting. On an encrypted deployment that meant every embedding_pending=1 row +re-embedded by this path got an embedding of the ciphertext = garbage vector. +""" +from __future__ import annotations + +import hashlib +import os +import sqlite3 +import struct +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np +import pytest + +from iai_mcp.hippo import HippoDB +from iai_mcp.types import EMBED_DIM + + +def _vec_from_text(text: str) -> list[float]: + """Deterministic, text-sensitive unit vector. embed(plaintext) and + embed(ciphertext) therefore differ, which is what makes the test meaningful.""" + seed = int.from_bytes(hashlib.sha256(text.encode("utf-8")).digest()[:8], "little") + rng = np.random.default_rng(seed) + v = rng.standard_normal(EMBED_DIM).astype(np.float32) + v /= np.linalg.norm(v) + 1e-10 + return v.tolist() + + +class _RecordingEmbedder: + def __init__(self) -> None: + self.seen: list[str] = [] + + def embed(self, text: str) -> list[float]: + self.seen.append(text) + return _vec_from_text(text) + + +def _brain_path(tmp_path: Path) -> Path: + return tmp_path / "hippo" / "brain.sqlite3" + + +def _insert_pending(db: HippoDB, rid: str, surface: str) -> None: + now = datetime.now(timezone.utc).isoformat() + db.insert_pending_row( + record_id=rid, + tier="episodic", + literal_surface=surface, + tags_json="[]", + provenance_json="{}", + created_at=now, + updated_at=now, + ) + + +def _read_row(db_path: Path, rid: str) -> tuple[list[float], int]: + conn = sqlite3.connect(str(db_path)) + try: + row = conn.execute( + "SELECT embedding, embedding_pending FROM records WHERE id = ?", (rid,) + ).fetchone() + finally: + conn.close() + assert row is not None + blob, pending = row[0], row[1] + n = len(blob) // 4 + return list(struct.unpack(f"<{n}f", blob)), int(pending) + + +def _set_literal_surface(db_path: Path, rid: str, value: str) -> None: + conn = sqlite3.connect(str(db_path)) + try: + conn.execute("UPDATE records SET literal_surface = ? WHERE id = ?", (value, rid)) + conn.commit() + finally: + conn.close() + + +def test_reembed_pending_decrypts_before_embedding(tmp_path: Path) -> None: + key = os.urandom(32) + db = HippoDB(tmp_path, crypto_key_provider=lambda: key) + try: + plaintext = "a secret pending memory worth embedding" + rid = str(uuid4()) + _insert_pending(db, rid, plaintext) + + # Simulate encrypted-at-rest: replace the stored surface with ciphertext + # carrying the canonical AAD (uuid.lower()). + ciphertext = db._encrypt_for_uuid(rid, plaintext) + assert ciphertext.startswith("iai:enc:v1:") + _set_literal_surface(_brain_path(tmp_path), rid, ciphertext) + + emb = _RecordingEmbedder() + n = db.reembed_pending_rows(emb) + + assert n == 1 + # The embedder was handed PLAINTEXT, never the ciphertext. + assert emb.seen == [plaintext] + assert not emb.seen[0].startswith("iai:enc:v1:") + + stored_vec, pending = _read_row(_brain_path(tmp_path), rid) + assert pending == 0 + # Stored vector matches embed(plaintext)... + assert stored_vec == pytest.approx(_vec_from_text(plaintext), abs=1e-6) + # ...and is NOT embed(ciphertext) (the old, buggy result). + assert stored_vec != pytest.approx(_vec_from_text(ciphertext), abs=1e-6) + finally: + db.close() + + +def test_reembed_pending_plaintext_store_still_works(tmp_path: Path) -> None: + """No crypto provider: decrypt is a no-op and the path behaves as before.""" + db = HippoDB(tmp_path) + try: + plaintext = "a plain pending memory" + rid = str(uuid4()) + _insert_pending(db, rid, plaintext) + + emb = _RecordingEmbedder() + n = db.reembed_pending_rows(emb) + + assert n == 1 + assert emb.seen == [plaintext] + stored_vec, pending = _read_row(_brain_path(tmp_path), rid) + assert pending == 0 + assert stored_vec == pytest.approx(_vec_from_text(plaintext), abs=1e-6) + finally: + db.close() + + +def test_reembed_pending_undecryptable_row_stays_pending(tmp_path: Path) -> None: + """A row whose ciphertext can't be decrypted (wrong AAD) must not be embedded + with garbage and must remain embedding_pending=1 for a later retry.""" + key = os.urandom(32) + db = HippoDB(tmp_path, crypto_key_provider=lambda: key) + try: + plaintext = "undecryptable pending memory" + rid = str(uuid4()) + _insert_pending(db, rid, plaintext) + + # Encrypt under a DIFFERENT record id's AAD so decrypt fails for `rid`. + wrong_ciphertext = db._encrypt_for_uuid(str(uuid4()), plaintext) + _set_literal_surface(_brain_path(tmp_path), rid, wrong_ciphertext) + + emb = _RecordingEmbedder() + n = db.reembed_pending_rows(emb) + + assert n == 0 + assert emb.seen == [] # never reached embed() + _, pending = _read_row(_brain_path(tmp_path), rid) + assert pending == 1 # left for retry, not poisoned + finally: + db.close() diff --git a/tests/test_sigma.py b/tests/test_sigma.py index 67a5ca5..0f77133 100644 --- a/tests/test_sigma.py +++ b/tests/test_sigma.py @@ -101,3 +101,36 @@ def test_sigma_module_does_not_call_nx_sigma(): assert needle not in text, ( f"sigma.py must NOT call {needle} -- use fast_sigma" ) + + +def test_sigma_ceiling_constant_above_floor(): + # Defensive cap added in the 2026-06-21 remediation: bound the unbounded + # small-worldness compute on a pathologically large graph. + from iai_mcp import sigma + + assert isinstance(sigma.SIGMA_N_CEIL, int) + assert sigma.SIGMA_N_CEIL > sigma.SIGMA_N_FLOOR + + +def test_compute_sigma_returns_none_above_ceiling(monkeypatch): + # Above SIGMA_N_CEIL, compute_sigma skips the compute and returns None + # (regime -> insufficient_data). WITHOUT the cap this 300-node small-world + # graph yields a finite float, so the test fails -> the guard is load-bearing. + from iai_mcp import sigma + from iai_mcp.sigma import classify_regime, compute_sigma + + monkeypatch.setattr(sigma, "SIGMA_N_CEIL", 100) + g = nx.connected_watts_strogatz_graph(300, 6, 0.1, seed=42) + mg = _nx_graph_to_memory_graph(g) + assert mg.node_count() == 300 > sigma.SIGMA_N_CEIL + assert compute_sigma(mg) is None + assert classify_regime(mg.node_count(), compute_sigma(mg)) == "insufficient_data" + + +def test_compute_sigma_ceiling_env_override(monkeypatch): + from iai_mcp.sigma import compute_sigma + + monkeypatch.setenv("IAI_MCP_SIGMA_N_CEIL", "100") + g = nx.connected_watts_strogatz_graph(300, 6, 0.1, seed=42) + mg = _nx_graph_to_memory_graph(g) + assert compute_sigma(mg) is None diff --git a/tests/test_socket_activity_tracking.py b/tests/test_socket_activity_tracking.py new file mode 100644 index 0000000..9bde5d3 --- /dev/null +++ b/tests/test_socket_activity_tracking.py @@ -0,0 +1,134 @@ +"""Regression: the socket server must track *real* memory traffic only. + +`SocketServer.last_activity_ts` feeds the daemon's `_interrupt_check`: the sleep +/ consolidation pipeline defers whenever activity is recent (< 30s). The watchdog +probes daemon liveness with a `{"type": "status"}` control message every 7-30s +(`daemon/_watchdog.py::_probe_status_roundtrip`). When *every* inbound line — +including that probe — refreshed `last_activity_ts`, `_interrupt_check` was +perpetually True, so the cycle never completed, the daemon never hibernated, and +the wake-hook re-ran every tick (a ~200% CPU churn on any long-lived deployment). + +The fix: refresh `last_activity_ts` only for dispatched JSON-RPC method calls +(recall/capture/etc.), never for control-plane messages. These tests lock that in. +""" +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path + +import pytest + + +@pytest.fixture +def short_socket_paths(tmp_path, monkeypatch): + from iai_mcp import concurrency, daemon_state + + sock_dir = Path(f"/tmp/iai-srvact-{os.getpid()}-{id(tmp_path)}") + sock_dir.mkdir(parents=True, exist_ok=True) + sock_path = sock_dir / "d.sock" + state_path = tmp_path / ".daemon-state.json" + + monkeypatch.setattr(concurrency, "SOCKET_PATH", sock_path) + monkeypatch.setattr(daemon_state, "STATE_PATH", state_path) + store_root = tmp_path / "store_root" + store_root.mkdir(parents=True, exist_ok=True) + monkeypatch.setenv("IAI_MCP_STORE", str(store_root)) + + try: + yield sock_path + finally: + try: + if sock_path.exists(): + sock_path.unlink() + except OSError: + pass + try: + sock_dir.rmdir() + except OSError: + pass + + +async def _send_line(sock_path: Path, payload: dict, *, timeout: float = 10.0) -> dict: + reader, writer = await asyncio.wait_for( + asyncio.open_unix_connection(path=str(sock_path)), timeout=timeout, + ) + try: + writer.write((json.dumps(payload) + "\n").encode("utf-8")) + await writer.drain() + line = await asyncio.wait_for(reader.readline(), timeout=timeout) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: # noqa: BLE001 + pass + if not line: + raise AssertionError(f"daemon closed without reply (payload={payload})") + return json.loads(line.decode("utf-8")) + + +async def _serve(sock_path: Path, store, coro_fn): + from iai_mcp.socket_server import SocketServer + + srv = SocketServer(store, idle_secs=99999) + server_task = asyncio.create_task(srv.serve(socket_path=sock_path)) + for _ in range(250): + if sock_path.exists(): + break + await asyncio.sleep(0.01) + if not sock_path.exists(): + srv.shutdown_event.set() + raise AssertionError("socket never bound") + try: + return await coro_fn(srv) + finally: + srv.shutdown_event.set() + try: + await asyncio.wait_for(server_task, timeout=5) + except Exception: # noqa: BLE001 + pass + + +def test_status_probe_does_not_refresh_last_activity(short_socket_paths): + sock_path = short_socket_paths + from iai_mcp.store import MemoryStore + + store = MemoryStore() + + async def _runner(srv): + # Sentinel baseline so any refresh is detectable. + srv.last_activity_ts = 0.0 + resp = await _send_line(sock_path, {"type": "status"}) + # The control message is answered (round-trip), proving it was received... + assert resp is not None, resp + # ...yet it must NOT have been counted as memory activity: the watchdog's + # periodic probe would otherwise keep the daemon awake forever. + assert srv.last_activity_ts == 0.0, ( + "status liveness probe wrongly refreshed last_activity_ts" + ) + + asyncio.run(_serve(sock_path, store, _runner)) + + +def test_jsonrpc_method_refreshes_last_activity(short_socket_paths): + sock_path = short_socket_paths + from iai_mcp.store import MemoryStore + + store = MemoryStore() + + async def _runner(srv): + srv.last_activity_ts = 0.0 + resp = await _send_line( + sock_path, + {"jsonrpc": "2.0", "id": 1, "method": "session_start_payload", "params": {}}, + ) + assert "result" in resp, resp + # Real recall/capture traffic MUST refresh the activity clock so the sleep + # pipeline correctly defers while the user is actively using memory. + assert srv.last_activity_ts > 0.0, ( + "dispatched JSON-RPC method did not refresh last_activity_ts" + ) + + asyncio.run(_serve(sock_path, store, _runner)) diff --git a/tests/test_store_is_empty.py b/tests/test_store_is_empty.py new file mode 100644 index 0000000..c7528c3 --- /dev/null +++ b/tests/test_store_is_empty.py @@ -0,0 +1,64 @@ +"""Regression test for _store_is_empty: a count failure must NOT be read as empty. + +The daemon tick calls _store_is_empty() and, if true, skips the whole tick +(no idle-check, no drain). A transient count failure -- e.g. the shared sqlite +connection left in an error state by a concurrent heavy reader, raising +HippoIntegrityError/lock errors (all subclass RuntimeError) -- used to be caught +and return True, parking the lifecycle on a store that actually has records. +The fix returns False (unknown != empty) so the tick proceeds. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from uuid import uuid4 + +import numpy as np + +from iai_mcp.daemon import _store_is_empty +from iai_mcp.store import MemoryStore +from iai_mcp.types import EMBED_DIM, MemoryRecord + + +def _rec(seed: int) -> MemoryRecord: + now = datetime.now(timezone.utc) + rng = np.random.default_rng(seed) + v = rng.random(EMBED_DIM).astype(np.float32) + return MemoryRecord( + id=uuid4(), tier="episodic", literal_surface="rec", aaak_index="", + embedding=(v / np.linalg.norm(v)).tolist(), community_id=None, + centrality=0.0, detail_level=2, pinned=False, stability=0.0, + difficulty=0.0, last_reviewed=None, never_decay=False, never_merge=False, + provenance=[], created_at=now, updated_at=now, tags=[], language="en", + ) + + +def _make_store(tmp_path: Path, monkeypatch) -> MemoryStore: + root = tmp_path / "store" + monkeypatch.setenv("IAI_MCP_STORE", str(root)) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("IAI_DAEMON_SOCKET_PATH", str(tmp_path / "daemon.sock")) + return MemoryStore(path=root) + + +def test_empty_store_is_empty(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + assert _store_is_empty(store) is True + + +def test_nonempty_store_is_not_empty(tmp_path, monkeypatch): + store = _make_store(tmp_path, monkeypatch) + store.insert(_rec(1)) + assert _store_is_empty(store) is False + + +def test_count_failure_is_not_treated_as_empty(tmp_path, monkeypatch): + """The core fix: a RuntimeError during the count must yield False, not True.""" + store = _make_store(tmp_path, monkeypatch) + store.insert(_rec(2)) + + def boom(*_a, **_k): + raise RuntimeError("connection in error state") + + monkeypatch.setattr(store.db, "open_table", boom) + assert _store_is_empty(store) is False