diff --git a/README.md b/README.md index 6695b580..8c994e09 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,53 @@ Or use the **one-click setup**: Settings → Nexus → Get Started → MCP Integ After adding, fully quit and relaunch Claude Desktop. +### Multiplexer (optional) + +The plugin's IPC socket supports only one connected transport at a time. If you run multiple MCP clients (Claude Code, Claude Desktop, Claudian) simultaneously, each will compete for the single connection. `nexus-mux.js` solves this by sitting between your clients and Obsidian: + +``` +Claude Code ──┐ +Claude Desktop ─┼──▶ nexus-mux (proxy socket) ──▶ Obsidian (IPC socket) +Claudian ──────┘ +``` + +The mux holds one persistent MCP session with Obsidian and fans out requests from any number of clients. It handles ID rewriting so responses route back to the correct caller, intercepts session-lifecycle messages (initialize, shutdown) per-client without disturbing the shared Obsidian session, and shuts itself down after 5 minutes with no connected clients. + +**Setup:** + +Point your MCP client config at `nexus-mux.js` instead of `connector.js`: + +```json +{ + "mcpServers": { + "nexus-your-vault": { + "command": "node", + "args": [ + "/path/to/Vault/.obsidian/plugins/nexus/nexus-mux.js" + ] + } + } +} +``` + +The first client to connect auto-starts a background daemon; subsequent clients reuse it. No manual daemon management needed. + +**Debugging:** + +```bash +# Check if the daemon is running +ps aux | grep nexus-mux | grep -v grep + +# Start with debug logging (stderr) +NEXUS_MUX_DEBUG=1 node /path/to/nexus-mux.js --daemon + +# Check socket state +lsof /tmp/nexus_mcp_core.sock # Obsidian's IPC socket +lsof /tmp/nexus_mcp_proxy.sock # Mux proxy socket +``` + +> **Note:** If you only ever use one MCP client at a time, the mux is unnecessary. The plugin now self-heals on client disconnect (the transport slot is released when a connector dies), so direct `connector.js` connections work fine for single-client use. + --- ## Using Native Chat diff --git a/nexus-mux.js b/nexus-mux.js new file mode 100644 index 00000000..33bbe9c8 --- /dev/null +++ b/nexus-mux.js @@ -0,0 +1,297 @@ +#!/usr/bin/env node +// nexus-mux.js — MCP multiplexer for Obsidian nexus-core +// +// The nexus plugin exposes a single-transport MCP server over a Unix socket. +// Only one client can hold that transport at a time. This multiplexer sits +// in front, owning the one Obsidian connection and fanning out to any number +// of Claude clients (Code, Desktop, Claudian) via a proxy socket. +// +// Architecture: +// Claude (stdio) → [client mode] → proxy socket → [daemon] → Obsidian socket +// +// Two modes: +// --daemon Long-running process. Connects to Obsidian, listens on the +// proxy socket, multiplexes requests/responses between clients. +// Shuts itself down after IDLE_TIMEOUT_MS with no clients. +// (default) Short-lived stdio bridge. Connects to the proxy socket (auto- +// starting the daemon if needed) and pipes stdin/stdout through. +// This is what MCP clients actually spawn. +"use strict"; + +const net = require('net'); +const { spawn } = require('child_process'); +const readline = require('readline'); +const fs = require('fs'); + +// Obsidian's IPC socket — created by the nexus plugin on load +const OBSIDIAN_SOCKET = '/tmp/nexus_mcp_core.sock'; +// Proxy socket — the daemon listens here for client connections +const PROXY_SOCKET = '/tmp/nexus_mcp_proxy.sock'; +// Daemon exits after this long with zero connected clients +const IDLE_TIMEOUT_MS = 5 * 60 * 1000; + +const log = process.env.NEXUS_MUX_DEBUG + ? (...a) => process.stderr.write('[mux] ' + a.join(' ') + '\n') + : () => {}; + +// ── Daemon ──────────────────────────────────────────────────────────────────── + +function runDaemon() { + // — Obsidian connection state — + let obsidian = null; // net.Socket to Obsidian, null when disconnected + let ready = false; // true once our MCP handshake with Obsidian completes + let serverResult = null; // cached initialize result — replayed to each client + + // — Client bookkeeping — + const clients = new Map(); // clientId → { socket } + const pending = new Map(); // rewrittenId → { clientId, origId } + const queue = []; // messages buffered while Obsidian is (re)connecting + const pendingInit = new Map(); // clientId → msgId (clients awaiting init response) + let idleTimer = null; // handle for the idle shutdown timer + + // — Idle shutdown — + // When the last client disconnects, start a countdown. If no new client + // arrives before it fires, the daemon tears down cleanly. The timer is + // also started on boot in case the spawning client never connects. + + const teardown = () => { + try { fs.unlinkSync(PROXY_SOCKET); } catch {} + process.exit(0); + }; + + function resetIdleTimer() { + if (idleTimer) clearTimeout(idleTimer); + if (clients.size === 0) { + log(`no clients — shutting down in ${IDLE_TIMEOUT_MS / 1000}s`); + idleTimer = setTimeout(() => { + log('idle timeout reached, exiting'); + teardown(); + }, IDLE_TIMEOUT_MS); + } + } + + function cancelIdleTimer() { + if (idleTimer) { clearTimeout(idleTimer); idleTimer = null; } + } + + // — Obsidian message plumbing — + + /** Send a JSON-RPC message to Obsidian, or queue it if not yet connected. */ + function toObsidian(msg) { + if (ready && obsidian) obsidian.write(JSON.stringify(msg) + '\n'); + else queue.push(msg); + } + + /** Drain the queue once the Obsidian connection is live. */ + function flush() { + while (queue.length && ready && obsidian) + obsidian.write(JSON.stringify(queue.shift()) + '\n'); + } + + /** Send a synthetic initialize response to a client using our cached result. */ + function replyInit(clientId, msgId) { + const c = clients.get(clientId); + if (c) c.socket.write(JSON.stringify({ + jsonrpc: '2.0', id: msgId, + result: serverResult, + }) + '\n'); + } + + // — Obsidian connection lifecycle — + // We maintain exactly one MCP session with Obsidian. If the connection + // drops (Obsidian restart, plugin reload), we reconnect and re-handshake. + + function connectObsidian() { + log('connecting to Obsidian…'); + const sock = net.createConnection(OBSIDIAN_SOCKET); + const rl = readline.createInterface({ input: sock }); + + sock.on('connect', () => { + obsidian = sock; + // Perform the MCP handshake — one session for the lifetime of this daemon + sock.write(JSON.stringify({ + jsonrpc: '2.0', id: '__mux__', method: 'initialize', + params: { + protocolVersion: '2024-11-05', + capabilities: { roots: { listChanged: true }, sampling: {} }, + clientInfo: { name: 'nexus-mux', version: '1.0.0' }, + }, + }) + '\n'); + }); + + rl.on('line', line => { + let msg; + try { msg = JSON.parse(line); } catch { return; } + + // Before the handshake completes, the only message we expect is our + // own initialize response. Everything else is ignored. + if (!ready) { + if (msg.id === '__mux__' && msg.result) { + serverResult = msg.result; + sock.write(JSON.stringify({ + jsonrpc: '2.0', method: 'notifications/initialized', + }) + '\n'); + ready = true; + log('Obsidian ready'); + // Any clients that connected before the handshake finished get + // their initialize response now + for (const [cid, msgId] of pendingInit) replyInit(cid, msgId); + pendingInit.clear(); + flush(); + } + return; + } + + // — Route responses and notifications to clients — + + if (msg.id !== undefined && msg.id !== null) { + // Response: look up which client sent the original request via the + // rewritten ID (format: "clientId:originalId") + const key = typeof msg.id === 'string' ? msg.id : JSON.stringify(msg.id); + const p = pending.get(key); + if (p) { + pending.delete(key); + const c = clients.get(p.clientId); + if (c) c.socket.write(JSON.stringify({ ...msg, id: p.origId }) + '\n'); + } + } else { + // Notification (no id): broadcast to every connected client + const out = JSON.stringify(msg) + '\n'; + for (const c of clients.values()) c.socket.write(out); + } + }); + + const reconnect = () => { + obsidian = null; + ready = false; + log('Obsidian disconnected, retrying in 1.5s…'); + setTimeout(connectObsidian, 1500); + }; + sock.on('error', reconnect); + sock.on('close', reconnect); + } + + // — Proxy server — + // Each MCP client (Claude Code, Desktop, Claudian) connects here. + // We intercept session-lifecycle messages (initialize, shutdown) so that + // individual clients can come and go without disturbing the one Obsidian + // session. Everything else is forwarded with rewritten IDs for routing. + + // Clean up stale socket from a previous crash + try { fs.unlinkSync(PROXY_SOCKET); } catch {} + + const server = net.createServer(clientSock => { + const id = Math.random().toString(36).slice(2, 8); + log(`client ${id} connected`); + cancelIdleTimer(); + clients.set(id, { socket: clientSock }); + const rl = readline.createInterface({ input: clientSock }); + + rl.on('line', line => { + let msg; + try { msg = JSON.parse(line); } catch { return; } + + // initialize — reply from cache (or queue if Obsidian isn't ready yet). + // Never forward to Obsidian; we already have a session. + if (msg.method === 'initialize') { + if (ready) { + replyInit(id, msg.id); + } else { + pendingInit.set(id, msg.id); + } + return; + } + + // initialized — swallow; our own was sent during the handshake + if (msg.method === 'notifications/initialized') return; + + // shutdown / exit — ack locally. One client leaving doesn't end + // the Obsidian session; other clients may still be connected. + if (msg.method === 'shutdown' || msg.method === 'exit') { + if (msg.id !== undefined && msg.id !== null) + clientSock.write(JSON.stringify({ + jsonrpc: '2.0', id: msg.id, result: null, + }) + '\n'); + return; + } + + // All other messages: rewrite the ID so we can route the response + // back to this specific client, then forward to Obsidian. + if (msg.id !== undefined && msg.id !== null) { + const rewritten = `${id}:${msg.id}`; + pending.set(rewritten, { clientId: id, origId: msg.id }); + toObsidian({ ...msg, id: rewritten }); + } else { + // Notifications (no id) pass straight through + toObsidian(msg); + } + }); + + const cleanup = () => { + clients.delete(id); + pendingInit.delete(id); + // Drop any pending responses for this client — Obsidian's replies + // will arrive but we'll have nowhere to send them; that's fine. + for (const [k, p] of pending) if (p.clientId === id) pending.delete(k); + log(`client ${id} gone (${clients.size} remaining)`); + resetIdleTimer(); + }; + clientSock.on('close', cleanup); + clientSock.on('error', cleanup); + }); + + server.listen(PROXY_SOCKET, () => { + log(`proxy listening on ${PROXY_SOCKET}`); + connectObsidian(); + // Start idle timer — if no client connects within the timeout, exit. + // (The spawning client usually connects within ~1s, cancelling this.) + resetIdleTimer(); + }); + + process.on('exit', () => { try { fs.unlinkSync(PROXY_SOCKET); } catch {} }); + process.on('SIGTERM', teardown); + process.on('SIGINT', teardown); + process.on('SIGHUP', teardown); +} + +// ── Client (stdio bridge) ───────────────────────────────────────────────────── +// This is the process that MCP clients actually spawn. It connects to the +// daemon's proxy socket and pipes stdin/stdout through — a transparent bridge. +// If the daemon isn't running, it spawns one and retries. + +function runClient(attempt = 0) { + const sock = net.createConnection(PROXY_SOCKET); + + sock.once('connect', () => { + process.stdin.pipe(sock); + sock.pipe(process.stdout); + }); + + sock.once('error', () => { + if (attempt === 0) { + // First failure — no daemon running. Spawn one detached and retry. + log('starting daemon…'); + spawn(process.execPath, [__filename, '--daemon'], { + detached: true, + stdio: 'ignore', + }).unref(); + setTimeout(() => runClient(1), 700); + } else if (attempt < 20) { + // Daemon is probably still starting up — keep trying + setTimeout(() => runClient(attempt + 1), 300); + } else { + // Give up after ~7s of retries + process.exit(1); + } + }); + + // If the proxy socket closes, we're done + sock.on('close', () => process.exit(0)); + // If the MCP client closes stdin, tear down our side + process.stdin.on('end', () => sock.destroy()); +} + +// ── Entry ───────────────────────────────────────────────────────────────────── + +if (process.argv.includes('--daemon')) runDaemon(); +else runClient(); diff --git a/src/server/transport/IPCTransportManager.ts b/src/server/transport/IPCTransportManager.ts index eded4ca9..0abb7803 100644 --- a/src/server/transport/IPCTransportManager.ts +++ b/src/server/transport/IPCTransportManager.ts @@ -3,7 +3,7 @@ * Follows Single Responsibility Principle by focusing only on IPC transport */ -import { Server as NetServer, createServer } from 'net'; +import { Server as NetServer, Socket, createServer } from 'net'; import { promises as fs } from 'fs'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { Server as MCPSDKServer } from '@modelcontextprotocol/sdk/server/index.js'; @@ -56,20 +56,47 @@ export class IPCTransportManager { /** * Handle new socket connections + * + * Wires the raw socket's lifecycle events to the MCP transport so that + * Protocol._transport is cleared when a client disconnects. Without this, + * a dropped connector permanently wedges the single-transport Protocol and + * all subsequent connections are silently black-holed. */ private handleSocketConnection(socket: NodeJS.ReadWriteStream): void { try { const transport = this.stdioTransportManager.createSocketTransport(socket, socket); - + + // The underlying socket emits 'close'/'end' when the connector + // process dies — forward that into transport.close() so the SDK's + // Protocol._onclose() fires and resets _transport to undefined. + const netSocket = socket as Socket; + let closed = false; + const onSocketGone = () => { + if (closed) return; + closed = true; + logger.systemLog('IPC socket disconnected — releasing transport'); + transport.close().catch((err: Error) => { + logger.systemError(err, 'IPC Transport Close on Disconnect'); + }); + }; + netSocket.on('close', onSocketGone); + netSocket.on('end', onSocketGone); + this.stdioTransportManager.connectSocketTransport(transport) .then(() => { logger.systemLog('IPC socket connected successfully'); }) .catch(error => { logger.systemError(error as Error, 'IPC Socket Connection'); + // server.connect() rejected (e.g. "Already connected to a + // transport") — destroy the raw socket so we don't leak FDs. + if (!netSocket.destroyed) netSocket.destroy(); }); } catch (error) { logger.systemError(error as Error, 'IPC Socket Handling'); + // Destroy on synchronous failure too + const netSocket = socket as Socket; + if (!netSocket.destroyed) netSocket.destroy(); } } diff --git a/tests/unit/nexus-mux.test.ts b/tests/unit/nexus-mux.test.ts new file mode 100644 index 00000000..c8fcff10 --- /dev/null +++ b/tests/unit/nexus-mux.test.ts @@ -0,0 +1,492 @@ +/** + * nexus-mux.js — Multiplexer tests + * + * Spins up a real daemon process against a fake "Obsidian" server, + * then exercises client connections, message routing, idle timeout, + * and session-lifecycle interception. + * + * The fake Obsidian server speaks just enough MCP to handshake and + * echo requests back, so we can verify the mux routes correctly. + */ + +import * as net from 'net'; +import * as readline from 'readline'; +import * as path from 'path'; +import { ChildProcess, fork } from 'child_process'; +import * as fs from 'fs'; + +// ── Helpers ───────────────────────────────────────────────────────────────── + +const MUX_PATH = path.resolve(__dirname, '../../nexus-mux.js'); + +// Use unique socket paths per test run to avoid collisions +const TEST_ID = `test_${process.pid}_${Date.now()}`; +const OBSIDIAN_SOCKET = `/tmp/nexus_mux_${TEST_ID}_obsidian.sock`; +const PROXY_SOCKET = `/tmp/nexus_mux_${TEST_ID}_proxy.sock`; + +// Short idle timeout for testing (1 second) +const TEST_IDLE_TIMEOUT_MS = 1000; + +/** JSON-RPC helper */ +function rpc(method: string, id?: string | number, params?: Record) { + const msg: Record = { jsonrpc: '2.0', method }; + if (id !== undefined) msg.id = id; + if (params) msg.params = params; + return msg; +} + +function rpcResponse(id: string | number, result: unknown) { + return { jsonrpc: '2.0', id, result }; +} + +/** + * Fake Obsidian MCP server. + * Handles initialize handshake, then echoes any request back as a response + * with { echo: true, method, params } so tests can verify routing. + */ +function createFakeObsidian(): Promise<{ + server: net.Server; + connections: net.Socket[]; + receivedMessages: Array>; + close: () => Promise; +}> { + return new Promise((resolve) => { + const connections: net.Socket[] = []; + const receivedMessages: Array> = []; + + const server = net.createServer((socket) => { + connections.push(socket); + const rl = readline.createInterface({ input: socket }); + + rl.on('line', (line) => { + let msg: Record; + try { msg = JSON.parse(line); } catch { return; } + receivedMessages.push(msg); + + // Respond to initialize + if (msg.method === 'initialize') { + socket.write(JSON.stringify(rpcResponse(msg.id as string, { + protocolVersion: '2024-11-05', + capabilities: {}, + serverInfo: { name: 'fake-obsidian', version: '1.0.0' }, + })) + '\n'); + return; + } + + // Swallow notifications (no id) + if (msg.id === undefined || msg.id === null) return; + + // Echo everything else back as a response + socket.write(JSON.stringify(rpcResponse(msg.id as string, { + echo: true, + method: msg.method, + params: msg.params, + })) + '\n'); + }); + + socket.on('error', () => {}); + }); + + server.listen(OBSIDIAN_SOCKET, () => { + resolve({ + server, + connections, + receivedMessages, + close: () => new Promise((res) => { + connections.forEach((s) => s.destroy()); + server.close(() => { + try { fs.unlinkSync(OBSIDIAN_SOCKET); } catch {} + res(); + }); + }), + }); + }); + }); +} + +/** + * Start the mux daemon as a child process, patched to use our test sockets + * and a short idle timeout. + * + * We write a patched copy of the mux to a temp file (swapping socket paths, + * idle timeout, and forcing daemon mode) then spawn it directly. + */ +const PATCHED_MUX_PATH = `/tmp/nexus_mux_${TEST_ID}_patched.js`; + +function writePatchedMux(): void { + let src = fs.readFileSync(MUX_PATH, 'utf8'); + src = src.replace(/^#!.*\n/, ''); + src = src.replace( + /const OBSIDIAN_SOCKET = .+/, + `const OBSIDIAN_SOCKET = ${JSON.stringify(OBSIDIAN_SOCKET)};`, + ); + src = src.replace( + /const PROXY_SOCKET\s*= .+/, + `const PROXY_SOCKET = ${JSON.stringify(PROXY_SOCKET)};`, + ); + src = src.replace( + /const IDLE_TIMEOUT_MS = .+/, + `const IDLE_TIMEOUT_MS = ${TEST_IDLE_TIMEOUT_MS};`, + ); + // Force daemon mode: replace the entry-point conditional with just runDaemon() + src = src.replace( + /if \(process\.argv\.includes\('--daemon'\)\) runDaemon\(\);\nelse runClient\(\);/, + 'runDaemon();', + ); + fs.writeFileSync(PATCHED_MUX_PATH, src); +} + +function startDaemon(): ChildProcess { + writePatchedMux(); + const child = fork(PATCHED_MUX_PATH, [], { + stdio: ['pipe', 'pipe', 'pipe', 'ipc'], + env: { ...process.env, NEXUS_MUX_DEBUG: '1' }, + }); + return child; +} + +/** Connect to the proxy socket and return a line-based interface. */ +function connectClient(): Promise<{ + socket: net.Socket; + lines: string[]; + waitForLine: (predicate?: (msg: Record) => boolean) => Promise>; + send: (msg: Record) => void; + close: () => void; +}> { + return new Promise((resolve, reject) => { + const socket = net.createConnection(PROXY_SOCKET); + const lines: string[] = []; + const waiters: Array<{ + predicate: (msg: Record) => boolean; + resolve: (msg: Record) => void; + }> = []; + + socket.once('error', reject); + + socket.once('connect', () => { + socket.removeListener('error', reject); + const rl = readline.createInterface({ input: socket }); + + rl.on('line', (line) => { + lines.push(line); + let msg: Record; + try { msg = JSON.parse(line); } catch { return; } + + // Check waiters + for (let i = waiters.length - 1; i >= 0; i--) { + if (waiters[i].predicate(msg)) { + const waiter = waiters.splice(i, 1)[0]; + waiter.resolve(msg); + } + } + }); + + resolve({ + socket, + lines, + send: (msg) => socket.write(JSON.stringify(msg) + '\n'), + waitForLine: (predicate = () => true) => new Promise((res) => { + waiters.push({ predicate, resolve: res }); + }), + close: () => socket.destroy(), + }); + }); + }); +} + +/** Wait for the proxy socket to exist. */ +async function waitForProxy(timeoutMs = 5000): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + try { + fs.statSync(PROXY_SOCKET); + return; + } catch { + await new Promise((r) => setTimeout(r, 100)); + } + } + throw new Error(`Proxy socket ${PROXY_SOCKET} not created within ${timeoutMs}ms`); +} + +// ── Tests ─────────────────────────────────────────────────────────────────── + +describe('nexus-mux', () => { + let fakeObsidian: Awaited>; + let daemon: ChildProcess; + + beforeEach(async () => { + // Clean up any stale sockets + try { fs.unlinkSync(OBSIDIAN_SOCKET); } catch {} + try { fs.unlinkSync(PROXY_SOCKET); } catch {} + + fakeObsidian = await createFakeObsidian(); + daemon = startDaemon(); + await waitForProxy(); + }); + + afterEach(async () => { + // Kill daemon + if (daemon && !daemon.killed) { + daemon.kill('SIGTERM'); + // Wait for it to die + await new Promise((resolve) => { + daemon.once('exit', () => resolve()); + setTimeout(resolve, 2000); + }); + } + + await fakeObsidian.close(); + + // Final cleanup + try { fs.unlinkSync(PROXY_SOCKET); } catch {} + try { fs.unlinkSync(PATCHED_MUX_PATH); } catch {} + }); + + // ── Handshake ─────────────────────────────────────────────────────────── + + it('performs MCP handshake with Obsidian on startup', async () => { + // Give the daemon a moment to handshake + await new Promise((r) => setTimeout(r, 500)); + + const initMsg = fakeObsidian.receivedMessages.find( + (m) => m.method === 'initialize' + ); + expect(initMsg).toBeDefined(); + expect((initMsg!.params as Record).clientInfo).toEqual({ + name: 'nexus-mux', + version: '1.0.0', + }); + + // Should also have sent notifications/initialized + const initialized = fakeObsidian.receivedMessages.find( + (m) => m.method === 'notifications/initialized' + ); + expect(initialized).toBeDefined(); + }); + + // ── Client initialize ───────────────────────────────────────────────── + + it('replies to client initialize from cache without forwarding to Obsidian', async () => { + // Wait for daemon to be ready + await new Promise((r) => setTimeout(r, 500)); + const msgCountBefore = fakeObsidian.receivedMessages.length; + + const client = await connectClient(); + client.send(rpc('initialize', 1, { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test-client', version: '0.1.0' }, + })); + + const response = await client.waitForLine((m) => m.id === 1); + expect(response.result).toBeDefined(); + expect((response.result as Record).serverInfo).toEqual({ + name: 'fake-obsidian', + version: '1.0.0', + }); + + // No new messages forwarded to Obsidian (initialize is handled locally) + expect(fakeObsidian.receivedMessages.length).toBe(msgCountBefore); + + client.close(); + }); + + // ── Request routing ─────────────────────────────────────────────────── + + it('routes requests to Obsidian and responses back to the correct client', async () => { + await new Promise((r) => setTimeout(r, 500)); + + const client = await connectClient(); + + // Do the handshake first + client.send(rpc('initialize', 'init-1', { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test', version: '0.1.0' }, + })); + await client.waitForLine((m) => m.id === 'init-1'); + + // Send a real request + client.send(rpc('tools/list', 42)); + + const response = await client.waitForLine((m) => m.id === 42); + expect(response.result).toBeDefined(); + const result = response.result as Record; + expect(result.echo).toBe(true); + expect(result.method).toBe('tools/list'); + + // Verify the ID was rewritten for Obsidian (contains clientId prefix) + const forwarded = fakeObsidian.receivedMessages.find( + (m) => m.method === 'tools/list' + ); + expect(forwarded).toBeDefined(); + expect(typeof forwarded!.id).toBe('string'); + expect((forwarded!.id as string)).toContain(':42'); + + client.close(); + }); + + // ── Multi-client routing ────────────────────────────────────────────── + + it('routes responses to the correct client when multiple are connected', async () => { + await new Promise((r) => setTimeout(r, 500)); + + const clientA = await connectClient(); + const clientB = await connectClient(); + + // Handshake both + for (const [client, name] of [[clientA, 'A'], [clientB, 'B']] as const) { + client.send(rpc('initialize', `init-${name}`, { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: `client-${name}`, version: '0.1.0' }, + })); + await client.waitForLine((m) => m.id === `init-${name}`); + } + + // Both send requests with the same ID (collision test) + clientA.send(rpc('tools/list', 1)); + clientB.send(rpc('resources/list', 1)); + + const responseA = await clientA.waitForLine((m) => m.id === 1); + const responseB = await clientB.waitForLine((m) => m.id === 1); + + // Each gets their own response back, with the original ID restored + expect((responseA.result as Record).method).toBe('tools/list'); + expect((responseB.result as Record).method).toBe('resources/list'); + + clientA.close(); + clientB.close(); + }); + + // ── Shutdown interception ───────────────────────────────────────────── + + it('intercepts shutdown/exit without forwarding to Obsidian', async () => { + await new Promise((r) => setTimeout(r, 500)); + const msgCountBefore = fakeObsidian.receivedMessages.length; + + const client = await connectClient(); + client.send(rpc('initialize', 'init', { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test', version: '0.1.0' }, + })); + await client.waitForLine((m) => m.id === 'init'); + + // shutdown should be acked locally + client.send(rpc('shutdown', 'sd-1')); + const sdResponse = await client.waitForLine((m) => m.id === 'sd-1'); + expect(sdResponse.result).toBeNull(); + + // exit (notification, no id) should be swallowed + client.send(rpc('exit')); + + // Brief wait, then verify nothing was forwarded + await new Promise((r) => setTimeout(r, 200)); + const newMessages = fakeObsidian.receivedMessages.slice(msgCountBefore); + const forwarded = newMessages.filter( + (m) => m.method === 'shutdown' || m.method === 'exit' + ); + expect(forwarded).toHaveLength(0); + + client.close(); + }); + + // ── notifications/initialized swallowed ─────────────────────────────── + + it('swallows client notifications/initialized', async () => { + await new Promise((r) => setTimeout(r, 500)); + const msgCountBefore = fakeObsidian.receivedMessages.length; + + const client = await connectClient(); + client.send(rpc('initialize', 'init', { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test', version: '0.1.0' }, + })); + await client.waitForLine((m) => m.id === 'init'); + + client.send(rpc('notifications/initialized')); + + await new Promise((r) => setTimeout(r, 200)); + const newMessages = fakeObsidian.receivedMessages.slice(msgCountBefore); + const forwarded = newMessages.filter( + (m) => m.method === 'notifications/initialized' + ); + expect(forwarded).toHaveLength(0); + + client.close(); + }); + + // ── Idle timeout ────────────────────────────────────────────────────── + + it('shuts down after idle timeout with no clients', async () => { + await new Promise((r) => setTimeout(r, 500)); + + // Connect and then disconnect a client + const client = await connectClient(); + client.send(rpc('initialize', 'init', { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test', version: '0.1.0' }, + })); + await client.waitForLine((m) => m.id === 'init'); + client.close(); + + // Daemon should exit after TEST_IDLE_TIMEOUT_MS (1s) + some grace + const exitCode = await new Promise((resolve) => { + const timeout = setTimeout(() => resolve(null), 5000); + daemon.once('exit', (code) => { + clearTimeout(timeout); + resolve(code); + }); + }); + + expect(exitCode).toBe(0); + + // Proxy socket should be cleaned up + expect(() => fs.statSync(PROXY_SOCKET)).toThrow(); + }); + + it('cancels idle timeout when a new client connects', async () => { + await new Promise((r) => setTimeout(r, 500)); + + // Connect, disconnect — starts idle timer + const client1 = await connectClient(); + client1.close(); + + // Wait for most of the idle timeout, then connect again + await new Promise((r) => setTimeout(r, TEST_IDLE_TIMEOUT_MS * 0.7)); + + const client2 = await connectClient(); + client2.send(rpc('initialize', 'init', { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test', version: '0.1.0' }, + })); + const response = await client2.waitForLine((m) => m.id === 'init'); + expect(response.result).toBeDefined(); + + // Daemon should still be alive after the original timeout would have fired + await new Promise((r) => setTimeout(r, TEST_IDLE_TIMEOUT_MS * 0.5)); + expect(daemon.killed).toBe(false); + expect(daemon.exitCode).toBeNull(); + + client2.close(); + }); + + // ── Boot idle timeout ───────────────────────────────────────────────── + + it('shuts down if no client ever connects', async () => { + // Don't connect any clients — daemon should exit after idle timeout + const exitCode = await new Promise((resolve) => { + const timeout = setTimeout(() => resolve(null), 5000); + daemon.once('exit', (code) => { + clearTimeout(timeout); + resolve(code); + }); + }); + + expect(exitCode).toBe(0); + }); +});