Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cad047a
fix(daemon): hibernate when idle — ignore open connections and watchd…
Marsu6996 Jun 20, 2026
0b1fac2
fix(daemon): single-flight build_runtime_graph to end the WAKE recomp…
Marsu6996 Jun 21, 2026
f049840
fix(graph-cache): widen staleness window so capture keeps the cache warm
Marsu6996 Jun 21, 2026
16b4ed4
fix(daemon): drop boot-preload double-save that nulled the cache payload
Marsu6996 Jun 21, 2026
d1678f0
fix(daemon): signal wake before kickstart so the daemon serves on wake
Marsu6996 Jun 21, 2026
093b3a6
fix(graph): exclude tombstoned records from the runtime graph
Marsu6996 Jun 21, 2026
5d14f43
fix(daemon): a store count failure must not be read as empty
Marsu6996 Jun 21, 2026
73e8e44
fix(daemon): reset last_tick_skipped_reason on a successful tick
Marsu6996 Jun 21, 2026
364ea59
fix(daemon): keep idle countdown awake during active drains and RPC
Marsu6996 Jun 21, 2026
d03e024
test(daemon): guard tick-flag observability and the skip-reason reset
Marsu6996 Jun 21, 2026
e39a901
fix(capture): stop re-embedding seen turns and close the dedup race
Marsu6996 Jun 21, 2026
ef9ee31
fix(recall): clamp displayed score to [0,1], rank on unclamped sort_s…
Marsu6996 Jun 21, 2026
5bd3f47
test(recall): guard displayed score clamp to [0,1] with order preserved
Marsu6996 Jun 21, 2026
b355138
fix(consolidation): build crisis topology on the live graph; demote r…
Marsu6996 Jun 21, 2026
92d6737
fix(dmn): embed the reflection surface text instead of a zero placeho…
Marsu6996 Jun 21, 2026
56532af
fix(sigma): bound the small-worldness compute with a node-count ceiling
Marsu6996 Jun 21, 2026
7285233
fix(watchdog): monitor a retried-but-wedged sleep cycle for staleness
Marsu6996 Jun 21, 2026
1a77018
feat(daemon): surface a repeated store-empty count failure as telemetry
Marsu6996 Jun 21, 2026
d74c8c2
fix(hippo): decrypt literal_surface before reembedding pending rows
Marsu6996 Jun 21, 2026
642a8e8
fix(daemon): recover cleanly from a crash mid-SLEEP at boot
Marsu6996 Jun 21, 2026
6c92442
fix(pipeline): deterministic tie-break on final hit ranking
Marsu6996 Jun 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 113 additions & 23 deletions src/iai_mcp/capture.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from __future__ import annotations

import contextlib
import hashlib
import json
import logging
Expand Down Expand Up @@ -41,6 +42,46 @@
# two threads before either has inserted it.
_CAPTURE_DEDUP_LOCK = threading.Lock()

# Drain-in-progress accounting for the daemon's lifecycle idle countdown.
#
# The wrapper heartbeat file is only ONE signal that the daemon is busy. A
# deferred-capture drain (triggered by real RPC traffic, kicked off at the
# drowsy edge, or run on startup) can keep writing to the store long after the
# heartbeat file has gone stale. If the idle countdown only watched the
# heartbeat, it would force the FSM toward SLEEP -- which escalates to an
# EXCLUSIVE store lock -- while these drain threads are still hammering the
# store, causing lock contention. The daemon consults `is_drain_in_progress`
# so an in-flight drain holds the idle countdown open. A depth counter (rather
# than a boolean) keeps this correct under concurrent/overlapping drains across
# the several threads that asyncio.to_thread dispatch spins up.
_DRAIN_IN_PROGRESS_LOCK = threading.Lock()
_drain_in_progress_depth = 0


@contextlib.contextmanager
def _drain_in_progress_guard():
"""Mark a deferred-capture drain as in flight for its whole duration."""
global _drain_in_progress_depth
with _DRAIN_IN_PROGRESS_LOCK:
_drain_in_progress_depth += 1
try:
yield
finally:
with _DRAIN_IN_PROGRESS_LOCK:
_drain_in_progress_depth -= 1


