Skip to content

Add native WebSocket support by vendoring httpx-ws#1042

Open
Kludex wants to merge 114 commits into
mainfrom
websockets-native
Open

Add native WebSocket support by vendoring httpx-ws#1042
Kludex wants to merge 114 commits into
mainfrom
websockets-native

Conversation

@Kludex

@Kludex Kludex commented Jun 24, 2026

Copy link
Copy Markdown
Member

Ports httpx-ws (MIT, by @frankie567) into httpx2 as a native httpx2._websockets package, preserving the full upstream commit history via a subtree-style merge.

What you get

  • httpx2.websocket("ws://…"), Client().websocket(…), AsyncClient().websocket(…)
  • httpx2.ASGIWebSocketTransport(app=…) for testing WebSocket endpoints against ASGI apps
  • WebSocketSession / AsyncWebSocketSession and the exception hierarchy (HTTPXWSException, WebSocketDisconnect, WebSocketUpgradeError, …) exported from the top level

Design

  • wsproto is an optional extra httpx2[ws], matching the brotli/http2/socks pattern. import httpx2 and constructing a Client work without it; WebSocket features raise a clear "install httpx2[ws]" error. The public names resolve lazily through __getattr__, with a TYPE_CHECKING block re-importing them from the typed submodules so type checkers still see real types.
  • httpcore2 stays lazily imported inside the methods that need it, preserving httpx2's existing lazy-loading.
  • The vendored code is adapted to httpx2's namespaces and typing standards (passes ruff and mypy --strict).

History

The vendor commit merges httpx-ws's complete history (98 commits) under src/httpx2/httpx2/_websockets/, so upstream authorship and provenance are preserved. Three follow-up commits adapt the code, wire the public API, and port the tests.

Tests

Vendored httpx-ws tests live under tests/httpx2/websockets/ (122 passing across asyncio and trio). Dev deps added: starlette, websockets, flaky. scripts/check is green.

AI Disclaimer

This PR was developed with the assistance of either Claude or Codex. I've reviewed and verified the changes.

Review in cubic

frankie567 and others added 30 commits November 22, 2022 09:17
> ⚠️ This is a very young project. Expect bugs 🐛

Features
--------

