Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ protected CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObser
return streamConnection.start(message -> {
if (getLogger().isTraceEnabled()) {
getLogger().trace("[{}] Message received:\n{}", streamId, message);
} else {
getLogger().debug("[{}] Message received", streamId);
}

if (isWorking.get()) {
Expand Down Expand Up @@ -88,8 +86,6 @@ public void send(W request) {

if (getLogger().isTraceEnabled()) {
getLogger().trace("[{}] Sending request:\n{}", streamId, request);
} else {
getLogger().debug("[{}] Sending request", streamId);
}
Comment thread
alex268 marked this conversation as resolved.
streamConnection.sendNext(request);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ void confirmCommit(long committedOffset) {
public void stop() {
isStopped = true;
committer.failPendingCommits();
// complete all read futures
for (Batch batch: readingQueue) {
Comment thread
alex268 marked this conversation as resolved.
Comment thread
alex268 marked this conversation as resolved.
batch.complete();
Comment thread
alex268 marked this conversation as resolved.
}
logger.info("{} stopped", this);
}

Expand Down Expand Up @@ -120,6 +124,10 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
}

sendDataToReadersIfNeeded();

if (isStopped) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture<?>[0]));
}

Expand Down
4 changes: 2 additions & 2 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void confirm() {

private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse response) {
final long responseBytesSize = response.getBytesSize();
logger.trace("[{}] Received ReadResponse of {} bytes", streamId, responseBytesSize);
logger.debug("[{}] Received ReadResponse of {} bytes", streamId, responseBytesSize);
List<CompletableFuture<Void>> batchReadFutures = new ArrayList<>();

for (YdbTopic.StreamReadMessage.ReadResponse.PartitionData data: response.getPartitionDataList()) {
Expand Down Expand Up @@ -365,7 +365,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
logger.debug("[{}] processMessage called, but read session is already closed", streamId);
return;
}
logger.debug("[{}] processMessage called", streamId);
logger.trace("[{}] processMessage called", streamId);
if (message.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
Status status = Status.of(StatusCode.fromProto(message.getStatus()),
Issue.fromPb(message.getIssuesList()));
Expand Down