Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/12309.doc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added docstrings to all public methods of :class:`~aiohttp.StreamReader`
and :class:`~aiohttp.DataQueue` in ``aiohttp/streams.py``
-- by :user:`mastash3ff`.
68 changes: 68 additions & 0 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class EofStream(Exception):


class AsyncStreamIterator(Generic[_T]):
"""Async iterator wrapper for stream read functions."""

__slots__ = ("read_func",)

Expand All @@ -50,6 +51,7 @@ async def __anext__(self) -> _T:


class ChunkTupleAsyncStreamIterator:
"""Async iterator yielding ``(bytes, bool)`` tuples from a stream."""

__slots__ = ("_stream",)

Expand All @@ -67,6 +69,7 @@ async def __anext__(self) -> tuple[bytes, bool]:


class AsyncStreamReaderMixin:
"""Mixin providing async iteration helpers for stream readers."""

__slots__ = ()

Expand Down Expand Up @@ -174,16 +177,23 @@ def __repr__(self) -> str:
return "<%s>" % " ".join(info)

def get_read_buffer_limits(self) -> tuple[int, int]:
"""Return the low-water and high-water marks for the read buffer."""
return (self._low_water, self._high_water)

def exception(self) -> type[BaseException] | BaseException | None:
"""Return the stored exception, or ``None`` if no exception was set."""
return self._exception

def set_exception(
self,
exc: type[BaseException] | BaseException,
exc_cause: BaseException = _EXC_SENTINEL,
) -> None:
"""Set the exception and wake up any waiters.

Any pending ``read``, ``readany``, ``readline``, or ``wait_eof``
calls will raise *exc*.
"""
self._exception = exc
self._eof_callbacks.clear()

Expand All @@ -198,6 +208,10 @@ def set_exception(
set_exception(waiter, exc, exc_cause)

def on_eof(self, callback: Callable[[], None]) -> None:
"""Register a callback to be called when EOF is reached.

If EOF has already been fed, the callback is invoked immediately.
"""
if self._eof:
try:
callback()
Expand All @@ -207,6 +221,10 @@ def on_eof(self, callback: Callable[[], None]) -> None:
self._eof_callbacks.append(callback)

def feed_eof(self) -> None:
"""Signal the end of the data stream.

Wake up any pending readers and invoke all registered EOF callbacks.
"""
self._eof = True

waiter = self._waiter
Expand Down Expand Up @@ -239,6 +257,7 @@ def at_eof(self) -> bool:
return self._eof and not self._buffer

async def wait_eof(self) -> None:
"""Wait until ``feed_eof`` is called."""
if self._eof:
return

Expand Down Expand Up @@ -275,6 +294,10 @@ def unread_data(self, data: bytes) -> None:
self._eof_counter = 0

def feed_data(self, data: bytes) -> None:
"""Feed *data* into the internal buffer and wake up any pending reader.

Pauses the underlying protocol when the buffer exceeds the high-water mark.
"""
assert not self._eof, "feed_data after feed_eof"

if not data:
Expand All @@ -294,6 +317,7 @@ def feed_data(self, data: bytes) -> None:
self._protocol.pause_reading()

def begin_http_chunk_receiving(self) -> None:
"""Mark the beginning of an HTTP chunk in chunked transfer encoding."""
if self._http_chunk_splits is None:
if self.total_bytes:
raise RuntimeError(
Expand All @@ -302,6 +326,7 @@ def begin_http_chunk_receiving(self) -> None:
self._http_chunk_splits = collections.deque()

def end_http_chunk_receiving(self) -> None:
"""Mark the end of an HTTP chunk in chunked transfer encoding."""
if self._http_chunk_splits is None:
raise RuntimeError(
"Called end_chunk_receiving without calling "
Expand Down Expand Up @@ -362,11 +387,24 @@ async def _wait(self, func_name: str) -> None:
self._waiter = None

async def readline(self, *, max_line_length: int | None = None) -> bytes:
"""Read one line, where "line" is a sequence of bytes ending with ``\\n``.

If *max_line_length* is given and the line exceeds that limit,
:exc:`LineTooLong` is raised.
"""
return await self.readuntil(max_size=max_line_length)

async def readuntil(
self, separator: bytes = b"\n", *, max_size: int | None = None
) -> bytes:
"""Read data until *separator* is found.

If *max_size* is given and the accumulated data exceeds that limit
before the separator is found, :exc:`LineTooLong` is raised.

Returns the data read including the separator, or the remaining
data if EOF is reached before the separator.
"""
seplen = len(separator)
if seplen == 0:
raise ValueError("Separator should be at least one-byte string")
Expand Down Expand Up @@ -404,6 +442,7 @@ async def readuntil(
return chunk

async def read(self, n: int = -1) -> bytes:
"""Read up to *n* bytes. If *n* is ``-1``, read until EOF."""
if self._exception is not None:
raise self._exception

Expand Down Expand Up @@ -432,6 +471,11 @@ async def read(self, n: int = -1) -> bytes:
return self._read_nowait(n)

async def readany(self) -> bytes:
"""Read available data.

Returns immediately if data is buffered, otherwise waits for at
least one byte. Returns ``b""`` at EOF.
"""
if self._exception is not None:
raise self._exception

Expand Down Expand Up @@ -478,6 +522,11 @@ async def readchunk(self) -> tuple[bytes, bool]:
await self._wait("readchunk")

async def readexactly(self, n: int) -> bytes:
"""Read exactly *n* bytes.

Raise :exc:`asyncio.IncompleteReadError` if EOF is reached before
*n* bytes are read.
"""
if self._exception is not None:
raise self._exception

Expand All @@ -493,6 +542,13 @@ async def readexactly(self, n: int) -> bytes:
return b"".join(blocks)

def read_nowait(self, n: int = -1) -> bytes:
"""Read up to *n* bytes without blocking.

If *n* is ``-1``, return all currently buffered data.

Raise :exc:`RuntimeError` if another coroutine is already waiting
for incoming data.
"""
# default was changed to be consistent with .read(-1)
#
# I believe the most users don't know about the method and
Expand Down Expand Up @@ -642,38 +698,50 @@ def __len__(self) -> int:
return len(self._buffer)

def is_eof(self) -> bool:
"""Return ``True`` if ``feed_eof`` was called."""
return self._eof

def at_eof(self) -> bool:
"""Return ``True`` if the buffer is empty and ``feed_eof`` was called."""
return self._eof and not self._buffer

def exception(self) -> type[BaseException] | BaseException | None:
"""Return the stored exception, or ``None`` if no exception was set."""
return self._exception

def set_exception(
self,
exc: type[BaseException] | BaseException,
exc_cause: BaseException = _EXC_SENTINEL,
) -> None:
"""Set the exception, mark EOF, and wake up any pending reader."""
self._eof = True
self._exception = exc
if (waiter := self._waiter) is not None:
self._waiter = None
set_exception(waiter, exc, exc_cause)

def feed_data(self, data: _T) -> None:
"""Append *data* to the buffer and wake up any pending reader."""
self._buffer.append(data)
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)

def feed_eof(self) -> None:
"""Signal the end of the data stream and wake up any pending reader."""
self._eof = True
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)

async def read(self) -> _T:
"""Read and return the next item from the queue.

Wait until an item is available if the buffer is empty.

Raise :exc:`EofStream` if EOF was reached and the buffer is empty.
"""
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
Expand Down
Loading