Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/livepeer_gateway/trickle_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def __init__(
# Preconnected writer state for the next segment.
self._next_state: Optional[_SegmentPostState] = None

# Set to True when close() is called so that expected 404s during
# teardown are logged at DEBUG level instead of ERROR.
self._closing: bool = False

# Terminal failure for the whole publisher. Once set, no new segments
# should be opened or written.
self._terminal_error: Optional[TricklePublisherTerminalError] = None
Expand Down Expand Up @@ -243,14 +247,27 @@ async def _run_post(self, url: str, seg_state: _SegmentPostState) -> None:
continue
break

if final_status is not None:
_LOG.error("Trickle POST failed url=%s status=%s body=%r", url, final_status, final_body)
else:
_LOG.error("Trickle POST exception url=%s error=%s", url, final_exc)
assert final_exc is not None
# A 404 during teardown is expected — the orchestrator already closed the
# channel before the publisher drained. Downgrade to DEBUG to avoid
# noise in logs when this is part of a normal disconnect sequence.
is_teardown_404 = (final_status == 404 and self._closing)
if is_teardown_404:
if final_status is not None:
_LOG.debug("Trickle POST 404 during teardown (expected) url=%s", url)
else:
_LOG.debug("Trickle POST exception during teardown url=%s error=%s", url, final_exc)
else:
if final_status is not None:
_LOG.error("Trickle POST failed url=%s status=%s body=%r", url, final_status, final_body)
else:
_LOG.error("Trickle POST exception url=%s error=%s", url, final_exc)
self._record_segment_failure(final_exc, seg_state)
if final_status == 404 and self._terminal_error is None:
_LOG.error("Trickle publisher channel does not exist url=%s", self.url)
if is_teardown_404:
_LOG.debug("Trickle publisher channel gone during teardown url=%s", self.url)
else:
_LOG.error("Trickle publisher channel does not exist url=%s", self.url)
terminal_exc = TricklePublisherTerminalError(
"Trickle publisher channel does not exist",
consecutive_failures=self._consecutive_failures,
Expand Down Expand Up @@ -389,6 +406,10 @@ async def close(self) -> None:
if self._session is None and self._lock is None and self._next_state is None:
return

# Signal that we are in teardown so in-flight POSTs can downgrade 404
# log levels — the orchestrator may have already closed the channel.
self._closing = True

try:
await self._ensure_runtime()
# Close is best-effort; suppress cancellation/runtime-init failures.
Expand Down