Add native WebSocket support by vendoring httpx-ws#1042
Conversation
>⚠️ This is a very young project. Expect bugs 🐛 Features -------- * `connect_ws` helper to talk to WebSockets synchronously. * `aconnect_ws` helper to talk to WebSockets asynchronously. * `ASGIWebSocketTransport` to test WebSockets in ASGI apps directly.
Features -------- * Add a `.ping` method. Thanks @kousikmitra 🎉 Improvements ------------ * Pin lower bound version of `httpx` and `httpcore` dependencies * `httpx>=0.23.1` * `httpcore>=0.16.1`
Add `httpx2.websocket()`, `Client.websocket()` and `AsyncClient.websocket()` context managers, and re-export the WebSocket session classes, the `ASGIWebSocketTransport` and the exception hierarchy from the top-level `httpx2` namespace. These names resolve lazily through `__getattr__` so `import httpx2` keeps working without `wsproto`; a missing dependency raises a clear error pointing to the `httpx2[ws]` extra. A `TYPE_CHECKING` block re-imports them from the typed submodules so static type checkers still see the real types.
Rewrite the vendored WebSocket tests to httpx2's namespaces, point the server fixture at uvicorn's `wsproto`/`websockets-sansio` implementations (the legacy implementation is incompatible with `filterwarnings=error`), and close the mock memory streams so the trio backend doesn't trip an unraisable ResourceWarning. Add `starlette`, `websockets` and `flaky` to the dev group, the `ws` extra to the dev `httpx2[...]` install, and update `test_exported_members` to account for the lazily-exported WebSocket names.
|
Docs preview: |
Merging this PR will not alter performance
Comparing Footnotes
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 29150edd36
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| headers = kwargs.pop("headers", {}) | ||
| headers.update(_get_headers(subprotocols)) |
There was a problem hiding this comment.
Normalize missing WebSocket headers before updating
When the advertised top-level httpx2.websocket(...) is used without headers, _api.websocket() forwards headers=None, so kwargs.pop("headers", {}) returns None here and the next line raises AttributeError before opening a connection. This makes the default public helper unusable; coerce None and other supported HeaderTypes to a mutable headers object before adding the handshake headers.
Useful? React with 👍 / 👎.
| if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket": | ||
| subprotocols: list[str] = [] | ||
| if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None: | ||
| subprotocols = subprotocols_header.split(",") |
There was a problem hiding this comment.
Strip spaces when parsing requested subprotocols
When multiple subprotocols are requested, the client-side helper serializes them as a comma-space header such as chat, superchat, but splitting only on , leaves the second value as " superchat" in scope["subprotocols"]. ASGI apps that compare exact protocol names will fail to negotiate any protocol after the first, so trim whitespace and ignore empty tokens when building the scope.
Useful? React with 👍 / 👎.
| send = self._asgi_send | ||
| try: | ||
| await self.app(scope, receive, send) | ||
| except Exception as e: |
There was a problem hiding this comment.
Honor raise_app_exceptions for websocket ASGI apps
With ASGIWebSocketTransport(..., raise_app_exceptions=True) (the inherited default), exceptions raised by a websocket endpoint are caught here and converted into a close frame instead of being surfaced to the test client. That makes the default behave the same as raise_app_exceptions=False for websocket routes and can let crashing ASGI tests pass as ordinary disconnects; pass the flag into this stream and re-raise when it is enabled.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
7 issues found across 19 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/httpx2/httpx2/_websockets/_api.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_api.py:601">
P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</violation>
<violation number="2" location="src/httpx2/httpx2/_websockets/_api.py:1052">
P1: `kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</violation>
</file>
<file name="src/httpx2/httpx2/_websockets/_transport.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_transport.py:115">
P2: `timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</violation>
<violation number="2" location="src/httpx2/httpx2/_websockets/_transport.py:128">
P2: `raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</violation>
<violation number="3" location="src/httpx2/httpx2/_websockets/_transport.py:173">
P2: Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</violation>
<violation number="4" location="src/httpx2/httpx2/_websockets/_transport.py:178">
P2: WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</violation>
</file>
<file name="src/httpx2/httpx2/_websockets/_ping.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_ping.py:25">
P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| subprotocols: list[str] | None = None, | ||
| **kwargs: typing.Any, | ||
| ) -> typing.Generator[WebSocketSession, None, None]: | ||
| headers = kwargs.pop("headers", {}) |
There was a problem hiding this comment.
P1: kwargs.pop("headers", {}) returns None when headers=None is explicitly passed (e.g., from httpx2.websocket(...) which defaults headers to None). The default {} only applies when the key is absent from kwargs, not when the value is None. The subsequent headers.update(...) will raise AttributeError: 'NoneType' object has no attribute 'update'. Coerce the result: headers = kwargs.pop("headers", None) or {}.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 1052:
<comment>`kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</comment>
<file context>
@@ -0,0 +1,1291 @@
+ subprotocols: list[str] | None = None,
+ **kwargs: typing.Any,
+) -> typing.Generator[WebSocketSession, None, None]:
+ headers = kwargs.pop("headers", {})
+ headers.update(_get_headers(subprotocols))
+
</file context>
| headers = kwargs.pop("headers", {}) | |
| headers = kwargs.pop("headers", None) or {} |
| @@ -0,0 +1,41 @@ | |||
| from __future__ import annotations | |||
There was a problem hiding this comment.
P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_ping.py, line 25:
<comment>ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</comment>
<file context>
@@ -0,0 +1,41 @@
+ return ping_id, event
+
+ def ack(self, ping_id: bytes | bytearray) -> None:
+ event = self._pings.pop(bytes(ping_id))
+ event.set()
+
</file context>
| if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket": | ||
| subprotocols: list[str] = [] | ||
| if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None: | ||
| subprotocols = subprotocols_header.split(",") |
There was a problem hiding this comment.
P2: Splitting on "," without stripping whitespace leaves leading spaces in protocol names (e.g., "chat, superchat".split(",") → ["chat", " superchat"]). Since _get_headers() serializes subprotocols as ", ".join(subprotocols), any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use [s.strip() for s in subprotocols_header.split(",")] to trim whitespace.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 173:
<comment>Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</comment>
<file context>
@@ -0,0 +1,219 @@
+ if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
+ subprotocols: list[str] = []
+ if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
+ subprotocols = subprotocols_header.split(",")
+
+ scope = {
</file context>
| scope = { | ||
| "type": "websocket", | ||
| "path": request.url.path, | ||
| "raw_path": request.url.raw_path, |
There was a problem hiding this comment.
P2: WebSocket ASGI scope passes raw_path including query string. This can break ASGI apps that expect raw_path to be path-only and use query_string separately.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 178:
<comment>WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</comment>
<file context>
@@ -0,0 +1,219 @@
+ scope = {
+ "type": "websocket",
+ "path": request.url.path,
+ "raw_path": request.url.raw_path,
+ "root_path": self.root_path,
+ "scheme": scheme,
</file context>
| "raw_path": request.url.raw_path, | |
| "raw_path": request.url.raw_path.split(b"?")[0], |
| send = self._asgi_send | ||
| try: | ||
| await self.app(scope, receive, send) | ||
| except Exception as e: |
There was a problem hiding this comment.
P2: raise_app_exceptions is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 128:
<comment>`raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</comment>
<file context>
@@ -0,0 +1,219 @@
+ send = self._asgi_send
+ try:
+ await self.app(scope, receive, send)
+ except Exception as e:
+ message = {
+ "type": "websocket.close",
</file context>
| self._receive_queue.put(message) | ||
|
|
||
| async def receive(self, timeout: float | None = None) -> Message: | ||
| while self._send_queue.empty(): |
There was a problem hiding this comment.
P2: timeout is ineffective in receive(), so read(timeout=...) can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 115:
<comment>`timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</comment>
<file context>
@@ -0,0 +1,219 @@
+ self._receive_queue.put(message)
+
+ async def receive(self, timeout: float | None = None) -> Message:
+ while self._send_queue.empty():
+ await anyio.sleep(0)
+ return self._send_queue.get(timeout=timeout)
</file context>
| self._send_event, self._receive_event = anyio.create_memory_object_stream[ | ||
| wsproto.events.Event | HTTPXWSException | ||
| ]() |
There was a problem hiding this comment.
P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 601:
<comment>Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</comment>
<file context>
@@ -0,0 +1,1291 @@
+
+ async def __aenter__(self) -> AsyncWebSocketSession:
+ async with contextlib.AsyncExitStack() as exit_stack:
+ self._send_event, self._receive_event = anyio.create_memory_object_stream[
+ wsproto.events.Event | HTTPXWSException
+ ]()
</file context>
| self._send_event, self._receive_event = anyio.create_memory_object_stream[ | |
| wsproto.events.Event | HTTPXWSException | |
| ]() | |
| self._send_event, self._receive_event = anyio.create_memory_object_stream[ | |
| wsproto.events.Event | HTTPXWSException | |
| ](max_buffer_size=self._queue_size) |
Replace the `require_wsproto()` helper with a direct `try`/`import` guard in each WebSocket entrypoint (`Client.websocket`, `AsyncClient.websocket` and the top-level `__getattr__` resolution). When `wsproto` is missing the entrypoints raise a clear ImportError pointing to the `httpx2[ws]` extra, instead of surfacing a raw `ModuleNotFoundError`.
|
I like this approach better than #1034's! |
…ng it The session-scoped `server` fixture serves a raw ASGI app and never needs WebSocket support, but its `uvicorn.Config` left `ws` at the default `"auto"`. Once `websockets`/`wsproto` are installed (as this branch requires), uvicorn's auto-detection deadlocks server startup in the fixture thread, so `started` is never set and `serve_in_thread` busy-waits forever.
Add high-level tests covering `httpx2.websocket()`, `Client.websocket()`, top-level lazy name access, and fragmented-message reassembly. Exclude the aliased `_typing.TYPE_CHECKING` block from coverage, and mark the defensive `wsproto`-not-installed import guards and racy test-only branches as no cover.
Resolve `httpx2._websockets` via `importlib` so `mock.patch` targets keep working after `tests/httpx2/test_api.py` drops `httpx2` from `sys.modules`. Track session threads by identity instead of asserting absolute counts, so `test_threads_wont_hang` no longer flakes on threads owned by other tests.
The async keepalive loop slept for the interval and then pinged unconditionally, so a close racing in during the sleep left it sending a Ping on a connection already in `LOCAL_CLOSING`, raising `LocalProtocolError`. Re-check `_should_close` after the sleep, mirroring the synchronous keepalive loop.
These mock `read()` loops run in the session's background thread and only exit once the test closes the connection, so whether the loop body and the trailing `raise` execute before teardown depends on scheduling. Under full suite load that made coverage dip below 100% on some Python versions.
The WebSocket session runs its receive and keepalive loops in background threads, whose lines coverage only records with `concurrency = ["thread"]`. Without it, coverage dipped below 100% on whichever Python version happened to tear a thread down early under load. Enabling it lets the existing tests cover those lines deterministically, so the ad-hoc `no cover` pragmas on the blocking mock reads are no longer needed.
The sync keepalive close-guard and the blocking mock reads only run when a background thread happens to reach them before the session tears down, so their coverage swings with scheduling and fails the gate on whichever Python version runs slowest under load. Mark them `no cover` and drop the thread concurrency setting, which only masked the flakiness via incidental timing.
`Client.websocket()`, `AsyncClient.websocket()` and the vendored `connect_ws`/`aconnect_ws` forwarded arbitrary `**kwargs` to the handshake `stream()` call. Spell out the request parameters instead - `params`, `headers`, `cookies`, `auth`, `follow_redirects`, `timeout`, `extensions` - so the surface is typed and matches `sse()`, and merge the mandatory upgrade headers over any caller-supplied ones with the `Headers` union operator.
Ports httpx-ws (MIT, by @frankie567) into httpx2 as a native
httpx2._websocketspackage, preserving the full upstream commit history via a subtree-style merge.What you get
httpx2.websocket("ws://…"),Client().websocket(…),AsyncClient().websocket(…)httpx2.ASGIWebSocketTransport(app=…)for testing WebSocket endpoints against ASGI appsWebSocketSession/AsyncWebSocketSessionand the exception hierarchy (HTTPXWSException,WebSocketDisconnect,WebSocketUpgradeError, …) exported from the top levelDesign
wsprotois an optional extrahttpx2[ws], matching thebrotli/http2/sockspattern.import httpx2and constructing aClientwork without it; WebSocket features raise a clear "installhttpx2[ws]" error. The public names resolve lazily through__getattr__, with aTYPE_CHECKINGblock re-importing them from the typed submodules so type checkers still see real types.httpcore2stays lazily imported inside the methods that need it, preserving httpx2's existing lazy-loading.ruffandmypy --strict).History
The vendor commit merges httpx-ws's complete history (98 commits) under
src/httpx2/httpx2/_websockets/, so upstream authorship and provenance are preserved. Three follow-up commits adapt the code, wire the public API, and port the tests.Tests
Vendored httpx-ws tests live under
tests/httpx2/websockets/(122 passing across asyncio and trio). Dev deps added:starlette,websockets,flaky.scripts/checkis green.AI Disclaimer
This PR was developed with the assistance of either Claude or Codex. I've reviewed and verified the changes.