diff --git a/CHANGES/12309.doc.rst b/CHANGES/12309.doc.rst new file mode 100644 index 00000000000..22d6d21e992 --- /dev/null +++ b/CHANGES/12309.doc.rst @@ -0,0 +1,3 @@ +Added docstrings to all public methods of :class:`~aiohttp.StreamReader` +and :class:`~aiohttp.DataQueue` in ``aiohttp/streams.py`` +-- by :user:`mastash3ff`. diff --git a/aiohttp/streams.py b/aiohttp/streams.py index bacb810958b..0030f67ed06 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -30,6 +30,7 @@ class EofStream(Exception): class AsyncStreamIterator(Generic[_T]): + """Async iterator wrapper for stream read functions.""" __slots__ = ("read_func",) @@ -50,6 +51,7 @@ async def __anext__(self) -> _T: class ChunkTupleAsyncStreamIterator: + """Async iterator yielding ``(bytes, bool)`` tuples from a stream.""" __slots__ = ("_stream",) @@ -67,6 +69,7 @@ async def __anext__(self) -> tuple[bytes, bool]: class AsyncStreamReaderMixin: + """Mixin providing async iteration helpers for stream readers.""" __slots__ = () @@ -174,9 +177,11 @@ def __repr__(self) -> str: return "<%s>" % " ".join(info) def get_read_buffer_limits(self) -> tuple[int, int]: + """Return the low-water and high-water marks for the read buffer.""" return (self._low_water, self._high_water) def exception(self) -> type[BaseException] | BaseException | None: + """Return the stored exception, or ``None`` if no exception was set.""" return self._exception def set_exception( @@ -184,6 +189,11 @@ def set_exception( exc: type[BaseException] | BaseException, exc_cause: BaseException = _EXC_SENTINEL, ) -> None: + """Set the exception and wake up any waiters. + + Any pending ``read``, ``readany``, ``readline``, or ``wait_eof`` + calls will raise *exc*. + """ self._exception = exc self._eof_callbacks.clear() @@ -198,6 +208,10 @@ def set_exception( set_exception(waiter, exc, exc_cause) def on_eof(self, callback: Callable[[], None]) -> None: + """Register a callback to be called when EOF is reached. + + If EOF has already been fed, the callback is invoked immediately. + """ if self._eof: try: callback() @@ -207,6 +221,10 @@ def on_eof(self, callback: Callable[[], None]) -> None: self._eof_callbacks.append(callback) def feed_eof(self) -> None: + """Signal the end of the data stream. + + Wake up any pending readers and invoke all registered EOF callbacks. + """ self._eof = True waiter = self._waiter @@ -239,6 +257,7 @@ def at_eof(self) -> bool: return self._eof and not self._buffer async def wait_eof(self) -> None: + """Wait until ``feed_eof`` is called.""" if self._eof: return @@ -275,6 +294,10 @@ def unread_data(self, data: bytes) -> None: self._eof_counter = 0 def feed_data(self, data: bytes) -> None: + """Feed *data* into the internal buffer and wake up any pending reader. + + Pauses the underlying protocol when the buffer exceeds the high-water mark. + """ assert not self._eof, "feed_data after feed_eof" if not data: @@ -294,6 +317,7 @@ def feed_data(self, data: bytes) -> None: self._protocol.pause_reading() def begin_http_chunk_receiving(self) -> None: + """Mark the beginning of an HTTP chunk in chunked transfer encoding.""" if self._http_chunk_splits is None: if self.total_bytes: raise RuntimeError( @@ -302,6 +326,7 @@ def begin_http_chunk_receiving(self) -> None: self._http_chunk_splits = collections.deque() def end_http_chunk_receiving(self) -> None: + """Mark the end of an HTTP chunk in chunked transfer encoding.""" if self._http_chunk_splits is None: raise RuntimeError( "Called end_chunk_receiving without calling " @@ -362,11 +387,24 @@ async def _wait(self, func_name: str) -> None: self._waiter = None async def readline(self, *, max_line_length: int | None = None) -> bytes: + """Read one line, where "line" is a sequence of bytes ending with ``\\n``. + + If *max_line_length* is given and the line exceeds that limit, + :exc:`LineTooLong` is raised. + """ return await self.readuntil(max_size=max_line_length) async def readuntil( self, separator: bytes = b"\n", *, max_size: int | None = None ) -> bytes: + """Read data until *separator* is found. + + If *max_size* is given and the accumulated data exceeds that limit + before the separator is found, :exc:`LineTooLong` is raised. + + Returns the data read including the separator, or the remaining + data if EOF is reached before the separator. + """ seplen = len(separator) if seplen == 0: raise ValueError("Separator should be at least one-byte string") @@ -404,6 +442,7 @@ async def readuntil( return chunk async def read(self, n: int = -1) -> bytes: + """Read up to *n* bytes. If *n* is ``-1``, read until EOF.""" if self._exception is not None: raise self._exception @@ -432,6 +471,11 @@ async def read(self, n: int = -1) -> bytes: return self._read_nowait(n) async def readany(self) -> bytes: + """Read available data. + + Returns immediately if data is buffered, otherwise waits for at + least one byte. Returns ``b""`` at EOF. + """ if self._exception is not None: raise self._exception @@ -478,6 +522,11 @@ async def readchunk(self) -> tuple[bytes, bool]: await self._wait("readchunk") async def readexactly(self, n: int) -> bytes: + """Read exactly *n* bytes. + + Raise :exc:`asyncio.IncompleteReadError` if EOF is reached before + *n* bytes are read. + """ if self._exception is not None: raise self._exception @@ -493,6 +542,13 @@ async def readexactly(self, n: int) -> bytes: return b"".join(blocks) def read_nowait(self, n: int = -1) -> bytes: + """Read up to *n* bytes without blocking. + + If *n* is ``-1``, return all currently buffered data. + + Raise :exc:`RuntimeError` if another coroutine is already waiting + for incoming data. + """ # default was changed to be consistent with .read(-1) # # I believe the most users don't know about the method and @@ -642,12 +698,15 @@ def __len__(self) -> int: return len(self._buffer) def is_eof(self) -> bool: + """Return ``True`` if ``feed_eof`` was called.""" return self._eof def at_eof(self) -> bool: + """Return ``True`` if the buffer is empty and ``feed_eof`` was called.""" return self._eof and not self._buffer def exception(self) -> type[BaseException] | BaseException | None: + """Return the stored exception, or ``None`` if no exception was set.""" return self._exception def set_exception( @@ -655,6 +714,7 @@ def set_exception( exc: type[BaseException] | BaseException, exc_cause: BaseException = _EXC_SENTINEL, ) -> None: + """Set the exception, mark EOF, and wake up any pending reader.""" self._eof = True self._exception = exc if (waiter := self._waiter) is not None: @@ -662,18 +722,26 @@ def set_exception( set_exception(waiter, exc, exc_cause) def feed_data(self, data: _T) -> None: + """Append *data* to the buffer and wake up any pending reader.""" self._buffer.append(data) if (waiter := self._waiter) is not None: self._waiter = None set_result(waiter, None) def feed_eof(self) -> None: + """Signal the end of the data stream and wake up any pending reader.""" self._eof = True if (waiter := self._waiter) is not None: self._waiter = None set_result(waiter, None) async def read(self) -> _T: + """Read and return the next item from the queue. + + Wait until an item is available if the buffer is empty. + + Raise :exc:`EofStream` if EOF was reached and the buffer is empty. + """ if not self._buffer and not self._eof: assert not self._waiter self._waiter = self._loop.create_future()