* `connect_ws` helper to talk to WebSockets synchronously.
* `aconnect_ws` helper to talk to WebSockets asynchronously.
* `ASGIWebSocketTransport` to test WebSockets in ASGI apps directly.
* Add ping method (#2)

* add optional arguement paylod to ping
Features
--------

* Add a `.ping` method. Thanks @kousikmitra 🎉

Improvements
------------

* Pin lower bound version of `httpx` and `httpcore` dependencies
    * `httpx>=0.23.1`
    * `httpcore>=0.16.1`
Kludex added 2 commits June 24, 2026 12:57
Add `httpx2.websocket()`, `Client.websocket()` and `AsyncClient.websocket()`
context managers, and re-export the WebSocket session classes, the
`ASGIWebSocketTransport` and the exception hierarchy from the top-level
`httpx2` namespace.

These names resolve lazily through `__getattr__` so `import httpx2` keeps
working without `wsproto`; a missing dependency raises a clear error pointing
to the `httpx2[ws]` extra. A `TYPE_CHECKING` block re-imports them from the
typed submodules so static type checkers still see the real types.
Rewrite the vendored WebSocket tests to httpx2's namespaces, point the
server fixture at uvicorn's `wsproto`/`websockets-sansio` implementations
(the legacy implementation is incompatible with `filterwarnings=error`), and
close the mock memory streams so the trio backend doesn't trip an unraisable
ResourceWarning.

Add `starlette`, `websockets` and `flaky` to the dev group, the `ws` extra to
the dev `httpx2[...]` install, and update `test_exported_members` to account
for the lazily-exported WebSocket names.
@github-actions

Copy link
Copy Markdown

Docs preview:

@codspeed-hq

codspeed-hq Bot commented Jun 24, 2026

Copy link
Copy Markdown

Merging this PR will not alter performance

✅ 15 untouched benchmarks
⏩ 7 skipped benchmarks1


Comparing websockets-native (79289f3) with main (70fac53)

Open in CodSpeed

Footnotes

  1. 7 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 29150edd36

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/httpx2/httpx2/_websockets/api.py Outdated
Comment on lines +1052 to +1053
headers = kwargs.pop("headers", {})
headers.update(_get_headers(subprotocols))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Normalize missing WebSocket headers before updating

When the advertised top-level httpx2.websocket(...) is used without headers, _api.websocket() forwards headers=None, so kwargs.pop("headers", {}) returns None here and the next line raises AttributeError before opening a connection. This makes the default public helper unusable; coerce None and other supported HeaderTypes to a mutable headers object before adding the handshake headers.

Useful? React with 👍 / 👎.

if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
subprotocols: list[str] = []
if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
subprotocols = subprotocols_header.split(",")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Strip spaces when parsing requested subprotocols

When multiple subprotocols are requested, the client-side helper serializes them as a comma-space header such as chat, superchat, but splitting only on , leaves the second value as " superchat" in scope["subprotocols"]. ASGI apps that compare exact protocol names will fail to negotiate any protocol after the first, so trim whitespace and ignore empty tokens when building the scope.

Useful? React with 👍 / 👎.

send = self._asgi_send
try:
await self.app(scope, receive, send)
except Exception as e:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor raise_app_exceptions for websocket ASGI apps

With ASGIWebSocketTransport(..., raise_app_exceptions=True) (the inherited default), exceptions raised by a websocket endpoint are caught here and converted into a close frame instead of being surfaced to the test client. That makes the default behave the same as raise_app_exceptions=False for websocket routes and can let crashing ASGI tests pass as ordinary disconnects; pass the flag into this stream and re-raise when it is enabled.

Useful? React with 👍 / 👎.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 19 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/httpx2/httpx2/_websockets/_api.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_api.py:601">
P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</violation>

<violation number="2" location="src/httpx2/httpx2/_websockets/_api.py:1052">
P1: `kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</violation>
</file>

<file name="src/httpx2/httpx2/_websockets/_transport.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_transport.py:115">
P2: `timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</violation>

<violation number="2" location="src/httpx2/httpx2/_websockets/_transport.py:128">
P2: `raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</violation>

<violation number="3" location="src/httpx2/httpx2/_websockets/_transport.py:173">
P2: Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</violation>

<violation number="4" location="src/httpx2/httpx2/_websockets/_transport.py:178">
P2: WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</violation>
</file>

<file name="src/httpx2/httpx2/_websockets/_ping.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_ping.py:25">
P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</violation>
</file>

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

Comment thread src/httpx2/httpx2/_websockets/api.py Outdated
subprotocols: list[str] | None = None,
**kwargs: typing.Any,
) -> typing.Generator[WebSocketSession, None, None]:
headers = kwargs.pop("headers", {})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: kwargs.pop("headers", {}) returns None when headers=None is explicitly passed (e.g., from httpx2.websocket(...) which defaults headers to None). The default {} only applies when the key is absent from kwargs, not when the value is None. The subsequent headers.update(...) will raise AttributeError: 'NoneType' object has no attribute 'update'. Coerce the result: headers = kwargs.pop("headers", None) or {}.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 1052:

<comment>`kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</comment>

<file context>
@@ -0,0 +1,1291 @@
+    subprotocols: list[str] | None = None,
+    **kwargs: typing.Any,
+) -> typing.Generator[WebSocketSession, None, None]:
+    headers = kwargs.pop("headers", {})
+    headers.update(_get_headers(subprotocols))
+
</file context>
Suggested change
headers = kwargs.pop("headers", {})
headers = kwargs.pop("headers", None) or {}

@@ -0,0 +1,41 @@
from __future__ import annotations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_ping.py, line 25:

<comment>ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</comment>

<file context>
@@ -0,0 +1,41 @@
+        return ping_id, event
+
+    def ack(self, ping_id: bytes | bytearray) -> None:
+        event = self._pings.pop(bytes(ping_id))
+        event.set()
+
</file context>

if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
subprotocols: list[str] = []
if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
subprotocols = subprotocols_header.split(",")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Splitting on "," without stripping whitespace leaves leading spaces in protocol names (e.g., "chat, superchat".split(",")["chat", " superchat"]). Since _get_headers() serializes subprotocols as ", ".join(subprotocols), any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use [s.strip() for s in subprotocols_header.split(",")] to trim whitespace.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 173:

<comment>Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</comment>

