From 820c671e56c7e01b5667591d185e5766ddddd345 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 15:37:45 -0700 Subject: [PATCH] [fix][client] PIP-468: DagWatchClient auto-reconnects on broker disconnect Replace the TODO in DagWatchClient.connectionClosed() that left long-lived producers / consumers stranded on a stale layout after a transient broker disconnect. The DAG watch is the consumer / producer's only path to learning about split / merge events; without reconnect, a single network blip silently drops the client off future layout updates and never surfaces an error to the application. The new logic mirrors the proven pattern used by ScalableTopicsWatcher (used for the namespace-watcher session) and ScalableConsumerClient (used for the controller session): - start() (initial create) keeps its current behavior: the future is failed on disconnect so producer.create() / consumer.subscribe() fails fast when the broker is unreachable. - After the initial layout has arrived, connectionClosed() schedules a reconnect with exponential backoff (100 ms initial, 30 s max). The reconnect path calls getConnection again, re-registers the same sessionId on the fresh ClientCnx, and re-issues ScalableTopicLookup; the broker re-pushes the current layout via onUpdate, which resets the backoff. - The connect/reconnect logic is shared in a single attach(cnx) method so both paths use identical wiring. Adds a forceCloseConnectionForTesting() hook (mirroring the one on ScalableConsumerClient) for cross-module integration tests. V5DagWatchAutoReconnectTest force-closes the DAG channel underneath a producer and a consumer and asserts each continues operating. A third test reflectively verifies the DagWatchClient's cnx field is repopulated within the backoff window. --- .../api/v5/V5DagWatchAutoReconnectTest.java | 207 ++++++++++++++++++ .../pulsar/client/impl/v5/DagWatchClient.java | 146 +++++++++--- 2 files changed, 322 insertions(+), 31 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java new file mode 100644 index 0000000000000..c5e294c35cef3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * Coverage for {@code DagWatchClient}'s auto-reconnect path. The DAG watch holds + * the producer / consumer's view of the scalable topic layout — without + * reconnect, a transient broker disconnect silently strands the client on stale + * layout and never delivers another split / merge update. + * + *

These tests force-close the watch's underlying channel and assert that + * producers and consumers continue to operate end-to-end without application + * intervention. + */ +public class V5DagWatchAutoReconnectTest extends V5ClientBaseTest { + + /** + * Producer's DAG watch channel is force-closed mid-life. Sends made after the + * close must still succeed: the cached layout keeps existing segments + * reachable, and the reconnect re-establishes the watch so subsequent layout + * changes would still be observed. + */ + @Test + public void testProducerSurvivesDagWatchConnectionDrop() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("dag-reconnect-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int firstN = 10; + Set firstSent = new HashSet<>(); + for (int i = 0; i < firstN; i++) { + String v = "first-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + firstSent.add(v); + } + assertEquals(drain(consumer, firstN), firstSent, + "first batch must arrive before the disconnect"); + + // Force-close the DAG watch channel. The cnx layer fires connectionClosed() + // on the DagWatchClient, which schedules a reconnect. + forceCloseDagWatchOnProducer(producer); + + // Send a second batch immediately. Existing segments are still reachable + // through the per-segment v4 producers (their own connections are unaffected), + // so this proves the producer keeps working through the reconnect window. + int secondN = 10; + Set secondSent = new HashSet<>(); + for (int i = 0; i < secondN; i++) { + String v = "second-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + secondSent.add(v); + } + assertEquals(drain(consumer, secondN), secondSent, + "producer must keep sending after DAG watch reconnect kicks in"); + } + + /** + * After a force-close, the DAG watch must observe a fresh broker connection + * (i.e., its internal {@code cnx} field is re-populated). Asserts the + * reconnect path actually fires rather than silently staying disconnected. + */ + @Test + public void testDagWatchReattachesAfterDisconnect() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + + Object dagWatch = getDagWatchOnProducer(producer); + Object originalCnx = readField(dagWatch, "cnx"); + assertNotNull(originalCnx, "DAG watch must have an initial connection"); + + forceCloseDagWatchOnProducer(producer); + + // Wait for the reconnect path to land a fresh ClientCnx on the DagWatchClient. + // Backoff starts at 100ms; allow a generous window for CI. + Awaitility.await().atMost(Duration.ofSeconds(15)) + .until(() -> { + Object current = readField(dagWatch, "cnx"); + return current != null && current != originalCnx; + }); + } + + /** + * A consumer's DAG watch channel is force-closed mid-life. Like the producer + * test, this asserts the consumer continues to deliver messages produced + * after the disconnect. + */ + @Test + public void testConsumerSurvivesDagWatchConnectionDrop() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("dag-reconnect-consumer-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int firstN = 10; + for (int i = 0; i < firstN; i++) { + producer.newMessage().key("k-" + i).value("first-" + i).send(); + } + assertEquals(drain(consumer, firstN).size(), firstN, + "first batch must arrive before disconnect"); + + forceCloseDagWatchOnConsumer(consumer); + + int secondN = 10; + for (int i = 0; i < secondN; i++) { + producer.newMessage().key("k-" + i).value("second-" + i).send(); + } + Set got = drain(consumer, secondN); + assertEquals(got.size(), secondN, + "consumer must keep receiving after DAG watch reconnect kicks in"); + } + + // --- Helpers --- + + private Set drain(QueueConsumer consumer, int expected) throws Exception { + Set received = new HashSet<>(); + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < expected && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + } + return received; + } + + private static Object getDagWatchOnProducer(Producer producer) throws Exception { + Field f = producer.getClass().getDeclaredField("dagWatch"); + f.setAccessible(true); + Object watch = f.get(producer); + assertNotNull(watch, "expected dagWatch on producer"); + return watch; + } + + private static void forceCloseDagWatchOnProducer(Producer producer) throws Exception { + Object watch = getDagWatchOnProducer(producer); + Method m = watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting"); + m.setAccessible(true); + m.invoke(watch); + } + + private static void forceCloseDagWatchOnConsumer(QueueConsumer consumer) throws Exception { + Field f = consumer.getClass().getDeclaredField("dagWatch"); + f.setAccessible(true); + Object watch = f.get(consumer); + assertNotNull(watch, "expected dagWatch on consumer"); + Method m = watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting"); + m.setAccessible(true); + m.invoke(watch); + } + + private static Object readField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.get(target); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java index b13828e756174..2041dab9b57b7 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java @@ -19,7 +19,9 @@ package org.apache.pulsar.client.impl.v5; import io.github.merlimat.slog.Logger; +import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.PulsarClientException; @@ -30,6 +32,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Backoff; /** * Client-side manager for a DAG watch session on a scalable topic. @@ -39,6 +42,12 @@ * *

