Skip to content

Fix topic reader hang and zombie stream when closing during reconnect#835

Merged
vgvoleg merged 3 commits into
mainfrom
fix-topic-reader-hang-close-during-reconnect
Jun 8, 2026
Merged

Fix topic reader hang and zombie stream when closing during reconnect#835
vgvoleg merged 3 commits into
mainfrom
fix-topic-reader-hang-close-during-reconnect

Conversation

@vgvoleg

@vgvoleg vgvoleg commented Jun 8, 2026

Copy link
Copy Markdown
Member

Problem

Closing a topic reader while it is reconnecting can hang close() and leave a
"zombie" ReaderStream reading forever.

In ReaderReconnector._connection_loop, after a reconnect-triggering error the loop
closes the old stream in a finally that swallows every exception:

finally:
    if self._stream_reader is not None:
        try:
            await self._stream_reader.close(flush=False)
        except BaseException:   # also catches asyncio.CancelledError
            pass

When reader.close() cancels the connection-loop task while it is inside that
await self._stream_reader.close(...), the CancelledError is swallowed. The loop
is not stopped: it continues, brings up a fresh stream and parks on
wait_error(), while reconnector.close() is stuck on await asyncio.wait(...)
waiting for that loop — so close() hangs forever and the new stream keeps reading.

This is realistic: a transaction-commit failure triggers both the SDK auto-reconnect
and the application closing the reader within a few ms, and the old stream's grpc
teardown is a network round-trip a few ms wide — exactly the window close() lands in.

Fix

  • Propagate asyncio.CancelledError out of the connection loop's finally instead of
    swallowing it, so the loop stops on close() rather than reconnecting into a zombie
    stream while close() hangs.
  • Add a _closed flag (set first in close()) and check it at the top of the loop so it
    cannot bring up a new stream after close.
  • In close(), mark closed, close the current stream with the requested flush, then
    cancel the loop. Closing with flush before cancelling preserves pending-commit flushing
    on a normal close.
  • Wake any pending wait_message() waiter on close so a concurrent receive_message()
    does not hang.
  • Narrow the finally's except to Exception so KeyboardInterrupt/SystemExit
    are not swallowed.

Test

test_close_during_reconnect_does_not_hang: forces the connection loop into the
finally-close of the old stream, calls close() there, and asserts close completes
(no hang) and no second stream is created (no zombie).

If reader.close() cancels the connection loop while it is closing the old stream in its finally, the bare except swallowed the CancelledError, so the loop kept reconnecting into a stream nobody owns while close() hung. Propagate the cancellation and guard the loop with a _closed flag.
@vgvoleg vgvoleg force-pushed the fix-topic-reader-hang-close-during-reconnect branch from 48033e0 to 501f34f Compare June 8, 2026 15:10
@vgvoleg vgvoleg requested a review from Copilot June 8, 2026 15:11

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a shutdown race in the asyncio topic reader where closing during an in-flight reconnect could hang close() and leave a “zombie” reconnect loop/stream running. The changes aim to ensure cancellation is respected during stream teardown and that the reconnector won’t create a new stream after a close begins.

Changes:

  • Add a _closed flag to ReaderReconnector and gate the connection loop from continuing after close.
  • Ensure asyncio.CancelledError is propagated when it occurs during the reconnect loop’s stream-close finally.
  • Add an asyncio test that forces the loop into the reconnect finally and asserts close() completes and no second stream is created.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
ydb/_topic_reader/topic_reader_asyncio.py Adds _closed state and adjusts cancellation handling during reconnect/close.
ydb/_topic_reader/topic_reader_asyncio_test.py Adds regression test for closing during reconnect without hang/zombie stream.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ydb/_topic_reader/topic_reader_asyncio.py
Comment thread ydb/_topic_reader/topic_reader_asyncio.py Outdated
Comment thread ydb/_topic_reader/topic_reader_asyncio_test.py
Closing while the loop is reconnecting could leave a concurrent receive_message() hung; wake any waiter via _set_first_error on close. Narrow the finally's except BaseException to Exception so KeyboardInterrupt/SystemExit aren't swallowed (CancelledError is re-raised above), and fix a typo.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comment thread ydb/_topic_reader/topic_reader_asyncio.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.

@vgvoleg vgvoleg merged commit cab1869 into main Jun 8, 2026
35 checks passed
@vgvoleg vgvoleg deleted the fix-topic-reader-hang-close-during-reconnect branch June 8, 2026 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants