Skip to content

Commit a27bca7

Browse files
committed
Harden ClientSession/dispatcher lifecycle and error handling
DirectDispatcher now mirrors JSONRPCDispatcher's lifecycle so the public ClientSession(dispatcher=...) path honors the documented contracts: send_raw_request raises RuntimeError before run() and MCPError(CONNECTION_CLOSED) after close, inbound dispatch to a closed side fails the peer's request instead of serving the exited session's callbacks, and the ready-wait sits inside fail_after so a request timeout bounds waiting on a peer whose run() never started. ClientSession._on_notify self-contains a raising logging_callback / message_handler (matching ServerRunner._on_notify) so a callback that throws over DirectDispatcher costs only that delivery instead of failing the in-process peer's notify() call. JSONRPCDispatcher.notify is now fire-and-forget all the way: a post- close send or a write onto a torn-down transport drops the notification with a debug log instead of leaking raw anyio ClosedResourceError/BrokenResourceError. DirectDispatcher.notify matches. JSONRPCDispatcher.send_raw_request: a builtin TimeoutError raised by a custom transport's bounded send() now propagates raw instead of being mislabelled REQUEST_TIMEOUT (the timeout-conversion arm only fires once the response fail_after has armed). request_write_started is set before the write so a delivered-but-cancelled request still triggers the courtesy notifications/cancelled, and _handle_request skips the shutdown CONNECTION_CLOSED answer once an answer write has started, preferring possibly-zero responses over possibly-two for one id. request_read_timeout_seconds=0.0 is now honored as fail-immediately on both ClientSession.send_request and ServerSession.send_request (was silently treated as unset via or-fallback). None remains the only sentinel that defers to the session-level timeout.
1 parent 32b76cf commit a27bca7

9 files changed

Lines changed: 657 additions & 64 deletions

File tree

