From 62f96aa421d0a1af6bc02ae722bcdc4642f241bb Mon Sep 17 00:00:00 2001 From: Pedro Santos Date: Fri, 29 May 2026 14:24:14 +0000 Subject: [PATCH 1/3] Fix MCP JSON-RPC request ids --- .changeset/fix-mcp-json-rpc-request-ids.md | 5 + .../src/unstable/rpc/RpcSerialization.ts | 133 ++++++++++++------ .../effect/test/rpc/RpcSerialization.test.ts | 75 ++++++++-- .../effect/test/unstable/ai/McpServer.test.ts | 83 ++++++++++- 4 files changed, 245 insertions(+), 51 deletions(-) create mode 100644 .changeset/fix-mcp-json-rpc-request-ids.md diff --git a/.changeset/fix-mcp-json-rpc-request-ids.md b/.changeset/fix-mcp-json-rpc-request-ids.md new file mode 100644 index 0000000000..b83f38ca1e --- /dev/null +++ b/.changeset/fix-mcp-json-rpc-request-ids.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +Fix JSON-RPC serialization to preserve arbitrary request ids for MCP HTTP clients. diff --git a/packages/effect/src/unstable/rpc/RpcSerialization.ts b/packages/effect/src/unstable/rpc/RpcSerialization.ts index 55d9e442e9..af1111dd8f 100644 --- a/packages/effect/src/unstable/rpc/RpcSerialization.ts +++ b/packages/effect/src/unstable/rpc/RpcSerialization.ts @@ -154,19 +154,16 @@ export const jsonRpc = (options?: { includesFraming: false, makeUnsafe: () => { const decoder = new TextDecoder() - const batches = new Map - }>() + const state = makeJsonRpcState() return { decode: (bytes) => { const decoded: JsonRpcMessage | Array = JSON.parse( typeof bytes === "string" ? bytes : decoder.decode(bytes) ) - return decodeJsonRpcRaw(decoded, batches) + return decodeJsonRpcRaw(decoded, state) }, encode: (response) => { - const encoded = encodeJsonRpcResponse(response as any, batches) + const encoded = encodeJsonRpcResponse(response as any, state) return encoded && JSON.stringify(encoded) } } @@ -188,10 +185,7 @@ export const ndJsonRpc = (options?: { includesFraming: true, makeUnsafe: () => { const parser = ndjson.makeUnsafe() - const batches = new Map - }>() + const state = makeJsonRpcState() return ({ decode: (bytes) => { const frames = parser.decode(bytes) @@ -199,24 +193,38 @@ export const ndJsonRpc = (options?: { const messages: Array = [] for (let i = 0; i < frames.length; i++) { const frame = frames[i] - messages.push(...decodeJsonRpcRaw(frame as any, batches) as any) + messages.push(...decodeJsonRpcRaw(frame as any, state) as any) } return messages }, encode: (response) => { - const encoded = encodeJsonRpcResponse(response as any, batches) + const encoded = encodeJsonRpcResponse(response as any, state) return encoded && parser.encode(encoded) } }) } }) -function decodeJsonRpcRaw( - decoded: JsonRpcMessage | Array, - batches: Map }> + readonly requestIds: Map + nextRequestId: bigint +} + +const makeJsonRpcState = (): JsonRpcState => ({ + batches: new Map(), + requestIds: new Map(), + nextRequestId: BigInt(-1) +}) + +function decodeJsonRpcRaw( + decoded: JsonRpcMessage | Array, + state: JsonRpcState ) { if (Array.isArray(decoded)) { const batch = { @@ -225,19 +233,22 @@ function decodeJsonRpcRaw( } const messages: Array = [] for (let i = 0; i < decoded.length; i++) { - const message = decodeJsonRpcMessage(decoded[i]) + const message = decodeJsonRpcMessage(decoded[i], state) messages.push(message) if (message._tag === "Request") { batch.size++ - batches.set(message.id, batch) + state.batches.set(message.id, batch) } } return messages } - return [decodeJsonRpcMessage(decoded)] + return [decodeJsonRpcMessage(decoded, state)] } -function decodeJsonRpcMessage(decoded: JsonRpcMessage): RpcMessage.FromClientEncoded | RpcMessage.FromServerEncoded { +function decodeJsonRpcMessage( + decoded: JsonRpcMessage, + state: JsonRpcState +): RpcMessage.FromClientEncoded | RpcMessage.FromServerEncoded { if ("method" in decoded) { if (Predicate.isNullish(decoded.id) && decoded.method.startsWith("@effect/rpc/")) { const tag = decoded.method.slice("@effect/rpc/".length) as @@ -251,9 +262,10 @@ function decodeJsonRpcMessage(decoded: JsonRpcMessage): RpcMessage.FromClientEnc } as any : { _tag: tag } as any } + const id = decodeJsonRpcRequestId(decoded, state) return { _tag: "Request", - id: Predicate.isNotNullish(decoded.id) ? String(decoded.id) : "", + id, tag: decoded.method, payload: decoded.params ?? null, headers: decoded.headers ?? [], @@ -297,26 +309,64 @@ function decodeJsonRpcMessage(decoded: JsonRpcMessage): RpcMessage.FromClientEnc } } +const canonicalIntegerString = /^(?:0|-?[1-9]\d*)$/ + +function decodeJsonRpcRequestId(decoded: JsonRpcRequest, state: JsonRpcState): string { + const jsonRpcId = Predicate.hasProperty(decoded, "id") ? decoded.id : undefined + let requestId: string | undefined + if (typeof jsonRpcId === "number" && Number.isSafeInteger(jsonRpcId)) { + requestId = String(jsonRpcId) + } else if (typeof jsonRpcId === "string" && canonicalIntegerString.test(jsonRpcId)) { + requestId = jsonRpcId + } + if (requestId === undefined || state.requestIds.has(requestId)) { + requestId = nextJsonRpcRequestId(state) + } + state.requestIds.set(requestId, jsonRpcId) + return requestId +} + +function nextJsonRpcRequestId(state: JsonRpcState): string { + let requestId: string + do { + requestId = state.nextRequestId.toString() + state.nextRequestId-- + } while (state.requestIds.has(requestId)) + return requestId +} + +function encodeJsonRpcResponseId( + requestId: string, + state: JsonRpcState, + isExit: boolean +): JsonRpcRequestId { + if (state.requestIds.has(requestId)) { + const jsonRpcId = state.requestIds.get(requestId) + if (isExit) { + state.requestIds.delete(requestId) + } + return jsonRpcId + } + return requestId !== "" ? Number(requestId) : undefined +} + function encodeJsonRpcRaw( response: RpcMessage.FromServerEncoded | RpcMessage.FromClientEncoded, - batches: Map - }> + state: JsonRpcState ) { if (!("requestId" in response)) { - return encodeJsonRpcMessage(response) + return encodeJsonRpcMessage(response, state) } - const batch = batches.get(response.requestId) + const batch = state.batches.get(response.requestId) if (batch) { - batches.delete(response.requestId) + state.batches.delete(response.requestId) batch.responses.set(response.requestId, response as any) if (batch.size === batch.responses.size) { - return Array.from(batch.responses.values(), encodeJsonRpcMessage) + return Array.from(batch.responses.values(), (message) => encodeJsonRpcMessage(message, state)) } return undefined } - return encodeJsonRpcMessage(response) + return encodeJsonRpcMessage(response, state) } function encodeJsonRpcResponse( @@ -324,20 +374,17 @@ function encodeJsonRpcResponse( | RpcMessage.FromServerEncoded | RpcMessage.FromClientEncoded | Array, - batches: Map - }> + state: JsonRpcState ) { if (Array.isArray(response) === false) { - return encodeJsonRpcRaw(response, batches) + return encodeJsonRpcRaw(response, state) } if (response.length === 0) { return undefined } const encoded: Array> = [] for (let i = 0; i < response.length; i++) { - const current = encodeJsonRpcRaw(response[i], batches) + const current = encodeJsonRpcRaw(response[i], state) if (current !== undefined) { encoded.push(current) } @@ -360,7 +407,10 @@ function encodeJsonRpcResponse( return messages } -function encodeJsonRpcMessage(response: RpcMessage.FromServerEncoded | RpcMessage.FromClientEncoded): JsonRpcMessage { +function encodeJsonRpcMessage( + response: RpcMessage.FromServerEncoded | RpcMessage.FromClientEncoded, + state: JsonRpcState +): JsonRpcMessage { switch (response._tag) { case "Request": return { @@ -383,25 +433,28 @@ function encodeJsonRpcMessage(response: RpcMessage.FromServerEncoded | RpcMessag method: `@effect/rpc/${response._tag}`, params: "requestId" in response ? { requestId: response.requestId } : undefined } - case "Chunk": + case "Chunk": { + const chunkId = encodeJsonRpcResponseId(response.requestId, state, false) return { jsonrpc: "2.0", chunk: true, - id: Number(response.requestId), + ...(chunkId === undefined ? {} : { id: chunkId }), result: response.values } + } case "Exit": { + const id = encodeJsonRpcResponseId(response.requestId, state, true) if (response.exit._tag === "Success") { return { jsonrpc: "2.0", - id: response.requestId !== "" ? Number(response.requestId) : undefined, + id, result: response.exit.value } as any } const error = response.exit.cause.find((failure) => failure._tag === "Fail") return { jsonrpc: "2.0", - id: response.requestId !== "" ? Number(response.requestId) : undefined, + id, error: response.exit._tag === "Failure" ? { _tag: "Cause", diff --git a/packages/effect/test/rpc/RpcSerialization.test.ts b/packages/effect/test/rpc/RpcSerialization.test.ts index 2572b7f8c6..2e0a31b89a 100644 --- a/packages/effect/test/rpc/RpcSerialization.test.ts +++ b/packages/effect/test/rpc/RpcSerialization.test.ts @@ -97,28 +97,87 @@ describe("RpcSerialization", () => { ) }) - it("jsonRpc maps null id to internal notification sentinel", () => { + it("jsonRpc preserves arbitrary string ids across decode and response encode", () => { + const parser = RpcSerialization.jsonRpc().makeUnsafe() + const decoded = parser.decode("{\"jsonrpc\":\"2.0\",\"id\":\"14f40ee1b859ee70\",\"method\":\"users.get\"}") + assert.strictEqual(decoded.length, 1) + const request = decoded[0] as any + assert.strictEqual(request._tag, "Request") + + const encoded = parser.encode(responseExitSuccess(request.id, "ok")) + assert(encoded !== undefined) + assert.deepStrictEqual(JSON.parse(encoded as string), { + jsonrpc: "2.0", + id: "14f40ee1b859ee70", + result: "ok" + }) + }) + + it("jsonRpc preserves null id across decode and response encode", () => { const parser = RpcSerialization.jsonRpc().makeUnsafe() const decoded = parser.decode("{\"jsonrpc\":\"2.0\",\"id\":null,\"method\":\"users.get\"}") + assert.strictEqual(decoded.length, 1) + const request = decoded[0] as any + assert.strictEqual(request._tag, "Request") + + const encoded = parser.encode(responseExitSuccess(request.id, "ok")) + assert(encoded !== undefined) + assert.deepStrictEqual(JSON.parse(encoded as string), { + jsonrpc: "2.0", + id: null, + result: "ok" + }) + }) + + it("jsonRpc preserves omitted id across decode and response encode", () => { + const parser = RpcSerialization.jsonRpc().makeUnsafe() + const decoded = parser.decode("{\"jsonrpc\":\"2.0\",\"method\":\"users.get\"}") + assert.strictEqual(decoded.length, 1) + const request = decoded[0] as any + assert.strictEqual(request._tag, "Request") + + const encoded = parser.encode(responseExitSuccess(request.id, "ok")) + assert(encoded !== undefined) + assert.deepStrictEqual(JSON.parse(encoded as string), { + jsonrpc: "2.0", + result: "ok" + }) + }) + + it("jsonRpc preserves numeric string ids across decode and response encode", () => { + const parser = RpcSerialization.jsonRpc().makeUnsafe() + const decoded = parser.decode("{\"jsonrpc\":\"2.0\",\"id\":\"1\",\"method\":\"users.get\"}") assert.deepStrictEqual(decoded, [{ _tag: "Request", - id: "", + id: "1", tag: "users.get", payload: null, headers: [] }]) + + const encoded = parser.encode(responseExitSuccess("1", "ok")) + assert(encoded !== undefined) + assert.deepStrictEqual(JSON.parse(encoded as string), { + jsonrpc: "2.0", + id: "1", + result: "ok" + }) }) it("jsonRpc preserves empty string id across decode and encode", () => { const parser = RpcSerialization.jsonRpc().makeUnsafe() const decoded = parser.decode("{\"jsonrpc\":\"2.0\",\"id\":\"\",\"method\":\"users.get\"}") - assert.deepStrictEqual(decoded, [{ - _tag: "Request", + assert.strictEqual(decoded.length, 1) + const request = decoded[0] as any + assert.strictEqual(request._tag, "Request") + + const response = parser.encode(responseExitSuccess(request.id, "ok")) + assert(response !== undefined) + assert.deepStrictEqual(JSON.parse(response as string), { + jsonrpc: "2.0", id: "", - tag: "users.get", - payload: null, - headers: [] - }]) + result: "ok" + }) const encoded = parser.encode({ _tag: "Request", diff --git a/packages/effect/test/unstable/ai/McpServer.test.ts b/packages/effect/test/unstable/ai/McpServer.test.ts index 3f3edf20ac..a92041e982 100644 --- a/packages/effect/test/unstable/ai/McpServer.test.ts +++ b/packages/effect/test/unstable/ai/McpServer.test.ts @@ -10,9 +10,7 @@ import * as HttpRouter from "effect/unstable/http/HttpRouter" import { RpcSerialization } from "effect/unstable/rpc" import * as RpcClient from "effect/unstable/rpc/RpcClient" -const makeTestClient = Effect.gen(function*() { - const responses: Array = [] - +const makeTestHandler = Effect.gen(function*() { const serverLayer = McpServer.layerHttp({ name: "TestServer", version: "1.0.0", @@ -20,6 +18,34 @@ const makeTestClient = Effect.gen(function*() { }) const { handler, dispose } = HttpRouter.toWebHandler(serverLayer, { disableLogger: true }) yield* Effect.addFinalizer(() => Effect.promise(() => dispose())) + return handler +}) + +const postJson = ( + handler: (request: Request) => Promise, + body: unknown, + sessionId?: string | null | undefined +) => + Effect.promise(() => { + const headers = new Headers({ "content-type": "application/json" }) + if (sessionId) { + headers.set("Mcp-Session-Id", sessionId) + } + return handler( + new Request("http://localhost/mcp", { + method: "POST", + headers, + body: JSON.stringify(body) + }) + ) + }) + +const responseJson = (response: Response) => Effect.promise(() => response.json() as Promise) + +const makeTestClient = Effect.gen(function*() { + const responses: Array = [] + + const handler = yield* makeTestHandler let sessionId: string | null = null const customFetch: typeof fetch = async (input, init) => { @@ -79,4 +105,55 @@ describe("McpServer", () => { strictEqual(response.status, 404) })) + + it.effect("preserves JSON-RPC request ids from raw HTTP clients", () => + Effect.gen(function*() { + const handler = yield* makeTestHandler + + const initializeResponse = yield* postJson(handler, { + jsonrpc: "2.0", + id: "14f40ee1b859ee70", + method: "initialize", + params: { + protocolVersion: "9999-01-01", + capabilities: {}, + clientInfo: { + name: "RawClient", + version: "1.0.0" + } + } + }) + const initializeJson = yield* responseJson(initializeResponse) + strictEqual(initializeResponse.status, 200) + strictEqual(initializeJson.id, "14f40ee1b859ee70") + + const sessionId = initializeResponse.headers.get("Mcp-Session-Id") + strictEqual(typeof sessionId, "string") + + const numberResponse = yield* postJson(handler, { + jsonrpc: "2.0", + id: 123, + method: "ping", + params: {} + }, sessionId) + const numberJson = yield* responseJson(numberResponse) + strictEqual(numberJson.id, 123) + + const nullResponse = yield* postJson(handler, { + jsonrpc: "2.0", + id: null, + method: "ping", + params: {} + }, sessionId) + const nullJson = yield* responseJson(nullResponse) + strictEqual(nullJson.id, null) + + const omittedResponse = yield* postJson(handler, { + jsonrpc: "2.0", + method: "ping", + params: {} + }, sessionId) + const omittedJson = yield* responseJson(omittedResponse) + strictEqual("id" in omittedJson, false) + })) }) From 54f48e4bb8d52d4f672ddf993e2ee49d627940af Mon Sep 17 00:00:00 2001 From: Pedro Santos Date: Fri, 29 May 2026 14:33:31 +0000 Subject: [PATCH 2/3] Fix MCP request id changeset --- .changeset/fix-mcp-json-rpc-request-ids.md | 1 - 1 file changed, 1 deletion(-) diff --git a/.changeset/fix-mcp-json-rpc-request-ids.md b/.changeset/fix-mcp-json-rpc-request-ids.md index b83f38ca1e..4b3c62b66e 100644 --- a/.changeset/fix-mcp-json-rpc-request-ids.md +++ b/.changeset/fix-mcp-json-rpc-request-ids.md @@ -1,5 +1,4 @@ --- -"effect": patch --- Fix JSON-RPC serialization to preserve arbitrary request ids for MCP HTTP clients. From 21ece75c91929ef6f27098984954e4cbcda71307 Mon Sep 17 00:00:00 2001 From: Pedro Santos Date: Fri, 29 May 2026 14:34:40 +0000 Subject: [PATCH 3/3] Restore MCP request id changeset --- .changeset/fix-mcp-json-rpc-request-ids.md | 1 + 1 file changed, 1 insertion(+) diff --git a/.changeset/fix-mcp-json-rpc-request-ids.md b/.changeset/fix-mcp-json-rpc-request-ids.md index 4b3c62b66e..b83f38ca1e 100644 --- a/.changeset/fix-mcp-json-rpc-request-ids.md +++ b/.changeset/fix-mcp-json-rpc-request-ids.md @@ -1,4 +1,5 @@ --- +"effect": patch --- Fix JSON-RPC serialization to preserve arbitrary request ids for MCP HTTP clients.