Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/style.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ jobs:
environment: [style, black, mypy]

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ jobs:
protobuf-version: [proto3, proto4, proto5, proto6]

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand All @@ -48,9 +48,9 @@ jobs:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Fix leaked topic reader stream when close interrupts stream creation during reconnect
* Include reconnector and read session ids in topic reader logs

## 3.29.3 ##
* Fix topic reader hang and zombie stream when closing during reconnect

Expand Down
79 changes: 50 additions & 29 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ def __init__(
self._id = ReaderStream._static_id_counter.inc_and_get()
self._reader_reconnector_id = reader_reconnector_id
self._session_id = "not initialized"
self._log_prefix = "reader %s stream %s session=%s" % (
self._reader_reconnector_id,
self._id,
self._session_id,
)
self._stream = None
self._started = False
self._background_tasks = set()
Expand Down Expand Up @@ -549,17 +554,28 @@ async def create(
settings: topic_reader.PublicReaderSettings,
) -> "ReaderStream":
stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto)
reader = None
try:
await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead)

await stream.start(driver, _apis.TopicService.Stub, _apis.TopicService.StreamRead)

creds = driver._credentials
reader = ReaderStream(
reader_reconnector_id,
settings,
get_token_function=creds.get_auth_token if creds else None,
)
await reader._start(stream, settings._init_message())
logger.debug("reader stream %s started session=%s", reader._id, reader._session_id)
creds = driver._credentials
reader = ReaderStream(
reader_reconnector_id,
settings,
get_token_function=creds.get_auth_token if creds else None,
)
await reader._start(stream, settings._init_message())
except BaseException:
# If create() is interrupted (e.g. reader.close() cancels the connection loop
# mid-reconnect) the in-flight stream is not yet assigned to the reconnector, so
# its finally cannot reach it. Close it here to avoid a zombie gRPC read session
# that keeps holding the consumer's partition on the server.
if reader is not None:
await reader.close(flush=False)
else:
stream.close()
raise
logger.debug("%s started", reader._log_prefix)
return reader

async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest):
Expand All @@ -568,7 +584,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess

self._started = True
self._stream = stream
logger.debug("reader stream %s send init request", self._id)
logger.debug("%s send init request", self._log_prefix)

stream.write(StreamReadMessage.FromClient(client_message=init_message))
try:
Expand All @@ -580,7 +596,12 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess

if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
self._session_id = init_response.server_message.session_id
logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)
self._log_prefix = "reader %s stream %s session=%s" % (
self._reader_reconnector_id,
self._id,
self._session_id,
)
logger.debug("%s initialized", self._log_prefix)
else:
raise TopicReaderError("Unexpected message after InitRequest: %s" % init_response)

Expand Down Expand Up @@ -714,7 +735,7 @@ async def _handle_background_errors(self):

async def _read_messages_loop(self):
try:
logger.debug("reader stream %s start read loop", self._id)
logger.debug("%s start read loop", self._log_prefix)
self._stream.write(
StreamReadMessage.FromClient(
client_message=StreamReadMessage.ReadRequest(
Expand All @@ -728,7 +749,7 @@ async def _read_messages_loop(self):
_process_response(message.server_status)

if isinstance(message.server_message, StreamReadMessage.ReadResponse):
logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size)
logger.debug("%s read %s bytes", self._log_prefix, message.server_message.bytes_size)
self._on_read_response(message.server_message)

elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
Expand All @@ -739,8 +760,8 @@ async def _read_messages_loop(self):
StreamReadMessage.StartPartitionSessionRequest,
):
logger.debug(
"reader stream %s start partition %s",
self._id,
"%s start partition %s",
self._log_prefix,
message.server_message.partition_session.partition_session_id,
)
await self._on_start_partition_session(message.server_message)
Expand All @@ -750,8 +771,8 @@ async def _read_messages_loop(self):
StreamReadMessage.StopPartitionSessionRequest,
):
logger.debug(
"reader stream %s stop partition %s",
self._id,
"%s stop partition %s",
self._log_prefix,
message.server_message.partition_session_id,
)
self._on_partition_session_stop(message.server_message)
Expand All @@ -761,8 +782,8 @@ async def _read_messages_loop(self):
StreamReadMessage.EndPartitionSession,
):
logger.debug(
"reader stream %s end partition %s",
self._id,
"%s end partition %s",
self._log_prefix,
message.server_message.partition_session_id,
)
self._on_end_partition_session(message.server_message)
Expand All @@ -779,12 +800,12 @@ async def _read_messages_loop(self):

