Skip to content

Commit 39938a2

Browse files
committed
Harden the streaming ASGI bridge against trailing responses
Once an application sends the final http.response.body chunk, the bridge now drops any further http.response.start/body messages, matching what a real ASGI server's client observes. Starlette's request_response sends a trailing response when an endpoint's sub-application has already completed a rejection response, so the SSE security rejection tests no longer depend on scheduling for the client to read the first status. Pinned by two new bridge contract tests (registered as harness self-tests). Also widen the trio-leg unraisable-warning filter to the whole httpx/httpx-sse generator chain: abandoning EventSource.aiter_sse abandons the nested aiter_lines -> aiter_text -> aiter_bytes -> aiter_raw generators, and which link the finalizer reports depends on GC timing and Python version.
1 parent 600c96e commit 39938a2

4 files changed

Lines changed: 69 additions & 6 deletions

File tree

tests/interaction/conftest.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ def pytest_configure(config: pytest.Config) -> None:
2727
# only test on the trio backend. v1's streamable-HTTP client abandons its httpx/httpx-sse
2828
# response generators when the session task group is cancelled at teardown; asyncio finalizes
2929
# abandoned async generators silently at loop shutdown, but trio's finalizer warns about each
30-
# one (`Async generator ... was garbage collected before it had been exhausted`). The fixes
31-
# live in `src/` on `main` and are out of scope for this tests-only backport. The filters are
32-
# scoped to the two known httpx generator signatures so an unrelated leak still fails the suite.
30+
# one (`Async generator ... was garbage collected before it had been exhausted`). Abandoning
31+
# `EventSource.aiter_sse` abandons the whole generator chain nested under it (`aiter_lines` ->
32+
# `aiter_text` -> `aiter_bytes` -> `aiter_raw`), and which links the finalizer reports depends
33+
# on GC timing and Python version. The fixes live in `src/` on `main` and are out of scope for
34+
# this tests-only backport. The filters are scoped to the httpx/httpx-sse generator signatures
35+
# (every generator in that chain lives on `Response` or `EventSource`) so an unrelated leak
36+
# still fails the suite.
3337
config.addinivalue_line("filterwarnings", "ignore:Async generator 'httpx:ResourceWarning")
3438
config.addinivalue_line(
3539
"filterwarnings",
36-
"ignore:.*async_generator object (Response.aiter_text|EventSource.aiter_sse)"
37-
":pytest.PytestUnraisableExceptionWarning",
40+
"ignore:.*async_generator object (Response|EventSource).aiter_:pytest.PytestUnraisableExceptionWarning",
3841
)
3942

4043