src/mcp/client/session.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,11 @@ async def send_request(
226226
data = request.model_dump(by_alias=True, mode="json", exclude_none=True)
227227
method: str = data["method"]
228228
opts: CallOptions = {}
229-
timeout = request_read_timeout_seconds or self._session_read_timeout_seconds
229+
timeout = (
230+
request_read_timeout_seconds
231+
if request_read_timeout_seconds is not None
232+
else self._session_read_timeout_seconds
233+
)
230234
if timeout is not None:
231235
opts["timeout"] = timeout
232236
if progress_callback is not None:
@@ -243,7 +247,11 @@ async def send_request(
243247
return result_type.model_validate(raw, by_name=False)
244248

245249
async def send_notification(self, notification: types.ClientNotification) -> None:
246-
"""Send a one-way notification. Usable before entering the context manager."""
250+
"""Send a one-way notification. Usable before entering the context manager.
251+
252+
Fire-and-forget: after the connection has closed, the notification is
253+
dropped with a debug log instead of raising.
254+
"""
247255
data = notification.model_dump(by_alias=True, mode="json", exclude_none=True)
248256
await self._dispatcher.notify(data["method"], data.get("params"))
249257

@@ -541,9 +549,16 @@ async def _on_notify(
541549
if isinstance(notification, types.CancelledNotification):
542550
# The dispatcher already applied the cancellation; not surfaced to message_handler.
543551
return
544-
if isinstance(notification, types.LoggingMessageNotification):
545-
await self._logging_callback(notification.params)
546-
await self._message_handler(notification)
552+
try:
553+
if isinstance(notification, types.LoggingMessageNotification):
554+
await self._logging_callback(notification.params)
555+
await self._message_handler(notification)
556+
except Exception:
557+
# Contain here, not in the dispatcher: DirectDispatcher awaits this
558+
# handler inline in the peer's notify() call, so a raising callback
559+
# would otherwise fail the peer's send. A raising logging_callback
560+
# skips the message_handler tee for that notification (v1 parity).
561+
logger.exception("notification callback for %r raised", method)
547562

548563
async def _on_stream_exception(self, exc: Exception) -> None:
549564
"""Deliver a transport-level fault to message_handler via a spawned task.

src/mcp/server/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async def send_request(
8484
"""
8585
data = request.model_dump(by_alias=True, mode="json", exclude_none=True)
8686
opts: CallOptions = {}
87-
if request_read_timeout_seconds:
87+
if request_read_timeout_seconds is not None:
8888
opts["timeout"] = request_read_timeout_seconds
8989
if progress_callback is not None:
9090
opts["on_progress"] = progress_callback

src/mcp/shared/direct_dispatcher.py

Lines changed: 89 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from __future__ import annotations
1717

18+
import logging
1819
from collections.abc import Awaitable, Callable, Mapping
1920
from dataclasses import dataclass, field
2021
from typing import Any
@@ -27,7 +28,9 @@
2728
from mcp.shared.exceptions import MCPError, NoBackChannelError
2829
from mcp.shared.message import MessageMetadata
2930
from mcp.shared.transport_context import TransportContext
30-
from mcp.types import INTERNAL_ERROR, INVALID_PARAMS, REQUEST_TIMEOUT, RequestId
31+
from mcp.types import CONNECTION_CLOSED, INTERNAL_ERROR, INVALID_PARAMS, REQUEST_TIMEOUT, RequestId
32+
33+
logger = logging.getLogger(__name__)
3134

3235
__all__ = ["DirectDispatcher", "create_direct_dispatcher_pair"]
3336

@@ -84,6 +87,13 @@ class DirectDispatcher:
8487
Two instances are wired together with `create_direct_dispatcher_pair`; each
8588
holds a reference to the other. `send_raw_request` on one awaits the peer's
8689
`on_request`. `run` parks until `close` is called.
90+
91+
Lifecycle mirrors `JSONRPCDispatcher`: `send_raw_request` requires `run()`
92+
to have started, and once a side has closed - via `close()` or `run()`
93+
ending - `send_raw_request` raises `MCPError` (`CONNECTION_CLOSED`) and
94+
inbound requests fail the peer's call the same way instead of invoking the
95+
handler. Notifications are fire-and-forget in both directions: after close
96+
they are silently dropped.
8797
"""
8898

8999
def __init__(self, transport_ctx: TransportContext):
@@ -93,7 +103,9 @@ def __init__(self, transport_ctx: TransportContext):
93103
self._on_notify: OnNotify | None = None
94104
self._next_id = 0
95105
self._ready = anyio.Event()
96-
self._closed = anyio.Event()
106+
self._close_event = anyio.Event()
107+
self._running = False
108+
self._closed = False
97109

98110
def connect_to(self, peer: DirectDispatcher) -> None:
99111
self._peer = peer
@@ -104,13 +116,35 @@ async def send_raw_request(
104116
params: Mapping[str, Any] | None,
105117
opts: CallOptions | None = None,
106118
) -> dict[str, Any]:
119+
"""Send a request by invoking the peer's `on_request` directly.
120+
121+
Raises:
122+
MCPError: The peer's handler raised; `REQUEST_TIMEOUT` if
123+
`opts["timeout"]` elapsed; `CONNECTION_CLOSED` if either
124+
side has closed.
125+
RuntimeError: Called before `run()`.
126+
"""
107127
if self._peer is None:
108128
raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
129+
# Post-close sends get the same CONNECTION_CLOSED contract as JSONRPCDispatcher.
130+
if self._closed:
131+
raise MCPError(code=CONNECTION_CLOSED, message="Connection closed")
132+
if not self._running:
133+
raise RuntimeError("DirectDispatcher.send_raw_request called before run()")
109134
return await self._peer._dispatch_request(method, params, opts)
110135

111136
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
137+
"""Send a notification by invoking the peer's `on_notify` directly.
138+
139+
Fire-and-forget: usable before `run()` (delivery waits for the peer to
140+
start), and after close it is silently dropped, matching
141+
`JSONRPCDispatcher.notify`.
142+
"""
112143
if self._peer is None:
113144
raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
145+
if self._closed:
146+
logger.debug("dropped notification %r on closed DirectDispatcher", method)
147+
return
114148
await self._peer._dispatch_notify(method, params)
115149

116150
async def run(
@@ -120,14 +154,29 @@ async def run(
120154
*,
121155
task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
122156
) -> None:
123-
self._on_request = on_request
124-
self._on_notify = on_notify
125-
self._ready.set()
126-
task_status.started()
127-
await self._closed.wait()
157+
"""Mark this side ready and park until `close()` is called.
158+
159+
Single-shot, like `JSONRPCDispatcher.run`: once it returns the
160+
dispatcher stays closed and cannot be restarted.
161+
"""
162+
try:
163+
self._on_request = on_request
164+
self._on_notify = on_notify
165+
self._running = True
166+
self._ready.set()
167+
task_status.started()
168+
await self._close_event.wait()
169+
finally:
170+
self._running = False
171+
self._closed = True
172+
# run() may end via cancellation without close() ever being
173+
# called; setting the event wakes `_wait_ready` waiters so they
174+
# observe the closed state instead of parking forever.
175+
self._close_event.set()
128176

129177
def close(self) -> None:
130-
self._closed.set()
178+
self._closed = True
179+
self._close_event.set()
131180

132181
def _make_context(
133182
self, on_progress: ProgressFnT | None = None, request_id: RequestId | None = None
@@ -142,20 +191,40 @@ def _make_context(
142191
_on_progress=on_progress,
143192
)
144193

194+
async def _wait_ready(self) -> None:
195+
"""Park until `run()` has started, waking early if this side closes.
196+
197+
Raises:
198+
MCPError: `CONNECTION_CLOSED` if this side has closed.
199+
"""
200+
if not self._ready.is_set() and not self._close_event.is_set():
201+
async with anyio.create_task_group() as tg:
202+
203+
async def wake_on(event: anyio.Event) -> None:
204+
await event.wait()
205+
tg.cancel_scope.cancel()
206+
207+
tg.start_soon(wake_on, self._ready)
208+
tg.start_soon(wake_on, self._close_event)
209+
if self._closed:
210+
raise MCPError(code=CONNECTION_CLOSED, message="Connection closed")
211+
145212
async def _dispatch_request(
146213
self,
147214
method: str,
148215
params: Mapping[str, Any] | None,
149216
opts: CallOptions | None,
150217
) -> dict[str, Any]:
151-
await self._ready.wait()
152-
assert self._on_request is not None
153218
opts = opts or {}
154-
# Synthesize an id: the DispatchContext contract reserves None for notifications.
155-
self._next_id += 1
156-
dctx = self._make_context(on_progress=opts.get("on_progress"), request_id=self._next_id)
157219
try:
158220
with anyio.fail_after(opts.get("timeout")):
221+
# Inside the timeout scope, so a configured timeout also bounds
222+
# waiting on a peer whose run() has not started yet.
223+
await self._wait_ready()
224+
assert self._on_request is not None
225+
# Synthesize an id: the DispatchContext contract reserves None for notifications.
226+
self._next_id += 1
227+
dctx = self._make_context(on_progress=opts.get("on_progress"), request_id=self._next_id)
159228
try:
160229
return await self._on_request(dctx, method, params)
161230
except MCPError:
@@ -173,7 +242,13 @@ async def _dispatch_request(
173242
) from None
174243

175244
async def _dispatch_notify(self, method: str, params: Mapping[str, Any] | None) -> None:
176-
await self._ready.wait()
245+
try:
246+
await self._wait_ready()
247+
except MCPError:
248+
# Notifications are fire-and-forget: a notify to a closed peer is
249+
# dropped, not raised back into the sender's call.
250+
logger.debug("dropped notification %r to closed DirectDispatcher", method)
251+
return
177252
assert self._on_notify is not None
178253
dctx = self._make_context()
179254
await self._on_notify(dctx, method, params)

src/mcp/shared/jsonrpc_dispatcher.py

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import anyio
1818
import anyio.abc
19+
import anyio.lowlevel
1920
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
2021
from opentelemetry.trace import SpanKind
2122
from pydantic import ValidationError
@@ -296,9 +297,13 @@ async def send_raw_request(
296297
self._pending[request_id] = pending
297298

298299
plan = _plan_outbound(_related_request_id, opts)
299-
# Spec MUST: only previously-issued requests may be cancelled, so the
300-
# courtesy cancel arms only once the request write completes.
301-
request_written = False
300+
# Spec MUST: only previously-issued requests may be cancelled. A write
301+
# interrupted by cancellation may still have delivered (a memory-stream
302+
# send can hand its item to the receiver and still raise), so a started
303+
# write counts as issued: the peer ignores a cancel for an id it never
304+
# saw, while skipping it would leak a delivered request's handler.
305+
request_write_started = False
306+
timeout_armed = False
302307

303308
target = out_params.get("name")
304309
span_name = f"MCP send {method}{f' {target}' if isinstance(target, str) else ''}"
@@ -313,18 +318,28 @@ async def send_raw_request(
313318
# SEP-414: inject W3C trace context; `_meta` stays on the wire even with a no-op tracer.
314319
inject_trace_context(out_meta)
315320
msg = JSONRPCRequest(jsonrpc="2.0", id=request_id, method=method, params=out_params)
321+
# Surface a pre-existing cancellation while the request provably
322+
# never started; past this point a cancelled write counts as issued.
323+
await anyio.lowlevel.checkpoint_if_cancelled()
324+
request_write_started = True
316325
try:
317326
await self._write(msg, plan.metadata)
318327
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
319328
# Transport tore down before run() noticed EOF; surface the documented contract.
320329
raise MCPError(code=CONNECTION_CLOSED, message="Connection closed") from None
321-
request_written = True
322330
with anyio.fail_after(opts.get("timeout")):
331+
timeout_armed = True
323332
outcome = await receive.receive()
324333
except TimeoutError:
334+
if not timeout_armed:
335+
# `fail_after` arms only after the write, so this TimeoutError is the
336+
# transport's own bounded send() failing - a transport error, not
337+
# `opts["timeout"]` elapsing. Propagate it raw (v1 kept the write
338+
# outside the timeout-catching try and did the same).
339+
raise
325340
# Courtesy cancel (spec-recommended, new vs v1) so the peer stops work;
326341
# unshielded so an outer caller cancellation can still interrupt the write.
327-
if plan.cancel_on_abandon and request_written:
342+
if plan.cancel_on_abandon:
328343
await self._final_write(
329344
partial(
330345
self._cancel_outbound,
@@ -340,7 +355,7 @@ async def send_raw_request(
340355
except anyio.get_cancelled_exc_class():
341356
# Caller cancelled: bare awaits re-raise here, so the shielded helper
342357
# lets the courtesy cancel go out before we propagate.
343-
if plan.cancel_on_abandon and request_written:
358+
if plan.cancel_on_abandon and request_write_started:
344359
await self._final_write(
345360
partial(self._cancel_outbound, request_id, "caller cancelled", _related_request_id),
346361
shield=True,
@@ -365,13 +380,26 @@ async def notify(
365380
*,
366381
_related_request_id: RequestId | None = None,
367382
) -> None:
383+
"""Send a fire-and-forget notification.
384+
385+
Fire-and-forget all the way: a post-close send or a write onto a
386+
torn-down transport drops the notification with a debug log instead
387+
of raising (same policy as the response writes and `ctx.notify`).
388+
"""
389+
if self._closed:
390+
logger.debug("dropped %s: dispatcher closed", method)
391+
return
368392
# Leave `params` unset when None: with `exclude_unset=True` an explicit
369393
# None would serialize as `"params": null`, which JSON-RPC 2.0 forbids.
370394
if params is not None:
371395
msg = JSONRPCNotification(jsonrpc="2.0", method=method, params=dict(params))
372396
else:
373397
msg = JSONRPCNotification(jsonrpc="2.0", method=method)
374-
await self._write(msg, _plan_outbound(_related_request_id, None).metadata)
398+
try:
399+
await self._write(msg, _plan_outbound(_related_request_id, None).metadata)
400+
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
401+
# Transport tore down before run() noticed EOF.
402+
logger.debug("dropped %s: write stream closed", method)
375403

376404
async def run(
377405
self,
@@ -613,6 +641,7 @@ async def _handle_request(
613641
614642
The single exception-to-wire boundary: handler exceptions become `JSONRPCError` here.
615643
"""
644+
answer_write_started = False
616645
try:
617646
with scope:
618647
try:
@@ -625,23 +654,33 @@ async def _handle_request(
625654
key = _coerce_id(req.id)
626655
if (entry := self._in_flight.get(key)) is not None and entry.dctx is dctx:
627656
del self._in_flight[key]
657+
# A write interrupted by cancellation may still have delivered
658+
# (a memory-stream send can hand its item to the receiver and
659+
# still raise), so a started answer write counts as sent below:
660+
# peers drop late responses, while a second answer for one id
661+
# would break JSON-RPC.
662+
answer_write_started = True
628663
await self._write_result(req.id, result)
629664
if scope.cancelled_caught:
630665
# anyio absorbs the scope's own cancel at __exit__, and
631666
# `cancelled_caught` (unlike `cancel_called`) guarantees the
632667
# result write above did not happen - no double response.
633668
# TODO(maxisbey): spec says SHOULD NOT respond after cancel;
634669
# the existing server always has, so match that for now.
670+
answer_write_started = True
635671
await self._write_error(req.id, ErrorData(code=0, message="Request cancelled"))
636672
except anyio.get_cancelled_exc_class():
637-
# Shutdown: answer the request so the peer isn't left waiting; the
638-
# shielded helper is needed because bare awaits re-raise here.
639-
await self._final_write(
640-
partial(self._write_error, req.id, ErrorData(code=CONNECTION_CLOSED, message="Connection closed")),
641-
shield=True,
642-
timeout=_SHUTDOWN_WRITE_TIMEOUT,
643-
describe=f"shutdown error response for request {req.id!r}",
644-
)
673+
# Shutdown: answer the request so the peer isn't left waiting - unless
674+
# an answer write already started (it may have reached the transport;
675+
# prefer possibly-zero answers over possibly-two). The shielded helper
676+
# is needed because bare awaits re-raise here.
677+
if not answer_write_started:
678+
await self._final_write(
679+
partial(self._write_error, req.id, ErrorData(code=CONNECTION_CLOSED, message="Connection closed")),
680+
shield=True,
681+
timeout=_SHUTDOWN_WRITE_TIMEOUT,
682+
describe=f"shutdown error response for request {req.id!r}",
683+
)
645684
raise
646685
except MCPError as e:
647686
await self._write_error(req.id, e.error)
@@ -701,11 +740,9 @@ async def _final_write(
701740
async def _cancel_outbound(self, request_id: RequestId, reason: str, related_request_id: RequestId | None) -> None:
702741
# Thread `related_request_id` so streamable HTTP routes the cancel onto
703742
# the request's own SSE stream instead of a possibly-absent GET stream.
704-
try:
705-
await self.notify(
706-
"notifications/cancelled",
707-
{"requestId": request_id, "reason": reason},
708-
_related_request_id=related_request_id,
709-
)
710-
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
711-
pass
743+
# `notify` swallows connection-state errors itself, so no guard here.
744+
await self.notify(
745+
"notifications/cancelled",
746+
{"requestId": request_id, "reason": reason},
747+
_related_request_id=related_request_id,
748+
)

0 commit comments

Comments
 (0)