From 602e59d2f3e1da8f7023e9a69f650e27fa89af85 Mon Sep 17 00:00:00 2001 From: go165 <196723798+go165@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:11:52 +0800 Subject: [PATCH 1/5] fix(client): surface streamable http transport errors --- src/mcp/client/streamable_http.py | 18 +++- .../test_915_streamable_http_unreachable.py | 102 ++++++++++++++++++ 2 files changed, 115 insertions(+), 5 deletions(-) create mode 100644 tests/issues/test_915_streamable_http_unreachable.py diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index b5950d3b5..a26f4367c 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -468,11 +468,19 @@ async def _handle_message(session_message: SessionMessage) -> None: read_stream_writer=read_stream_writer, ) - async def handle_request_async(): - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) + async def handle_request_async() -> None: + try: + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) + except httpx.HTTPError as exc: + logger.exception("Transport error handling request") + if isinstance(message, JSONRPCRequest): + error_data = ErrorData(code=INTERNAL_ERROR, message=f"Transport error: {exc}") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + with contextlib.suppress(anyio.BrokenResourceError, anyio.ClosedResourceError): + await read_stream_writer.send(error_msg) # If this is a request, start a new task to handle it if isinstance(message, JSONRPCRequest): diff --git a/tests/issues/test_915_streamable_http_unreachable.py b/tests/issues/test_915_streamable_http_unreachable.py new file mode 100644 index 000000000..80170ba9d --- /dev/null +++ b/tests/issues/test_915_streamable_http_unreachable.py @@ -0,0 +1,102 @@ +import json +from typing import cast + +import anyio +import httpx +import pytest + +from mcp import ClientSession +from mcp.client.session_group import ClientSessionGroup, StreamableHttpParameters +from mcp.client.streamable_http import streamable_http_client +from mcp.shared.exceptions import MCPError +from mcp.types import LATEST_PROTOCOL_VERSION, RootsListChangedNotification + +pytestmark = pytest.mark.anyio + + +def _contains_cancel_scope_error(exc: BaseException) -> bool: + if isinstance(exc, RuntimeError) and "Attempted to exit cancel scope" in str(exc): + return True + + raw_grouped_exceptions = getattr(exc, "exceptions", ()) + if isinstance(raw_grouped_exceptions, tuple) and raw_grouped_exceptions: + grouped_exceptions = cast(tuple[BaseException, ...], raw_grouped_exceptions) + return any(_contains_cancel_scope_error(inner) for inner in grouped_exceptions) + + return any(_contains_cancel_scope_error(inner) for inner in (exc.__cause__, exc.__context__) if inner is not None) + + +def test_contains_cancel_scope_error_follows_exception_tree() -> None: + cancel_scope_error = RuntimeError("Attempted to exit cancel scope in a different task than it was entered in") + wrapped = RuntimeError("wrapped") + wrapped.__cause__ = cancel_scope_error + + assert _contains_cancel_scope_error(wrapped) + + +def test_contains_cancel_scope_error_follows_grouped_exceptions() -> None: + cancel_scope_error = RuntimeError("Attempted to exit cancel scope in a different task than it was entered in") + + class DummyGroup(Exception): + def __init__(self) -> None: + self.exceptions = (cancel_scope_error,) + + assert _contains_cancel_scope_error(DummyGroup()) + + +async def test_session_group_streamable_http_connect_error_is_catchable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def raise_connect_error(request: httpx.Request) -> httpx.Response: + raise httpx.ConnectError("server unavailable", request=request) + + def mock_http_client( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient( + auth=auth, + headers=headers, + timeout=timeout, + transport=httpx.MockTransport(raise_connect_error), + ) + + monkeypatch.setattr("mcp.client.session_group.create_mcp_http_client", mock_http_client) + + async with ClientSessionGroup() as group: + with anyio.fail_after(5), pytest.raises(MCPError) as exc_info: + await group.connect_to_server(StreamableHttpParameters(url="http://example.invalid/mcp")) + + assert "Transport error: server unavailable" in exc_info.value.error.message + assert not _contains_cancel_scope_error(exc_info.value) + + +async def test_streamable_http_notification_transport_error_does_not_crash() -> None: + async def handle_request(request: httpx.Request) -> httpx.Response: + data = json.loads(request.content) + if data.get("method") == "initialize": + return httpx.Response( + 200, + headers={"content-type": "application/json"}, + json={ + "jsonrpc": "2.0", + "id": data["id"], + "result": { + "protocolVersion": LATEST_PROTOCOL_VERSION, + "capabilities": {}, + "serverInfo": {"name": "mock-server", "version": "1.0.0"}, + }, + }, + ) + + raise httpx.ConnectError("notification failed", request=request) + + async with ( + httpx.AsyncClient(transport=httpx.MockTransport(handle_request)) as http_client, + streamable_http_client("http://example.invalid/mcp", http_client=http_client) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed")) + await anyio.sleep(0) From be95105384d0302d40685916be376ce08452f6cf Mon Sep 17 00:00:00 2001 From: go165 <196723798+go165@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:44:28 +0800 Subject: [PATCH 2/5] fix(client): cover streamable transport error paths --- src/mcp/client/streamable_http.py | 34 +++++++++++++++++++------------ 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index a26f4367c..8d84e8b1b 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -468,25 +468,33 @@ async def _handle_message(session_message: SessionMessage) -> None: read_stream_writer=read_stream_writer, ) - async def handle_request_async() -> None: - try: - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) - except httpx.HTTPError as exc: - logger.exception("Transport error handling request") - if isinstance(message, JSONRPCRequest): + # If this is a request, start a new task to handle it + if isinstance(message, JSONRPCRequest): + request_id = message.id + + async def handle_request_async() -> None: + try: + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) + except httpx.HTTPError as exc: + logger.exception("Transport error handling request") error_data = ErrorData(code=INTERNAL_ERROR, message=f"Transport error: {exc}") - error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data)) with contextlib.suppress(anyio.BrokenResourceError, anyio.ClosedResourceError): await read_stream_writer.send(error_msg) - # If this is a request, start a new task to handle it - if isinstance(message, JSONRPCRequest): tg.start_soon(handle_request_async) else: - await handle_request_async() + + async def handle_notification_async() -> None: + try: + await self._handle_post_request(ctx) + except httpx.HTTPError: + logger.debug("Transport error handling notification", exc_info=True) + + await handle_notification_async() async for session_message in write_stream_reader: sender_ctx = write_stream_reader.last_context From 79b75ec34cad4e2c9ed51be1d689b8ac2fce7719 Mon Sep 17 00:00:00 2001 From: go165 <196723798+go165@users.noreply.github.com> Date: Mon, 15 Jun 2026 14:23:19 +0800 Subject: [PATCH 3/5] test(client): cover session group cleanup branches --- src/mcp/client/session_group.py | 6 +++--- tests/client/test_session_group.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index 961021264..57577c662 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -147,7 +147,7 @@ def __init__( self._session_exit_stacks = {} self._component_name_hook = component_name_hook - async def __aenter__(self) -> Self: # pragma: no cover + async def __aenter__(self) -> Self: # Enter the exit stack only if we created it ourselves if self._owns_exit_stack: await self._exit_stack.__aenter__() @@ -158,7 +158,7 @@ async def __aexit__( _exc_type: type[BaseException] | None, _exc_val: BaseException | None, _exc_tb: TracebackType | None, - ) -> bool | None: # pragma: no cover + ) -> bool | None: """Closes session exit stacks and main exit stack upon completion.""" # Only close the main exit stack if we created it @@ -323,7 +323,7 @@ async def _establish_session( await self._exit_stack.enter_async_context(session_stack) return result.server_info, session - except Exception: # pragma: no cover + except Exception: # If anything during this setup fails, ensure the session-specific # stack is closed. await session_stack.aclose() diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index 6a58b39f3..261648c03 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -278,6 +278,24 @@ async def test_client_session_group_disconnect_non_existent_server(): await group.disconnect_from_server(session) +@pytest.mark.anyio +async def test_client_session_group_context_manager_with_provided_exit_stack(): + """Provided exit stacks are not entered or closed by the session group.""" + provided_stack = mock.AsyncMock(spec=contextlib.AsyncExitStack) + session_stack = mock.AsyncMock(spec=contextlib.AsyncExitStack) + session = mock.Mock(spec=mcp.ClientSession) + + group = ClientSessionGroup(exit_stack=provided_stack) + group._session_exit_stacks[session] = session_stack + + assert await group.__aenter__() is group + await group.__aexit__(None, None, None) + + provided_stack.__aenter__.assert_not_awaited() + provided_stack.aclose.assert_not_awaited() + session_stack.aclose.assert_awaited_once() + + # TODO(Marcelo): This is horrible. We should drop this test. @pytest.mark.anyio @pytest.mark.parametrize( From bd2518e0a606184b0209aab7911aa5b4968d3963 Mon Sep 17 00:00:00 2001 From: go165 <196723798+go165@users.noreply.github.com> Date: Mon, 15 Jun 2026 14:26:13 +0800 Subject: [PATCH 4/5] test(client): cover owned session group exit stack --- tests/client/test_session_group.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index 261648c03..5fb056911 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -296,6 +296,18 @@ async def test_client_session_group_context_manager_with_provided_exit_stack(): session_stack.aclose.assert_awaited_once() +@pytest.mark.anyio +async def test_client_session_group_context_manager_with_owned_exit_stack(): + """Owned exit stacks are entered and closed by the session group.""" + session_stack = mock.AsyncMock(spec=contextlib.AsyncExitStack) + session = mock.Mock(spec=mcp.ClientSession) + + async with ClientSessionGroup() as group: + group._session_exit_stacks[session] = session_stack + + session_stack.aclose.assert_awaited_once() + + # TODO(Marcelo): This is horrible. We should drop this test. @pytest.mark.anyio @pytest.mark.parametrize( From e974b7f144f694b5fffcf80dc080d7230dfea0cb Mon Sep 17 00:00:00 2001 From: go165 <196723798+go165@users.noreply.github.com> Date: Mon, 15 Jun 2026 14:28:57 +0800 Subject: [PATCH 5/5] test(client): cover session setup cleanup error --- tests/client/test_session_group.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index 5fb056911..0e74c036f 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -415,3 +415,30 @@ async def test_client_session_group_establish_session_parameterized( # 3. Assert returned values assert returned_server_info is mock_initialize_result.server_info assert returned_session is mock_entered_session + + +@pytest.mark.anyio +async def test_client_session_group_establish_session_closes_stack_on_initialize_error(): + group_exit_stack = mock.AsyncMock(spec=contextlib.AsyncExitStack) + session_stack = mock.AsyncMock(spec=contextlib.AsyncExitStack) + mock_read_stream = mock.AsyncMock(name="Read") + mock_write_stream = mock.AsyncMock(name="Write") + mock_session = mock.AsyncMock(spec=mcp.ClientSession) + mock_session.initialize.side_effect = RuntimeError("initialize failed") + session_stack.enter_async_context.side_effect = [ + (mock_read_stream, mock_write_stream), + mock_session, + ] + + group = ClientSessionGroup(exit_stack=group_exit_stack) + + with ( + mock.patch("mcp.client.session_group.contextlib.AsyncExitStack", return_value=session_stack), + mock.patch("mcp.client.session_group.mcp.stdio_client", return_value=mock.AsyncMock()), + mock.patch("mcp.client.session_group.mcp.ClientSession", return_value=mock.AsyncMock()), + pytest.raises(RuntimeError, match="initialize failed"), + ): + await group._establish_session(StdioServerParameters(command="test"), ClientSessionParameters()) + + session_stack.aclose.assert_awaited_once() + group_exit_stack.enter_async_context.assert_not_awaited()