55(server→client and client→server) using the streamable HTTP transport.
66"""
77
8- import multiprocessing
9- import socket
10- from collections . abc import Generator
8+ from collections . abc import AsyncIterator
9+ from contextlib import asynccontextmanager
10+ from typing import Any
1111
12+ import httpx
1213import pytest
14+ from starlette .applications import Starlette
15+ from starlette .routing import Mount
1316
17+ import mcp .types as types
1418from mcp .client .session import ClientSession
1519from mcp .client .streamable_http import streamable_http_client
16- from tests .test_helpers import wait_for_server
20+ from mcp .server import Server
21+ from mcp .server .streamable_http_manager import StreamableHTTPSessionManager
22+ from mcp .types import CallToolResult , TextContent , Tool
23+ from tests .interaction .transports import StreamingASGITransport
24+
25+ # The in-process app is mounted at this origin purely so URLs are well-formed; nothing listens here.
26+ BASE_URL = "http://127.0.0.1:8000"
27+
28+ # v1's streamable-HTTP server transport leaks a handful of anyio memory streams on teardown when
29+ # run in process; the old subprocess harness never observed them. The interaction suite registers
30+ # the same two scoped filters globally from tests/interaction/conftest.py (see the comment there),
31+ # but they only take effect when that package's conftest is loaded; these markers keep this file
32+ # self-contained for isolated runs. The filters are scoped to anyio's MemoryObject*Stream leak
33+ # signature so an unrelated leak still fails the suite.
34+ pytestmark = [
35+ pytest .mark .filterwarnings ("ignore:.*MemoryObject(Send|Receive)Stream:pytest.PytestUnraisableExceptionWarning" ),
36+ pytest .mark .filterwarnings ("ignore:.*MemoryObject(Send|Receive)Stream:ResourceWarning" ),
37+ ]
1738
1839# Test constants with various Unicode characters
1940UNICODE_TEST_STRINGS = {
3556}
3657
3758
38- def run_unicode_server (port : int ) -> None : # pragma: no cover
39- """Run the Unicode test server in a separate process."""
40- # Import inside the function since this runs in a separate process
41- from collections .abc import AsyncGenerator
42- from contextlib import asynccontextmanager
43- from typing import Any
44-
45- import uvicorn
46- from starlette .applications import Starlette
47- from starlette .routing import Mount
48-
49- import mcp .types as types
50- from mcp .server import Server
51- from mcp .server .streamable_http_manager import StreamableHTTPSessionManager
52- from mcp .types import TextContent , Tool
53-
54- # Need to recreate the server setup in this process
55- server = Server (name = "unicode_test_server" )
59+ def make_unicode_server () -> Server [object , object ]:
60+ """The Unicode echo server: tool and prompt contents that exercise non-ASCII round trips."""
61+ server : Server [object , object ] = Server (name = "unicode_test_server" )
5662
5763 @server .list_tools ()
58- async def list_tools () -> list [Tool ]:
59- """List tools with Unicode descriptions."""
64+ async def handle_list_tools () -> list [Tool ]:
6065 return [
6166 Tool (
6267 name = "echo_unicode" ,
@@ -72,22 +77,12 @@ async def list_tools() -> list[Tool]:
7277 ]
7378
7479 @server .call_tool ()
75- async def call_tool (name : str , arguments : dict [str , Any ] | None ) -> list [TextContent ]:
76- """Handle tool calls with Unicode content."""
77- if name == "echo_unicode" :
78- text = arguments .get ("text" , "" ) if arguments else ""
79- return [
80- TextContent (
81- type = "text" ,
82- text = f"Echo: { text } " ,
83- )
84- ]
85- else :
86- raise ValueError (f"Unknown tool: { name } " )
80+ async def handle_call_tool (name : str , arguments : dict [str , Any ]) -> CallToolResult :
81+ assert name == "echo_unicode"
82+ return CallToolResult (content = [TextContent (type = "text" , text = f"Echo: { arguments ['text' ]} " )])
8783
8884 @server .list_prompts ()
89- async def list_prompts () -> list [types .Prompt ]:
90- """List prompts with Unicode names and descriptions."""
85+ async def handle_list_prompts () -> list [types .Prompt ]:
9186 return [
9287 types .Prompt (
9388 name = "unicode_prompt" ,
@@ -97,137 +92,90 @@ async def list_prompts() -> list[types.Prompt]:
9792 ]
9893
9994 @server .get_prompt ()
100- async def get_prompt (name : str , arguments : dict [str , Any ] | None ) -> types .GetPromptResult :
101- """Get a prompt with Unicode content."""
102- if name == "unicode_prompt" :
103- return types .GetPromptResult (
104- messages = [
105- types .PromptMessage (
106- role = "user" ,
107- content = types .TextContent (
108- type = "text" ,
109- text = "Hello世界🌍Привет안녕مرحباשלום" ,
110- ),
111- )
112- ]
113- )
114- raise ValueError (f"Unknown prompt: { name } " )
115-
116- # Create the session manager
117- session_manager = StreamableHTTPSessionManager (
118- app = server ,
119- json_response = False , # Use SSE for testing
120- )
121-
122- @asynccontextmanager
123- async def lifespan (app : Starlette ) -> AsyncGenerator [None , None ]:
124- async with session_manager .run ():
125- yield
126-
127- # Create an ASGI application
128- app = Starlette (
129- debug = True ,
130- routes = [
131- Mount ("/mcp" , app = session_manager .handle_request ),
132- ],
133- lifespan = lifespan ,
134- )
135-
136- # Run the server
137- config = uvicorn .Config (
138- app = app ,
139- host = "127.0.0.1" ,
140- port = port ,
141- log_level = "error" ,
142- )
143- uvicorn_server = uvicorn .Server (config )
144- uvicorn_server .run ()
145-
146-
147- @pytest .fixture
148- def unicode_server_port () -> int :
149- """Find an available port for the Unicode test server."""
150- with socket .socket () as s :
151- s .bind (("127.0.0.1" , 0 ))
152- return s .getsockname ()[1 ]
153-
154-
155- @pytest .fixture
156- def running_unicode_server (unicode_server_port : int ) -> Generator [str , None , None ]:
157- """Start a Unicode test server in a separate process."""
158- proc = multiprocessing .Process (target = run_unicode_server , kwargs = {"port" : unicode_server_port }, daemon = True )
159- proc .start ()
160-
161- # Wait for server to be ready
162- wait_for_server (unicode_server_port )
163-
164- try :
165- yield f"http://127.0.0.1:{ unicode_server_port } "
166- finally :
167- # Clean up - try graceful termination first
168- proc .terminate ()
169- proc .join (timeout = 2 )
170- if proc .is_alive (): # pragma: no cover
171- proc .kill ()
172- proc .join (timeout = 1 )
95+ async def handle_get_prompt (name : str , arguments : dict [str , str ] | None ) -> types .GetPromptResult :
96+ assert name == "unicode_prompt"
97+ return types .GetPromptResult (
98+ messages = [
99+ types .PromptMessage (
100+ role = "user" ,
101+ content = types .TextContent (type = "text" , text = "Hello世界🌍Привет안녕مرحباשלום" ),
102+ )
103+ ]
104+ )
105+
106+ return server
107+
108+
109+ @asynccontextmanager
110+ async def unicode_session () -> AsyncIterator [ClientSession ]:
111+ """Yield an initialized ClientSession speaking streamable HTTP (SSE responses) to the
112+ Unicode test server, entirely in process."""
113+ # SSE response mode, so Unicode rides the SSE event encoding rather than a plain JSON body.
114+ session_manager = StreamableHTTPSessionManager (app = make_unicode_server (), json_response = False )
115+ app = Starlette (routes = [Mount ("/mcp" , app = session_manager .handle_request )])
116+
117+ async with (
118+ session_manager .run (),
119+ # follow_redirects matches the SDK's own client factory; Starlette's Mount 307-redirects
120+ # the bare /mcp path to /mcp/.
121+ httpx .AsyncClient (
122+ transport = StreamingASGITransport (app ), base_url = BASE_URL , follow_redirects = True
123+ ) as http_client ,
124+ streamable_http_client (f"{ BASE_URL } /mcp" , http_client = http_client ) as (
125+ read_stream ,
126+ write_stream ,
127+ _get_session_id ,
128+ ),
129+ ClientSession (read_stream , write_stream ) as session ,
130+ ):
131+ await session .initialize ()
132+ yield session
173133
174134
175135@pytest .mark .anyio
176- async def test_streamable_http_client_unicode_tool_call (running_unicode_server : str ) -> None :
136+ async def test_streamable_http_client_unicode_tool_call () -> None :
177137 """Test that Unicode text is correctly handled in tool calls via streamable HTTP."""
178- base_url = running_unicode_server
179- endpoint_url = f"{ base_url } /mcp"
180-
181- async with streamable_http_client (endpoint_url ) as (read_stream , write_stream , _get_session_id ):
182- async with ClientSession (read_stream , write_stream ) as session :
183- await session .initialize ()
184-
185- # Test 1: List tools (server→client Unicode in descriptions)
186- tools = await session .list_tools ()
187- assert len (tools .tools ) == 1
138+ async with unicode_session () as session :
139+ # Test 1: List tools (server→client Unicode in descriptions)
140+ tools = await session .list_tools ()
141+ assert len (tools .tools ) == 1
188142
189- # Check Unicode in tool descriptions
190- echo_tool = tools .tools [0 ]
191- assert echo_tool .name == "echo_unicode"
192- assert echo_tool .description is not None
193- assert "🔤" in echo_tool .description
194- assert "👋" in echo_tool .description
143+ # Check Unicode in tool descriptions
144+ echo_tool = tools .tools [0 ]
145+ assert echo_tool .name == "echo_unicode"
146+ assert echo_tool .description is not None
147+ assert "🔤" in echo_tool .description
148+ assert "👋" in echo_tool .description
195149
196- # Test 2: Send Unicode text in tool call (client→server→client)
197- for test_name , test_string in UNICODE_TEST_STRINGS .items ():
198- result = await session .call_tool ("echo_unicode" , arguments = {"text" : test_string })
150+ # Test 2: Send Unicode text in tool call (client→server→client)
151+ for test_name , test_string in UNICODE_TEST_STRINGS .items ():
152+ result = await session .call_tool ("echo_unicode" , arguments = {"text" : test_string })
199153
200- # Verify server correctly received and echoed back Unicode
201- assert len (result .content ) == 1
202- content = result .content [0 ]
203- assert content .type == "text"
204- assert f"Echo: { test_string } " == content .text , f"Failed for { test_name } "
154+ # Verify server correctly received and echoed back Unicode
155+ assert len (result .content ) == 1
156+ content = result .content [0 ]
157+ assert content .type == "text"
158+ assert f"Echo: { test_string } " == content .text , f"Failed for { test_name } "
205159
206160
207161@pytest .mark .anyio
208- async def test_streamable_http_client_unicode_prompts (running_unicode_server : str ) -> None :
162+ async def test_streamable_http_client_unicode_prompts () -> None :
209163 """Test that Unicode text is correctly handled in prompts via streamable HTTP."""
210- base_url = running_unicode_server
211- endpoint_url = f"{ base_url } /mcp"
212-
213- async with streamable_http_client (endpoint_url ) as (read_stream , write_stream , _get_session_id ):
214- async with ClientSession (read_stream , write_stream ) as session :
215- await session .initialize ()
216-
217- # Test 1: List prompts (server→client Unicode in descriptions)
218- prompts = await session .list_prompts ()
219- assert len (prompts .prompts ) == 1
220-
221- prompt = prompts .prompts [0 ]
222- assert prompt .name == "unicode_prompt"
223- assert prompt .description is not None
224- assert "Слой хранилища, где располагаются" in prompt .description
225-
226- # Test 2: Get prompt with Unicode content (server→client)
227- result = await session .get_prompt ("unicode_prompt" , arguments = {})
228- assert len (result .messages ) == 1
229-
230- message = result .messages [0 ]
231- assert message .role == "user"
232- assert message .content .type == "text"
233- assert message .content .text == "Hello世界🌍Привет안녕مرحباשלום"
164+ async with unicode_session () as session :
165+ # Test 1: List prompts (server→client Unicode in descriptions)
166+ prompts = await session .list_prompts ()
167+ assert len (prompts .prompts ) == 1
168+
169+ prompt = prompts .prompts [0 ]
170+ assert prompt .name == "unicode_prompt"
171+ assert prompt .description is not None
172+ assert "Слой хранилища, где располагаются" in prompt .description
173+
174+ # Test 2: Get prompt with Unicode content (server→client)
175+ result = await session .get_prompt ("unicode_prompt" , arguments = {})
176+ assert len (result .messages ) == 1
177+
178+ message = result .messages [0 ]
179+ assert message .role == "user"
180+ assert message .content .type == "text"
181+ assert message .content .text == "Hello世界🌍Привет안녕مرحباשלום"
0 commit comments