Maintains the current {@link ClientSegmentLayout} and notifies a listener * when it changes. + * + *

Reconnects automatically on transient broker disconnects. The initial-create + * future ({@link #start}) surfaces failures up front so a producer / consumer + * {@code create()} fails fast when the broker is unreachable. After the first + * layout has arrived, subsequent disconnects schedule a reconnection with + * exponential backoff so long-lived producers / consumers survive network blips. */ final class DagWatchClient implements DagWatchSession, AutoCloseable { @@ -52,6 +61,7 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { private final long sessionId; private final AtomicReference currentLayout = new AtomicReference<>(); private final CompletableFuture initialLayoutFuture = new CompletableFuture<>(); + private final Backoff reconnectBackoff; private volatile LayoutChangeListener listener; private volatile ClientCnx cnx; private volatile boolean closed = false; @@ -60,6 +70,10 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { this.v4Client = v4Client; this.topicName = topicName; this.sessionId = SESSION_ID_GENERATOR.incrementAndGet(); + this.reconnectBackoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(30)) + .build(); this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build(); } @@ -70,39 +84,55 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { * @return a future that completes with the initial layout */ CompletableFuture start() { - // Get any broker connection and send the lookup command v4Client.getConnection(topicName.toString()) - .thenAccept(cnx -> { - this.cnx = cnx; - if (!cnx.isSupportsScalableTopics()) { - initialLayoutFuture.completeExceptionally( - new PulsarClientException.FeatureNotSupportedException( - "Broker does not support scalable topics", - PulsarClientException.FailedFeatureCheck.SupportsScalableTopics)); - return; - } - // Register this session to receive updates - cnx.registerDagWatchSession(sessionId, this); - - // Send the lookup command - cnx.ctx().writeAndFlush( - Commands.newScalableTopicLookup(sessionId, topicName.toString())) - .addListener(writeFuture -> { - if (!writeFuture.isSuccess()) { - cnx.removeDagWatchSession(sessionId); - initialLayoutFuture.completeExceptionally( - new PulsarClientException(writeFuture.cause())); - } - }); - }) + .thenAccept(this::attach) .exceptionally(ex -> { initialLayoutFuture.completeExceptionally(ex); return null; }); - return initialLayoutFuture; } + /** + * Wire {@code newCnx} to this session and send a ScalableTopicLookup. Used by + * both {@link #start} (first connect) and {@link #reconnect} (after disconnect). + */ + private void attach(ClientCnx newCnx) { + if (closed) { + return; + } + if (!newCnx.isSupportsScalableTopics()) { + PulsarClientException ex = new PulsarClientException.FeatureNotSupportedException( + "Broker does not support scalable topics", + PulsarClientException.FailedFeatureCheck.SupportsScalableTopics); + if (!initialLayoutFuture.isDone()) { + initialLayoutFuture.completeExceptionally(ex); + } else { + log.warn().exceptionMessage(ex) + .log("Reconnect target broker doesn't support scalable topics"); + scheduleReconnect(); + } + return; + } + this.cnx = newCnx; + newCnx.registerDagWatchSession(sessionId, this); + newCnx.ctx().writeAndFlush( + Commands.newScalableTopicLookup(sessionId, topicName.toString())) + .addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + newCnx.removeDagWatchSession(sessionId); + if (!initialLayoutFuture.isDone()) { + initialLayoutFuture.completeExceptionally( + new PulsarClientException(writeFuture.cause())); + } else { + log.warn().exceptionMessage(writeFuture.cause()) + .log("DAG watch reconnect write failed; will retry"); + scheduleReconnect(); + } + } + }); + } + /** * Called when the broker pushes a ScalableTopicUpdate for this session. * This is invoked from the Netty I/O thread. @@ -121,6 +151,10 @@ public void onUpdate(ScalableTopicDAG dag) { .attr("activeSegmentCount", newLayout.activeSegments().size()) .log("Layout updated"); + // Reset the reconnect backoff: the broker confirmed the session is live and + // our local state is consistent. + reconnectBackoff.reset(); + // Complete the initial layout future if this is the first update initialLayoutFuture.complete(newLayout); @@ -138,18 +172,56 @@ public void onUpdate(ScalableTopicDAG dag) { public void onError(ServerError error, String message) { log.error().attr("error", error).attr("message", message) .log("DAG watch session error"); - initialLayoutFuture.completeExceptionally( - new PulsarClientException( - "Scalable topic lookup failed: " + error + " - " + message)); + if (!initialLayoutFuture.isDone()) { + initialLayoutFuture.completeExceptionally( + new PulsarClientException( + "Scalable topic lookup failed: " + error + " - " + message)); + } + // After the initial layout has arrived, broker-side errors on this session + // (e.g., metadata unavailable) are transient — a reconnect typically clears + // them. The connection-closed path will pick this up; no extra work here. } @Override public void connectionClosed() { log.warn("DAG watch session connection closed"); cnx = null; - initialLayoutFuture.completeExceptionally( - new PulsarClientException("Connection closed while waiting for scalable topic layout")); - // TODO: implement automatic reconnection with backoff + if (closed) { + return; + } + if (!initialLayoutFuture.isDone()) { + // Initial lookup never completed — surface the failure rather than + // retrying silently behind the caller of producer / consumer create(). + initialLayoutFuture.completeExceptionally( + new PulsarClientException( + "Connection closed while waiting for scalable topic layout")); + return; + } + scheduleReconnect(); + } + + private void scheduleReconnect() { + if (closed) { + return; + } + long delayMs = reconnectBackoff.next().toMillis(); + log.info().attr("delayMs", delayMs).log("Scheduling DAG watch reconnect"); + v4Client.timer().newTimeout(timeout -> reconnect(), + delayMs, TimeUnit.MILLISECONDS); + } + + private void reconnect() { + if (closed) { + return; + } + v4Client.getConnection(topicName.toString()) + .thenAccept(this::attach) + .exceptionally(ex -> { + log.warn().exceptionMessage(ex) + .log("DAG watch reconnect failed; will retry"); + scheduleReconnect(); + return null; + }); } ClientSegmentLayout currentLayout() { @@ -182,6 +254,18 @@ public void close() { } } + /** + * Test hook: forcibly close the underlying broker channel to simulate a network + * drop. The cnx layer will fire {@link #connectionClosed()} which triggers the + * automatic reconnect path. Reached via reflection from cross-module tests. + */ + void forceCloseConnectionForTesting() { + ClientCnx c = cnx; + if (c != null) { + c.ctx().channel().close(); + } + } + interface LayoutChangeListener { void onLayoutChange(ClientSegmentLayout newLayout, ClientSegmentLayout oldLayout); }