diff --git a/server/package.json b/server/package.json index 5b398d64c..62da989ce 100644 --- a/server/package.json +++ b/server/package.json @@ -24,7 +24,8 @@ "build": "tsc && shx cp -R static build", "start": "node build/index.js", "dev": "tsx watch --clear-screen=false src/index.ts", - "dev:windows": "tsx watch --clear-screen=false src/index.ts < NUL" + "dev:windows": "tsx watch --clear-screen=false src/index.ts < NUL", + "test": "node --import tsx --test 'src/**/*.test.ts'" }, "devDependencies": { "@types/cors": "^2.8.19", diff --git a/server/src/index.ts b/server/src/index.ts index bdfe49019..2cbd3e255 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -28,6 +28,12 @@ import express from "express"; import rateLimit from "express-rate-limit"; import { findActualExecutable } from "spawn-rx"; import mcpProxy, { type ProxyHeaderHolder } from "./mcpProxy.js"; +import { + chainOnClose, + removeSession, + sendToClientSafe, + type SessionMaps, +} from "./sessionRegistry.js"; import { randomUUID, randomBytes, timingSafeEqual } from "node:crypto"; import { fileURLToPath } from "url"; import { dirname, join } from "path"; @@ -222,6 +228,12 @@ const webAppTransports: Map = new Map(); / const serverTransports: Map = new Map(); // Server Transports by web app sessionId const sessionHeaderHolders: Map = new Map(); // For dynamic header updates +const sessionMaps: SessionMaps = { + webAppTransports, + serverTransports, + sessionHeaderHolders, +}; + // Use provided token from environment or generate a new one const sessionToken = process.env.MCP_PROXY_AUTH_TOKEN || randomBytes(32).toString("hex"); @@ -668,11 +680,15 @@ app.get( await webAppTransport.start(); - (serverTransport as StdioClientTransport).stderr!.on("data", (chunk) => { + const sessionId = webAppTransport.sessionId; + const stdioTransport = serverTransport as StdioClientTransport; + const stderrStream = stdioTransport.stderr; + + const onStderrData = (chunk: Buffer) => { if (chunk.toString().includes("MODULE_NOT_FOUND")) { // Server command not found, remove transports const message = "Command not found, transports removed"; - webAppTransport.send({ + sendToClientSafe(webAppTransport, { jsonrpc: "2.0", method: "notifications/message", params: { @@ -685,9 +701,7 @@ app.get( }); webAppTransport.close(); serverTransport.close(); - webAppTransports.delete(webAppTransport.sessionId); - serverTransports.delete(webAppTransport.sessionId); - sessionHeaderHolders.delete(webAppTransport.sessionId); + removeSession(sessionMaps, sessionId); console.error(message); } else { // Inspect message and attempt to assign a RFC 5424 Syslog Protocol level @@ -722,7 +736,7 @@ app.get( } else { level = "info"; } - webAppTransport.send({ + sendToClientSafe(webAppTransport, { jsonrpc: "2.0", method: "notifications/message", params: { @@ -734,12 +748,20 @@ app.get( }, }); } - }); + }; + + stderrStream?.on("data", onStderrData); mcpProxy({ transportToClient: webAppTransport, transportToServer: serverTransport, }); + + chainOnClose(webAppTransport, () => { + stderrStream?.removeListener("data", onStderrData); + removeSession(sessionMaps, sessionId); + console.log(`Transports removed for sessionId ${sessionId}`); + }); } catch (error) { if (is401Error(error)) { console.error( @@ -784,11 +806,18 @@ app.get( await webAppTransport.start(); + const sessionId = webAppTransport.sessionId; + mcpProxy({ transportToClient: webAppTransport, transportToServer: serverTransport, headerHolder, }); + + chainOnClose(webAppTransport, () => { + removeSession(sessionMaps, sessionId); + console.log(`Transports removed for sessionId ${sessionId}`); + }); } catch (error) { if (is401Error(error)) { console.error( diff --git a/server/src/mcpProxy.ts b/server/src/mcpProxy.ts index 3195553e1..f2d2aefe9 100644 --- a/server/src/mcpProxy.ts +++ b/server/src/mcpProxy.ts @@ -2,6 +2,7 @@ import { SseError } from "@modelcontextprotocol/sdk/client/sse.js"; import { StreamableHTTPError } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { isJSONRPCRequest } from "@modelcontextprotocol/sdk/types.js"; +import { sendToClientSafe } from "./sessionRegistry.js"; /** * JSON-RPC error code for failed proxy → upstream transport sends. @@ -113,7 +114,7 @@ export default function mcpProxy({ data: serializeProxyTransportError(error, headerHolder), }, }; - transportToClient.send(errorResponse).catch(onClientError); + sendToClientSafe(transportToClient, errorResponse); } }); }; @@ -128,7 +129,7 @@ export default function mcpProxy({ } reportedServerSession = true; } - transportToClient.send(message).catch(onClientError); + sendToClientSafe(transportToClient, message); }; transportToClient.onclose = () => { diff --git a/server/src/sessionRegistry.test.ts b/server/src/sessionRegistry.test.ts new file mode 100644 index 000000000..60c499c5c --- /dev/null +++ b/server/src/sessionRegistry.test.ts @@ -0,0 +1,55 @@ +import assert from "node:assert/strict"; +import { describe, it } from "node:test"; +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import { + chainOnClose, + removeSession, + sendToClientSafe, + type SessionMaps, +} from "./sessionRegistry.js"; + +describe("sessionRegistry", () => { + it("removeSession clears all session maps", () => { + const maps: SessionMaps = { + webAppTransports: new Map([["s1", {} as Transport]]), + serverTransports: new Map([["s1", {} as Transport]]), + sessionHeaderHolders: new Map([["s1", { headers: {} }]]), + }; + + removeSession(maps, "s1"); + + assert.equal(maps.webAppTransports.size, 0); + assert.equal(maps.serverTransports.size, 0); + assert.equal(maps.sessionHeaderHolders.size, 0); + }); + + it("sendToClientSafe ignores Not connected errors", async () => { + const transport = { + send: async () => { + throw new Error("Not connected"); + }, + } as unknown as Transport; + + sendToClientSafe(transport, { + jsonrpc: "2.0", + method: "notifications/test", + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + }); + + it("chainOnClose runs chained handlers in order", () => { + const calls: string[] = []; + const transport = { + onclose: () => { + calls.push("previous"); + }, + } as Transport; + + chainOnClose(transport, () => { + calls.push("new"); + }); + transport.onclose?.(); + + assert.deepEqual(calls, ["new", "previous"]); + }); +}); diff --git a/server/src/sessionRegistry.ts b/server/src/sessionRegistry.ts new file mode 100644 index 000000000..f693b0210 --- /dev/null +++ b/server/src/sessionRegistry.ts @@ -0,0 +1,35 @@ +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import type { ProxyHeaderHolder } from "./mcpProxy.js"; + +export type SessionMaps = { + webAppTransports: Map; + serverTransports: Map; + sessionHeaderHolders: Map; +}; + +export function removeSession(maps: SessionMaps, sessionId: string): void { + maps.webAppTransports.delete(sessionId); + maps.serverTransports.delete(sessionId); + maps.sessionHeaderHolders.delete(sessionId); +} + +/** Ignore "Not connected" when the browser SSE stream has already closed. */ +export function sendToClientSafe( + transport: Transport, + message: Parameters[0], +): void { + transport.send(message).catch((error: unknown) => { + if (error instanceof Error && error.message === "Not connected") { + return; + } + console.error("Error from inspector client:", error); + }); +} + +export function chainOnClose(transport: Transport, onClose: () => void): void { + const previousOnClose = transport.onclose; + transport.onclose = () => { + onClose(); + previousOnClose?.(); + }; +} diff --git a/server/tsconfig.json b/server/tsconfig.json index b5a92612a..275a6a092 100644 --- a/server/tsconfig.json +++ b/server/tsconfig.json @@ -12,5 +12,5 @@ "resolveJsonModule": true }, "include": ["src/**/*"], - "exclude": ["node_modules", "packages", "**/*.spec.ts"] + "exclude": ["node_modules", "packages", "**/*.spec.ts", "**/*.test.ts"] }