diff --git a/src/httpcore2/CHANGELOG.md b/src/httpcore2/CHANGELOG.md index 26ee1955..bb7158e4 100644 --- a/src/httpcore2/CHANGELOG.md +++ b/src/httpcore2/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## Unreleased + +### Changed + +* Write request body chunks to the network without copying them, using h11's `send_with_data_passthrough`. A bytes-like body (e.g. a `memoryview` over an `mmap`) is now only faulted into memory as it is written to the socket. Chunked framing and header writes are unchanged. + ## 2.4.0 (June 11th, 2026) ### Fixed diff --git a/src/httpcore2/httpcore2/_async/http11.py b/src/httpcore2/httpcore2/_async/http11.py index 769825d9..e2a3db68 100644 --- a/src/httpcore2/httpcore2/_async/http11.py +++ b/src/httpcore2/httpcore2/_async/http11.py @@ -145,15 +145,34 @@ async def _send_request_body(self, request: Request) -> None: assert isinstance(request.stream, typing.AsyncIterable) async with safe_async_iterate(request.stream) as iterator: async for chunk in iterator: + if isinstance(chunk, memoryview) and chunk.itemsize != 1: + # h11 tracks Content-Length with `len()`, and the network + # backends slice send progress by byte count - both of which + # assume one byte per element. Normalise wider item types to + # a flat unsigned-byte view (zero-copy) so neither is misled. + chunk = chunk.cast("B") event = h11.Data(data=chunk) await self._send_event(event, timeout=timeout) await self._send_event(h11.EndOfMessage(), timeout=timeout) async def _send_event(self, event: h11.Event, timeout: float | None = None) -> None: - bytes_to_send = self._h11_state.send(event) - if bytes_to_send is not None: - await self._network_stream.write(bytes_to_send, timeout=timeout) + if isinstance(event, h11.Data): + # Use passthrough so a Content-Length body (a single element) is + # written straight to the network instead of through `send`'s + # `b"".join`, leaving a `memoryview` body uncopied. + chunks = self._h11_state.send_with_data_passthrough(event) + assert chunks is not None # Only `ConnectionClosed` yields `None`. + if len(chunks) == 1: + await self._network_stream.write(chunks[0], timeout=timeout) + else: + # Chunked framing wraps the body, so coalesce into one write + # as before. + await self._network_stream.write(b"".join(chunks), timeout=timeout) + else: + bytes_to_send = self._h11_state.send(event) + if bytes_to_send is not None: + await self._network_stream.write(bytes_to_send, timeout=timeout) # Receiving the response... diff --git a/src/httpcore2/httpcore2/_sync/http11.py b/src/httpcore2/httpcore2/_sync/http11.py index 14d674ae..475b356b 100644 --- a/src/httpcore2/httpcore2/_sync/http11.py +++ b/src/httpcore2/httpcore2/_sync/http11.py @@ -145,15 +145,34 @@ def _send_request_body(self, request: Request) -> None: assert isinstance(request.stream, typing.Iterable) with safe_iterate(request.stream) as iterator: for chunk in iterator: + if isinstance(chunk, memoryview) and chunk.itemsize != 1: + # h11 tracks Content-Length with `len()`, and the network + # backends slice send progress by byte count - both of which + # assume one byte per element. Normalise wider item types to + # a flat unsigned-byte view (zero-copy) so neither is misled. + chunk = chunk.cast("B") event = h11.Data(data=chunk) self._send_event(event, timeout=timeout) self._send_event(h11.EndOfMessage(), timeout=timeout) def _send_event(self, event: h11.Event, timeout: float | None = None) -> None: - bytes_to_send = self._h11_state.send(event) - if bytes_to_send is not None: - self._network_stream.write(bytes_to_send, timeout=timeout) + if isinstance(event, h11.Data): + # Use passthrough so a Content-Length body (a single element) is + # written straight to the network instead of through `send`'s + # `b"".join`, leaving a `memoryview` body uncopied. + chunks = self._h11_state.send_with_data_passthrough(event) + assert chunks is not None # Only `ConnectionClosed` yields `None`. + if len(chunks) == 1: + self._network_stream.write(chunks[0], timeout=timeout) + else: + # Chunked framing wraps the body, so coalesce into one write + # as before. + self._network_stream.write(b"".join(chunks), timeout=timeout) + else: + bytes_to_send = self._h11_state.send(event) + if bytes_to_send is not None: + self._network_stream.write(bytes_to_send, timeout=timeout) # Receiving the response... diff --git a/src/httpx2/CHANGELOG.md b/src/httpx2/CHANGELOG.md index df680940..e5f3b288 100644 --- a/src/httpx2/CHANGELOG.md +++ b/src/httpx2/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## Unreleased + +### Added + +* Support uploading a `memoryview` (e.g. over a `bytearray`, `array.array`, or an `mmap`) as request `content`, without copying it into a `bytes` object up front. A non-contiguous view raises rather than being silently copied. + ## 2.4.0 (June 11th, 2026) ### Added diff --git a/src/httpx2/httpx2/_content.py b/src/httpx2/httpx2/_content.py index 58cbdcfd..fcfd1c8a 100644 --- a/src/httpx2/httpx2/_content.py +++ b/src/httpx2/httpx2/_content.py @@ -35,6 +35,22 @@ async def __aiter__(self) -> AsyncIterator[bytes]: yield self._stream +class BufferStream(AsyncByteStream, SyncByteStream): + """ + A request body backed by a `memoryview`, yielded as a single chunk and not + eagerly read into `bytes` at request construction (see `Request.__init__`). + """ + + def __init__(self, buffer: memoryview) -> None: + self._buffer = buffer + + def __iter__(self) -> Iterator[bytes]: + yield self._buffer # type: ignore[misc] + + async def __aiter__(self) -> AsyncIterator[bytes]: + yield self._buffer # type: ignore[misc] + + class IteratorByteStream(SyncByteStream): CHUNK_SIZE = 65_536 @@ -100,7 +116,7 @@ async def __aiter__(self) -> AsyncIterator[bytes]: def encode_content( - content: str | bytes | Iterable[bytes] | AsyncIterable[bytes], + content: str | bytes | memoryview | Iterable[bytes] | AsyncIterable[bytes], ) -> tuple[dict[str, str], SyncByteStream | AsyncByteStream]: if isinstance(content, (bytes, str)): body = content.encode("utf-8") if isinstance(content, str) else content @@ -108,6 +124,18 @@ def encode_content( headers = {"Content-Length": str(content_length)} if body else {} return headers, ByteStream(body) + elif isinstance(content, memoryview): + # A non-contiguous view can't be sent without copying it, so require + # the caller to opt into that explicitly. + if not content.c_contiguous: + raise TypeError("content memoryview must be C-contiguous; copy it first, e.g. content=bytes(view)") + # Pass the buffer through as a single chunk so a large buffer isn't + # copied up front; normalise to 1-D unsigned bytes for the byte count. + buffer = content.cast("B") + content_length = buffer.nbytes + headers = {"Content-Length": str(content_length)} if content_length else {} + return headers, BufferStream(buffer) + elif isinstance(content, Iterable) and not isinstance(content, dict): # `not isinstance(content, dict)` is a bit oddly specific, but it # catches a case that's easy for users to make in error, and would diff --git a/src/httpx2/httpx2/_types.py b/src/httpx2/httpx2/_types.py index 3635da6b..d71a92cf 100644 --- a/src/httpx2/httpx2/_types.py +++ b/src/httpx2/httpx2/_types.py @@ -42,8 +42,8 @@ AuthTypes = Union[tuple[str | bytes, str | bytes], Callable[["Request"], "Request"], "Auth"] -RequestContent = str | bytes | Iterable[bytes] | AsyncIterable[bytes] -ResponseContent = str | bytes | Iterable[bytes] | AsyncIterable[bytes] +RequestContent = str | bytes | memoryview | Iterable[bytes] | AsyncIterable[bytes] +ResponseContent = str | bytes | memoryview | Iterable[bytes] | AsyncIterable[bytes] ResponseExtensions = Mapping[str, Any] RequestData = Mapping[str, Any] diff --git a/tests/httpcore2/_async/test_http11.py b/tests/httpcore2/_async/test_http11.py index 4e828359..028af4ed 100644 --- a/tests/httpcore2/_async/test_http11.py +++ b/tests/httpcore2/_async/test_http11.py @@ -1,3 +1,5 @@ +import typing + import pytest import httpcore2 @@ -345,3 +347,113 @@ async def test_http11_header_sub_100kb() -> None: response = await conn.request("GET", "https://example.com/") assert response.status == 200 assert response.content == b"" + + +class AsyncRecordingStream(httpcore2.AsyncMockStream): + """A mock stream that records the exact objects passed to `write`.""" + + def __init__(self, buffer: list[bytes]) -> None: + super().__init__(buffer) + self.writes: list[bytes | memoryview] = [] + + async def write(self, buffer: bytes | memoryview, timeout: float | None = None) -> None: + self.writes.append(buffer) + + +@pytest.mark.anyio +async def test_http11_request_body_buffer_is_passed_through() -> None: + """ + A `Content-Length` body chunk that is a bytes-like buffer (e.g. a + `memoryview`) is written straight to the network without being copied into + a new `bytes` object first, so a large buffer is only faulted into memory + as it is written out. See `_send_event`, which uses h11's + `send_with_data_passthrough` for `Data` events. + """ + origin = httpcore2.Origin(b"https", b"example.com", 443) + stream = AsyncRecordingStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Length: 0\r\n", + b"\r\n", + ] + ) + body = memoryview(bytearray(b"Hello, world!")) + + async def stream_body() -> typing.AsyncIterator[bytes]: + yield body # type: ignore[misc] + + async with httpcore2.AsyncHTTP11Connection(origin=origin, stream=stream, keepalive_expiry=5.0) as conn: + response = await conn.request( + "POST", + "https://example.com/", + headers={"Content-Length": "13"}, + content=stream_body(), + ) + assert response.status == 200 + + # The exact object we passed as the body reached the network stream, i.e. + # it was not copied into a new `bytes` object along the way. + assert any(chunk is body for chunk in stream.writes) + # ...and the bytes actually written form a correct, fully-framed request. + assert b"".join(bytes(chunk) for chunk in stream.writes) == ( + b"POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 13\r\n\r\nHello, world!" + ) + + +@pytest.mark.anyio +async def test_http11_request_body_buffer_chunked() -> None: + """ + With chunked transfer encoding the body is wrapped in framing, so the + passthrough list has multiple fragments. `_send_event` coalesces these into + a single write (its `else` branch). + """ + origin = httpcore2.Origin(b"https", b"example.com", 443) + stream = AsyncRecordingStream([b"HTTP/1.1 200 OK\r\n", b"Content-Length: 0\r\n", b"\r\n"]) + body = memoryview(bytearray(b"Hello, world!")) + + async def stream_body() -> typing.AsyncIterator[bytes]: + yield body # type: ignore[misc] + + async with httpcore2.AsyncHTTP11Connection(origin=origin, stream=stream) as conn: + # No Content-Length, so the request is sent with chunked framing. + response = await conn.request("POST", "https://example.com/", content=stream_body()) + assert response.status == 200 + + raw = b"".join(bytes(chunk) for chunk in stream.writes) + assert b"transfer-encoding: chunked" in raw.lower() + # 0xd == 13: one data chunk followed by the terminating zero-length chunk. + assert raw.endswith(b"\r\nd\r\nHello, world!\r\n0\r\n\r\n") + + +@pytest.mark.anyio +async def test_http11_request_body_buffer_itemsize_normalised() -> None: + """ + A `memoryview` with itemsize > 1 is normalised to a flat byte view before + being sent, so neither h11's `Content-Length` tracking (which uses `len()`) + nor the backend's byte-count slicing is misled into truncating the body. + """ + import array + + origin = httpcore2.Origin(b"https", b"example.com", 443) + stream = AsyncRecordingStream([b"HTTP/1.1 200 OK\r\n", b"Content-Length: 0\r\n", b"\r\n"]) + body = memoryview(array.array("I", [0x01020304] * 4)) # 16 bytes, itemsize 4 + + async def stream_body() -> typing.AsyncIterator[bytes]: + yield body # type: ignore[misc] + + async with httpcore2.AsyncHTTP11Connection(origin=origin, stream=stream) as conn: + response = await conn.request( + "POST", + "https://example.com/", + headers={"Content-Length": str(body.nbytes)}, + content=stream_body(), + ) + assert response.status == 200 + + raw = b"".join(bytes(chunk) for chunk in stream.writes) + # All 16 bytes are written (not truncated to 4 elements), and the body + # reaches the stream as a byte-granular (itemsize 1) view. + assert b"Content-Length: 16\r\n" in raw + assert raw.endswith(b"\r\n\r\n" + bytes(body)) + body_writes = [w for w in stream.writes if isinstance(w, memoryview)] + assert body_writes and all(w.itemsize == 1 for w in body_writes) diff --git a/tests/httpcore2/_sync/test_http11.py b/tests/httpcore2/_sync/test_http11.py index 46b56392..c4dcea6d 100644 --- a/tests/httpcore2/_sync/test_http11.py +++ b/tests/httpcore2/_sync/test_http11.py @@ -1,3 +1,5 @@ +import typing + import pytest import httpcore2 @@ -345,3 +347,113 @@ def test_http11_header_sub_100kb() -> None: response = conn.request("GET", "https://example.com/") assert response.status == 200 assert response.content == b"" + + +class RecordingStream(httpcore2.MockStream): + """A mock stream that records the exact objects passed to `write`.""" + + def __init__(self, buffer: list[bytes]) -> None: + super().__init__(buffer) + self.writes: list[bytes | memoryview] = [] + + def write(self, buffer: bytes | memoryview, timeout: float | None = None) -> None: + self.writes.append(buffer) + + + +def test_http11_request_body_buffer_is_passed_through() -> None: + """ + A `Content-Length` body chunk that is a bytes-like buffer (e.g. a + `memoryview`) is written straight to the network without being copied into + a new `bytes` object first, so a large buffer is only faulted into memory + as it is written out. See `_send_event`, which uses h11's + `send_with_data_passthrough` for `Data` events. + """ + origin = httpcore2.Origin(b"https", b"example.com", 443) + stream = RecordingStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Length: 0\r\n", + b"\r\n", + ] + ) + body = memoryview(bytearray(b"Hello, world!")) + + def stream_body() -> typing.Iterator[bytes]: + yield body # type: ignore[misc] + + with httpcore2.HTTP11Connection(origin=origin, stream=stream, keepalive_expiry=5.0) as conn: + response = conn.request( + "POST", + "https://example.com/", + headers={"Content-Length": "13"}, + content=stream_body(), + ) + assert response.status == 200 + + # The exact object we passed as the body reached the network stream, i.e. + # it was not copied into a new `bytes` object along the way. + assert any(chunk is body for chunk in stream.writes) + # ...and the bytes actually written form a correct, fully-framed request. + assert b"".join(bytes(chunk) for chunk in stream.writes) == ( + b"POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 13\r\n\r\nHello, world!" + ) + + + +def test_http11_request_body_buffer_chunked() -> None: + """ + With chunked transfer encoding the body is wrapped in framing, so the + passthrough list has multiple fragments. `_send_event` coalesces these into + a single write (its `else` branch). + """ + origin = httpcore2.Origin(b"https", b"example.com", 443) + stream = RecordingStream([b"HTTP/1.1 200 OK\r\n", b"Content-Length: 0\r\n", b"\r\n"]) + body = memoryview(bytearray(b"Hello, world!")) + + def stream_body() -> typing.Iterator[bytes]: + yield body # type: ignore[misc] + + with httpcore2.HTTP11Connection(origin=origin, stream=stream) as conn: + # No Content-Length, so the request is sent with chunked framing. + response = conn.request("POST", "https://example.com/", content=stream_body()) + assert response.status == 200 + + raw = b"".join(bytes(chunk) for chunk in stream.writes) + assert b"transfer-encoding: chunked" in raw.lower() + # 0xd == 13: one data chunk followed by the terminating zero-length chunk. + assert raw.endswith(b"\r\nd\r\nHello, world!\r\n0\r\n\r\n") + + + +def test_http11_request_body_buffer_itemsize_normalised() -> None: + """ + A `memoryview` with itemsize > 1 is normalised to a flat byte view before + being sent, so neither h11's `Content-Length` tracking (which uses `len()`) + nor the backend's byte-count slicing is misled into truncating the body. + """ + import array + + origin = httpcore2.Origin(b"https", b"example.com", 443) + stream = RecordingStream([b"HTTP/1.1 200 OK\r\n", b"Content-Length: 0\r\n", b"\r\n"]) + body = memoryview(array.array("I", [0x01020304] * 4)) # 16 bytes, itemsize 4 + + def stream_body() -> typing.Iterator[bytes]: + yield body # type: ignore[misc] + + with httpcore2.HTTP11Connection(origin=origin, stream=stream) as conn: + response = conn.request( + "POST", + "https://example.com/", + headers={"Content-Length": str(body.nbytes)}, + content=stream_body(), + ) + assert response.status == 200 + + raw = b"".join(bytes(chunk) for chunk in stream.writes) + # All 16 bytes are written (not truncated to 4 elements), and the body + # reaches the stream as a byte-granular (itemsize 1) view. + assert b"Content-Length: 16\r\n" in raw + assert raw.endswith(b"\r\n\r\n" + bytes(body)) + body_writes = [w for w in stream.writes if isinstance(w, memoryview)] + assert body_writes and all(w.itemsize == 1 for w in body_writes) diff --git a/tests/httpx2/test_content.py b/tests/httpx2/test_content.py index 426705cf..d67526be 100644 --- a/tests/httpx2/test_content.py +++ b/tests/httpx2/test_content.py @@ -50,6 +50,52 @@ async def test_bytes_content() -> None: assert async_content == b"Hello, world!" +@pytest.mark.anyio +async def test_memoryview_content() -> None: + request = httpx2.Request(method, url, content=memoryview(b"Hello, world!")) + assert isinstance(request.stream, typing.Iterable) + assert isinstance(request.stream, typing.AsyncIterable) + + assert request.headers == {"Host": "www.example.com", "Content-Length": "13"} + + # A bytes-like buffer is treated as (replayable) streaming content: it is + # not eagerly copied into a `bytes` object, so - like other streaming + # bodies - the content is not available until it has been explicitly read. + with pytest.raises(httpx2.RequestNotRead): + request.content # noqa: B018 + + sync_content = b"".join(list(request.stream)) + async_content = b"".join([part async for part in request.stream]) + assert sync_content == b"Hello, world!" + assert async_content == b"Hello, world!" + + +def test_memoryview_content_is_not_copied() -> None: + # The memoryview passed as content is yielded through as-is rather than + # being copied up front. This is what lets a large buffer (e.g. a + # `memoryview` over an `mmap`) be faulted into memory only as it is written + # out, rather than all at once. We verify it by mutating the underlying + # buffer after building the request and observing the change downstream. + # (`BufferStream.__iter__`/`__aiter__` both yield the same view, so the + # sync path is representative.) + data = bytearray(b"Hello, world!") + request = httpx2.Request(method, url, content=memoryview(data)) + data[0:5] = b"HELLO" + + assert isinstance(request.stream, typing.Iterable) + (chunk,) = list(request.stream) + assert bytes(chunk) == b"HELLO, world!" + + +def test_noncontiguous_memoryview_content_raises() -> None: + # A non-contiguous view can't be sent without copying, so we reject it + # rather than silently materialising it (the opposite of what passing a + # memoryview asks for). + strided = memoryview(bytearray(b"0123456789"))[::2] + with pytest.raises(TypeError, match="must be C-contiguous"): + httpx2.Request(method, url, content=strided) + + @pytest.mark.anyio async def test_bytesio_content() -> None: request = httpx2.Request(method, url, content=io.BytesIO(b"Hello, world!"))