Skip to content

Commit 16fbef6

Browse files
committed
fix: JSONRPCDispatcher.run cancels in-flight handlers on read-stream EOF
Restores the Server.run() behaviour the dispatcher rework dropped: at read-stream EOF the task group cancels in-flight handler tasks instead of joining on them. Without this, a handler that outlives its caller (its request timed out client-side, or the client disconnected mid-call) keeps run() from returning forever, leaking the handler task and over SSE the GET request that hosts the session. Regression test parks a handler in sleep_forever(), EOFs the read stream, asserts run() returns within fail_after(5). Confirmed to hang on the unpatched code.
1 parent ca0c67b commit 16fbef6

2 files changed

Lines changed: 70 additions & 18 deletions

File tree

src/mcp/shared/jsonrpc_dispatcher.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -356,24 +356,34 @@ async def run(
356356
self._tg = tg
357357
self._running = True
358358
task_status.started()
359-
async with self._read_stream, self._write_stream:
360-
try:
361-
async for item in self._read_stream:
362-
# Duck-typed: `_context_streams.ContextReceiveStream`
363-
# exposes `.last_context` (the sender's contextvars
364-
# snapshot per message). Plain memory streams don't.
365-
sender_ctx: contextvars.Context | None = getattr(self._read_stream, "last_context", None)
366-
self._dispatch(item, on_request, on_notify, sender_ctx)
367-
except anyio.ClosedResourceError:
368-
# The transport closed our receive end and we looped back
369-
# to `__anext__` on the now-closed stream (stateless SHTTP
370-
# teardown). Same as EOF.
371-
logger.debug("read stream closed by transport; treating as EOF")
372-
# Read stream EOF: wake any blocked `send_raw_request` waiters now,
373-
# *before* the task group joins, so handlers parked in
374-
# `dctx.send_raw_request()` can unwind and the join doesn't deadlock.
375-
self._running = False
376-
self._fan_out_closed()
359+
try:
360+
async with self._read_stream, self._write_stream:
361+
try:
362+
async for item in self._read_stream:
363+
# Duck-typed: `_context_streams.ContextReceiveStream`
364+
# exposes `.last_context` (the sender's contextvars
365+
# snapshot per message). Plain memory streams don't.
366+
sender_ctx: contextvars.Context | None = getattr(
367+
self._read_stream, "last_context", None
368+
)
369+
self._dispatch(item, on_request, on_notify, sender_ctx)
370+
except anyio.ClosedResourceError:
371+
# The transport closed our receive end and we looped
372+
# back to `__anext__` on the now-closed stream
373+
# (stateless SHTTP teardown). Same as EOF.
374+
logger.debug("read stream closed by transport; treating as EOF")
375+
# Read stream EOF: wake any blocked `send_raw_request` waiters
376+
# (callers outside this task group) with CONNECTION_CLOSED.
377+
self._running = False
378+
self._fan_out_closed()
379+
finally:
380+
# Transport closed: cancel in-flight handlers. Without this
381+
# the task-group join waits for them, and a handler that
382+
# outlives its caller (its request timed out client-side, or
383+
# the client disconnected mid-call) would keep `run()` from
384+
# returning forever. Same behaviour as `Server.run()` before
385+
# the dispatcher rework.
386+
tg.cancel_scope.cancel()
377387
finally:
378388
# Covers the cancel/crash paths where the inline fan-out above is
379389
# never reached. Idempotent.

tests/shared/test_jsonrpc_dispatcher.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,48 @@ async def test_run_returns_cleanly_when_read_stream_receive_end_is_closed():
196196
s.close()
197197

198198

199+
@pytest.mark.anyio
200+
async def test_run_cancels_in_flight_handlers_when_read_stream_eofs():
201+
"""A handler that outlives its caller must not keep run() from returning.
202+
203+
Without the cancel-at-EOF, the task-group join would wait on this handler
204+
forever (over SSE that leaks the handler task and the GET request hosting
205+
the session).
206+
"""
207+
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
208+
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
209+
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
210+
handler_started = anyio.Event()
211+
handler_cancelled = anyio.Event()
212+
213+
async def park(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:
214+
handler_started.set()
215+
try:
216+
await anyio.sleep_forever()
217+
finally:
218+
handler_cancelled.set()
219+
raise NotImplementedError
220+
221+
async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None:
222+
raise NotImplementedError
223+
224+
run_returned = anyio.Event()
225+
226+
async def drive() -> None:
227+
await server.run(park, on_notify)
228+
run_returned.set()
229+
230+
async with anyio.create_task_group() as tg:
231+
tg.start_soon(drive)
232+
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=1, method="x", params=None)))
233+
with anyio.fail_after(5):
234+
await handler_started.wait()
235+
c2s_send.close() # EOF the read side; run() must cancel the parked handler
236+
await run_returned.wait()
237+
assert handler_cancelled.is_set()
238+
s2c_recv.close()
239+
240+
199241
@pytest.mark.anyio
200242
async def test_run_closes_write_stream_on_exit():
201243
"""run() enters both streams; the write end is released on EOF."""

0 commit comments

Comments
 (0)