Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ Namespaces (`src/cli/log.ts`):
|---|---|---|
| `mcp` | McpSource construction; per-call dispatch showing `taskSupport` / `path=task-augmented\|inline` / cached tool count | "Why is my tool going inline vs task-augmented?" "Is my tool cache populated?" |
| `sse` | Every `tool.progress` / `tool.done` entering the runtime sink wrap; every `data.changed` broadcast with client count | "Are progress events reaching the SSE layer?" "Are broadcasts happening, to how many clients?" |
| `registry` | Per-source skips inside `ToolRegistry.availableTools` (one line per stuck source per enumeration; operator-facing "first failure" signal lives at the lifecycle transition site as a warn — see issue #194) | "Which sources are being skipped on each chat turn?" "Is enumeration silently dropping a connector?" |

Add a namespace by calling `log.debug("ns", "message")` (from `src/cli/log.ts`). Keep this table and the `log.ts` doc comment in sync.

Expand Down
31 changes: 30 additions & 1 deletion src/api/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,36 @@ export function startServer(options: ServerOptions): ServerHandle {
const internalToken = runtime.getInternalToken();

const mcpSources = runtime.mcpSources();
const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink());
// Wire HealthMonitor into the bundle lifecycle so a detected stdio
// crash transitions BundleInstance.state through the lifecycle's
// recordCrash/Recovery/Dead funnel. This is what gives the operator
// a single warn-on-first-failure log instead of one warn per chat
// turn forever (issue #194), and keeps the Configure UI's state pill
// accurate when a subprocess dies mid-session.
const lifecycle = runtime.getLifecycle();
const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink(), {
reportSourceTransition: (source, to) => {
// Resolve source → BundleInstance by (serverName, wsId). McpSource
// instances stamped with workspaceId via bundleContext at
// construction time — see `McpSource.getWorkspaceId`. The wsId
// disambiguates the same connector installed in multiple
// workspaces; without it a crash in workspace A could land on
// workspace B's BundleInstance (whichever the iteration hit first),
// leaving the actual crashed bundle stuck at `running` in its
// workspace's Configure UI. In-process platform sources and
// sources missing a bundleContext (rare) return undefined and we
// silently skip — they don't back a BundleInstance.
const wsId = source.getWorkspaceId();
if (!wsId) return;
const inst = runtime
.getBundleInstances()
.find((i) => i.serverName === source.name && i.wsId === wsId);
if (!inst) return;
if (to === "crashed") lifecycle.recordCrash(inst.serverName, inst.wsId);
else if (to === "running") lifecycle.recordRecovery(inst.serverName, inst.wsId);
else lifecycle.recordDead(inst.serverName, inst.wsId);
},
});
healthMonitor.start();

// SSE event manager — listens to runtime events and broadcasts to clients
Expand Down
27 changes: 27 additions & 0 deletions src/bundles/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,30 @@ export function summarizeConnectionState(connections: Map<string, Connection>):
if (states.includes("crashed")) return "crashed";
return states[0]!;
}

/**
* States that warrant operator attention when entered. Transitioning INTO
* one of these is the "first failure" signal worth a warn log; staying in
* one across many tool-list enumerations is not (that's the spam this set
* eliminates — see issue #194).
*
* - `dead` — process / transport gave up
* - `crashed` — transport closed unexpectedly mid-session
* - `reauth_required` — refresh token rejected; user must reconnect
*
* Excluded deliberately:
* - `not_authenticated` — silent resting state (fresh install, post-Disconnect)
* - `pending_auth` — in-flight OAuth, expected during normal Connect
* - `starting` — transient boot state
* - `stopped` — operator intent (stopBundle / uninstall)
* - `running` — healthy
*/
const BROKEN_STATES: ReadonlySet<BundleState> = new Set<BundleState>([
"dead",
"crashed",
"reauth_required",
]);

