From 6204d74370d87cd11792c5e672b2b1a30c3cf4b8 Mon Sep 17 00:00:00 2001 From: Daniil Zulin Date: Thu, 28 May 2026 20:23:39 +0700 Subject: [PATCH] Fix topic reader flow-control stall on partition session stop When ReadPartitionSession.stop() was called (e.g. on graceful partition eviction) any batches still sitting in readingQueue had their readFuture abandoned. Those futures are part of CompletableFuture.allOf in ReadSession#onReadResponse; if they never complete, sendReadRequest() is never invoked for that ReadResponse, the SDK never replenishes the server-side memory budget, and the whole read session stops requesting data. This restores the drain that used to live in v2.3.33 PartitionSessionImpl .shutdown() and was lost during the refactor that introduced MessageCommitter (b8628162). It also closes a small race where stop() runs between the entry check and readingQueue.offer() in addBatches(). Adds a regression test that reproduces the stall on master and verifies the fix. --- .../topic/read/impl/ReadPartitionSession.java | 17 +- .../ReadPartitionSessionFlowControlTest.java | 312 ++++++++++++++++++ 2 files changed, 328 insertions(+), 1 deletion(-) create mode 100644 topic/src/test/java/tech/ydb/topic/read/impl/ReadPartitionSessionFlowControlTest.java diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java index 9cec764ee..f36ff1710 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReadPartitionSession.java @@ -74,6 +74,14 @@ void confirmCommit(long committedOffset) { public void stop() { isStopped = true; committer.failPendingCommits(); + // Drain any batches that were buffered but not yet delivered to the user. + // Their read futures are part of CompletableFuture.allOf(batchReadFutures) + // in ReadSession#onReadResponse; if they never complete, the SDK stops + // sending ReadRequests and the whole read session stalls. + Batch pending; + while ((pending = readingQueue.poll()) != null) { + pending.complete(); + } logger.info("{} stopped", this); } @@ -114,7 +122,14 @@ public CompletableFuture addBatches(List + *
  • ReadResponse R1 arrives and its batch is handed to the user handler (whose future is pending).
  • + *
  • While the user is still processing R1, ReadResponse R2 arrives -- its batch is enqueued + * behind the in-flight one (because {@code isReadingNow == true}).
  • + *
  • Server gracefully stops the partition; the user confirms and the partition session is removed, + * {@link ReadPartitionSession#stop()} is invoked.
  • + *
  • If {@code stop()} fails to complete the queued batch futures, the per-response + * {@code CompletableFuture.allOf(batchReadFutures)} in {@link ReadSession#onReadResponse} + * never fires, which means {@link ReadSession#sendReadRequest()} is never called, + * starving the reader of any further data.
  • + * + */ +public class ReadPartitionSessionFlowControlTest { + + private static final long MAX_MEMORY = 10_000L; + private static final long PARTITION_SESSION_ID = 1L; + private static final long PARTITION_ID = 42L; + private static final long RESPONSE1_BYTES = 300L; + private static final long RESPONSE2_BYTES = 400L; + + @Test(timeout = 10_000) + public void readerKeepsRequestingDataAfterGracefulStopWithPendingBatches() { + StreamMock stream = new StreamMock(); + TopicRpc rpc = Mockito.mock(TopicRpc.class); + Mockito.when(rpc.readSession(Mockito.any())).thenReturn(stream); + Mockito.when(rpc.getScheduler()).thenReturn(Mockito.mock(ScheduledExecutorService.class)); + + ReaderSettings settings = ReaderSettings.newBuilder() + .setMaxMemoryUsageBytes(MAX_MEMORY) + .setConsumerName("test-consumer") + .addTopic(TopicReadSettings.newBuilder().setPath("/test/topic").build()) + .setDecompressionExecutor(Runnable::run) + .build(); + + TestReader reader = new TestReader(rpc, settings); + reader.init(); + + // Init request must be the first message sent + Assert.assertTrue("init request expected", stream.lastSent().hasInitRequest()); + + // Step 1: Server sends InitResponse -- SDK should reply with the initial ReadRequest + stream.deliver(initResponse("sess-1")); + Assert.assertEquals(MAX_MEMORY, stream.lastSent().getReadRequest().getBytesSize()); + + // Step 2: Start partition session -- reader auto-confirms + stream.deliver(startPartition(PARTITION_SESSION_ID, PARTITION_ID, 0L)); + Assert.assertTrue("start partition response expected", + stream.lastSent().hasStartPartitionSessionResponse()); + + // Step 3: ReadResponse #1 -- batch handed to user, future stays pending + stream.deliver(readResponse(PARTITION_SESSION_ID, RESPONSE1_BYTES, 0L)); + Assert.assertEquals("handler must be invoked exactly once for R1", + 1, reader.pendingDataFutures.size()); + + // Step 4: ReadResponse #2 -- arrives while R1 is in flight, so its batch + // is enqueued in readingQueue but never delivered to the user + // (isReadingNow == true). + stream.deliver(readResponse(PARTITION_SESSION_ID, RESPONSE2_BYTES, 1L)); + Assert.assertEquals("handler must NOT be invoked again while R1 is in flight", + 1, reader.pendingDataFutures.size()); + + // Bytes the SDK had asked for *before* the partition stop -- only the initial + // maxMemoryUsageBytes ReadRequest emitted after InitResponse should be in here, + // because no data callback has completed yet. + long readRequestBytesBefore = stream.sumReadRequestBytes(); + + // Step 5: Graceful stop -- reader auto-confirms. This triggers + // ReadPartitionSession.stop() while the batch from R2 is still queued. + stream.deliver(stopPartition(PARTITION_SESSION_ID, true, 1L)); + Assert.assertTrue("stop partition response expected", + stream.hasSentStopPartitionSessionResponse()); + + // Step 6: Unblock R1's handler future. This completes R1's allOf and + // triggers a ReadRequest replenishing R1's bytes. R2's allOf, however, + // should also complete because the queued batch must be drained by stop(). + CompletableFuture r1Future = reader.pendingDataFutures.poll(); + Assert.assertNotNull("R1 handler future is missing", r1Future); + r1Future.complete(null); + + long readRequestBytesAfter = stream.sumReadRequestBytes(); + long replenishedBytes = readRequestBytesAfter - readRequestBytesBefore; + + // The SDK must have replenished the full server-side budget consumed by R1+R2. + // If R2's batch is orphaned in readingQueue, only R1's bytes get replenished + // and the reader silently stops requesting data. + Assert.assertEquals("ReadRequest bytes after partition stop must include both responses", + RESPONSE1_BYTES + RESPONSE2_BYTES, replenishedBytes); + } + + // --------------------------------------------------------------------- + // Test fixtures + // --------------------------------------------------------------------- + + private static FromServer initResponse(String sessionId) { + return FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setInitResponse(YdbTopic.StreamReadMessage.InitResponse.newBuilder() + .setSessionId(sessionId).build()) + .build(); + } + + private static FromServer startPartition(long psid, long pid, long committed) { + return FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest.newBuilder() + .setPartitionSession(YdbTopic.StreamReadMessage.PartitionSession.newBuilder() + .setPartitionSessionId(psid) + .setPartitionId(pid) + .setPath("/test/topic") + .build()) + .setCommittedOffset(committed) + .setPartitionOffsets(YdbTopic.OffsetsRange.newBuilder() + .setStart(committed) + .setEnd(committed + 1_000_000L) + .build()) + .build()) + .build(); + } + + private static FromServer readResponse(long psid, long bytes, long firstOffset) { + // codec=RAW (1) so that the batch is "ready" immediately and the message goes + // straight from addBatches to sendDataToReadersIfNeeded -> handleDataReceivedEvent + // without any async decoder steps. + YdbTopic.StreamReadMessage.ReadResponse.MessageData message = + YdbTopic.StreamReadMessage.ReadResponse.MessageData.newBuilder() + .setOffset(firstOffset) + .setSeqNo(firstOffset + 1) + .setData(ByteString.copyFromUtf8("payload-" + firstOffset)) + .build(); + + YdbTopic.StreamReadMessage.ReadResponse.Batch batch = + YdbTopic.StreamReadMessage.ReadResponse.Batch.newBuilder() + .setProducerId("test-producer") + .setCodec(1) // Codec.RAW + .addMessageData(message) + .build(); + + YdbTopic.StreamReadMessage.ReadResponse.PartitionData partitionData = + YdbTopic.StreamReadMessage.ReadResponse.PartitionData.newBuilder() + .setPartitionSessionId(psid) + .addBatches(batch) + .build(); + + return FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setReadResponse(YdbTopic.StreamReadMessage.ReadResponse.newBuilder() + .setBytesSize(bytes) + .addPartitionData(partitionData) + .build()) + .build(); + } + + private static FromServer stopPartition(long psid, boolean graceful, long committed) { + return FromServer.newBuilder() + .setStatus(StatusCodesProtos.StatusIds.StatusCode.SUCCESS) + .setStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest.newBuilder() + .setPartitionSessionId(psid) + .setGraceful(graceful) + .setCommittedOffset(committed) + .build()) + .build(); + } + + /** + * Minimal {@link ReaderImpl} that runs the handlers synchronously and exposes the + * {@link DataReceivedEvent} future so the test can finely control when the user's + * processing is considered complete. + */ + private static final class TestReader extends ReaderImpl { + final LinkedBlockingQueue> pendingDataFutures = new LinkedBlockingQueue<>(); + + TestReader(TopicRpc topicRpc, ReaderSettings settings) { + super(topicRpc, settings, new CodecRegistry()); + } + + void init() { + initImpl(); + } + + @Override + protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) { + CompletableFuture future = new CompletableFuture<>(); + pendingDataFutures.add(future); + return future; + } + + @Override + protected void handleSessionStarted(String sessionId) { + // nothing + } + + @Override + protected void handleCommitResponse(long committedOffset, PartitionSession partition) { + // nothing + } + + @Override + protected void handleStartPartitionSessionRequest(StartPartitionSessionEvent event) { + event.confirm(); + } + + @Override + protected void handleStopPartitionSession(StopPartitionSessionEvent event) { + event.confirm(); + } + + @Override + protected void handleClosePartitionSession(PartitionSession partition) { + // nothing + } + } + + /** + * Stub for the bidirectional gRPC stream that records every {@link FromClient} sent + * and replays {@link FromServer} messages synchronously into the captured observer. + */ + private static final class StreamMock implements GrpcReadWriteStream { + private final CompletableFuture streamFuture = new CompletableFuture<>(); + private final List sent = new ArrayList<>(); + private GrpcReadStream.Observer observer; + + void deliver(FromServer message) { + Assert.assertNotNull("observer is not set yet", observer); + observer.onNext(message); + } + + FromClient lastSent() { + Assert.assertFalse("no messages were sent", sent.isEmpty()); + return sent.get(sent.size() - 1); + } + + long sumReadRequestBytes() { + long total = 0L; + for (FromClient msg : sent) { + if (msg.hasReadRequest()) { + total += msg.getReadRequest().getBytesSize(); + } + } + return total; + } + + boolean hasSentStopPartitionSessionResponse() { + for (FromClient msg : sent) { + if (msg.hasStopPartitionSessionResponse()) { + return true; + } + } + return false; + } + + @Override + public String authToken() { + return "token"; + } + + @Override + public void sendNext(FromClient message) { + sent.add(message); + } + + @Override + public CompletableFuture start(GrpcReadStream.Observer observer) { + this.observer = observer; + return streamFuture; + } + + @Override + public void close() { + streamFuture.complete(Status.SUCCESS); + } + + @Override + public void cancel() { + streamFuture.complete(Status.SUCCESS); + } + } + +}