From fef6f71f54ce58072de319fe9c36da84de18820d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 25 Jun 2026 19:34:48 +0000 Subject: [PATCH] fix(server): prevent SSE disconnect from crashing inspector proxy When the browser closes or replaces an SSE session, stderr forwarding and mcpProxy could still call SSEServerTransport.send() after the stream was torn down, throwing an unhandled 'Not connected' error. - Add safe client send helper that ignores disconnected SSE sessions - Remove proxy session maps and stdio stderr listeners on client close - Chain cleanup after mcpProxy onclose for /stdio and /sse routes Fixes #1014 --- server/package.json | 3 +- server/src/index.ts | 43 +++++++++++++++++++---- server/src/mcpProxy.ts | 5 +-- server/src/sessionRegistry.test.ts | 55 ++++++++++++++++++++++++++++++ server/src/sessionRegistry.ts | 35 +++++++++++++++++++ server/tsconfig.json | 2 +- 6 files changed, 132 insertions(+), 11 deletions(-) create mode 100644 server/src/sessionRegistry.test.ts create mode 100644 server/src/sessionRegistry.ts diff --git a/server/package.json b/server/package.json index 5b398d64c..62da989ce 100644 --- a/server/package.json +++ b/server/package.json @@ -24,7 +24,8 @@ "build": "tsc && shx cp -R static build", "start": "node build/index.js", "dev": "tsx watch --clear-screen=false src/index.ts", - "dev:windows": "tsx watch --clear-screen=false src/index.ts < NUL" + "dev:windows": "tsx watch --clear-screen=false src/index.ts < NUL", + "test": "node --import tsx --test 'src/**/*.test.ts'" }, "devDependencies": { "@types/cors": "^2.8.19", diff --git a/server/src/index.ts b/server/src/index.ts index bdfe49019..2cbd3e255 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -28,6 +28,12 @@ import express from "express"; import rateLimit from "express-rate-limit"; import { findActualExecutable } from "spawn-rx"; import mcpProxy, { type ProxyHeaderHolder } from "./mcpProxy.js"; +import { + chainOnClose, + removeSession, + sendToClientSafe, + type SessionMaps, +} from "./sessionRegistry.js"; import { randomUUID, randomBytes, timingSafeEqual } from "node:crypto"; import { fileURLToPath } from "url"; import { dirname, join } from "path"; @@ -222,6 +228,12 @@ const webAppTransports: Map = new Map(); / const serverTransports: Map = new Map(); // Server Transports by web app sessionId const sessionHeaderHolders: Map = new Map(); // For dynamic header updates +const sessionMaps: SessionMaps = { + webAppTransports, + serverTransports, + sessionHeaderHolders, +}; + // Use provided token from environment or generate a new one const sessionToken = process.env.MCP_PROXY_AUTH_TOKEN || randomBytes(32).toString("hex"); @@ -668,11 +680,15 @@ app.get( await webAppTransport.start(); - (serverTransport as StdioClientTransport).stderr!.on("data", (chunk) => { + const sessionId = webAppTransport.sessionId; + const stdioTransport = serverTransport as StdioClientTransport; + const stderrStream = stdioTransport.stderr; + + const onStderrData = (chunk: Buffer) => { if (chunk.toString().includes("MODULE_NOT_FOUND")) { // Server command not found, remove transports const message = "Command not found, transports removed"; - webAppTransport.send({ + sendToClientSafe(webAppTransport, { jsonrpc: "2.0", method: "notifications/message", params: { @@ -685,9 +701,7 @@ app.get( }); webAppTransport.close(); serverTransport.close(); - webAppTransports.delete(webAppTransport.sessionId); - serverTransports.delete(webAppTransport.sessionId); - sessionHeaderHolders.delete(webAppTransport.sessionId); + removeSession(sessionMaps, sessionId); console.error(message); } else { // Inspect message and attempt to assign a RFC 5424 Syslog Protocol level @@ -722,7 +736,7 @@ app.get( } else { level = "info"; } - webAppTransport.send({ + sendToClientSafe(webAppTransport, { jsonrpc: "2.0", method: "notifications/message", params: { @@ -734,12 +748,20 @@ app.get( }, }); } - }); + }; + + stderrStream?.on("data", onStderrData); mcpProxy({ transportToClient: webAppTransport, transportToServer: serverTransport, }); + + chainOnClose(webAppTransport, () => { + stderrStream?.removeListener("data", onStderrData); + removeSession(sessionMaps, sessionId); + console.log(`Transports removed for sessionId ${sessionId}`); + }); } catch (error) { if (is401Error(error)) { console.error( @@ -784,11 +806,18 @@ app.get( await webAppTransport.start(); + const sessionId = webAppTransport.sessionId; + mcpProxy({ transportToClient: webAppTransport, transportToServer: serverTransport, headerHolder, }); + + chainOnClose(webAppTransport, () => { + removeSession(sessionMaps, sessionId); + console.log(`Transports removed for sessionId ${sessionId}`); + }); } catch (error) { if (is401Error(error)) { console.error( diff --git a/server/src/mcpProxy.ts b/server/src/mcpProxy.ts index 3195553e1..f2d2aefe9 100644 --- a/server/src/mcpProxy.ts +++ b/server/src/mcpProxy.ts @@ -2,6 +2,7 @@ import { SseError } from "@modelcontextprotocol/sdk/client/sse.js"; import { StreamableHTTPError } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { isJSONRPCRequest } from "@modelcontextprotocol/sdk/types.js"; +import { sendToClientSafe } from "./sessionRegistry.js"; /** * JSON-RPC error code for failed proxy → upstream transport sends. @@ -113,7 +114,7 @@ export default function mcpProxy({ data: serializeProxyTransportError(error, headerHolder), }, }; - transportToClient.send(errorResponse).catch(onClientError); + sendToClientSafe(transportToClient, errorResponse); } }); }; @@ -128,7 +129,7 @@ export default function mcpProxy({ } reportedServerSession = true; } - transportToClient.send(message).catch(onClientError); + sendToClientSafe(transportToClient, message); }; transportToClient.onclose = () => { diff --git a/server/src/sessionRegistry.test.ts b/server/src/sessionRegistry.test.ts new file mode 100644 index 000000000..60c499c5c --- /dev/null +++ b/server/src/sessionRegistry.test.ts @@ -0,0 +1,55 @@ +import assert from "node:assert/strict"; +import { describe, it } from "node:test"; +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import { + chainOnClose, + removeSession, + sendToClientSafe, + type SessionMaps, +} from "./sessionRegistry.js"; + +describe("sessionRegistry", () => { + it("removeSession clears all session maps", () => { + const maps: SessionMaps = { + webAppTransports: new Map([["s1", {} as Transport]]), + serverTransports: new Map([["s1", {} as Transport]]), + sessionHeaderHolders: new Map([["s1", { headers: {} }]]), + }; + + removeSession(maps, "s1"); + + assert.equal(maps.webAppTransports.size, 0); + assert.equal(maps.serverTransports.size, 0); + assert.equal(maps.sessionHeaderHolders.size, 0); + }); + + it("sendToClientSafe ignores Not connected errors", async () => { + const transport = { + send: async () => { + throw new Error("Not connected"); + }, + } as unknown as Transport; + + sendToClientSafe(transport, { + jsonrpc: "2.0", + method: "notifications/test", + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + }); + + it("chainOnClose runs chained handlers in order", () => { + const calls: string[] = []; + const transport = { + onclose: () => { + calls.push("previous"); + }, + } as Transport; + + chainOnClose(transport, () => { + calls.push("new"); + }); + transport.onclose?.(); + + assert.deepEqual(calls, ["new", "previous"]); + }); +}); diff --git a/server/src/sessionRegistry.ts b/server/src/sessionRegistry.ts new file mode 100644 index 000000000..f693b0210 --- /dev/null +++ b/server/src/sessionRegistry.ts @@ -0,0 +1,35 @@ +import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import type { ProxyHeaderHolder } from "./mcpProxy.js"; + +export type SessionMaps = { + webAppTransports: Map; + serverTransports: Map; + sessionHeaderHolders: Map; +}; + +export function removeSession(maps: SessionMaps, sessionId: string): void { + maps.webAppTransports.delete(sessionId); + maps.serverTransports.delete(sessionId); + maps.sessionHeaderHolders.delete(sessionId); +} + +/** Ignore "Not connected" when the browser SSE stream has already closed. */ +export function sendToClientSafe( + transport: Transport, + message: Parameters[0], +): void { + transport.send(message).catch((error: unknown) => { + if (error instanceof Error && error.message === "Not connected") { + return; + } + console.error("Error from inspector client:", error); + }); +} + +export function chainOnClose(transport: Transport, onClose: () => void): void { + const previousOnClose = transport.onclose; + transport.onclose = () => { + onClose(); + previousOnClose?.(); + }; +} diff --git a/server/tsconfig.json b/server/tsconfig.json index b5a92612a..275a6a092 100644 --- a/server/tsconfig.json +++ b/server/tsconfig.json @@ -12,5 +12,5 @@ "resolveJsonModule": true }, "include": ["src/**/*"], - "exclude": ["node_modules", "packages", "**/*.spec.ts"] + "exclude": ["node_modules", "packages", "**/*.spec.ts", "**/*.test.ts"] }