export function isBrokenState(s: BundleState): boolean {
return BROKEN_STATES.has(s);
}
83 changes: 82 additions & 1 deletion src/bundles/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { connectorSlug, hasPersistedComposioConnection } from "./composio-connec
import {
type Connection,
type ConnectionState,
isBrokenState,
summarizeConnectionState,
WORKSPACE_PRINCIPAL_ID,
} from "./connection.ts";
Expand Down Expand Up @@ -878,9 +879,37 @@ export class BundleLifecycleManager {
/**
* Update state on a BundleInstance. Public so HealthMonitor can
* report crashed/recovered/dead transitions.
*
* Edge-triggered logging (issue #194):
* - non-broken → broken: one `log.warn` at the moment of transition.
* Subsequent enumerations of the now-stuck source produce zero
* operator-visible noise.
* - broken → running: one `log.info` ("recovered") at the moment of
* recovery, symmetric with the warn.
* - No-op (same state in, same state out): silent.
*
* "Broken" set lives in `connection.ts::isBrokenState` =
* { dead, crashed, reauth_required }. Excludes `pending_auth` (in-flight
* OAuth) and `not_authenticated` (resting state).
*/
transition(instance: BundleInstance, newState: BundleState): void {
const oldState = instance.state;
instance.state = newState;
if (oldState === newState) return;

updateLastBrokenState(instance, oldState, newState);

if (isBrokenState(newState) && !isBrokenState(oldState)) {
log.warn(
`[bundles] ${instance.serverName} (ws ${instance.wsId}) entered '${newState}' from '${oldState}'. ` +
`Tools unavailable until recovered. ${transitionRecoveryHint(newState)}`,
);
} else if (newState === "running" && instance.lastBrokenState !== undefined) {
log.info(
`[bundles] ${instance.serverName} (ws ${instance.wsId}) recovered: '${instance.lastBrokenState}' → 'running'`,
);
instance.lastBrokenState = undefined;
}
}

/**
Expand Down Expand Up @@ -972,7 +1001,9 @@ export class BundleLifecycleManager {

// Recompute summary state so legacy consumers (HealthMonitor,
// briefing-collector, runtime status API) see the right surface.
instance.state = summarizeConnectionState(instance.connections);
// Routed through transition() so edge-triggered logging fires on
// non-broken → broken / broken → running transitions (issue #194).
this.transition(instance, summarizeConnectionState(instance.connections));

this.eventSink.emit({
type: "connection.state_changed",
Expand Down Expand Up @@ -1741,6 +1772,56 @@ interface UpjackScheduleDeclaration {
// Internal helpers
// ---------------------------------------------------------------------------

/**
* Maintain `instance.lastBrokenState` for one transition. Three rules,
* in priority order:
*
* 1. Entering / staying broken → set to newState (latest wins)
* 2. Leaving broken with no label → capture oldState (handles
* instances constructed directly into a broken state, never
* having flowed through this funnel before)
* 3. Operator-stop → clear (explicit end of episode;
* a later boot → running must not emit a stale recovery log)
*
* The matching "leaving broken to running → clear after logging" case
* lives in `transition()` itself because it needs to read the value to
* format the recovery message before clearing.
*
* Split out from `transition()` so the funnel reads top-down as
* state-write → sticky maintenance → log decision, without sticky-bit
* branching obscuring the log policy.
*/
function updateLastBrokenState(
instance: BundleInstance,
oldState: BundleState,
newState: BundleState,
): void {
if (isBrokenState(newState)) {
instance.lastBrokenState = newState;
} else if (isBrokenState(oldState) && instance.lastBrokenState === undefined) {
instance.lastBrokenState = oldState;
} else if (newState === "stopped") {
instance.lastBrokenState = undefined;
}
}

/**
* One-line operator hint appended to the broken-state warn. Tells the
* reader what recovery path applies without forcing them to look it up.
*/
function transitionRecoveryHint(state: BundleState): string {
switch (state) {
case "reauth_required":
return "User must reconnect via Connectors page.";
case "dead":
return "Use startBundle to restart.";
case "crashed":
return "HealthMonitor will attempt auto-restart with exponential backoff.";
default:
return "";
}
}

function createInstance(
serverName: string,
bundleName: string,
Expand Down
13 changes: 13 additions & 0 deletions src/bundles/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,19 @@ export interface BundleInstance {
description?: string;
/** Current lifecycle state. */
state: BundleState;
/**
* Sticky breadcrumb of the most recent broken state observed since
* the last successful recovery to `running`. Owned by
* `BundleLifecycleManager.transition()` — set on any transition where
* the new (or old, for boot-into-broken instances) state is in the
* broken set, cleared on reaching `running` or an operator-driven
* `stopped`. Lets the recovery info log fire on multi-step paths the
* direct edge check misses — notably the URL bundle reconnect flow
* `reauth_required → pending_auth → running`, where neither
* transition is `broken → running`. Process-local; no persistence
* needed because a fresh process starts a fresh episode.
*/
lastBrokenState?: BundleState;
/** MTF trust score from mpak (0-100), or null if unavailable. */
trustScore: number | null;
/** UI placement metadata from _meta["ai.nimblebrain/host"]. */
Expand Down
2 changes: 2 additions & 0 deletions src/cli/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* Known namespaces:
* - `mcp` — McpSource construction, dispatch decisions (task-augmented vs inline)
* - `sse` — Runtime event sink → SSE broadcast (tool.progress, data.changed)
* - `registry` — ToolRegistry.availableTools per-source skips (one line per stuck
* source per enumeration; operator-facing signal lives at lifecycle transition)
*
* Keep this list in sync with the CLAUDE.md "Debugging" section so it's
* discoverable without reading source.
Expand Down
24 changes: 16 additions & 8 deletions src/runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1647,19 +1647,27 @@ export class Runtime {
return [...names];
}

/** Get MCP sources across all workspace registries (for health monitoring). */
/** Get MCP sources across all workspace registries (for health monitoring).
*
* Returns every McpSource instance — no dedupe by `source.name`.
* The same bundle (e.g. `linear-mcp`) installed in workspaces A and B
* produces two distinct McpSource processes, both of which need
* monitoring; collapsing them would leave one workspace's
* `BundleInstance.state` permanently stale on a crash.
*
* `Set<McpSource>` keys on instance identity so any McpSource
* registered directly in multiple workspace registries is monitored
* once. `SharedSourceRef` wrappers don't satisfy `instanceof McpSource`
* and are passed over here — their inner source is monitored via the
* native registry it was constructed in. */
mcpSources(): McpSource[] {
const sources: McpSource[] = [];
const seen = new Set<string>();
const sources = new Set<McpSource>();
for (const reg of this._workspaceRegistries.values()) {
for (const s of reg.getSources()) {
if (s instanceof McpSource && !seen.has(s.name)) {
seen.add(s.name);
sources.push(s);
}
if (s instanceof McpSource) sources.add(s);
}
}
return sources;
return [...sources];
}

/** Get all tracked bundle instances (unfiltered — use getBundleInstancesForWorkspace for scoped access). */
Expand Down
42 changes: 38 additions & 4 deletions src/tools/health-monitor.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,50 @@
import type { EventSink } from "../engine/types.ts";
import type { McpSource } from "./mcp-source.ts";

export type BundleState = "healthy" | "restarting" | "dead";
export type HealthRecordState = "healthy" | "restarting" | "dead";

export interface BundleHealth {
name: string;
state: BundleState;
state: HealthRecordState;
uptime: number | null;
restartCount: number;
}

interface BundleRecord {
source: McpSource;
state: BundleState;
state: HealthRecordState;
restartCount: number;
}

const MAX_RESTARTS = 5;
const DEFAULT_BASE_DELAY_MS = 1000;
const DEFAULT_CHECK_INTERVAL_MS = 30_000;

/**
* Reported HealthMonitor transition for the BundleInstance underlying a
* source. `crashed` and `dead` map to `lifecycle.recordCrash` /
* `recordDead`; `running` maps to `recordRecovery`. The single shape
* pushes the per-method switch + source→instance lookup into the
* caller (`startServer`) so HealthMonitor stays unaware of lifecycle
* internals — useful when the caller wants to inject a different
* backing system (tests, alternative observability paths).
*/
export type HealthMonitorTransition = "crashed" | "running" | "dead";

export interface HealthMonitorOptions {
checkIntervalMs?: number;
baseDelayMs?: number;
/**
* Propagate a detected health transition back to `BundleInstance.state`
* via the bundle lifecycle. Without this hook the URL bundle path
* (which funnels through `recordConnectionStateChange`) still works,
* but stdio subprocess crashes leave the user-facing state stuck at
* `running`. Hook is no-op safe; the caller is responsible for the
* source → instance resolution (returning early if the source doesn't
* back any BundleInstance, e.g. shared or in-process platform sources).
* See issue #194 for the operator log story this enables.
*/
reportSourceTransition?: (source: McpSource, to: HealthMonitorTransition) => void;
}

/**
Expand All @@ -34,6 +56,7 @@ export class HealthMonitor {
private timer: ReturnType<typeof setInterval> | null = null;
private checkIntervalMs: number;
private baseDelayMs: number;
private reportSourceTransition: HealthMonitorOptions["reportSourceTransition"];

constructor(
sources: McpSource[],
Expand All @@ -42,9 +65,10 @@ export class HealthMonitor {
) {
this.checkIntervalMs = opts.checkIntervalMs ?? DEFAULT_CHECK_INTERVAL_MS;
this.baseDelayMs = opts.baseDelayMs ?? DEFAULT_BASE_DELAY_MS;
this.reportSourceTransition = opts.reportSourceTransition;
this.records = sources.map((source) => ({
source,
state: "healthy" as BundleState,
state: "healthy" as HealthRecordState,
restartCount: 0,
}));
}
Expand Down Expand Up @@ -98,6 +122,14 @@ export class HealthMonitor {
},
});

// Propagate to BundleInstance.state on first detection only. The
// `record.state === "healthy"` guard distinguishes the first failure
// from subsequent sweeps that find us in "restarting" — we don't want
// to report crashed on every poll cycle while a restart is pending.
if (record.state === "healthy") {
this.reportSourceTransition?.(record.source, "crashed");
}

// Check if we've exhausted restart attempts
if (record.restartCount >= MAX_RESTARTS) {
record.state = "dead";
Expand All @@ -109,6 +141,7 @@ export class HealthMonitor {
...(remote ? { remote: true } : {}),
},
});
this.reportSourceTransition?.(record.source, "dead");
return;
}

Expand Down Expand Up @@ -149,6 +182,7 @@ export class HealthMonitor {
...(remote ? { remote: true } : {}),
},
});
this.reportSourceTransition?.(record.source, "running");
} else {
// Restart failed — check again on next cycle (might hit max)
record.state = "restarting";
Expand Down
18 changes: 18 additions & 0 deletions src/tools/mcp-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,24 @@ export class McpSource implements ToolSource {
return this.mode.type === "remote";
}

/**
* Workspace this source was constructed for, when available. Bundle-
* spawned sources (URL, stdio, named, local) always carry a workspace
* via `bundleContext.workspaceId` — that is the field returned here.
* In-process platform sources and short-lived test sources don't pass
* a `bundleContext` and return `undefined`.
*
* Used by `HealthMonitor`'s `reportSourceTransition` hook so the
* source → BundleInstance lookup is keyed on `(serverName, wsId)`
* rather than name alone. Without the wsId disambiguation, the same
* bundle (e.g. `linear-mcp`) installed in two workspaces would all
* route their crash detection to whichever instance happened to be
* first in iteration order, leaving the other stuck at `running`.
*/
getWorkspaceId(): string | undefined {
return this.bundleContext?.workspaceId;
}

async start(): Promise<void> {
// Fresh stderr state on every start. Restart cycles must not bleed
// a dead instance's tail into the new instance's crash report.
Expand Down
Loading