|
8 | 8 | import anyio |
9 | 9 | import pytest |
10 | 10 |
|
| 11 | +import mcp.server.stdio as stdio_module |
11 | 12 | from mcp.server.mcpserver import MCPServer |
12 | 13 | from mcp.server.stdio import stdio_server |
13 | 14 | from mcp.shared.message import SessionMessage |
@@ -96,6 +97,32 @@ async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch) -> Non |
96 | 97 | assert second.message == valid |
97 | 98 |
|
98 | 99 |
|
| 100 | +@pytest.mark.anyio |
| 101 | +async def test_stdio_server_disables_newline_translation(monkeypatch: pytest.MonkeyPatch): |
| 102 | + raw_stdin = io.BytesIO() |
| 103 | + raw_stdout = io.BytesIO() |
| 104 | + |
| 105 | + monkeypatch.setattr(sys, "stdin", TextIOWrapper(raw_stdin, encoding="utf-8")) |
| 106 | + monkeypatch.setattr(sys, "stdout", TextIOWrapper(raw_stdout, encoding="utf-8")) |
| 107 | + |
| 108 | + calls: list[dict[str, object | None]] = [] |
| 109 | + real_text_io_wrapper = TextIOWrapper |
| 110 | + |
| 111 | + def spy(buffer: io.BufferedIOBase, *args: object, **kwargs: object) -> TextIOWrapper: |
| 112 | + calls.append({"errors": kwargs.get("errors"), "newline": kwargs.get("newline")}) |
| 113 | + return real_text_io_wrapper(buffer, *args, **kwargs) |
| 114 | + |
| 115 | + monkeypatch.setattr(stdio_module, "TextIOWrapper", spy) |
| 116 | + |
| 117 | + with anyio.fail_after(5): |
| 118 | + async with stdio_server() as (read_stream, write_stream): |
| 119 | + await write_stream.aclose() |
| 120 | + await read_stream.aclose() |
| 121 | + |
| 122 | + assert {"errors": "replace", "newline": ""} in calls |
| 123 | + assert {"errors": None, "newline": ""} in calls |
| 124 | + |
| 125 | + |
99 | 126 | class _KeepOpenBytesIO(io.BytesIO): |
100 | 127 | """A BytesIO that survives its TextIOWrapper being closed. |
101 | 128 |
|
|
0 commit comments