self._state_changed.set()
except asyncio.CancelledError as e:
logger.debug("reader stream %s error: %s", self._id, e)
logger.debug("%s error: %s", self._log_prefix, e)
if not self._closed:
self._set_first_error(issues.ConnectionLost("gRPC stream cancelled"))
raise
except Exception as e:
logger.debug("reader stream %s error: %s", self._id, e)
logger.debug("%s error: %s", self._log_prefix, e)
self._set_first_error(e)
return

Expand Down Expand Up @@ -954,7 +975,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
async def _decode_batches_loop(self):
while True:
batch = await self._batches_to_decode.get()
logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages))
logger.debug("%s decode batch %s messages", self._log_prefix, len(batch.messages))
await self._decode_batch_inplace(batch)
self._add_batch_to_queue(batch)
self._state_changed.set()
Expand All @@ -964,17 +985,17 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id]._extend(batch)
logger.debug(
"reader stream %s extend batch partition=%s size=%s",
self._id,
"%s extend batch partition=%s size=%s",
self._log_prefix,
part_sess_id,
len(batch.messages),
)
return

self._message_batches[part_sess_id] = batch
logger.debug(
"reader stream %s new batch partition=%s size=%s",
self._id,
"%s new batch partition=%s size=%s",
self._log_prefix,
part_sess_id,
len(batch.messages),
)
Expand Down Expand Up @@ -1025,7 +1046,7 @@ async def close(self, flush: bool):
return

self._closed = True
logger.debug("reader stream %s close", self._id)
logger.debug("%s close", self._log_prefix)

if flush:
await self.flush()
Expand All @@ -1045,4 +1066,4 @@ async def close(self, flush: bool):
if self._background_tasks:
await asyncio.wait(self._background_tasks)

logger.debug("reader stream %s was closed", self._id)
logger.debug("%s was closed", self._log_prefix)
31 changes: 31 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,37 @@ async def stream_create(reader_reconnector_id, driver, settings):
# The loop stopped on close instead of reconnecting into a second (zombie) stream.
assert create_calls == 1

async def test_create_closes_inflight_stream_on_cancel(self, default_reader_settings):
# If create() is cancelled (e.g. reader.close() cancels the connection loop during a
# reconnect) while parked on the init handshake, the in-flight gRPC stream must be
# closed. Otherwise it leaks as a zombie read session that keeps holding the
# consumer's partition on the server, and a fresh reader never gets it assigned.
built = []

class FakeStream(StreamMock):
def __init__(self, *args, **kwargs):
super().__init__()
built.append(self)

async def start(self, driver, stub, method):
return None

driver = mock.Mock()
driver._credentials = None

with mock.patch.object(topic_reader_asyncio, "GrpcWrapperAsyncIO", FakeStream):
# Real create(); no InitResponse is sent, so it parks inside _start() on
# `await stream.receive()` (the only reachable cancellation point in create()).
create_task = asyncio.create_task(ReaderStream.create(7, driver, default_reader_settings))
await wait_condition(lambda: bool(built) and not built[0].from_client.empty())
assert not create_task.done()

create_task.cancel()
with pytest.raises(asyncio.CancelledError):
await create_task

assert built[0]._closed, "create() leaked the in-flight gRPC stream on cancel"

async def test_wait_error_returns_on_cancelled_error_from_receive(self, default_reader_settings):
receive_call = 0

Expand Down
Loading