|
28 | 28 | from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client |
29 | 29 | from mcp.server import Server, ServerRequestContext |
30 | 30 | from mcp.server.streamable_http import ( |
| 31 | + GET_STREAM_KEY, |
31 | 32 | MCP_PROTOCOL_VERSION_HEADER, |
32 | 33 | MCP_SESSION_ID_HEADER, |
33 | 34 | SESSION_ID_PATTERN, |
@@ -2224,3 +2225,44 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(context_ |
2224 | 2225 |
|
2225 | 2226 | assert "content-type" in headers_data |
2226 | 2227 | assert headers_data["content-type"] == "application/json" |
| 2228 | + |
| 2229 | + |
| 2230 | +@pytest.mark.anyio |
| 2231 | +async def test_standalone_stream_teardown_mid_listen_is_not_an_error(caplog: pytest.LogCaptureFixture) -> None: |
| 2232 | + """Tearing down the standalone stream under its parked writer produces no error log. |
| 2233 | +
|
| 2234 | + Cleanup closes the send side first, so a writer parked in receive() ends on a clean |
| 2235 | + end-of-stream. This pins that close ordering: reversing it would wake the parked writer |
| 2236 | + with ClosedResourceError on every disconnect. (The timing window where teardown lands |
| 2237 | + between dequeues is handled by the writer's ClosedResourceError arm, which cannot be |
| 2238 | + forced deterministically from the public surface.) |
| 2239 | + """ |
| 2240 | + session_manager = StreamableHTTPSessionManager( |
| 2241 | + app=_create_server(), |
| 2242 | + security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=False), |
| 2243 | + ) |
| 2244 | + app = Starlette(routes=[Mount("/mcp", app=session_manager.handle_request)]) |
| 2245 | + notified = anyio.Event() |
| 2246 | + |
| 2247 | + async def message_handler( |
| 2248 | + message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception, |
| 2249 | + ) -> None: |
| 2250 | + if isinstance(message, types.ResourceUpdatedNotification): |
| 2251 | + notified.set() |
| 2252 | + |
| 2253 | + async with session_manager.run(): |
| 2254 | + async with ( |
| 2255 | + make_client(app) as http_client, |
| 2256 | + streamable_http_client(f"{BASE_URL}/mcp", http_client=http_client) as (read_stream, write_stream), |
| 2257 | + ClientSession(read_stream, write_stream, message_handler=message_handler) as session, |
| 2258 | + ): |
| 2259 | + await session.initialize() |
| 2260 | + # Prove the standalone GET writer is live: a notification with no |
| 2261 | + # related request rides the GET stream to the client. |
| 2262 | + await session.call_tool("test_tool_with_standalone_notification", {}) |
| 2263 | + with anyio.fail_after(5): |
| 2264 | + await notified.wait() |
| 2265 | + # Tear the standalone stream down while the writer is parked on it. |
| 2266 | + (transport,) = session_manager._server_instances.values() # pyright: ignore[reportPrivateUsage] |
| 2267 | + await transport._clean_up_memory_streams(GET_STREAM_KEY) # pyright: ignore[reportPrivateUsage] |
| 2268 | + assert "Error in standalone SSE writer" not in caplog.text |
0 commit comments