tests/interaction/test_coverage.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
_HARNESS_SELF_TESTS = {
2828
"tests.interaction.lowlevel.test_wire.test_recording_read_stream_ends_iteration_when_the_sender_closes",
2929
"tests.interaction.transports.test_bridge.test_response_chunks_arrive_as_the_application_sends_them",
30+
"tests.interaction.transports.test_bridge.test_a_second_response_after_the_first_completes_is_invisible_to_the_client",
31+
"tests.interaction.transports.test_bridge.test_body_chunks_after_the_final_chunk_are_ignored",
3032
"tests.interaction.transports.test_bridge.test_closing_the_response_delivers_a_disconnect_to_the_application",
3133
"tests.interaction.transports.test_bridge.test_an_application_failure_before_the_response_starts_fails_the_request",
3234
"tests.interaction.transports.test_bridge.test_disabling_cancel_on_close_lets_the_application_finish_after_disconnect",

tests/interaction/transports/_bridge.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
1313
- The request body is buffered before the application is invoked (MCP requests are small JSON
1414
documents); the response streams chunk by chunk.
15+
- The response ends at the first `http.response.body` whose `more_body` is falsy; anything the
16+
application sends after that is ignored, exactly as a real server's client never observes it.
1517
- Closing the response — or the whole client — delivers `http.disconnect` to the application,
1618
exactly as a real server sees when its peer goes away.
1719
- An exception the application raises before sending `http.response.start` fails the originating
@@ -116,6 +118,7 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
116118
request_delivered = False
117119
client_disconnected = anyio.Event()
118120
response_started = anyio.Event()
121+
response_complete = False
119122
response_status = 0
120123
response_headers: list[tuple[bytes, bytes]] = []
121124
application_error: Exception | None = None
@@ -130,7 +133,14 @@ async def receive_request() -> Message:
130133
return {"type": "http.disconnect"}
131134

132135
async def send_response(message: Message) -> None:
133-
nonlocal response_status, response_headers
136+
nonlocal response_complete, response_status, response_headers
137+
if response_complete:
138+
# The response ended with the final body chunk below; a real server's client never
139+
# observes anything sent after that, so drop it. Starlette's `request_response`
140+
# makes this path real: an endpoint whose sub-application already sent a complete
141+
# rejection response (the legacy SSE transport's request validation) still returns
142+
# a `Response`, which sends a trailing second start/body pair.
143+
return
134144
if message["type"] == "http.response.start":
135145
response_status = message["status"]
136146
response_headers = list(message.get("headers", []))
@@ -141,6 +151,7 @@ async def send_response(message: Message) -> None:
141151
if body:
142152
await chunk_writer.send(body)
143153
if not message.get("more_body", False):
154+
response_complete = True
144155
await chunk_writer.aclose()
145156

146157
async def run_application() -> None:

tests/interaction/transports/test_bridge.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,53 @@ async def chunked_app(scope: Scope, receive: Receive, send: Send) -> None:
4040
assert chunks == [b"first", b"second"]
4141

4242

43+
async def test_a_second_response_after_the_first_completes_is_invisible_to_the_client() -> None:
44+
"""Only the first complete response reaches the client; a trailing start/body pair is dropped.
45+
46+
Starlette's `request_response` produces exactly this sequence when an endpoint's
47+
sub-application has already sent a complete rejection response (the legacy SSE transport's
48+
request validation): the endpoint still returns a `Response`, which sends a second response.
49+
"""
50+
51+
async def double_responding_app(scope: Scope, receive: Receive, send: Send) -> None:
52+
assert scope["type"] == "http"
53+
assert (await receive())["type"] == "http.request"
54+
await send({"type": "http.response.start", "status": 421, "headers": [(b"content-type", b"text/plain")]})
55+
await send({"type": "http.response.body", "body": b"rejected", "more_body": False})
56+
await send({"type": "http.response.start", "status": 200, "headers": [(b"x-late", b"yes")]})
57+
await send({"type": "http.response.body", "body": b"too late", "more_body": False})
58+
59+
transport = StreamingASGITransport(double_responding_app)
60+
async with httpx.AsyncClient(transport=transport, base_url="http://bridge") as http:
61+
response = await http.get("/double")
62+
63+
assert response.status_code == 421
64+
assert response.text == "rejected"
65+
assert "x-late" not in response.headers
66+
67+
68+
async def test_body_chunks_after_the_final_chunk_are_ignored() -> None:
69+
"""Extra body chunks after `more_body: False` neither reach the client nor fail the application."""
70+
application_finished = anyio.Event()
71+
72+
async def overflowing_app(scope: Scope, receive: Receive, send: Send) -> None:
73+
assert scope["type"] == "http"
74+
assert (await receive())["type"] == "http.request"
75+
await send({"type": "http.response.start", "status": 200, "headers": []})
76+
await send({"type": "http.response.body", "body": b"complete", "more_body": False})
77+
await send({"type": "http.response.body", "body": b"overflow", "more_body": True})
78+
application_finished.set()
79+
80+
transport = StreamingASGITransport(overflowing_app)
81+
async with httpx.AsyncClient(transport=transport, base_url="http://bridge") as http:
82+
response = await http.get("/overflow")
83+
with anyio.fail_after(5):
84+
await application_finished.wait()
85+
86+
assert response.status_code == 200
87+
assert response.text == "complete"
88+
89+
4390
async def test_closing_the_response_delivers_a_disconnect_to_the_application() -> None:
4491
"""A client that closes the response early is seen by the application as an http.disconnect."""
4592
seen_after_request: list[Message] = []

0 commit comments

Comments
 (0)