From 3f8d185b835a6adb50d0da571a61f8aab96b901b Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 28 May 2026 13:51:49 +0100 Subject: [PATCH] Fixed ReadPartitionSession closing --- topic/src/main/java/tech/ydb/topic/impl/SessionBase.java | 4 ---- .../tech/ydb/topic/read/impl/ReadPartitionSession.java | 8 ++++++++ .../main/java/tech/ydb/topic/read/impl/ReadSession.java | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java index 8f11669ce..5e2c7198a 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java +++ b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java @@ -52,8 +52,6 @@ protected CompletableFuture start(GrpcReadStream.Observer streamObser return streamConnection.start(message -> { if (getLogger().isTraceEnabled()) { getLogger().trace("[{}] Message received:\n{}", streamId, message); - } else { - getLogger().debug("[{}] Message received", streamId); } if (isWorking.get()) { @@ -88,8 +86,6 @@ public void send(W request) { if (getLogger().isTraceEnabled()) { getLogger().trace("[{}] Sending request:\n{}", streamId, request); - } else { - getLogger().debug("[{}] Sending request", streamId); } streamConnection.sendNext(request); } finally { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java index 9cec764ee..0a055a0e3 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java @@ -74,6 +74,10 @@ void confirmCommit(long committedOffset) { public void stop() { isStopped = true; committer.failPendingCommits(); + // complete all read futures + for (Batch batch: readingQueue) { + batch.complete(); + } logger.info("{} stopped", this); } @@ -120,6 +124,10 @@ public CompletableFuture addBatches(List[0])); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java index 7dbabefb0..830c26efa 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java @@ -296,7 +296,7 @@ public void confirm() { private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse response) { final long responseBytesSize = response.getBytesSize(); - logger.trace("[{}] Received ReadResponse of {} bytes", streamId, responseBytesSize); + logger.debug("[{}] Received ReadResponse of {} bytes", streamId, responseBytesSize); List> batchReadFutures = new ArrayList<>(); for (YdbTopic.StreamReadMessage.ReadResponse.PartitionData data: response.getPartitionDataList()) { @@ -365,7 +365,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { logger.debug("[{}] processMessage called, but read session is already closed", streamId); return; } - logger.debug("[{}] processMessage called", streamId); + logger.trace("[{}] processMessage called", streamId); if (message.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) { Status status = Status.of(StatusCode.fromProto(message.getStatus()), Issue.fromPb(message.getIssuesList()));