Skip to content

Commit e9ee4b4

Browse files
committed
feat: JSONRPCDispatcher.send_raw_request emits CLIENT span and injects _meta
Mirrors BaseSession.send_request: outbound requests are wrapped in an otel CLIENT span and W3C trace context is injected into params._meta (SEP-414). A side effect is that _meta is always present on the wire (empty under a no-op tracer), which the interaction suite's sampling/elicitation snapshots pin. The contract-test echo recorder now strips _meta so JSON-RPC and direct dispatch parametrizations record identically. TODO(maxisbey) marker added: this belongs in an outbound middleware once that seam exists; the dispatcher should not own otel.
1 parent 44574ad commit e9ee4b4

3 files changed

Lines changed: 58 additions & 11 deletions

File tree

src/mcp/shared/jsonrpc_dispatcher.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import anyio
3030
import anyio.abc
3131
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
32+
from opentelemetry.trace import SpanKind
3233
from pydantic import ValidationError
3334

35+
from mcp.shared._otel import inject_trace_context, otel_span
3436
from mcp.shared._stream_protocols import ReadStream, WriteStream
3537
from mcp.shared.dispatcher import CallOptions, Dispatcher, OnNotify, OnRequest, ProgressFnT
3638
from mcp.shared.exceptions import MCPError, NoBackChannelError
@@ -255,17 +257,17 @@ async def send_raw_request(
255257
raise RuntimeError("JSONRPCDispatcher.send_raw_request called before run() / after close")
256258
opts = opts or {}
257259
request_id = self._allocate_id()
258-
out_params = dict(params) if params is not None else None
260+
out_params = dict(params) if params is not None else {}
261+
out_meta = dict(out_params.get("_meta") or {})
259262
on_progress = opts.get("on_progress")
260263
if on_progress is not None:
261264
# The caller wants progress updates. The spec mechanism is: include
262265
# `_meta.progressToken` on the request; the peer echoes that token on
263266
# any `notifications/progress` it sends. We use the request id as the
264267
# token so the receive loop can find this `_Pending.on_progress` by
265268
# `_pending[token]` without a second lookup table.
266-
meta = dict((out_params or {}).get("_meta") or {})
267-
meta["progressToken"] = request_id
268-
out_params = {**(out_params or {}), "_meta": meta}
269+
out_meta["progressToken"] = request_id
270+
out_params["_meta"] = out_meta
269271

270272
# buffer=1: at most one outcome is ever delivered. A `WouldBlock` from
271273
# `_resolve_pending`/`_fan_out_closed` means the waiter already has an
@@ -277,11 +279,26 @@ async def send_raw_request(
277279
self._pending[request_id] = pending
278280

279281
metadata = _outbound_metadata(_related_request_id, opts)
280-
msg = JSONRPCRequest(jsonrpc="2.0", id=request_id, method=method, params=out_params)
282+
target = out_params.get("name")
283+
span_name = f"MCP send {method}{f' {target}' if isinstance(target, str) else ''}"
284+
# TODO(maxisbey): the otel span + inject below mirror
285+
# BaseSession.send_request for parity. They belong in an outbound
286+
# middleware (symmetric with otel_middleware on the inbound side) once
287+
# that seam exists; the dispatcher should not own otel.
281288
try:
282-
await self._write(msg, metadata)
283-
with anyio.fail_after(opts.get("timeout")):
284-
outcome = await receive.receive()
289+
with otel_span(
290+
span_name,
291+
kind=SpanKind.CLIENT,
292+
attributes={"mcp.method.name": method, "jsonrpc.request.id": request_id},
293+
):
294+
# Inject W3C trace context into _meta (SEP-414). With a no-op
295+
# tracer this writes nothing, but `_meta` itself is still
296+
# present on the wire (and the interaction suite pins that).
297+
inject_trace_context(out_meta)
298+
msg = JSONRPCRequest(jsonrpc="2.0", id=request_id, method=method, params=out_params)
299+
await self._write(msg, metadata)
300+
with anyio.fail_after(opts.get("timeout")):
301+
outcome = await receive.receive()
285302
except TimeoutError:
286303
# Spec-recommended courtesy: tell the peer we've given up so it can
287304
# stop work and free resources. v1's BaseSession.send_request does

tests/shared/test_dispatcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ def echo_handlers(recorder: Recorder) -> tuple[OnRequest, OnNotify]:
3434
async def on_request(
3535
ctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
3636
) -> dict[str, Any]:
37-
recorder.requests.append((method, params))
37+
# Strip `_meta` so JSON-RPC and direct dispatch record identically:
38+
# the JSON-RPC outbound path always attaches `_meta` (otel injection).
39+
recorded = {k: v for k, v in (params or {}).items() if k != "_meta"} if params is not None else None
40+
recorder.requests.append((method, recorded))
3841
recorder.contexts.append(ctx)
39-
return {"echoed": method, "params": dict(params or {})}
42+
return {"echoed": method, "params": recorded or {}}
4043

4144
async def on_notify(ctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None:
4245
recorder.notifications.append((method, params))

tests/shared/test_jsonrpc_dispatcher.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import pytest
1515

1616
from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream
17-
from mcp.shared.dispatcher import DispatchContext
17+
from mcp.shared.dispatcher import CallOptions, DispatchContext
1818
from mcp.shared.exceptions import MCPError
1919
from mcp.shared.jsonrpc_dispatcher import ( # pyright: ignore[reportPrivateUsage]
2020
JSONRPCDispatcher,
@@ -399,6 +399,33 @@ async def server_on_request(ctx: DCtx, method: str, params: Mapping[str, Any] |
399399
assert received == [(0.25, None, None)]
400400

401401

402+
@pytest.mark.anyio
403+
async def test_send_raw_request_always_carries_meta_on_the_wire():
404+
"""Outbound requests always include `params._meta` (otel injection per SEP-414).
405+
406+
Caller-supplied `_meta` keys are preserved; the progress token is merged in.
407+
"""
408+
seen: list[Mapping[str, Any] | None] = []
409+
410+
async def server_on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:
411+
seen.append(params)
412+
return {}
413+
414+
async def noop_progress(progress: float, total: float | None, message: str | None) -> None:
415+
raise NotImplementedError
416+
417+
opts: CallOptions = {"on_progress": noop_progress}
418+
async with running_pair(jsonrpc_pair, server_on_request=server_on_request) as (client, *_):
419+
with anyio.fail_after(5):
420+
await client.send_raw_request("a", None)
421+
await client.send_raw_request("b", {"x": 1, "_meta": {"k": "v"}}, opts)
422+
assert seen[0] == {"_meta": {}}
423+
assert seen[1] is not None
424+
assert seen[1]["x"] == 1
425+
assert seen[1]["_meta"]["k"] == "v"
426+
assert "progressToken" in seen[1]["_meta"]
427+
428+
402429
@pytest.mark.anyio
403430
async def test_handler_raising_validation_error_sends_invalid_params():
404431
async def server_on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:

0 commit comments

Comments
 (0)