<file context>
@@ -0,0 +1,219 @@
+        if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
+            subprotocols: list[str] = []
+            if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
+                subprotocols = subprotocols_header.split(",")
+
+            scope = {
</file context>

scope = {
"type": "websocket",
"path": request.url.path,
"raw_path": request.url.raw_path,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: WebSocket ASGI scope passes raw_path including query string. This can break ASGI apps that expect raw_path to be path-only and use query_string separately.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 178:

<comment>WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</comment>

<file context>
@@ -0,0 +1,219 @@
+            scope = {
+                "type": "websocket",
+                "path": request.url.path,
+                "raw_path": request.url.raw_path,
+                "root_path": self.root_path,
+                "scheme": scheme,
</file context>
Suggested change
"raw_path": request.url.raw_path,
"raw_path": request.url.raw_path.split(b"?")[0],

send = self._asgi_send
try:
await self.app(scope, receive, send)
except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: raise_app_exceptions is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 128:

<comment>`raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</comment>

<file context>
@@ -0,0 +1,219 @@
+        send = self._asgi_send
+        try:
+            await self.app(scope, receive, send)
+        except Exception as e:
+            message = {
+                "type": "websocket.close",
</file context>

self._receive_queue.put(message)

async def receive(self, timeout: float | None = None) -> Message:
while self._send_queue.empty():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: timeout is ineffective in receive(), so read(timeout=...) can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 115:

<comment>`timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</comment>

<file context>
@@ -0,0 +1,219 @@
+        self._receive_queue.put(message)
+
+    async def receive(self, timeout: float | None = None) -> Message:
+        while self._send_queue.empty():
+            await anyio.sleep(0)
+        return self._send_queue.get(timeout=timeout)
</file context>

Comment on lines +601 to +603
self._send_event, self._receive_event = anyio.create_memory_object_stream[
wsproto.events.Event | HTTPXWSException
]()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 601:

<comment>Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</comment>

<file context>
@@ -0,0 +1,1291 @@
+
+    async def __aenter__(self) -> AsyncWebSocketSession:
+        async with contextlib.AsyncExitStack() as exit_stack:
+            self._send_event, self._receive_event = anyio.create_memory_object_stream[
+                wsproto.events.Event | HTTPXWSException
+            ]()
</file context>
Suggested change
self._send_event, self._receive_event = anyio.create_memory_object_stream[
wsproto.events.Event | HTTPXWSException
]()
self._send_event, self._receive_event = anyio.create_memory_object_stream[
wsproto.events.Event | HTTPXWSException
](max_buffer_size=self._queue_size)

Replace the `require_wsproto()` helper with a direct `try`/`import` guard in
each WebSocket entrypoint (`Client.websocket`, `AsyncClient.websocket` and the
top-level `__getattr__` resolution). When `wsproto` is missing the entrypoints
raise a clear ImportError pointing to the `httpx2[ws]` extra, instead of
surfacing a raw `ModuleNotFoundError`.
@Graeme22

Copy link
Copy Markdown

I like this approach better than #1034's!

…ng it

The session-scoped `server` fixture serves a raw ASGI app and never needs
WebSocket support, but its `uvicorn.Config` left `ws` at the default `"auto"`.
Once `websockets`/`wsproto` are installed (as this branch requires), uvicorn's
auto-detection deadlocks server startup in the fixture thread, so `started` is
never set and `serve_in_thread` busy-waits forever.
Add high-level tests covering `httpx2.websocket()`, `Client.websocket()`,
top-level lazy name access, and fragmented-message reassembly. Exclude the
aliased `_typing.TYPE_CHECKING` block from coverage, and mark the defensive
`wsproto`-not-installed import guards and racy test-only branches as no cover.
Resolve `httpx2._websockets` via `importlib` so `mock.patch` targets keep
working after `tests/httpx2/test_api.py` drops `httpx2` from `sys.modules`.
Track session threads by identity instead of asserting absolute counts, so
`test_threads_wont_hang` no longer flakes on threads owned by other tests.
The async keepalive loop slept for the interval and then pinged unconditionally,
so a close racing in during the sleep left it sending a Ping on a connection
already in `LOCAL_CLOSING`, raising `LocalProtocolError`. Re-check `_should_close`
after the sleep, mirroring the synchronous keepalive loop.
These mock `read()` loops run in the session's background thread and only
exit once the test closes the connection, so whether the loop body and the
trailing `raise` execute before teardown depends on scheduling. Under full
suite load that made coverage dip below 100% on some Python versions.
The WebSocket session runs its receive and keepalive loops in background
threads, whose lines coverage only records with `concurrency = ["thread"]`.
Without it, coverage dipped below 100% on whichever Python version happened
to tear a thread down early under load. Enabling it lets the existing tests
cover those lines deterministically, so the ad-hoc `no cover` pragmas on the
blocking mock reads are no longer needed.
The sync keepalive close-guard and the blocking mock reads only run when a
background thread happens to reach them before the session tears down, so
their coverage swings with scheduling and fails the gate on whichever Python
version runs slowest under load. Mark them `no cover` and drop the thread
concurrency setting, which only masked the flakiness via incidental timing.
`Client.websocket()`, `AsyncClient.websocket()` and the vendored
`connect_ws`/`aconnect_ws` forwarded arbitrary `**kwargs` to the handshake
`stream()` call. Spell out the request parameters instead - `params`,
`headers`, `cookies`, `auth`, `follow_redirects`, `timeout`, `extensions` -
so the surface is typed and matches `sse()`, and merge the mandatory upgrade
headers over any caller-supplied ones with the `Headers` union operator.
@Kludex Kludex deployed to cloudflare July 1, 2026 07:15 — with GitHub Actions Active
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants