diff --git a/src/livepeer_gateway/channel_reader.py b/src/livepeer_gateway/channel_reader.py index bc51b6b..4af657d 100644 --- a/src/livepeer_gateway/channel_reader.py +++ b/src/livepeer_gateway/channel_reader.py @@ -1,11 +1,16 @@ from __future__ import annotations import json +import logging from typing import Any, AsyncIterator +import aiohttp + from .errors import LivepeerGatewayError from .trickle_subscriber import TrickleSubscriber +_LOG = logging.getLogger(__name__) + class ChannelReader: def __init__(self, events_url: str) -> None: @@ -74,6 +79,12 @@ async def _iter() -> AsyncIterator[dict[str, Any]]: yield data except LivepeerGatewayError: raise + except aiohttp.ClientPayloadError as e: + # Orchestrator truncated the transfer mid-stream (e.g. TransferEncodingError 400) + # or went unreachable. Treat as a clean network disconnect — stop iterating + # rather than propagating as an application error. + _LOG.warning("Trickle events channel disconnected (network): %s: %s", e.__class__.__name__, e) + return except Exception as e: raise LivepeerGatewayError( f"Trickle events subscription error: {e.__class__.__name__}: {e}" @@ -167,6 +178,12 @@ async def _iter() -> AsyncIterator[dict[str, Any]]: await segment.close() except LivepeerGatewayError: raise + except aiohttp.ClientPayloadError as e: + # Orchestrator truncated the transfer mid-stream (e.g. TransferEncodingError 400) + # or went unreachable. Treat as a clean network disconnect — stop iterating + # rather than propagating as an application error. + _LOG.warning("Trickle JSONL channel disconnected (network): %s: %s", e.__class__.__name__, e) + return except Exception as e: raise LivepeerGatewayError( f"Trickle JSONL subscription error: {e.__class__.__name__}: {e}" diff --git a/src/livepeer_gateway/trickle_publisher.py b/src/livepeer_gateway/trickle_publisher.py index 179de50..4c45d47 100644 --- a/src/livepeer_gateway/trickle_publisher.py +++ b/src/livepeer_gateway/trickle_publisher.py @@ -290,7 +290,10 @@ async def _run_delete(self) -> None: try: resp = await self._session.delete(self.url) resp.release() - # Suppress any shutdown-time exceptions, including cancellation. + except aiohttp.ClientConnectorError as exc: + # Orchestrator already unreachable — suppress, no need to log at ERROR. + _LOG.debug("Trickle DELETE: orchestrator unreachable (suppressed) url=%s: %s", self.url, exc) + # Suppress any other shutdown-time exceptions, including cancellation. except BaseException: _LOG.error("Trickle DELETE exception url=%s", self.url, exc_info=True)