Skip to content

Commit 013d406

Browse files
committed
fix: JSONRPCDispatcher.run enters both streams; ClosedResourceError is EOF
run() now enters write_stream alongside read_stream so the write end is released when the read loop exits (BaseSession did this; without it every [sse] interaction leg leaks a MemoryObjectSendStream and fails under filterwarnings=error). ClosedResourceError from the read iterator is caught and treated as clean EOF. Stateless SHTTP teardown closes the dispatcher's receive end after the request is handled; the next __anext__ call on the now-closed stream raises, which previously surfaced as 'Stateless session crashed'.
1 parent caa7ca9 commit 013d406

2 files changed

Lines changed: 50 additions & 7 deletions

File tree

src/mcp/shared/jsonrpc_dispatcher.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,19 @@ async def run(
337337
self._tg = tg
338338
self._running = True
339339
task_status.started()
340-
async with self._read_stream:
341-
async for item in self._read_stream:
342-
# Duck-typed: `_context_streams.ContextReceiveStream`
343-
# exposes `.last_context` (the sender's contextvars
344-
# snapshot per message). Plain memory streams don't.
345-
sender_ctx: contextvars.Context | None = getattr(self._read_stream, "last_context", None)
346-
self._dispatch(item, on_request, on_notify, sender_ctx)
340+
async with self._read_stream, self._write_stream:
341+
try:
342+
async for item in self._read_stream:
343+
# Duck-typed: `_context_streams.ContextReceiveStream`
344+
# exposes `.last_context` (the sender's contextvars
345+
# snapshot per message). Plain memory streams don't.
346+
sender_ctx: contextvars.Context | None = getattr(self._read_stream, "last_context", None)
347+
self._dispatch(item, on_request, on_notify, sender_ctx)
348+
except anyio.ClosedResourceError:
349+
# The transport closed our receive end and we looped back
350+
# to `__anext__` on the now-closed stream (stateless SHTTP
351+
# teardown). Same as EOF.
352+
logger.debug("read stream closed by transport; treating as EOF")
347353
# Read stream EOF: wake any blocked `send_raw_request` waiters now,
348354
# *before* the task group joins, so handlers parked in
349355
# `dctx.send_raw_request()` can unwind and the join doesn't deadlock.

tests/shared/test_jsonrpc_dispatcher.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,43 @@ async def caller() -> None:
176176
s.close()
177177

178178

179+
@pytest.mark.anyio
180+
async def test_run_returns_cleanly_when_read_stream_receive_end_is_closed():
181+
"""Iterating a closed receive end raises ClosedResourceError; run() treats it as EOF.
182+
183+
Stateless SHTTP teardown closes the dispatcher's receive end after the
184+
request is handled; the next loop iteration must not surface as a crash.
185+
"""
186+
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
187+
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
188+
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
189+
on_request, on_notify = echo_handlers(Recorder())
190+
# Close the dispatcher's own receive end (not the send end) before run()
191+
# iterates it: __anext__ on a closed stream raises ClosedResourceError.
192+
c2s_recv.close()
193+
with anyio.fail_after(5):
194+
await server.run(on_request, on_notify)
195+
for s in (c2s_send, s2c_send, s2c_recv):
196+
s.close()
197+
198+
199+
@pytest.mark.anyio
200+
async def test_run_closes_write_stream_on_exit():
201+
"""run() enters both streams; the write end is released on EOF."""
202+
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
203+
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
204+
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
205+
on_request, on_notify = echo_handlers(Recorder())
206+
async with anyio.create_task_group() as tg:
207+
await tg.start(server.run, on_request, on_notify)
208+
c2s_send.close() # EOF the read side; run() exits
209+
with anyio.fail_after(5):
210+
# Write end was entered and released by run(); peer's receive sees EOF.
211+
with pytest.raises(anyio.EndOfStream):
212+
await s2c_recv.receive()
213+
s2c_recv.close()
214+
215+
179216
@pytest.mark.anyio
180217
async def test_late_response_after_timeout_is_dropped_without_crashing():
181218
handler_started = anyio.Event()

0 commit comments

Comments
 (0)