Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"build": "tsc && shx cp -R static build",
"start": "node build/index.js",
"dev": "tsx watch --clear-screen=false src/index.ts",
"dev:windows": "tsx watch --clear-screen=false src/index.ts < NUL"
"dev:windows": "tsx watch --clear-screen=false src/index.ts < NUL",
"test": "node --import tsx --test 'src/**/*.test.ts'"
},
"devDependencies": {
"@types/cors": "^2.8.19",
Expand Down
43 changes: 36 additions & 7 deletions server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import express from "express";
import rateLimit from "express-rate-limit";
import { findActualExecutable } from "spawn-rx";
import mcpProxy, { type ProxyHeaderHolder } from "./mcpProxy.js";
import {
chainOnClose,
removeSession,
sendToClientSafe,
type SessionMaps,
} from "./sessionRegistry.js";
import { randomUUID, randomBytes, timingSafeEqual } from "node:crypto";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
Expand Down Expand Up @@ -222,6 +228,12 @@ const webAppTransports: Map<string, Transport> = new Map<string, Transport>(); /
const serverTransports: Map<string, Transport> = new Map<string, Transport>(); // Server Transports by web app sessionId
const sessionHeaderHolders: Map<string, ProxyHeaderHolder> = new Map(); // For dynamic header updates

const sessionMaps: SessionMaps = {
webAppTransports,
serverTransports,
sessionHeaderHolders,
};

// Use provided token from environment or generate a new one
const sessionToken =
process.env.MCP_PROXY_AUTH_TOKEN || randomBytes(32).toString("hex");
Expand Down Expand Up @@ -668,11 +680,15 @@ app.get(

await webAppTransport.start();

(serverTransport as StdioClientTransport).stderr!.on("data", (chunk) => {
const sessionId = webAppTransport.sessionId;
const stdioTransport = serverTransport as StdioClientTransport;
const stderrStream = stdioTransport.stderr;

const onStderrData = (chunk: Buffer) => {
if (chunk.toString().includes("MODULE_NOT_FOUND")) {
// Server command not found, remove transports
const message = "Command not found, transports removed";
webAppTransport.send({
sendToClientSafe(webAppTransport, {
jsonrpc: "2.0",
method: "notifications/message",
params: {
Expand All @@ -685,9 +701,7 @@ app.get(
});
webAppTransport.close();
serverTransport.close();
webAppTransports.delete(webAppTransport.sessionId);
serverTransports.delete(webAppTransport.sessionId);
sessionHeaderHolders.delete(webAppTransport.sessionId);
removeSession(sessionMaps, sessionId);
console.error(message);
} else {
// Inspect message and attempt to assign a RFC 5424 Syslog Protocol level
Expand Down Expand Up @@ -722,7 +736,7 @@ app.get(
} else {
level = "info";
}
webAppTransport.send({
sendToClientSafe(webAppTransport, {
jsonrpc: "2.0",
method: "notifications/message",
params: {
Expand All @@ -734,12 +748,20 @@ app.get(
},
});
}
});
};

stderrStream?.on("data", onStderrData);

mcpProxy({
transportToClient: webAppTransport,
transportToServer: serverTransport,
});

chainOnClose(webAppTransport, () => {
stderrStream?.removeListener("data", onStderrData);
removeSession(sessionMaps, sessionId);
console.log(`Transports removed for sessionId ${sessionId}`);
});
} catch (error) {
if (is401Error(error)) {
console.error(
Expand Down Expand Up @@ -784,11 +806,18 @@ app.get(

await webAppTransport.start();

const sessionId = webAppTransport.sessionId;

mcpProxy({
transportToClient: webAppTransport,
transportToServer: serverTransport,
headerHolder,
});

chainOnClose(webAppTransport, () => {
removeSession(sessionMaps, sessionId);
console.log(`Transports removed for sessionId ${sessionId}`);
});
} catch (error) {
if (is401Error(error)) {
console.error(
Expand Down
5 changes: 3 additions & 2 deletions server/src/mcpProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { SseError } from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPError } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { isJSONRPCRequest } from "@modelcontextprotocol/sdk/types.js";
import { sendToClientSafe } from "./sessionRegistry.js";

/**
* JSON-RPC error code for failed proxy → upstream transport sends.
Expand Down Expand Up @@ -113,7 +114,7 @@ export default function mcpProxy({
data: serializeProxyTransportError(error, headerHolder),
},
};
transportToClient.send(errorResponse).catch(onClientError);
sendToClientSafe(transportToClient, errorResponse);
}
});
};
Expand All @@ -128,7 +129,7 @@ export default function mcpProxy({
}
reportedServerSession = true;
}
transportToClient.send(message).catch(onClientError);
sendToClientSafe(transportToClient, message);
};

transportToClient.onclose = () => {
Expand Down
55 changes: 55 additions & 0 deletions server/src/sessionRegistry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import assert from "node:assert/strict";
import { describe, it } from "node:test";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import {
chainOnClose,
removeSession,
sendToClientSafe,
type SessionMaps,
} from "./sessionRegistry.js";

describe("sessionRegistry", () => {
it("removeSession clears all session maps", () => {
const maps: SessionMaps = {
webAppTransports: new Map([["s1", {} as Transport]]),
serverTransports: new Map([["s1", {} as Transport]]),
sessionHeaderHolders: new Map([["s1", { headers: {} }]]),
};

removeSession(maps, "s1");

assert.equal(maps.webAppTransports.size, 0);
assert.equal(maps.serverTransports.size, 0);
assert.equal(maps.sessionHeaderHolders.size, 0);
});

it("sendToClientSafe ignores Not connected errors", async () => {
const transport = {
send: async () => {
throw new Error("Not connected");
},
} as unknown as Transport;

sendToClientSafe(transport, {
jsonrpc: "2.0",
method: "notifications/test",
});
await new Promise((resolve) => setTimeout(resolve, 0));
});

it("chainOnClose runs chained handlers in order", () => {
const calls: string[] = [];
const transport = {
onclose: () => {
calls.push("previous");
},
} as Transport;

chainOnClose(transport, () => {
calls.push("new");
});
transport.onclose?.();

assert.deepEqual(calls, ["new", "previous"]);
});
});
35 changes: 35 additions & 0 deletions server/src/sessionRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import type { ProxyHeaderHolder } from "./mcpProxy.js";

export type SessionMaps = {
webAppTransports: Map<string, Transport>;
serverTransports: Map<string, Transport>;
sessionHeaderHolders: Map<string, ProxyHeaderHolder>;
};

export function removeSession(maps: SessionMaps, sessionId: string): void {
maps.webAppTransports.delete(sessionId);
maps.serverTransports.delete(sessionId);
maps.sessionHeaderHolders.delete(sessionId);
}

/** Ignore "Not connected" when the browser SSE stream has already closed. */
export function sendToClientSafe(
transport: Transport,
message: Parameters<Transport["send"]>[0],
): void {
transport.send(message).catch((error: unknown) => {
if (error instanceof Error && error.message === "Not connected") {
return;
}
console.error("Error from inspector client:", error);
});
}

export function chainOnClose(transport: Transport, onClose: () => void): void {
const previousOnClose = transport.onclose;
transport.onclose = () => {
onClose();
previousOnClose?.();
};
}
2 changes: 1 addition & 1 deletion server/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
"resolveJsonModule": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "packages", "**/*.spec.ts"]
"exclude": ["node_modules", "packages", "**/*.spec.ts", "**/*.test.ts"]
}