def is_drain_in_progress() -> bool:
"""True while at least one deferred-capture drain is running.

The daemon's lifecycle idle countdown reads this so it does NOT advance the
FSM toward SLEEP (and the EXCLUSIVE store lock it takes) while drain threads
are still writing to the store.
"""
with _DRAIN_IN_PROGRESS_LOCK:
return _drain_in_progress_depth > 0


FAILED_MAX_ATTEMPTS: int = 3
FAILED_BACKOFF_BASE_SEC: float = 60.0

Expand Down Expand Up @@ -276,32 +317,53 @@ def capture_turn(
from iai_mcp.embed import embedder_for_store
from iai_mcp.events import TELEMETRY_EMBED_NATIVE_FAILURE, write_event

try:
# Embed the message content, never the cue. The cue is a provenance
# label only (transcript drains and deferred-drain pass a positional
# cue such as "session <id> turn <n>"); embedding it collapsed the
# stored vector space and broke semantic recall. text is already
# validated non-empty above (the MIN_CAPTURE_LEN guard), so embedding
# text is safe for every caller.
emb = embedder_for_store(store).embed(text)
except Exception as exc:
write_event(
store,
TELEMETRY_EMBED_NATIVE_FAILURE,
{
"op_type": "capture",
"backend": "rust",
"error_type": type(exc).__name__,
"error": str(exc),
},
)
raise NativeError(f"capture encode failed: {exc}") from exc
embedding = list(emb)
# Embedding is the expensive native (Rust) matmul; the exact-key idem dedup
# below is a cheap SQLite lookup. Active sessions re-drain the *whole*
# transcript every turn (write_deferred_captures / capture_transcript walk
# from line 0 on each call), so embedding eagerly here re-embedded every
# already-stored turn only to throw the vector away as a "reinforced"
# exact-key re-drain -> chronic daemon CPU. Defer the embed so it runs at
# most once and only when actually needed: an already-seen episodic turn
# short-circuits on its idem tag and never embeds. Embed the message
# content, never the cue (the cue is a positional provenance label;
# embedding it collapsed the vector space and broke semantic recall). text
# is validated non-empty above (the MIN_CAPTURE_LEN guard).
_embed_cache: dict[str, list[float]] = {}

def _compute_embedding() -> list[float]:
if "v" in _embed_cache:
return _embed_cache["v"]
try:
emb = embedder_for_store(store).embed(text)
except Exception as exc:
write_event(
store,
TELEMETRY_EMBED_NATIVE_FAILURE,
{
"op_type": "capture",
"backend": "rust",
"error_type": type(exc).__name__,
"error": str(exc),
},
)
raise NativeError(f"capture encode failed: {exc}") from exc
vec = list(emb)
_embed_cache["v"] = vec
return vec

with _CAPTURE_DEDUP_LOCK:
if _is_episodic_conversational(tier, role):
ts_iso = now.isoformat()
idem_t = _idem_tag(session_id, role, ts_iso, text, source_uuid=source_uuid)
# find_record_by_tag reads SQLite, but a just-inserted record may
# still sit in the in-process _record_buffer (not yet flushed to the
# records table). Under _CAPTURE_DEDUP_LOCK the insert path is
# serialized with this find, so flushing the buffer here makes every
# prior committed insert visible to the SQLite-backed find and closes
# the check-then-insert race that produced live idem-tag duplicates.
from iai_mcp.store import flush_record_buffer

flush_record_buffer(store)
existing_id = store.find_record_by_tag(idem_t)
if existing_id is not None:
try:
Expand All @@ -321,7 +383,7 @@ def capture_turn(
}
else:
try:
neighbours = store.query_similar(embedding, k=3, tier=tier)
neighbours = store.query_similar(_compute_embedding(), k=3, tier=tier)
except (ValueError, IOError) as exc:
log.warning(
"capture_dedup_query_failed",
Expand Down Expand Up @@ -361,7 +423,7 @@ def capture_turn(
tier=tier,
literal_surface=text,
aaak_index="",
embedding=embedding,
embedding=_compute_embedding(),
community_id=None,
centrality=0.0,
detail_level=2,
Expand Down Expand Up @@ -711,6 +773,17 @@ def write_deferred_captures(


def drain_deferred_captures(store: MemoryStore) -> dict[str, int]:
"""Drain crashed/orphaned deferred-capture files into the store.

Marks itself in-progress for the daemon idle countdown so the FSM does not
advance toward SLEEP (EXCLUSIVE store lock) mid-drain. See
`is_drain_in_progress`.
"""
with _drain_in_progress_guard():
return _drain_deferred_captures_impl(store)


def _drain_deferred_captures_impl(store: MemoryStore) -> dict[str, int]:
deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures"
log_dir = Path.home() / ".iai-mcp" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -1139,6 +1212,23 @@ def drain_active_live_captures(
store: MemoryStore,
*,
exclude_session_id: str,
) -> dict[str, int]:
"""Drain live-session capture files into the store.

Marked in-progress for the daemon idle countdown, like
`drain_deferred_captures` -- both write to the store and must hold the FSM
out of SLEEP while running. See `is_drain_in_progress`.
"""
with _drain_in_progress_guard():
return _drain_active_live_captures_impl(
store, exclude_session_id=exclude_session_id,
)


def _drain_active_live_captures_impl(
store: MemoryStore,
*,
exclude_session_id: str,
) -> dict[str, int]:
deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures"
state_dir = Path.home() / ".iai-mcp" / ".capture-state"
Expand Down
18 changes: 18 additions & 0 deletions src/iai_mcp/cli/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ def _stop_escalation_bound() -> float:
return STOP_TERM_TIMEOUT_S


def _signal_daemon_wake() -> None:
"""Create the wake signal before a kickstart so the booting daemon WAKEs.

Without it the daemon boots, re-reads its persisted HIBERNATION state and
hibernate-exits within a tick, closing the socket before it ever serves
recall. Best-effort: a failure here just falls back to the old behaviour.
"""
try:
from iai_mcp.wake_handler import WakeHandler

root = os.environ.get("IAI_MCP_STORE") or os.path.expanduser("~/.iai-mcp")
WakeHandler(Path(root) / "wake.signal").signal_wake()
except Exception: # noqa: BLE001 -- never let the wake signal break daemon start
pass


def _stop_poll_interval() -> float:
raw = os.environ.get("IAI_DAEMON_STOP_POLL_S")
if raw:
Expand Down Expand Up @@ -158,6 +174,7 @@ def cmd_daemon_install(args: argparse.Namespace) -> int:
f"{result.stderr.strip()}",
file=sys.stderr,
)
_signal_daemon_wake()
_cli.subprocess.run(
["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"],
check=False, capture_output=True,
Expand Down Expand Up @@ -255,6 +272,7 @@ def cmd_daemon_start(args: argparse.Namespace) -> int:
["launchctl", "bootstrap", f"gui/{uid}", str(target)],
check=False, capture_output=True,
)
_signal_daemon_wake()
_cli.subprocess.run(
["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"],
check=False, capture_output=True,
Expand Down
11 changes: 10 additions & 1 deletion src/iai_mcp/core/_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@
def _hit_to_json(h) -> dict:
_vf = getattr(h, "valid_from", None)
_vt = getattr(h, "valid_to", None)
# Clamp the *displayed* score to [0,1]. Multiplicative boosts (trigram*2,
# FTS*3, valence) can drive the internal score past 1.0; that raw value stays
# in `sort_score` for ordering, but the client must never see a "confidence"
# > 1 (or < 0). Ordering is unaffected: this only touches the serialized
# number, never the rank.
try:
_display_score = max(0.0, min(1.0, float(h.score)))
except (TypeError, ValueError):
_display_score = 0.0
return {
"record_id": str(h.record_id),
"score": float(h.score),
"score": _display_score,
"reason": h.reason,
"literal_surface": h.literal_surface,
"adjacent_suggestions": [str(x) for x in h.adjacent_suggestions],
Expand Down
Loading
Loading