Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions tests/issues/test_1363_race_condition_streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from contextlib import asynccontextmanager

import anyio
import anyio.to_thread
import httpx
import pytest
from starlette.applications import Starlette
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, app: Starlette):
super().__init__(daemon=True)
self.app = app
self._stop_event = threading.Event()
self._ready_event = threading.Event()

def run(self) -> None:
"""Run the lifespan in a new event loop."""
Expand All @@ -78,12 +80,19 @@ async def run_lifespan():
lifespan_context = getattr(self.app.router, "lifespan_context", None)
assert lifespan_context is not None # Tests always create apps with lifespan
async with lifespan_context(self.app):
# Only signal readiness once lifespan startup has completed, i.e. the
# session manager's task group exists and requests can be handled.
self._ready_event.set()
# Wait until stop is requested
while not self._stop_event.is_set():
await anyio.sleep(0.1)

anyio.run(run_lifespan)

def wait_ready(self, timeout: float = 5.0) -> None:
"""Block until the lifespan has started; call from a worker thread, not the event loop."""
assert self._ready_event.wait(timeout), "server thread did not start its lifespan in time"

def stop(self) -> None:
"""Signal the thread to stop."""
self._stop_event.set()
Expand Down Expand Up @@ -132,8 +141,8 @@ async def test_race_condition_invalid_accept_headers(caplog: pytest.LogCaptureFi
server_thread.start()

try:
# Give the server thread a moment to start
await anyio.sleep(0.1)
# Wait for the server thread to enter the lifespan before sending requests
await anyio.to_thread.run_sync(server_thread.wait_ready)

# Suppress WARNING logs (expected validation errors) and capture ERROR logs
with caplog.at_level(logging.ERROR):
Expand Down Expand Up @@ -203,8 +212,8 @@ async def test_race_condition_invalid_content_type(caplog: pytest.LogCaptureFixt
server_thread.start()

try:
# Give the server thread a moment to start
await anyio.sleep(0.1)
# Wait for the server thread to enter the lifespan before sending requests
await anyio.to_thread.run_sync(server_thread.wait_ready)

# Suppress WARNING logs (expected validation errors) and capture ERROR logs
with caplog.at_level(logging.ERROR):
Expand Down Expand Up @@ -243,8 +252,8 @@ async def test_race_condition_message_router_async_for(caplog: pytest.LogCapture
server_thread.start()

try:
# Give the server thread a moment to start
await anyio.sleep(0.1)
# Wait for the server thread to enter the lifespan before sending requests
await anyio.to_thread.run_sync(server_thread.wait_ready)

# Suppress WARNING logs (expected validation errors) and capture ERROR logs
with caplog.at_level(logging.ERROR):
Expand Down
Loading