Fix topic reader hang and zombie stream when closing during reconnect#835
Merged
Conversation
If reader.close() cancels the connection loop while it is closing the old stream in its finally, the bare except swallowed the CancelledError, so the loop kept reconnecting into a stream nobody owns while close() hung. Propagate the cancellation and guard the loop with a _closed flag.
48033e0 to
501f34f
Compare
There was a problem hiding this comment.
Pull request overview
This PR addresses a shutdown race in the asyncio topic reader where closing during an in-flight reconnect could hang close() and leave a “zombie” reconnect loop/stream running. The changes aim to ensure cancellation is respected during stream teardown and that the reconnector won’t create a new stream after a close begins.
Changes:
- Add a
_closedflag toReaderReconnectorand gate the connection loop from continuing after close. - Ensure
asyncio.CancelledErroris propagated when it occurs during the reconnect loop’s stream-closefinally. - Add an asyncio test that forces the loop into the reconnect
finallyand assertsclose()completes and no second stream is created.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ydb/_topic_reader/topic_reader_asyncio.py |
Adds _closed state and adjusts cancellation handling during reconnect/close. |
ydb/_topic_reader/topic_reader_asyncio_test.py |
Adds regression test for closing during reconnect without hang/zombie stream. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Closing while the loop is reconnecting could leave a concurrent receive_message() hung; wake any waiter via _set_first_error on close. Narrow the finally's except BaseException to Exception so KeyboardInterrupt/SystemExit aren't swallowed (CancelledError is re-raised above), and fix a typo.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
Closing a topic reader while it is reconnecting can hang
close()and leave a"zombie"
ReaderStreamreading forever.In
ReaderReconnector._connection_loop, after a reconnect-triggering error the loopcloses the old stream in a
finallythat swallows every exception:When
reader.close()cancels the connection-loop task while it is inside thatawait self._stream_reader.close(...), theCancelledErroris swallowed. The loopis not stopped: it continues, brings up a fresh stream and parks on
wait_error(), whilereconnector.close()is stuck onawait asyncio.wait(...)waiting for that loop — so
close()hangs forever and the new stream keeps reading.This is realistic: a transaction-commit failure triggers both the SDK auto-reconnect
and the application closing the reader within a few ms, and the old stream's grpc
teardown is a network round-trip a few ms wide — exactly the window
close()lands in.Fix
asyncio.CancelledErrorout of the connection loop'sfinallyinstead ofswallowing it, so the loop stops on
close()rather than reconnecting into a zombiestream while
close()hangs._closedflag (set first inclose()) and check it at the top of the loop so itcannot bring up a new stream after close.
close(), mark closed, close the current stream with the requestedflush, thencancel the loop. Closing with flush before cancelling preserves pending-commit flushing
on a normal close.
wait_message()waiter on close so a concurrentreceive_message()does not hang.
finally'sexcepttoExceptionsoKeyboardInterrupt/SystemExitare not swallowed.
Test
test_close_during_reconnect_does_not_hang: forces the connection loop into thefinally-close of the old stream, calls
close()there, and asserts close completes(no hang) and no second stream is created (no zombie).