Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/iai_mcp/capture.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from __future__ import annotations

import contextlib
import hashlib
import json
import logging
Expand Down Expand Up @@ -41,6 +42,46 @@
# two threads before either has inserted it.
_CAPTURE_DEDUP_LOCK = threading.Lock()

# Drain-in-progress accounting for the daemon's lifecycle idle countdown.
#
# The wrapper heartbeat file is only ONE signal that the daemon is busy. A
# deferred-capture drain (triggered by real RPC traffic, kicked off at the
# drowsy edge, or run on startup) can keep writing to the store long after the
# heartbeat file has gone stale. If the idle countdown only watched the
# heartbeat, it would force the FSM toward SLEEP -- which escalates to an
# EXCLUSIVE store lock -- while these drain threads are still hammering the
# store, causing lock contention. The daemon consults `is_drain_in_progress`
# so an in-flight drain holds the idle countdown open. A depth counter (rather
# than a boolean) keeps this correct under concurrent/overlapping drains across
# the several threads that asyncio.to_thread dispatch spins up.
_DRAIN_IN_PROGRESS_LOCK = threading.Lock()
_drain_in_progress_depth = 0


@contextlib.contextmanager
def _drain_in_progress_guard():
"""Mark a deferred-capture drain as in flight for its whole duration."""
global _drain_in_progress_depth
with _DRAIN_IN_PROGRESS_LOCK:
_drain_in_progress_depth += 1
try:
yield
finally:
with _DRAIN_IN_PROGRESS_LOCK:
_drain_in_progress_depth -= 1


def is_drain_in_progress() -> bool:
"""True while at least one deferred-capture drain is running.

The daemon's lifecycle idle countdown reads this so it does NOT advance the
FSM toward SLEEP (and the EXCLUSIVE store lock it takes) while drain threads
are still writing to the store.
"""
with _DRAIN_IN_PROGRESS_LOCK:
return _drain_in_progress_depth > 0


FAILED_MAX_ATTEMPTS: int = 3
FAILED_BACKOFF_BASE_SEC: float = 60.0

Expand Down Expand Up @@ -711,6 +752,17 @@ def write_deferred_captures(


def drain_deferred_captures(store: MemoryStore) -> dict[str, int]:
"""Drain crashed/orphaned deferred-capture files into the store.

Marks itself in-progress for the daemon idle countdown so the FSM does not
advance toward SLEEP (EXCLUSIVE store lock) mid-drain. See
`is_drain_in_progress`.
"""
with _drain_in_progress_guard():
return _drain_deferred_captures_impl(store)


def _drain_deferred_captures_impl(store: MemoryStore) -> dict[str, int]:
deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures"
log_dir = Path.home() / ".iai-mcp" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -1139,6 +1191,23 @@ def drain_active_live_captures(
store: MemoryStore,
*,
exclude_session_id: str,
) -> dict[str, int]:
"""Drain live-session capture files into the store.

Marked in-progress for the daemon idle countdown, like
`drain_deferred_captures` -- both write to the store and must hold the FSM
out of SLEEP while running. See `is_drain_in_progress`.
"""
with _drain_in_progress_guard():
return _drain_active_live_captures_impl(
store, exclude_session_id=exclude_session_id,
)


def _drain_active_live_captures_impl(
store: MemoryStore,
*,
exclude_session_id: str,
) -> dict[str, int]:
deferred_dir = Path.home() / ".iai-mcp" / ".deferred-captures"
state_dir = Path.home() / ".iai-mcp" / ".capture-state"
Expand Down
18 changes: 18 additions & 0 deletions src/iai_mcp/cli/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ def _stop_escalation_bound() -> float:
return STOP_TERM_TIMEOUT_S


def _signal_daemon_wake() -> None:
"""Create the wake signal before a kickstart so the booting daemon WAKEs.

Without it the daemon boots, re-reads its persisted HIBERNATION state and
hibernate-exits within a tick, closing the socket before it ever serves
recall. Best-effort: a failure here just falls back to the old behaviour.
"""
try:
from iai_mcp.wake_handler import WakeHandler

root = os.environ.get("IAI_MCP_STORE") or os.path.expanduser("~/.iai-mcp")
WakeHandler(Path(root) / "wake.signal").signal_wake()
except Exception: # noqa: BLE001 -- never let the wake signal break daemon start
pass


def _stop_poll_interval() -> float:
raw = os.environ.get("IAI_DAEMON_STOP_POLL_S")
if raw:
Expand Down Expand Up @@ -158,6 +174,7 @@ def cmd_daemon_install(args: argparse.Namespace) -> int:
f"{result.stderr.strip()}",
file=sys.stderr,
)
_signal_daemon_wake()
_cli.subprocess.run(
["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"],
check=False, capture_output=True,
Expand Down Expand Up @@ -255,6 +272,7 @@ def cmd_daemon_start(args: argparse.Namespace) -> int:
["launchctl", "bootstrap", f"gui/{uid}", str(target)],
check=False, capture_output=True,
)
_signal_daemon_wake()
_cli.subprocess.run(
["launchctl", "kickstart", f"gui/{uid}/{_cli.DAEMON_LABEL}"],
check=False, capture_output=True,
Expand Down
122 changes: 111 additions & 11 deletions src/iai_mcp/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,52 @@ def _should_drain_on_drowsy_edge(prev, current) -> bool:
return prev is _L.WAKE and current is _L.DROWSY


# Idle-countdown decisions. The lifecycle tick translates these into FSM events.
IDLE_DECISION_ACTIVE: str = "active" # real work in flight -> reset the countdown
IDLE_DECISION_SLEEP: str = "sleep" # idle past the sleep threshold -> IDLE_30MIN
IDLE_DECISION_DROWSY: str = "drowsy" # idle past the drowsy threshold -> IDLE_5MIN
IDLE_DECISION_HOLD: str = "hold" # within the drowsy window -> no transition


def _idle_countdown_decision(
*,
scanner_active: bool,
drain_in_progress: bool,
seconds_since_rpc: float,
idle_elapsed: float,
sleep_eligible: bool,
recent_rpc_window_sec: float,
drowsy_after_sec: float,
sleep_heartbeat_idle_sec: float,
) -> str:
"""Decide what the lifecycle idle countdown should do this tick.

The wrapper heartbeat (``scanner_active``) is only ONE signal that the
daemon is busy. Two others MUST also hold the daemon awake:

* ``drain_in_progress`` -- a deferred-capture drain is writing to the store
right now. The wrappers dir can be empty (heartbeat stale) while a drain
kicked off by earlier RPC traffic is still hammering the store.
* ``seconds_since_rpc`` -- real JSON-RPC traffic arrived recently. A drain
runs inside its triggering request, so by the time a long drain finishes
``last_activity_ts`` (set at request start) may already look stale; the
``drain_in_progress`` signal covers that tail.

When any of these holds we return ``ACTIVE`` so the caller resets the idle
countdown. Otherwise the daemon really is idle and we advance toward SLEEP
(which escalates to an EXCLUSIVE store lock) / DROWSY exactly as the
heartbeat-only logic used to, so a genuinely quiet daemon still settles and
crisis re-arming in SLEEP keeps running.
"""
if scanner_active or drain_in_progress or seconds_since_rpc < recent_rpc_window_sec:
return IDLE_DECISION_ACTIVE
if idle_elapsed >= sleep_heartbeat_idle_sec and sleep_eligible:
return IDLE_DECISION_SLEEP
if idle_elapsed >= drowsy_after_sec:
return IDLE_DECISION_DROWSY
return IDLE_DECISION_HOLD


def _run_drowsy_drain(store, *, drain_fn, write_event_fn) -> None:
try:
result = drain_fn(store)
Expand Down Expand Up @@ -228,8 +274,15 @@ def _store_is_empty(store: MemoryStore) -> bool:
try:
return store.db.open_table("records").count_rows() == 0
except (OSError, ValueError, KeyError, RuntimeError) as exc:
log.debug("store empty check failed, assuming empty: %s", exc)
return True
# Unknown != empty. A transient count failure (e.g. the shared sqlite
# connection left in an error state by a concurrent heavy reader, raising
# HippoIntegrityError/lock errors which subclass RuntimeError) must NOT be
# treated as an empty store: doing so parks the whole lifecycle tick
# (no idle-check, no drain) on a store that actually has records. Treat
# the unknown case as NOT empty so the tick proceeds; a truly empty store
# just does a little harmless no-op work.
log.debug("store empty check failed, assuming NOT empty: %s", exc)
return False


def _is_inside_window(
Expand Down Expand Up @@ -490,6 +543,10 @@ async def _tick_body(


state["last_tick_at"] = datetime.now(timezone.utc).isoformat()
# Clear the skip reason: reaching here means the tick was NOT skipped. Leaving a
# stale "empty_store"/"paused" value here makes a healthy daemon look parked in
# observability (last_tick_skipped_reason is only ever set, never reset).
state["last_tick_skipped_reason"] = None
try:
await asyncio.to_thread(save_state, state)
except (OSError, ValueError) as exc:
Expand Down Expand Up @@ -932,11 +989,13 @@ def _capture_handler(record: dict) -> None:
async def _boot_preload() -> None:
try:
from iai_mcp import retrieve as _retrieve_preload
_g, _a, _rc = await asyncio.to_thread(
_retrieve_preload.build_runtime_graph, store,
)
# build_runtime_graph already persists the cache internally
# (with the full node_payload) on a miss. The previous extra
# save(..., node_payload=None, ...) here overwrote that good
# cache with a payload-less one (forcing a pandas re-read on
# the next hit) — so we just warm the cache and drop it.
await asyncio.to_thread(
_rgc_mod.save, store, _a, _rc, None, 0,
_retrieve_preload.build_runtime_graph, store,
)
except Exception as _exc: # noqa: BLE001 -- preload MUST NOT crash daemon
log.debug("boot_preload failed: %s", _exc, exc_info=True)
Expand Down Expand Up @@ -1149,6 +1208,34 @@ def _emit_expiry() -> None:
now_mono = time.monotonic()
idle_elapsed = now_mono - _last_active_monotonic[0]

# Beyond the wrapper heartbeat, real RPC traffic and an
# in-flight deferred-capture drain are activity too. Folding
# them into the idle countdown stops the FSM forcing SLEEP
# (-> EXCLUSIVE store lock) while drain threads are still
# hammering the store. See _idle_countdown_decision.
try:
from iai_mcp.capture import is_drain_in_progress as _drain_q
_drain_active = bool(await asyncio.to_thread(_drain_q))
except Exception: # noqa: BLE001 -- idle accounting MUST NOT crash the tick
_drain_active = False
_seconds_since_rpc = (
(now_mono - mcp_socket.last_activity_ts)
if mcp_socket is not None
else float("inf")
)
_idle_decision = _idle_countdown_decision(
scanner_active=scanner_active,
drain_in_progress=_drain_active,
seconds_since_rpc=_seconds_since_rpc,
idle_elapsed=idle_elapsed,
sleep_eligible=sleep_eligible,
recent_rpc_window_sec=INTERRUPT_RECENT_ACTIVITY_WINDOW_SEC,
drowsy_after_sec=DROWSY_AFTER_SEC,
sleep_heartbeat_idle_sec=SLEEP_HEARTBEAT_IDLE_SEC,
)
if _idle_decision == IDLE_DECISION_ACTIVE:
_last_active_monotonic[0] = now_mono

try:
from iai_mcp.daemon_state import load_state as _load_ds
_ds = await asyncio.to_thread(_load_ds)
Expand Down Expand Up @@ -1197,15 +1284,18 @@ def _emit_expiry() -> None:
pass

if scanner_active:
_last_active_monotonic[0] = now_mono
# _last_active_monotonic was already refreshed above
# (scanner_active -> IDLE_DECISION_ACTIVE); a fresh
# wrapper heartbeat additionally pulls the FSM back to
# WAKE, which RPC/drain activity alone does not.
try:
await _state_machine.dispatch(
_LifecycleEvent.HEARTBEAT_REFRESH,
reason="heartbeat_refresh_active_wrapper",
)
except (S2OscillationConflict, S2OscillationBlocked):
pass
elif idle_elapsed >= SLEEP_HEARTBEAT_IDLE_SEC and sleep_eligible:
elif _idle_decision == IDLE_DECISION_SLEEP:
try:
await _state_machine.dispatch(
_LifecycleEvent.IDLE_30MIN,
Expand All @@ -1214,7 +1304,7 @@ def _emit_expiry() -> None:
)
except (S2OscillationConflict, S2OscillationBlocked):
pass
elif idle_elapsed >= DROWSY_AFTER_SEC:
elif _idle_decision == IDLE_DECISION_DROWSY:
try:
await _state_machine.dispatch(
_LifecycleEvent.IDLE_5MIN,
Expand Down Expand Up @@ -1282,8 +1372,13 @@ def _run_wake_sequence():
_prev_lifecycle_state[0] = current
if current is _LifecycleState.SLEEP:
def _interrupt_check() -> bool:
if mcp_socket.active_connections > 0:
return True
# Defer the sleep pipeline only on RECENT ACTIVITY, not on
# open connections: long-lived Claude sessions keep sockets
# open permanently, so `active_connections > 0` was True at
# nearly every tick -> the cycle never completed -> no
# HIBERNATION -> the wake-hook re-ran every 30s (the 221% CPU
# churn). last_activity_ts is refreshed on each request, so a
# busy burst still defers; a 30s lull lets the cycle finish.
elapsed = (
time.monotonic() - mcp_socket.last_activity_ts
)
Expand Down Expand Up @@ -1520,6 +1615,11 @@ def _interrupt_check() -> bool:
"_raise_fd_limit",
"_run_drowsy_drain",
"_should_drain_on_drowsy_edge",
"_idle_countdown_decision",
"IDLE_DECISION_ACTIVE",
"IDLE_DECISION_SLEEP",
"IDLE_DECISION_DROWSY",
"IDLE_DECISION_HOLD",
"_kick_drowsy_rgc_rebuild",
"_wake_hook_rebuild_if_cold",
"_store_is_empty",
Expand Down
Loading
Loading