diff --git a/README.md b/README.md index 1d9e638..6ebc68e 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,8 @@ native asynchronous APIs for all platforms. pip install serialx ``` -For drop-in import compatibility (`serial`, `serial_asyncio`, `serial_asyncio_fast`), install: +For drop-in import compatibility (`serial`, `serial_asyncio`, `serial_asyncio_fast`) in +environments where existing code cannot be migrated: ```console pip install serialx-compat ``` @@ -31,41 +32,64 @@ with serialx.serial_for_url("/dev/serial/by-id/port", baudrate=115200) as serial assert pins.dtr is serialx.PinState.HIGH ``` -A high-level asynchronous serial `(reader, writer)` pair: +An async equivalent of the synchronous API: ```Python import asyncio -import contextlib - import serialx async def main(): - reader, writer = await serialx.open_serial_connection("/dev/serial/by-id/port", baudrate=115200) + async with serialx.async_serial_for_url( + "/dev/serial/by-id/port", baudrate=115200, + ) as serial: + data = await serial.readexactly(5) + serial.write(b"test") + await serial.flush() + + await serial.set_modem_pins(rts=True, dtr=True) + pins = await serial.get_modem_pins() + assert pins.rts is serialx.PinState.HIGH +``` + +A `(StreamReader, StreamWriter)` pair is also available for code already wired up to +the asyncio streams API: + +```Python +import asyncio +import serialx - with contextlib.closing(writer): - data = await reader.readexactly(5) - writer.write(b"test") - await writer.drain() +async def main(): + reader, writer = await serialx.open_serial_connection( + "/dev/serial/by-id/port", baudrate=115200, + ) + + try: + data = await reader.readexactly(5) + writer.write(b"test") + await writer.drain() + finally: + writer.close() + await writer.wait_closed() ``` -And a low-level asynchronous serial transport: +And a low-level asynchronous serial transport for protocol-style consumers: ```Python import asyncio import serialx async def main(): - loop = asyncio.get_running_loop() - protocol = YourProtocol() + loop = asyncio.get_running_loop() + protocol = YourProtocol() - transport, protocol = await serialx.create_serial_connection( - loop, - lambda: protocol, - url="/dev/serial/by-id/port", - baudrate=115200 - ) + transport, protocol = await serialx.create_serial_connection( + loop, + lambda: protocol, + url="/dev/serial/by-id/port", + baudrate=115200, + ) - await transport.set_modem_pins(rts=True, dtr=True) + await transport.set_modem_pins(rts=True, dtr=True) ``` ## ESPHome serial proxy diff --git a/docs/api/platforms/posix.md b/docs/api/platforms/posix.md index ea34ec0..c51a29c 100644 --- a/docs/api/platforms/posix.md +++ b/docs/api/platforms/posix.md @@ -11,7 +11,3 @@ :members: :member-order: bysource ``` - -```{eval-rst} -.. autofunction:: serialx.platforms.serial_posix.posix_list_serial_ports -``` diff --git a/docs/how-to/async-serial.md b/docs/how-to/async-serial.md new file mode 100644 index 0000000..54c9917 --- /dev/null +++ b/docs/how-to/async-serial.md @@ -0,0 +1,99 @@ +# Async serial +Python's asyncio module introduces many similar async primitives. serialx provides APIs +for both high-level and low-level async code. + +## Async-with +`async with` is the simplest pattern and is suited for scripts and self-contained code: + +```python +import serialx + +async with serialx.async_serial_for_url( + "/dev/serial/by-id/port", baudrate=115200, +) as serial: + serial.write(b"ping") + data = await serial.readexactly(4) +``` + +Unlike the sync API, you have all of the asyncio primitives at your disposal, including +granular task cancellation, timeouts, and concurrency: + +```python +import asyncio + +async with serialx.async_serial_for_url( + "/dev/serial/by-id/port", baudrate=115200, +) as serial: + async with asyncio.TaskGroup() as tg: + async def ping() -> None: + while True: + serial.write(b"ping") + await serial.flush() + await asyncio.sleep(1) + + tg.create_task(ping()) + + async with asyncio.timeout(30): + data = await serial.readexactly(4) +``` + +### Manual open and close +The instance returned by `async_serial_for_url` is unopened. Open and close +explicitly when you need to keep the connection alive across function boundaries: + +```python +serial = serialx.async_serial_for_url( + "/dev/serial/by-id/port", baudrate=115200, +) + +await serial.open() + +try: + ... +finally: + await serial.close() +``` + +### Reading and writing +Reads are coroutines, writes are synchronous (data is buffered and drained on +demand): + +```python +data = await serial.read(64) # up to 64 bytes +chunk = await serial.readexactly(32) # exactly 32 bytes +line = await serial.readline() # through the next \n +header = await serial.readuntil(b"\r\n") # through a custom delimiter + +serial.write(b"hello ") +serial.write(b"world\n") +await serial.flush() # wait until the data has been written +``` + +### Modem pins +Modem control pins are async, since some transports (ESPHome, RFC2217) round-trip +to the device: + +```python +await serial.set_modem_pins(rts=True, dtr=True) +pins = await serial.get_modem_pins() +assert pins.rts is serialx.PinState.HIGH +``` + +## Async protocols and transports +While the high-level async API is useful for simple code, libraries and other +high-performance uses should use asyncio transports and protocols. These have the +benefit of allowing an `asyncio.Protocol` to immediately enqueue data in the same event +loop cycle as it is received. + +```python +import asyncio +import serialx + +loop = asyncio.get_running_loop() +transport, protocol = await serialx.create_serial_connection( + loop=loop, + protocol_factory=your_protocol_factory, + url="/dev/serial/by-id/port", + baudrate=115200, +) +``` diff --git a/docs/how-to/pyserial-migration.md b/docs/how-to/pyserial-migration.md index 9a92606..307cff1 100644 --- a/docs/how-to/pyserial-migration.md +++ b/docs/how-to/pyserial-migration.md @@ -145,3 +145,15 @@ if pins.cts is serialx.PinState.HIGH: ::: `set_modem_pins` accepts individual pin kwargs or a full `ModemPins` dataclass. Pins omitted from the call are left unchanged. `get_modem_pins` returns a `ModemPins` dataclass of `PinState` enum values, call `.to_bool()` on a pin for a `bool | None`. + +### Simplified async API +If you have existing sync code using `serial_for_url` and want to make it async, use `async_serial_for_url`. The method names match the sync API (e.g. `read`, `readexactly`, `readline`, `readuntil`, `write`, `flush`) so the migration is mostly adding `async`/`await`: + +```diff +-with serialx.serial_for_url("/dev/ttyUSB0", baudrate=115200) as serial: +- serial.write(b"ping") +- data = serial.readexactly(4) ++async with serialx.async_serial_for_url("/dev/ttyUSB0", baudrate=115200) as serial: ++ serial.write(b"ping") ++ data = await serial.readexactly(4) +``` diff --git a/docs/index.md b/docs/index.md index 4b78662..b9b8850 100644 --- a/docs/index.md +++ b/docs/index.md @@ -25,6 +25,7 @@ api :caption: How-to :hidden: +how-to/async-serial how-to/esphome how-to/pyodide how-to/pyserial-migration diff --git a/docs/quickstart.md b/docs/quickstart.md index 3351ff0..7c307cd 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -1,5 +1,4 @@ # Quickstart - Install `serialx`: ```bash @@ -16,15 +15,57 @@ with serialx.serial_for_url("/dev/serial/by-id/port", baudrate=115200) as serial data = serial.readexactly(4) ``` -Open a serial connection asynchronously: +Open a serial port asynchronously: ```python import serialx -reader, writer = await serialx.open_serial_connection( - "/dev/serial/by-id/port", - baudrate=115200, -) +async with serialx.async_serial_for_url( + "/dev/serial/by-id/port", baudrate=115200, +) as serial: + serial.write(b"ping") + data = await serial.readexactly(4) +``` + +A `(StreamReader, StreamWriter)` pair is also available for code already wired up to +the asyncio streams API: + +```Python +import asyncio +import serialx + +async def main(): + reader, writer = await serialx.open_serial_connection( + "/dev/serial/by-id/port", baudrate=115200, + ) + + try: + data = await reader.readexactly(5) + writer.write(b"test") + await writer.drain() + finally: + writer.close() + await writer.wait_closed() +``` + +And a low-level asynchronous serial transport for protocol-style consumers: + +```Python +import asyncio +import serialx + +async def main(): + loop = asyncio.get_running_loop() + protocol = YourProtocol() + + transport, protocol = await serialx.create_serial_connection( + loop, + lambda: protocol, + url="/dev/serial/by-id/port", + baudrate=115200, + ) + + await transport.set_modem_pins(rts=True, dtr=True) ``` For optional transports (ESPHome, RFC2217, socket) and compatibility notes, see diff --git a/docs/usage.md b/docs/usage.md index 2030daf..72a634e 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -10,7 +10,30 @@ with serialx.serial_for_url("/dev/serial/by-id/port", baudrate=115200) as serial serial.write(b"test") ``` +## Async +There are quite a few approaches in async Python. serialx supports all of the popular +asyncio primitives in addition to a simple async API. It's recommended to use the async +API over the sync API in general, as the async API allows for granular timeouts, +task cancellation, and concurrency. + +### Async (simple API) +For a simple translation of sync code into async code, you can use `serialx.async_serial_for_url`: +```python +import serialx + +async with serialx.async_serial_for_url("/dev/serial/by-id/port", baudrate=115200) as serial: + data = await serial.readexactly(5) + serial.write(b"test") + await serial.flush() +``` + +All functions, including `open` and `close`, are async and work exactly as they do with +the sync API. + ## Async (`StreamReader` and `StreamWriter`) +A `(StreamReader, StreamWriter)` pair is available for code already wired up to +the asyncio streams API: + ```python import serialx @@ -29,6 +52,8 @@ finally: ``` ## Async (transport) +For protocol-style consumers that want raw `asyncio.Protocol` callbacks: + ```python import asyncio import serialx diff --git a/serialx/__init__.py b/serialx/__init__.py index 8e482d0..5d37c1c 100644 --- a/serialx/__init__.py +++ b/serialx/__init__.py @@ -1,7 +1,9 @@ """serialx serial port implementation.""" from .async_serial import ( + AsyncSerial, SerialStreamWriter, + async_serial_for_url, create_serial_connection, open_serial_connection, ) @@ -42,6 +44,8 @@ from .platforms import Serial, SerialTransport __all__ = [ + "AsyncSerial", + "async_serial_for_url", "create_serial_connection", "get_serial_classes", "list_serial_ports", diff --git a/serialx/async_serial.py b/serialx/async_serial.py index 9df3873..58f68c6 100644 --- a/serialx/async_serial.py +++ b/serialx/async_serial.py @@ -3,11 +3,21 @@ from __future__ import annotations import asyncio -from collections.abc import Callable +from collections.abc import Callable, Iterable import logging from typing import Any, Generic, TypeVar, cast -from .common import BaseSerialTransport, Parity, StopBits, get_uri_handler +from typing_extensions import Self + +from .common import ( + BaseSerialTransport, + ModemPins, + Parity, + PinState, + SerialException, + StopBits, + get_uri_handler, +) LOGGER = logging.getLogger(__name__) @@ -23,6 +33,219 @@ def transport(self) -> _T: return cast(_T, super().transport) +class AsyncSerial: + """Async serial port with a sync-style API.""" + + def __init__( + self, + url: str | None, + *, + transport_cls: type[BaseSerialTransport] | None = None, + **kwargs: Any, + ) -> None: + """Initialize an unopened serial port. + + Directly creating this class is not recommended; use `async_serial_for_url` + instead. + """ + + self._url = url + self._connect_kwargs: dict[str, Any] = kwargs + self._transport_cls = transport_cls + + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._transport: BaseSerialTransport | None = None + + # ---- Lifecycle ---- + + async def open(self) -> None: + """Open the serial port connection.""" + if self._transport is not None: + raise SerialException("AsyncSerial is already open") + + loop = asyncio.get_running_loop() + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader, loop=loop) + transport, _ = await create_serial_connection( + loop, + lambda: protocol, + self._url, + transport_cls=self._transport_cls, + **self._connect_kwargs, + ) + self._reader = reader + self._writer = asyncio.StreamWriter(transport, protocol, reader, loop) + self._transport = transport + + async def close(self) -> None: + """Close the connection and wait until the port is fully closed.""" + if self._transport is None: + return + self._transport.close() + await self._transport.wait_closed() + self._reset() + + def schedule_close(self) -> None: + """Signal a graceful close without waiting for it to finish.""" + if self._transport is None: + return + self._transport.close() + + def abort(self) -> None: + """Drop pending writes and close immediately, without waiting.""" + if self._transport is None: + return + self._transport.abort() + + async def wait_closed(self) -> None: + """Wait until a previously-scheduled close or abort has finished.""" + if self._transport is None: + return + await self._transport.wait_closed() + self._reset() + + def _reset(self) -> None: + self._reader = None + self._writer = None + self._transport = None + + @property + def is_open(self) -> bool: + """Whether the connection is currently open.""" + return self._transport is not None and not self._transport.is_closing() + + async def __aenter__(self) -> Self: + """Open the connection and return self.""" + await self.open() + return self + + async def __aexit__(self, *exc: object) -> None: + """Close the connection and wait until it's fully closed.""" + await self.close() + + def __repr__(self) -> str: + """Return a debug representation.""" + return f"" + + # ---- Reads ---- + + async def read(self, n: int = -1) -> bytes: + """Read up to `n` bytes (or until EOF if `n` is -1).""" + return await self._require_reader().read(n) + + async def readexactly(self, n: int) -> bytes: + """Read exactly `n` bytes.""" + return await self._require_reader().readexactly(n) + + async def readuntil(self, separator: bytes = b"\n") -> bytes: + """Read up to and including `separator`.""" + return await self._require_reader().readuntil(separator) + + async def readline(self) -> bytes: + """Read until the next newline.""" + return await self._require_reader().readline() + + def _require_reader(self) -> asyncio.StreamReader: + if self._reader is None: + raise SerialException("AsyncSerial is not open") + return self._reader + + # ---- Writes ---- + + def write(self, data: bytes | bytearray | memoryview) -> None: + """Queue data for writing.""" + self._require_writer().write(data) + + def writelines(self, data: Iterable[bytes | bytearray | memoryview]) -> None: + """Queue an iterable of buffers for writing.""" + self._require_writer().writelines(data) + + async def drain(self) -> None: + """Wait until the application-level write buffer can accept more data.""" + await self._require_writer().drain() + + async def flush(self) -> None: + """Drain app-level buffer, then wait for the OS-level buffer to flush.""" + await self.drain() + await self.transport.flush() + + def _require_writer(self) -> asyncio.StreamWriter: + if self._writer is None: + raise SerialException("AsyncSerial is not open") + return self._writer + + # ---- Transport access ---- + + @property + def transport(self) -> BaseSerialTransport: + """Return the underlying serial transport.""" + if self._transport is None: + raise SerialException("AsyncSerial is not open") + return self._transport + + # ---- Modem pins (proxy to transport) ---- + + async def get_modem_pins(self) -> ModemPins: + """Get modem control pins.""" + return await self.transport.get_modem_pins() + + async def set_modem_pins( + self, + modem_pins: ModemPins | None = None, + *, + le: PinState | bool | None = PinState.UNDEFINED, + dtr: PinState | bool | None = PinState.UNDEFINED, + rts: PinState | bool | None = PinState.UNDEFINED, + st: PinState | bool | None = PinState.UNDEFINED, + sr: PinState | bool | None = PinState.UNDEFINED, + cts: PinState | bool | None = PinState.UNDEFINED, + car: PinState | bool | None = PinState.UNDEFINED, + rng: PinState | bool | None = PinState.UNDEFINED, + dsr: PinState | bool | None = PinState.UNDEFINED, + ) -> None: + """Set modem control pins.""" + await self.transport.set_modem_pins( + modem_pins, + le=le, + dtr=dtr, + rts=rts, + st=st, + sr=sr, + cts=cts, + car=car, + rng=rng, + dsr=dsr, + ) + + # ---- Settings (proxy to transport) ---- + + @property + def baudrate(self) -> int: + """Get the baud rate.""" + return self.transport.baudrate + + @property + def parity(self) -> Parity: + """Get the parity.""" + return self.transport.parity + + @property + def stopbits(self) -> StopBits: + """Get the number of stop bits.""" + return self.transport.stopbits + + @property + def byte_size(self) -> int: + """Get the byte size.""" + return self.transport.byte_size + + @property + def exclusive(self) -> bool: + """Get the exclusive setting.""" + return self.transport.exclusive + + async def create_serial_connection( loop: asyncio.AbstractEventLoop, protocol_factory: Callable[[], asyncio.Protocol], @@ -81,3 +304,13 @@ async def open_serial_connection( ) return reader, writer + + +def async_serial_for_url( + url: str | None, + *, + transport_cls: type[BaseSerialTransport] | None = None, + **kwargs: Any, +) -> AsyncSerial: + """Build an unopened AsyncSerial. Use `async with` or `await serial.open()`.""" + return AsyncSerial(url, transport_cls=transport_cls, **kwargs) diff --git a/serialx/common.py b/serialx/common.py index 273466e..2ffe7d3 100644 --- a/serialx/common.py +++ b/serialx/common.py @@ -20,7 +20,7 @@ import urllib.parse import warnings -from typing_extensions import Buffer, Self +from typing_extensions import Buffer, Self, TypedDict, Unpack class Platform(str, Enum): @@ -194,6 +194,20 @@ class Parity(str, Enum): SPACE = "S" +class ConnectKwargs( # type: ignore[call-arg] # PEP 728 not in mypy yet + TypedDict, total=False, extra_items=Any +): + """Kwargs forwarded to BaseSerialTransport.connect / _connect.""" + + baudrate: int + parity: Parity + stopbits: StopBits + xonxoff: bool + rtscts: bool + exclusive: bool + byte_size: int + + class PinState(Enum): """Pin state.""" @@ -934,7 +948,9 @@ def exclusive(self) -> bool: return self._serial.exclusive @abstractmethod - async def _connect(self, **kwargs: Any) -> None: + async def _connect( + self, *, path: str | None, **kwargs: Unpack[ConnectKwargs] + ) -> None: """Connect to serial port.""" raise NotImplementedError @@ -942,13 +958,7 @@ async def connect( self, *, path: str | None, - baudrate: int, - parity: Parity = Parity.NONE, - stopbits: StopBits = StopBits.ONE, - xonxoff: bool = False, - rtscts: bool = False, - byte_size: int = 8, - **kwargs: Any, + **kwargs: Unpack[ConnectKwargs], ) -> None: """Connect to serial port.""" if path is not None: @@ -959,16 +969,7 @@ async def connect( ) try: - await self._connect( - path=path, - baudrate=baudrate, - parity=parity, - stopbits=stopbits, - xonxoff=xonxoff, - rtscts=rtscts, - byte_size=byte_size, - **kwargs, - ) + await self._connect(path=path, **kwargs) except BaseException: # Intentionally catch cancellation too: callers should only observe # connect failure/cancel after transport resources are released. diff --git a/serialx/descriptor_transport.py b/serialx/descriptor_transport.py index 7a23b59..1ede6f7 100644 --- a/serialx/descriptor_transport.py +++ b/serialx/descriptor_transport.py @@ -10,7 +10,9 @@ from typing import Any import warnings -from .common import BaseSerialTransport +from typing_extensions import Unpack + +from .common import BaseSerialTransport, ConnectKwargs LOGGER = logging.getLogger(__name__) LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5 @@ -110,7 +112,9 @@ def _on_cancelled_open_done(self, open_fut: asyncio.Future[int]) -> None: self._closing = True self._maybe_background_close(None) - async def _connect(self, **_kwargs: Any) -> None: + async def _connect( + self, *, path: str | None = None, **_kwargs: Unpack[ConnectKwargs] + ) -> None: assert self._fileno is not None self._loop.add_reader(self._fileno, self._read_ready) self._connection_made = True diff --git a/serialx/platforms/serial_esphome.py b/serialx/platforms/serial_esphome.py index 0f870c7..907ace0 100644 --- a/serialx/platforms/serial_esphome.py +++ b/serialx/platforms/serial_esphome.py @@ -41,12 +41,13 @@ PingResponse, TimeoutAPIError, ) -from typing_extensions import Buffer +from typing_extensions import Buffer, Unpack from serialx import SerialException, UnsupportedSetting from serialx.common import ( BaseSerial, BaseSerialTransport, + ConnectKwargs, ModemPins, Parity, PinState, @@ -596,8 +597,10 @@ def __init__( self._close_task: asyncio.Task[None] | None = None @translate_esphome_errors - async def _connect(self, **kwargs: Any) -> None: - self._serial = self._serial_cls(loop=self._loop, **kwargs) + async def _connect( + self, *, path: str | None = None, **kwargs: Unpack[ConnectKwargs] + ) -> None: + self._serial = self._serial_cls(loop=self._loop, path=path, **kwargs) self._extra["serial"] = self._serial assert self._serial is not None diff --git a/serialx/platforms/serial_rfc2217/__init__.py b/serialx/platforms/serial_rfc2217/__init__.py index d9f7eb9..f84959e 100644 --- a/serialx/platforms/serial_rfc2217/__init__.py +++ b/serialx/platforms/serial_rfc2217/__init__.py @@ -16,10 +16,11 @@ from typing import Any, overload -from typing_extensions import Buffer +from typing_extensions import Buffer, Unpack from ...common import ( BaseSerialTransport, + ConnectKwargs, ModemPins, Parity, PinState, @@ -721,10 +722,12 @@ def __init__( async def _connect( self, - **kwargs: Any, + *, + path: str | None = None, + **kwargs: Unpack[ConnectKwargs], ) -> None: """Connect to the RFC 2217 server and negotiate COM-PORT-OPTION.""" - self._serial = RFC2217Serial(**kwargs) + self._serial = RFC2217Serial(path=path, **kwargs) assert self._serial is not None self._extra["serial"] = self._serial diff --git a/serialx/platforms/serial_win32.py b/serialx/platforms/serial_win32.py index 3320152..86c92a5 100644 --- a/serialx/platforms/serial_win32.py +++ b/serialx/platforms/serial_win32.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, cast import pywintypes -from typing_extensions import Buffer +from typing_extensions import Buffer, Unpack from win32con import ( DTR_CONTROL_ENABLE, DTR_CONTROL_HANDSHAKE, @@ -67,6 +67,7 @@ from ..common import ( BaseSerial, BaseSerialTransport, + ConnectKwargs, ModemPins, Parity, PinState, @@ -567,7 +568,9 @@ async def _open( self._open_fut = None self._handle = handle - async def _connect(self, **kwargs: Any) -> None: + async def _connect( + self, *, path: str | None = None, **kwargs: Unpack[ConnectKwargs] + ) -> None: """Connect to the serial port.""" if self._closing: self._resolve_closed_waiter() @@ -575,7 +578,6 @@ async def _connect(self, **kwargs: Any) -> None: self._connect_in_progress = True - path = kwargs.pop("path", None) if path is None: raise ValueError("A serial path is required") @@ -588,7 +590,7 @@ async def _connect(self, **kwargs: Any) -> None: # If 0 (default), ReadFile with default timeouts might wait for full buffer. original_inter_byte_timeout = kwargs.get("inter_byte_timeout", 0) if original_inter_byte_timeout == 0: - kwargs["inter_byte_timeout"] = 0.01 + kwargs["inter_byte_timeout"] = 0.01 # type: ignore[typeddict-unknown-key] self._serial = Win32Serial( **kwargs, diff --git a/tests/common.py b/tests/common.py index a9d2a32..b098b37 100644 --- a/tests/common.py +++ b/tests/common.py @@ -21,7 +21,6 @@ from typing_extensions import Self import serialx -from serialx.common import BaseSerialTransport _PYODIDE_PAIR_COUNTER = 0 @@ -576,48 +575,17 @@ def create_hub4com_pair( @contextlib.asynccontextmanager -async def async_create_reader_writer( - *args: Any, **kwargs: Any -) -> AsyncIterator[ - tuple[asyncio.StreamReader, serialx.SerialStreamWriter[BaseSerialTransport]] -]: - """Create a single reader/writer pair.""" - reader, writer = await serialx.open_serial_connection(*args, **kwargs) - - try: - yield (reader, writer) - finally: - writer.close() - await writer.wait_closed() - - -@contextlib.asynccontextmanager -async def async_create_reader_writer_pair( +async def async_create_serial_pair( left: str, right: str, **kwargs: Any, -) -> AsyncIterator[ - tuple[ - asyncio.StreamReader, - serialx.SerialStreamWriter[BaseSerialTransport], - asyncio.StreamReader, - serialx.SerialStreamWriter[BaseSerialTransport], - ] -]: - """Create reader/writer pairs for both sides of a socat connection. - - Returns (reader_left, writer_left, reader_right, writer_right). - """ - reader_left, writer_left = await serialx.open_serial_connection(left, **kwargs) - reader_right, writer_right = await serialx.open_serial_connection(right, **kwargs) - - try: - yield (reader_left, writer_left, reader_right, writer_right) - finally: - writer_left.close() - writer_right.close() - await writer_left.wait_closed() - await writer_right.wait_closed() +) -> AsyncIterator[tuple[serialx.AsyncSerial, serialx.AsyncSerial]]: + """Create AsyncSerial objects for both sides of a socat connection.""" + async with ( + serialx.async_serial_for_url(left, **kwargs) as ser_left, + serialx.async_serial_for_url(right, **kwargs) as ser_right, + ): + yield ser_left, ser_right @contextlib.contextmanager diff --git a/tests/test_async_serial.py b/tests/test_async_serial.py new file mode 100644 index 0000000..2fabe10 --- /dev/null +++ b/tests/test_async_serial.py @@ -0,0 +1,95 @@ +"""Tests for the AsyncSerial API.""" + +import pytest + +from serialx import SerialException, async_serial_for_url +from tests.common import SerialPair + + +async def test_unopened_state() -> None: + """A freshly-constructed AsyncSerial reports closed and has no transport.""" + serial = async_serial_for_url("socket://1.2.3.4:5678", baudrate=115200) + assert serial.is_open is False + await serial.close() + assert serial.is_open is False + + +async def test_repr_unopened() -> None: + """repr() works on an unopened instance and reports url + null transport.""" + serial = async_serial_for_url("socket://1.2.3.4:5678", baudrate=115200) + text = repr(serial) + assert "AsyncSerial" in text + assert "url='socket://1.2.3.4:5678'" in text + assert "transport=None" in text + + +async def test_async_with_opens_and_closes(serial_pair: SerialPair) -> None: + """`async with` opens on enter and closes on exit.""" + serial = async_serial_for_url(serial_pair.left, baudrate=115200) + assert serial.is_open is False + + async with serial: + assert serial.is_open is True + assert serial.baudrate == 115200 + + assert serial.is_open is False + + +async def test_manual_open_close(serial_pair: SerialPair) -> None: + """Users can open() and await close() manually.""" + serial = async_serial_for_url(serial_pair.left, baudrate=115200) + await serial.open() + assert serial.is_open is True + assert serial.baudrate == 115200 + await serial.close() + assert serial.is_open is False + + +async def test_double_open_raises(serial_pair: SerialPair) -> None: + """Calling open() on an already-open instance raises SerialException.""" + async with async_serial_for_url(serial_pair.left, baudrate=115200) as serial: + with pytest.raises(SerialException, match="already open"): + await serial.open() + + +async def test_reopen_after_close(serial_pair: SerialPair) -> None: + """The same instance can be re-opened after close, like sync Serial.""" + serial = async_serial_for_url(serial_pair.left, baudrate=115200) + + async with serial: + assert serial.is_open is True + assert serial.is_open is False + + async with serial: + assert serial.is_open is True + assert serial.is_open is False + + +async def test_schedule_close_then_wait(serial_pair: SerialPair) -> None: + """schedule_close() returns immediately; wait_closed() finishes the close.""" + serial = async_serial_for_url(serial_pair.left, baudrate=115200) + await serial.open() + assert serial.is_open is True + serial.schedule_close() + await serial.wait_closed() + assert serial.is_open is False + + +async def test_abort(serial_pair: SerialPair) -> None: + """abort() drops pending writes and triggers close immediately.""" + serial = async_serial_for_url(serial_pair.left, baudrate=115200) + await serial.open() + serial.write(b"this may be dropped") + serial.abort() + await serial.wait_closed() + assert serial.is_open is False + + +async def test_read_when_unopened_raises() -> None: + """Reading or writing on a never-opened instance raises SerialException.""" + serial = async_serial_for_url("socket://1.2.3.4:5678", baudrate=115200) + with pytest.raises(SerialException, match="not open"): + await serial.read(1) + + with pytest.raises(SerialException, match="not open"): + serial.write(b"x") diff --git a/tests/test_async_transports.py b/tests/test_async_transports.py index 3a1d862..53230b9 100644 --- a/tests/test_async_transports.py +++ b/tests/test_async_transports.py @@ -14,6 +14,7 @@ import pytest +import serialx from serialx import ( BaseSerialTransport, ModemPins, @@ -27,8 +28,7 @@ SerialBackend, SerialPair, SerialQuirk, - async_create_reader_writer, - async_create_reader_writer_pair, + async_create_serial_pair, ) LOGGER = logging.getLogger(__name__) @@ -39,68 +39,120 @@ async def test_async_all_bytes(serial_pair: SerialPair) -> None: """Test that all bytes 0-255 can be transmitted.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = bytes(range(256)) - writer_left.write(data) - result = await reader_right.readexactly(len(data)) + left.write(data) + result = await right.readexactly(len(data)) assert result == data async def test_async_segmented_binary_data(serial_pair: SerialPair) -> None: """Test binary data sent in segments.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): segment_size = 16 data = bytes(range(256)) for i in range(0, 256, segment_size): segment = data[i : i + segment_size] - writer_left.write(segment) - result = await reader_right.readexactly(len(segment)) + left.write(segment) + result = await right.readexactly(len(segment)) assert result == segment @pytest.mark.parametrize("size", [1, 16, 64, 256, 512, 1024]) async def test_async_binary_payload_sizes(serial_pair: SerialPair, size: int) -> None: """Test various binary payload sizes.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = bytes([i % 256 for i in range(size)]) - writer_left.write(data) - result = await reader_right.readexactly(len(data)) + left.write(data) + result = await right.readexactly(len(data)) assert result == data async def test_async_null_bytes(serial_pair: SerialPair) -> None: """Test that null bytes (0x00) can be transmitted.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): null_data = b"\x00" * 64 - writer_left.write(null_data) - result = await reader_right.readexactly(len(null_data)) + left.write(null_data) + result = await right.readexactly(len(null_data)) assert result == null_data +async def test_async_readuntil(serial_pair: SerialPair) -> None: + """Test readuntil reads up to and including the default newline separator.""" + async with async_create_serial_pair( + serial_pair.left, serial_pair.right, baudrate=115200 + ) as (left, right): + left.write(b"hello\nworld\n") + assert await right.readuntil() == b"hello\n" + assert await right.readuntil(b"\n") == b"world\n" + + +async def test_async_readuntil_custom_separator(serial_pair: SerialPair) -> None: + """Test readuntil with a multi-byte custom separator.""" + async with async_create_serial_pair( + serial_pair.left, serial_pair.right, baudrate=115200 + ) as (left, right): + left.write(b"first||second||tail") + assert await right.readuntil(b"||") == b"first||" + assert await right.readuntil(b"||") == b"second||" + + +async def test_async_readuntil_repeated_separator(serial_pair: SerialPair) -> None: + """Test readuntil with consecutive separators that don't align as framing.""" + async with async_create_serial_pair( + serial_pair.left, serial_pair.right, baudrate=115200 + ) as (left, right): + left.write(b"foo|||||bar||tail||") + assert await right.readuntil(b"||") == b"foo||" + assert await right.readuntil(b"||") == b"||" + assert await right.readuntil(b"||") == b"|bar||" + assert await right.readuntil(b"||") == b"tail||" + + +async def test_async_readline(serial_pair: SerialPair) -> None: + """Test readline returns successive newline-terminated lines.""" + async with async_create_serial_pair( + serial_pair.left, serial_pair.right, baudrate=115200 + ) as (left, right): + left.write(b"alpha\nbeta\ngamma\n") + assert await right.readline() == b"alpha\n" + assert await right.readline() == b"beta\n" + assert await right.readline() == b"gamma\n" + + +async def test_async_writelines(serial_pair: SerialPair) -> None: + """Test writelines writes an iterable of buffers in order.""" + async with async_create_serial_pair( + serial_pair.left, serial_pair.right, baudrate=115200 + ) as (left, right): + left.writelines([b"foo", b"bar", b"baz"]) + assert await right.readexactly(9) == b"foobarbaz" + + async def test_async_overlapping_read_write(serial_pair: SerialPair) -> None: """Test that read and write can overlap, data is buffered.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = bytes(range(256)) read = b"" - writer_left.write(data[:100]) - read += await reader_right.readexactly(10) - writer_left.write(data[100:150]) - read += await reader_right.readexactly(10) - writer_left.write(data[150:]) - read += await reader_right.readexactly(10) - read += await reader_right.readexactly(256 - 30) + left.write(data[:100]) + read += await right.readexactly(10) + left.write(data[100:150]) + read += await right.readexactly(10) + left.write(data[150:]) + read += await right.readexactly(10) + read += await right.readexactly(256 - 30) assert read == data @@ -133,12 +185,12 @@ async def test_async_random_large( ): pytest.xfail("macOS termios lacks constants above B230400") - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=baudrate - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = os.urandom(chunk_size) - writer_left.write(data) - read_data = await reader_right.readexactly(chunk_size) + left.write(data) + read_data = await right.readexactly(chunk_size) assert read_data == data @@ -147,30 +199,30 @@ async def test_async_repeated_write_read_cycles( serial_pair: SerialPair, iterations: int ) -> None: """Test repeated write/read cycles.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = bytes(range(256)) for _ in range(iterations): - writer_left.write(data) - result = await reader_right.readexactly(len(data)) + left.write(data) + result = await right.readexactly(len(data)) assert result == data async def test_async_buffered_writes_then_read(serial_pair: SerialPair) -> None: """Test multiple writes followed by a single read.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): chunk = bytes(range(256)) iterations = 4 for _ in range(iterations): - writer_left.write(chunk) + left.write(chunk) total_size = len(chunk) * iterations - result = await reader_right.readexactly(total_size) + result = await right.readexactly(total_size) expected = chunk * iterations assert result == expected @@ -184,27 +236,27 @@ async def test_async_large_payload(serial_pair: SerialPair, payload_size: int) - ): pytest.xfail("macOS termios lacks constants above B230400") - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=921600 - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = bytes([i % 256 for i in range(payload_size)]) - writer_left.write(data) - result = await reader_right.readexactly(len(data)) + left.write(data) + result = await right.readexactly(len(data)) assert result == data async def test_async_rapid_small_writes(serial_pair: SerialPair) -> None: """Test rapid succession of small writes.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): iterations = 256 received = bytearray() for i in range(iterations): data = bytes([i % 256]) - writer_left.write(data) - result = await reader_right.readexactly(1) + left.write(data) + result = await right.readexactly(1) received.extend(result) expected = bytes([i % 256 for i in range(iterations)]) @@ -226,13 +278,13 @@ async def test_async_sustained_throughput( ): pytest.xfail("macOS termios lacks constants above B230400") - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=baudrate - ) as (_, writer_left, reader_right, _): + ) as (left, right): chunk = os.urandom(1024) for _ in range(iterations): - writer_left.write(chunk) - result = await reader_right.readexactly(len(chunk)) + left.write(chunk) + result = await right.readexactly(len(chunk)) assert result == chunk @@ -254,12 +306,11 @@ async def test_async_valid_baudrates(serial_pair: SerialPair, baudrate: int) -> ): pytest.xfail("macOS termios lacks constants above B230400") - async with async_create_reader_writer(serial_pair.left, baudrate=baudrate) as ( - _, - writer, - ): - assert writer.transport.baudrate == baudrate - writer.write(b"test") + async with serialx.async_serial_for_url( + serial_pair.left, baudrate=baudrate + ) as left: + assert left.baudrate == baudrate + left.write(b"test") async def test_async_nonstandard_baudrate(serial_pair: SerialPair) -> None: @@ -270,12 +321,12 @@ async def test_async_nonstandard_baudrate(serial_pair: SerialPair) -> None: if sys.platform == "darwin" and SerialBackend.SER2NET in serial_pair.backends: pytest.xfail("macOS termios lacks constants above B230400") - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=200000 - ) as (_, writer_left, reader_right, _): - assert writer_left.transport.baudrate == 200000 - writer_left.write(b"test") - assert await reader_right.readexactly(4) == b"test" + ) as (left, right): + assert left.baudrate == 200000 + left.write(b"test") + assert await right.readexactly(4) == b"test" @pytest.mark.parametrize( @@ -298,11 +349,11 @@ async def test_async_valid_parity(serial_pair: SerialPair, parity: Parity) -> No ): pytest.skip("MARK/SPACE parity requires CMSPAR (Linux) or Win32") - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, parity=parity - ) as (_, writer): - assert writer.transport.parity == parity - writer.write(b"test") + ) as left: + assert left.parity == parity + left.write(b"test") @pytest.mark.parametrize( @@ -334,11 +385,11 @@ async def test_async_valid_stopbits( ): pytest.skip("1.5 stop bits only supported on Win32") - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, stopbits=stopbits - ) as (_, writer): - assert writer.transport.stopbits == expected - writer.write(b"test") + ) as left: + assert left.stopbits == expected + left.write(b"test") @pytest.mark.parametrize("byte_size", [5, 6, 7, 8]) @@ -347,10 +398,11 @@ async def test_async_valid_byte_size(serial_pair: SerialPair, byte_size: int) -> if sys.platform == "emscripten" and byte_size in (5, 6): pytest.skip("Web Serial spec only defines dataBits 7 or 8") - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, byte_size=byte_size - ) as (_, writer): - writer.write(b"test") + ) as left: + assert left.byte_size == byte_size + left.write(b"test") async def test_async_invalid_byte_size(serial_pair: SerialPair) -> None: @@ -359,7 +411,7 @@ async def test_async_invalid_byte_size(serial_pair: SerialPair) -> None: pytest.skip("socket transport does not validate serial settings") with pytest.raises(Exception): - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, byte_size=123 ): pass @@ -368,10 +420,10 @@ async def test_async_invalid_byte_size(serial_pair: SerialPair) -> None: @pytest.mark.parametrize("xonxoff", [True, False]) async def test_async_xonxoff_setting(serial_pair: SerialPair, xonxoff: bool) -> None: """Test that xonxoff setting is accepted.""" - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, xonxoff=xonxoff - ) as (_, writer): - writer.write(b"test") + ) as left: + left.write(b"test") @pytest.mark.parametrize("rtscts", [True, False]) @@ -380,11 +432,11 @@ async def test_async_rtscts_setting(serial_pair: SerialPair, rtscts: bool) -> No if rtscts and serial_pair.uri_scheme == "posix://": pytest.xfail("Strict POSIX backend does not support RTS/CTS flow control") - async with async_create_reader_writer(serial_pair.right, baudrate=115200): - async with async_create_reader_writer( + async with serialx.async_serial_for_url(serial_pair.right, baudrate=115200): + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, rtscts=rtscts - ) as (_, writer): - writer.write(b"test") + ) as left: + left.write(b"test") # --- Lifecycle --- @@ -392,12 +444,12 @@ async def test_async_rtscts_setting(serial_pair: SerialPair, rtscts: bool) -> No async def test_async_concurrent_writes(serial_pair: SerialPair) -> None: """Test concurrent writes from multiple tasks.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): async def write_data(data: bytes) -> None: - writer_left.write(data) + left.write(data) data1 = b"A" * 100 data2 = b"B" * 100 @@ -409,56 +461,53 @@ async def write_data(data: bytes) -> None: write_data(data3), ) - total_data = await reader_right.readexactly(300) + total_data = await right.readexactly(300) assert total_data == b"A" * 100 + b"B" * 100 + b"C" * 100 async def test_async_read_with_timeout(serial_pair: SerialPair) -> None: """Test reading with timeout.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): - writer_left.write(b"test") + ) as (left, right): + left.write(b"test") - result = await asyncio.wait_for(reader_right.readexactly(4), timeout=1.0) + result = await asyncio.wait_for(right.readexactly(4), timeout=1.0) assert result == b"test" with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(reader_right.readexactly(1), timeout=0.1) + await asyncio.wait_for(right.readexactly(1), timeout=0.1) async def test_async_close_is_idempotent(serial_pair: SerialPair) -> None: - """Test closing writer multiple times is safe and drains buffer state.""" - async with async_create_reader_writer_pair( + """Test closing the serial port multiple times is safe.""" + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, _, _): - writer_left.close() - await writer_left.wait_closed() - assert writer_left.transport.get_write_buffer_size() == 0 - - # Second close should be no-op - writer_left.close() - await writer_left.wait_closed() + ) as (left, right): + assert left.transport.get_write_buffer_size() == 0 + await left.close() + # Second close should be a no-op + await left.close() @pytest.mark.skip_quirks(SerialQuirk.NO_BUFFER_CONTROL) async def test_async_pause_resume(serial_pair: SerialPair) -> None: """Test transport pause and resume.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (reader_left, writer_left, _, writer_right): - writer_left.transport.pause_reading() + ) as (left, right): + left.transport.pause_reading() - writer_right.write(b"A long message") - await writer_right.drain() + right.write(b"A long message") + await right.drain() # Nothing can be read with pytest.raises(asyncio.TimeoutError): async with asyncio_timeout(1): - await reader_left.read(1) + await left.read(1) - writer_left.transport.resume_reading() - assert (await reader_left.read(14)) == b"A long message" + left.transport.resume_reading() + assert (await left.read(14)) == b"A long message" async def test_async_abort(serial_pair: SerialPair) -> None: @@ -538,33 +587,30 @@ async def test_async_abort_before_connect(serial_pair: SerialPair) -> None: async def test_async_write_bytearray(serial_pair: SerialPair) -> None: """Test writing bytearray data.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): + ) as (left, right): data = bytearray(b"hello bytearray") - writer_left.write(data) - result = await reader_right.readexactly(len(data)) + left.write(data) + result = await right.readexactly(len(data)) assert result == b"hello bytearray" async def test_async_write_empty(serial_pair: SerialPair) -> None: """Test writing empty data is a no-op.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): - writer_left.write(b"") - writer_left.write(b"after_empty") - result = await reader_right.readexactly(len(b"after_empty")) + ) as (left, right): + left.write(b"") + left.write(b"after_empty") + result = await right.readexactly(len(b"after_empty")) assert result == b"after_empty" async def test_async_transport_api(serial_pair: SerialPair) -> None: """Test transport public API methods.""" - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - _, - writer, - ): - transport = writer.transport + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + transport = left.transport # get/set protocol protocol = transport.get_protocol() @@ -580,11 +626,8 @@ async def test_async_transport_api(serial_pair: SerialPair) -> None: async def test_async_transport_write_buffer_limits(serial_pair: SerialPair) -> None: """Test get/set write buffer limits and can_write_eof.""" - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - _, - writer, - ): - transport = writer.transport + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + transport = left.transport low, high = transport.get_write_buffer_limits() assert 0 <= low <= high @@ -597,24 +640,24 @@ async def test_async_transport_write_buffer_limits(serial_pair: SerialPair) -> N async def test_async_flush(serial_pair: SerialPair) -> None: """Test flushing async transport write buffers.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, reader_right, _): - writer_left.write(b"flush test data") - await writer_left.transport.flush() + ) as (left, right): + left.write(b"flush test data") + await left.flush() - result = await reader_right.readexactly(len(b"flush test data")) + result = await right.readexactly(len(b"flush test data")) assert result == b"flush test data" @pytest.mark.skip_quirks(SerialQuirk.NO_BUFFER_CONTROL) async def test_async_resume_reading_when_not_paused(serial_pair: SerialPair) -> None: """Test that resume_reading when not paused is a no-op.""" - async with async_create_reader_writer_pair( + async with async_create_serial_pair( serial_pair.left, serial_pair.right, baudrate=115200 - ) as (_, writer_left, _, _): + ) as (left, right): # resume without prior pause should be a no-op - writer_left.transport.resume_reading() + left.transport.resume_reading() async def test_async_invalid_uri() -> None: @@ -645,11 +688,8 @@ async def test_create_serial_connection_no_url_no_transport() -> None: async def test_async_get_modem_pins(serial_pair: SerialPair) -> None: """Test reading modem control bits.""" - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - _, - writer, - ): - modem_pins = await writer.transport.get_modem_pins() + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + modem_pins = await left.get_modem_pins() assert isinstance(modem_pins, ModemPins) for field in ["le", "dtr", "rts", "st", "sr", "cts", "car", "rng", "dsr"]: value = getattr(modem_pins, field) @@ -669,15 +709,12 @@ async def test_async_set_modem_pins_api(serial_pair: SerialPair) -> None: ): pytest.xfail("FreeBSD socat sets all pins to LOW") - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - _, - writer, - ): - await writer.transport.set_modem_pins(dtr=True, rts=True) - pins_high = await writer.transport.get_modem_pins() + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + await left.set_modem_pins(dtr=True, rts=True) + pins_high = await left.get_modem_pins() - await writer.transport.set_modem_pins(dtr=False, rts=False) - pins_low = await writer.transport.get_modem_pins() + await left.set_modem_pins(dtr=False, rts=False) + pins_low = await left.get_modem_pins() for pins in (pins_high, pins_low): assert isinstance(pins, ModemPins) @@ -706,25 +743,22 @@ async def test_async_set_modem_pins_api(serial_pair: SerialPair) -> None: async def test_async_set_modem_pins(serial_pair: SerialPair) -> None: """Test setting modem control bits and verifying readback.""" - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - _, - writer, - ): - await writer.transport.set_modem_pins(dtr=True, rts=True) + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + await left.set_modem_pins(dtr=True, rts=True) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - modem_pins = await writer.transport.get_modem_pins() + modem_pins = await left.get_modem_pins() assert modem_pins.dtr is PinState.HIGH assert modem_pins.rts is PinState.HIGH - await writer.transport.set_modem_pins(dtr=False) + await left.set_modem_pins(dtr=False) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - modem_pins = await writer.transport.get_modem_pins() + modem_pins = await left.get_modem_pins() assert modem_pins.dtr is PinState.LOW assert modem_pins.rts is PinState.HIGH - await writer.transport.set_modem_pins(dtr=False, rts=False) + await left.set_modem_pins(dtr=False, rts=False) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - modem_pins = await writer.transport.get_modem_pins() + modem_pins = await left.get_modem_pins() assert modem_pins.dtr is PinState.LOW assert modem_pins.rts is PinState.LOW @@ -953,35 +987,32 @@ async def test_async_deassert_on_open(serial_pair: SerialPair) -> None: ): pytest.skip("POSIX backends do not support deasserting pins on open") - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - reader_left, - writer_left, - ): - async with async_create_reader_writer( + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtsdtr_on_open=PinState.HIGH, rtsdtr_on_close=PinState.HIGH, - ) as (reader_right, writer_right): - await writer_right.transport.set_modem_pins(rts=True) + ) as right: + await right.set_modem_pins(rts=True) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtsdtr_on_open=PinState.LOW, rtsdtr_on_close=PinState.HIGH, - ) as (reader_right, writer_right): + ) as right: await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.LOW - await writer_right.transport.set_modem_pins(rts=True) + assert (await left.get_modem_pins()).cts is PinState.LOW + await right.set_modem_pins(rts=True) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH @pytest.mark.skipif(sys.platform == "win32", reason="CloseHandle resets modem signals") @@ -996,46 +1027,43 @@ async def test_async_hang_up_on_close(serial_pair: SerialPair) -> None: ): pytest.skip("POSIX backends do not support deasserting pins on open") - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - reader_left, - writer_left, - ): - async with async_create_reader_writer( + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtsdtr_on_close=PinState.HIGH, rtsdtr_on_open=PinState.HIGH, - ) as (reader_right, writer_right): - await writer_right.transport.set_modem_pins(rts=True) + ) as right: + await right.set_modem_pins(rts=True) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtsdtr_on_close=PinState.HIGH, rtsdtr_on_open=PinState.HIGH, - ) as (reader_right, writer_right): + ) as right: await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtsdtr_on_close=PinState.LOW, rtsdtr_on_open=PinState.HIGH, - ) as (reader_right, writer_right): + ) as right: await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.LOW + assert (await left.get_modem_pins()).cts is PinState.LOW @pytest.mark.skipif(sys.platform == "win32", reason="CloseHandle resets modem signals") @@ -1065,42 +1093,41 @@ async def test_async_deassert_on_open_with_rtscts( ): pytest.skip("POSIX backends do not support deasserting pins on open") - async with async_create_reader_writer(serial_pair.left, baudrate=115200) as ( - reader_left, - writer_left, - ): - async with async_create_reader_writer( + async with serialx.async_serial_for_url(serial_pair.left, baudrate=115200) as left: + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtscts=False, rtsdtr_on_open=PinState.HIGH, rtsdtr_on_close=PinState.HIGH, - ) as (reader_right, writer_right): - await writer_right.transport.set_modem_pins(rts=True) + ) as right: + await right.set_modem_pins(rts=True) await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is PinState.HIGH + assert (await left.get_modem_pins()).cts is PinState.HIGH - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.right, baudrate=115200, rtscts=rtscts, rtsdtr_on_open=rtsdtr_on_open, - ) as (reader_right, writer_right): + ) as right: await asyncio.sleep(serial_pair.modem_line_propagation_delay) - assert (await writer_left.transport.get_modem_pins()).cts is expected_state + assert (await left.get_modem_pins()).cts is expected_state @pytest.mark.skip_quirks(SerialQuirk.NO_EXCLUSIVITY) async def test_async_exclusive(serial_pair: SerialPair) -> None: """Test that exclusive setting is respected for async connections.""" - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, exclusive=True - ): + ) as left: + assert left.exclusive is True + with pytest.raises(OSError): - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, exclusive=True ): pass @@ -1115,13 +1142,18 @@ async def test_async_exclusive_disabled(serial_pair: SerialPair) -> None: if SerialBackend.SER2NET in serial_pair.backends: pytest.skip("ser2net only allows one connection per port") - async with async_create_reader_writer( + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, exclusive=False - ) as (_, writer1): - async with async_create_reader_writer( + ) as left1: + assert left1.exclusive is False + + async with serialx.async_serial_for_url( serial_pair.left, baudrate=115200, exclusive=False - ) as (_, writer2): - writer2.write(b"test") + ) as left2: + assert left2.exclusive is False + + left1.write(b"hello") + left2.write(b"world") async def test_async_connect_nonexistent_port() -> None: diff --git a/tests/test_serial_esphome.py b/tests/test_serial_esphome.py index 63eeeb1..6ec39a5 100644 --- a/tests/test_serial_esphome.py +++ b/tests/test_serial_esphome.py @@ -15,19 +15,18 @@ from collections.abc import AsyncIterator, Iterator import contextlib import threading -from typing import cast from unittest.mock import patch import urllib.parse import warnings from serialx import ( + AsyncSerial, Platform, SerialException, SerialPortInfo, - SerialStreamWriter, async_list_serial_ports, + async_serial_for_url, list_serial_ports, - open_serial_connection, ) from serialx.platforms.serial_esphome import ( ESPHOME_DEFAULT_PORT, @@ -35,12 +34,7 @@ ESPHomeSerialTransport, ) -from .common import ( - ESPHOME_HOST_BINARY, - async_create_reader_writer, - create_esphome_pair, - create_socat_pair, -) +from .common import ESPHOME_HOST_BINARY, create_esphome_pair, create_socat_pair @contextlib.contextmanager @@ -83,27 +77,19 @@ async def _connect() -> APIClient: @contextlib.asynccontextmanager -async def create_cross_loop_reader_writer( - url: str, -) -> AsyncIterator[ - tuple[asyncio.StreamReader, SerialStreamWriter[ESPHomeSerialTransport]] -]: - """Create a reader/writer whose `APIClient` lives on its own thread loop.""" +async def cross_loop_async_serial(url: str) -> AsyncIterator[AsyncSerial]: + """Yield an AsyncSerial whose `APIClient` lives on its own thread loop.""" port_name = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)["port_name"][0] with api_client_on_thread_loop(url) as (api, _thread_loop): - reader, writer = await open_serial_connection( + async with async_serial_for_url( url=None, transport_cls=ESPHomeSerialTransport, api=api, port_name=port_name, baudrate=115200, - ) - try: - yield reader, cast(SerialStreamWriter[ESPHomeSerialTransport], writer) - finally: - writer.close() - await writer.wait_closed() + ) as serial: + yield serial def base64(key: bytes) -> str: @@ -127,21 +113,16 @@ async def test_externally_passed_api() -> None: ) await api.connect(login=True) - # Create a serial reader/writer pair for _attempt in range(10): - reader, writer = await open_serial_connection( + async with async_serial_for_url( url=None, transport_cls=ESPHomeSerialTransport, api=api, port_name="Serial Proxy Left", baudrate=115200, - ) - - writer.write(b"test") - await writer.drain() - - writer.close() - await writer.wait_closed() + ) as serial: + serial.write(b"test") + await serial.drain() # The API is still connected await api.device_info() @@ -160,19 +141,19 @@ async def test_externally_passed_api_close_after_disconnect() -> None: ) await api.connect(login=True) - reader, writer = await open_serial_connection( + serial = async_serial_for_url( url=None, transport_cls=ESPHomeSerialTransport, api=api, port_name="Serial Proxy Left", baudrate=115200, ) + await serial.open() # Disconnect the API before closing the transport await api.disconnect() - writer.close() - await writer.wait_closed() + await serial.close() @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -185,16 +166,9 @@ async def test_connect_by_instance_id() -> None: # Connect by instance ID instead of name, with a password url = f"esphome://{parsed.hostname}:{parsed.port}/0?password=unused" - reader, writer = await open_serial_connection( - url=url, - baudrate=115200, - ) - - writer.write(b"test") - await writer.drain() - - writer.close() - await writer.wait_closed() + async with async_serial_for_url(url=url, baudrate=115200) as serial: + serial.write(b"test") + await serial.drain() @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -206,10 +180,8 @@ async def test_connect_by_invalid_name() -> None: url = f"esphome://{parsed.hostname}:{parsed.port}?port_name=Nonexistent" with pytest.raises(ValueError, match="does not exist"): - await open_serial_connection( - url=url, - baudrate=115200, - ) + async with async_serial_for_url(url=url, baudrate=115200): + pass @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -227,10 +199,8 @@ async def test_connect_plaintext_to_encrypted_server() -> None: ) with pytest.raises(SerialException, match="Connection requires encryption"): - await open_serial_connection( - url=url, - baudrate=115200, - ) + async with async_serial_for_url(url=url, baudrate=115200): + pass @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -253,7 +223,8 @@ async def test_connect_encrypted_plaintext_to_server() -> None: with pytest.raises( SerialException, match="The device is using plaintext protocol" ): - await open_serial_connection(url=url, baudrate=115200) + async with async_serial_for_url(url=url, baudrate=115200): + pass async def test_connect_timeout_raises_timeout_error() -> None: @@ -262,9 +233,10 @@ async def test_connect_timeout_raises_timeout_error() -> None: with patch("aioesphomeapi.connection.TCP_CONNECT_TIMEOUT", 1.0): with pytest.raises(TimeoutError, match="Timeout while connecting"): # 192.0.2.1 is TEST-NET-1 (RFC 5737), packets are silently dropped - await open_serial_connection( + async with async_serial_for_url( url="esphome://192.0.2.1:6053?port_name=test", baudrate=115200 - ) + ): + pass @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -282,32 +254,32 @@ async def test_noise_psk_key_alias() -> None: with pytest.raises( ValueError, match="Both `key` and `noise_psk` cannot be provided" ): - await open_serial_connection( + async with async_serial_for_url( url=f"esphome://{parsed.hostname}:{parsed.port}", port_name="Serial Proxy Left", noise_psk=key, key=key, baudrate=115200, - ) + ): + pass with pytest.raises( ValueError, match="Both `key` and `noise_psk` cannot be provided" ): - await open_serial_connection( + async with async_serial_for_url( url=f"esphome://{parsed.hostname}:{parsed.port}?key={key}&noise_psk={key}", port_name="Serial Proxy Left", baudrate=115200, - ) + ): + pass - reader, writer = await open_serial_connection( + async with async_serial_for_url( url=f"esphome://{parsed.hostname}:{parsed.port}", port_name="Serial Proxy Left", noise_psk=key, # alias baudrate=115200, - ) - - writer.close() - await writer.wait_closed() + ): + pass def _expected_esphome_ports(netloc: str) -> list[SerialPortInfo]: @@ -413,34 +385,24 @@ async def test_cross_loop_async_api() -> None: """Async API works with the `APIClient` on a separate loop.""" with create_socat_pair() as (socat_left, socat_right): with create_esphome_pair(socat_left, socat_right) as (left, right): - reader_right, writer_right = await open_serial_connection( - url=right, baudrate=115200 - ) - - try: - async with create_cross_loop_reader_writer(left) as ( - reader_left, - writer_left, - ): - serial = writer_left.transport.get_extra_info("serial") + async with async_serial_for_url(url=right, baudrate=115200) as ser_right: + async with cross_loop_async_serial(left) as ser_left: + serial = ser_left.transport.get_extra_info("serial") assert isinstance(serial, ESPHomeSerial) assert serial._client_loop is not asyncio.get_running_loop() assert serial._loop is asyncio.get_running_loop() - writer_left.write(b"left to right") - data = await reader_right.readexactly(len(b"left to right")) + ser_left.write(b"left to right") + data = await ser_right.readexactly(len(b"left to right")) assert data == b"left to right" - writer_right.write(b"right to left") - data = await reader_left.readexactly(len(b"right to left")) + ser_right.write(b"right to left") + data = await ser_left.readexactly(len(b"right to left")) assert data == b"right to left" - await writer_left.transport.set_modem_pins(dtr=True, rts=False) - await writer_left.transport.get_modem_pins() - await writer_left.transport.flush() - finally: - writer_right.close() - await writer_right.wait_closed() + await ser_left.set_modem_pins(dtr=True, rts=False) + await ser_left.get_modem_pins() + await ser_left.transport.flush() @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -448,21 +410,18 @@ async def test_cross_loop_sync_modem_pins_on_loop_thread() -> None: """Sync modem-pin access from `self._loop`'s thread must not deadlock.""" with create_socat_pair() as (socat_left, socat_right): with create_esphome_pair(socat_left, socat_right) as (left, _right): - async with create_cross_loop_reader_writer(left) as ( - _reader, - writer, - ): - serial = writer.transport.serial + async with cross_loop_async_serial(left) as serial: + esphome_serial = serial.transport.serial with warnings.catch_warnings(): warnings.simplefilter("always", DeprecationWarning) with pytest.warns( DeprecationWarning, match="transport.set_modem_pins" ): - serial.dtr = False + esphome_serial.dtr = False with pytest.raises(RuntimeError, match="sync ESPHomeSerial method"): - _ = serial.dtr + _ = esphome_serial.dtr @pytest.mark.skipif(not ESPHOME_HOST_BINARY, reason="esphome host binary not available") @@ -470,10 +429,7 @@ async def test_sync_api_with_external_api_on_different_loop() -> None: """Sync API works when the `APIClient` lives on a different loop.""" with create_socat_pair() as (socat_left, socat_right): with create_esphome_pair(socat_left, socat_right) as (left, right): - async with async_create_reader_writer(right, baudrate=115200) as ( - _peer_reader, - peer_writer, - ): + async with async_serial_for_url(url=right, baudrate=115200) as peer: with api_client_on_thread_loop(left) as (api, api_loop): def _sync_open() -> ESPHomeSerial: @@ -493,8 +449,8 @@ def _sync_open() -> ESPHomeSerial: assert serial._loop is not None assert serial._loop is not api_loop - peer_writer.write(b"peer-data") - await peer_writer.drain() + peer.write(b"peer-data") + await peer.drain() data = await asyncio.to_thread(serial.read, len(b"peer-data")) assert data == b"peer-data" @@ -516,40 +472,35 @@ async def test_single_api_multiple_async_ports() -> None: await api.connect(login=True) try: - reader_left, writer_left = await open_serial_connection( - url=None, - transport_cls=ESPHomeSerialTransport, - api=api, - port_name="Serial Proxy Left", - baudrate=115200, - ) - reader_right, writer_right = await open_serial_connection( - url=None, - transport_cls=ESPHomeSerialTransport, - api=api, - port_name="Serial Proxy Right", - baudrate=115200, - ) - - try: - writer_left.write(b"left to right") - await writer_left.drain() + async with ( + async_serial_for_url( + url=None, + transport_cls=ESPHomeSerialTransport, + api=api, + port_name="Serial Proxy Left", + baudrate=115200, + ) as ser_left, + async_serial_for_url( + url=None, + transport_cls=ESPHomeSerialTransport, + api=api, + port_name="Serial Proxy Right", + baudrate=115200, + ) as ser_right, + ): + ser_left.write(b"left to right") + await ser_left.drain() data = await asyncio.wait_for( - reader_right.readexactly(len(b"left to right")), timeout=5 + ser_right.readexactly(len(b"left to right")), timeout=5 ) assert data == b"left to right" - writer_right.write(b"right to left") - await writer_right.drain() + ser_right.write(b"right to left") + await ser_right.drain() data = await asyncio.wait_for( - reader_left.readexactly(len(b"right to left")), timeout=5 + ser_left.readexactly(len(b"right to left")), timeout=5 ) assert data == b"right to left" - finally: - writer_left.close() - writer_right.close() - await writer_left.wait_closed() - await writer_right.wait_closed() # The shared externally-owned API is still connected await api.device_info() diff --git a/tests/test_sync_transports.py b/tests/test_sync_transports.py index 27e04ad..4115431 100644 --- a/tests/test_sync_transports.py +++ b/tests/test_sync_transports.py @@ -102,6 +102,43 @@ def test_sync_null_bytes(serial_pair: SerialPair) -> None: assert right.readexactly(len(null_data)) == null_data +def test_sync_readline(serial_pair: SerialPair) -> None: + """Test readline returns successive newline-terminated lines.""" + with ( + Serial.from_url(serial_pair.left, baudrate=115200) as left, + Serial.from_url(serial_pair.right, baudrate=115200, read_timeout=1.0) as right, + ): + left.write(b"alpha\nbeta\ngamma\n") + assert right.readline() == b"alpha\n" + assert right.readline() == b"beta\n" + assert right.readline() == b"gamma\n" + + +def test_sync_readline_returns_partial_on_timeout(serial_pair: SerialPair) -> None: + """Test readline returns the partial line if no newline arrives before timeout.""" + with ( + Serial.from_url(serial_pair.left, baudrate=115200) as left, + Serial.from_url(serial_pair.right, baudrate=115200, read_timeout=0.5) as right, + ): + left.write(b"no newline here") + + with measure_time() as elapsed: + result = right.readline() + + assert result == b"no newline here" + assert elapsed() == pytest.approx(0.5, abs=0.2) + + +def test_sync_writelines(serial_pair: SerialPair) -> None: + """Test writelines writes an iterable of buffers in order.""" + with ( + Serial.from_url(serial_pair.left, baudrate=115200) as left, + Serial.from_url(serial_pair.right, baudrate=115200) as right, + ): + left.writelines([b"foo", b"bar", b"baz"]) + assert right.readexactly(9) == b"foobarbaz" + + def test_sync_overlapping_read_write(serial_pair: SerialPair) -> None: """Test that read and write can overlap, data is buffered.""" with ( @@ -354,6 +391,7 @@ def test_sync_valid_byte_size(serial_pair: SerialPair, byte_size: int) -> None: with Serial.from_url( serial_pair.left, baudrate=115200, byte_size=byte_size ) as serial: + assert serial.byte_size == byte_size serial.write(b"test") @@ -659,6 +697,19 @@ def test_sync_read_until(serial_pair: SerialPair) -> None: assert right.read_until(b"\n") == b"world\n" +def test_sync_read_until_repeated_separator(serial_pair: SerialPair) -> None: + """Test read_until with consecutive separators that don't align as framing.""" + with ( + Serial.from_url(serial_pair.left, baudrate=115200) as left, + Serial.from_url(serial_pair.right, baudrate=115200, read_timeout=1.0) as right, + ): + left.write(b"foo|||||bar||tail||") + assert right.read_until(b"||") == b"foo||" + assert right.read_until(b"||") == b"||" + assert right.read_until(b"||") == b"|bar||" + assert right.read_until(b"||") == b"tail||" + + def test_sync_readexactly_total_timeout(serial_pair: SerialPair) -> None: """Test that readexactly bounds total wall-clock time, not per-read time.""" with (