From a91f1f2ac8a413b29a39e60427b297dd445b94dd Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 10 Jun 2026 18:03:41 +0300 Subject: [PATCH 1/3] Close in-flight reader stream when create() is cancelled Prevents a zombie gRPC read session holding the partition when reader.close() interrupts ReaderStream.create() mid-reconnect. --- CHANGELOG.md | 2 ++ ydb/_topic_reader/topic_reader_asyncio.py | 29 +++++++++++------ .../topic_reader_asyncio_test.py | 31 +++++++++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13a8294a..130b8bcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fix leaked topic reader stream when close interrupts stream creation during reconnect + ## 3.29.3 ## * Fix topic reader hang and zombie stream when closing during reconnect diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 4c9b538e..5ab843f1 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -549,16 +549,27 @@ async def create( settings: topic_reader.PublicReaderSettings, ) -> "ReaderStream": stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto) + reader = None + try: + await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead) - await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead) - - creds = driver._credentials - reader = ReaderStream( - reader_reconnector_id, - settings, - get_token_function=creds.get_auth_token if creds else None, - ) - await reader._start(stream, settings._init_message()) + creds = driver._credentials + reader = ReaderStream( + reader_reconnector_id, + settings, + get_token_function=creds.get_auth_token if creds else None, + ) + await reader._start(stream, settings._init_message()) + except BaseException: + # If create() is interrupted (e.g. reader.close() cancels the connection loop + # mid-reconnect) the in-flight stream is not yet assigned to the reconnector, so + # its finally cannot reach it. Close it here to avoid a zombie gRPC read session + # that keeps holding the consumer's partition on the server. + if reader is not None: + await reader.close(flush=False) + else: + stream.close() + raise logger.debug("reader stream %s started session=%s", reader._id, reader._session_id) return reader diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 388c50f8..ffc24947 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1607,6 +1607,37 @@ async def stream_create(reader_reconnector_id, driver, settings): # The loop stopped on close instead of reconnecting into a second (zombie) stream. assert create_calls == 1 + async def test_create_closes_inflight_stream_on_cancel(self, default_reader_settings): + # If create() is cancelled (e.g. reader.close() cancels the connection loop during a + # reconnect) while parked on the init handshake, the in-flight gRPC stream must be + # closed. Otherwise it leaks as a zombie read session that keeps holding the + # consumer's partition on the server, and a fresh reader never gets it assigned. + built = [] + + class FakeStream(StreamMock): + def __init__(self, *args, **kwargs): + super().__init__() + built.append(self) + + async def start(self, driver, stub, method): + return None + + driver = mock.Mock() + driver._credentials = None + + with mock.patch.object(topic_reader_asyncio, "GrpcWrapperAsyncIO", FakeStream): + # Real create(); no InitResponse is sent, so it parks inside _start() on + # `await stream.receive()` (the only reachable cancellation point in create()). + create_task = asyncio.create_task(ReaderStream.create(7, driver, default_reader_settings)) + await wait_condition(lambda: bool(built) and not built[0].from_client.empty()) + assert not create_task.done() + + create_task.cancel() + with pytest.raises(asyncio.CancelledError): + await create_task + + assert built[0]._closed, "create() leaked the in-flight gRPC stream on cancel" + async def test_wait_error_returns_on_cancelled_error_from_receive(self, default_reader_settings): receive_call = 0 From 4139e0bbc528e82d32236306e186e0cc23b26a1e Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 10 Jun 2026 18:04:27 +0300 Subject: [PATCH 2/3] Add reconnector and read session ids to topic reader logs --- CHANGELOG.md | 1 + ydb/_topic_reader/topic_reader_asyncio.py | 50 ++++++++++++++--------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 130b8bcd..15dc526b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ * Fix leaked topic reader stream when close interrupts stream creation during reconnect +* Include reconnector and read session ids in topic reader logs ## 3.29.3 ## * Fix topic reader hang and zombie stream when closing during reconnect diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 5ab843f1..ee988a1e 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -515,6 +515,11 @@ def __init__( self._id = ReaderStream._static_id_counter.inc_and_get() self._reader_reconnector_id = reader_reconnector_id self._session_id = "not initialized" + self._log_prefix = "reader %s stream %s session=%s" % ( + self._reader_reconnector_id, + self._id, + self._session_id, + ) self._stream = None self._started = False self._background_tasks = set() @@ -570,7 +575,7 @@ async def create( else: stream.close() raise - logger.debug("reader stream %s started session=%s", reader._id, reader._session_id) + logger.debug("%s started", reader._log_prefix) return reader async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest): @@ -579,7 +584,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._started = True self._stream = stream - logger.debug("reader stream %s send init request", self._id) + logger.debug("%s send init request", self._log_prefix) stream.write(StreamReadMessage.FromClient(client_message=init_message)) try: @@ -591,7 +596,12 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess if isinstance(init_response.server_message, StreamReadMessage.InitResponse): self._session_id = init_response.server_message.session_id - logger.debug("reader stream %s initialized session=%s", self._id, self._session_id) + self._log_prefix = "reader %s stream %s session=%s" % ( + self._reader_reconnector_id, + self._id, + self._session_id, + ) + logger.debug("%s initialized", self._log_prefix) else: raise TopicReaderError("Unexpected message after InitRequest: %s" % init_response) @@ -725,7 +735,7 @@ async def _handle_background_errors(self): async def _read_messages_loop(self): try: - logger.debug("reader stream %s start read loop", self._id) + logger.debug("%s start read loop", self._log_prefix) self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.ReadRequest( @@ -739,7 +749,7 @@ async def _read_messages_loop(self): _process_response(message.server_status) if isinstance(message.server_message, StreamReadMessage.ReadResponse): - logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size) + logger.debug("%s read %s bytes", self._log_prefix, message.server_message.bytes_size) self._on_read_response(message.server_message) elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): @@ -750,8 +760,8 @@ async def _read_messages_loop(self): StreamReadMessage.StartPartitionSessionRequest, ): logger.debug( - "reader stream %s start partition %s", - self._id, + "%s start partition %s", + self._log_prefix, message.server_message.partition_session.partition_session_id, ) await self._on_start_partition_session(message.server_message) @@ -761,8 +771,8 @@ async def _read_messages_loop(self): StreamReadMessage.StopPartitionSessionRequest, ): logger.debug( - "reader stream %s stop partition %s", - self._id, + "%s stop partition %s", + self._log_prefix, message.server_message.partition_session_id, ) self._on_partition_session_stop(message.server_message) @@ -772,8 +782,8 @@ async def _read_messages_loop(self): StreamReadMessage.EndPartitionSession, ): logger.debug( - "reader stream %s end partition %s", - self._id, + "%s end partition %s", + self._log_prefix, message.server_message.partition_session_id, ) self._on_end_partition_session(message.server_message) @@ -790,12 +800,12 @@ async def _read_messages_loop(self): self._state_changed.set() except asyncio.CancelledError as e: - logger.debug("reader stream %s error: %s", self._id, e) + logger.debug("%s error: %s", self._log_prefix, e) if not self._closed: self._set_first_error(issues.ConnectionLost("gRPC stream cancelled")) raise except Exception as e: - logger.debug("reader stream %s error: %s", self._id, e) + logger.debug("%s error: %s", self._log_prefix, e) self._set_first_error(e) return @@ -965,7 +975,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> async def _decode_batches_loop(self): while True: batch = await self._batches_to_decode.get() - logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages)) + logger.debug("%s decode batch %s messages", self._log_prefix, len(batch.messages)) await self._decode_batch_inplace(batch) self._add_batch_to_queue(batch) self._state_changed.set() @@ -975,8 +985,8 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch): if part_sess_id in self._message_batches: self._message_batches[part_sess_id]._extend(batch) logger.debug( - "reader stream %s extend batch partition=%s size=%s", - self._id, + "%s extend batch partition=%s size=%s", + self._log_prefix, part_sess_id, len(batch.messages), ) @@ -984,8 +994,8 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch): self._message_batches[part_sess_id] = batch logger.debug( - "reader stream %s new batch partition=%s size=%s", - self._id, + "%s new batch partition=%s size=%s", + self._log_prefix, part_sess_id, len(batch.messages), ) @@ -1036,7 +1046,7 @@ async def close(self, flush: bool): return self._closed = True - logger.debug("reader stream %s close", self._id) + logger.debug("%s close", self._log_prefix) if flush: await self.flush() @@ -1056,4 +1066,4 @@ async def close(self, flush: bool): if self._background_tasks: await asyncio.wait(self._background_tasks) - logger.debug("reader stream %s was closed", self._id) + logger.debug("%s was closed", self._log_prefix) From bcdddb6e8be3d15b9f6f9a1b8acde663afdaaa40 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 10 Jun 2026 18:28:01 +0300 Subject: [PATCH 3/3] Update CI checkout/setup-python actions to fix Python 3.9 setup --- .github/workflows/style.yaml | 4 ++-- .github/workflows/tests.yaml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/style.yaml b/.github/workflows/style.yaml index 00d4c065..84ebf12d 100644 --- a/.github/workflows/style.yaml +++ b/.github/workflows/style.yaml @@ -16,9 +16,9 @@ jobs: environment: [style, black, mypy] steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 06a0b518..8e271a1e 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -21,9 +21,9 @@ jobs: protobuf-version: [proto3, proto4, proto5, proto6] steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} @@ -48,9 +48,9 @@ jobs: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }}