diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
new file mode 100644
index 0000000000000..c5e294c35cef3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code DagWatchClient}'s auto-reconnect path. The DAG watch holds
+ * the producer / consumer's view of the scalable topic layout — without
+ * reconnect, a transient broker disconnect silently strands the client on stale
+ * layout and never delivers another split / merge update.
+ *
+ *
These tests force-close the watch's underlying channel and assert that
+ * producers and consumers continue to operate end-to-end without application
+ * intervention.
+ */
+public class V5DagWatchAutoReconnectTest extends V5ClientBaseTest {
+
+ /**
+ * Producer's DAG watch channel is force-closed mid-life. Sends made after the
+ * close must still succeed: the cached layout keeps existing segments
+ * reachable, and the reconnect re-establishes the watch so subsequent layout
+ * changes would still be observed.
+ */
+ @Test
+ public void testProducerSurvivesDagWatchConnectionDrop() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dag-reconnect-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int firstN = 10;
+ Set firstSent = new HashSet<>();
+ for (int i = 0; i < firstN; i++) {
+ String v = "first-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ firstSent.add(v);
+ }
+ assertEquals(drain(consumer, firstN), firstSent,
+ "first batch must arrive before the disconnect");
+
+ // Force-close the DAG watch channel. The cnx layer fires connectionClosed()
+ // on the DagWatchClient, which schedules a reconnect.
+ forceCloseDagWatchOnProducer(producer);
+
+ // Send a second batch immediately. Existing segments are still reachable
+ // through the per-segment v4 producers (their own connections are unaffected),
+ // so this proves the producer keeps working through the reconnect window.
+ int secondN = 10;
+ Set secondSent = new HashSet<>();
+ for (int i = 0; i < secondN; i++) {
+ String v = "second-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ secondSent.add(v);
+ }
+ assertEquals(drain(consumer, secondN), secondSent,
+ "producer must keep sending after DAG watch reconnect kicks in");
+ }
+
+ /**
+ * After a force-close, the DAG watch must observe a fresh broker connection
+ * (i.e., its internal {@code cnx} field is re-populated). Asserts the
+ * reconnect path actually fires rather than silently staying disconnected.
+ */
+ @Test
+ public void testDagWatchReattachesAfterDisconnect() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ Object dagWatch = getDagWatchOnProducer(producer);
+ Object originalCnx = readField(dagWatch, "cnx");
+ assertNotNull(originalCnx, "DAG watch must have an initial connection");
+
+ forceCloseDagWatchOnProducer(producer);
+
+ // Wait for the reconnect path to land a fresh ClientCnx on the DagWatchClient.
+ // Backoff starts at 100ms; allow a generous window for CI.
+ Awaitility.await().atMost(Duration.ofSeconds(15))
+ .until(() -> {
+ Object current = readField(dagWatch, "cnx");
+ return current != null && current != originalCnx;
+ });
+ }
+
+ /**
+ * A consumer's DAG watch channel is force-closed mid-life. Like the producer
+ * test, this asserts the consumer continues to deliver messages produced
+ * after the disconnect.
+ */
+ @Test
+ public void testConsumerSurvivesDagWatchConnectionDrop() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dag-reconnect-consumer-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int firstN = 10;
+ for (int i = 0; i < firstN; i++) {
+ producer.newMessage().key("k-" + i).value("first-" + i).send();
+ }
+ assertEquals(drain(consumer, firstN).size(), firstN,
+ "first batch must arrive before disconnect");
+
+ forceCloseDagWatchOnConsumer(consumer);
+
+ int secondN = 10;
+ for (int i = 0; i < secondN; i++) {
+ producer.newMessage().key("k-" + i).value("second-" + i).send();
+ }
+ Set got = drain(consumer, secondN);
+ assertEquals(got.size(), secondN,
+ "consumer must keep receiving after DAG watch reconnect kicks in");
+ }
+
+ // --- Helpers ---
+
+ private Set drain(QueueConsumer consumer, int expected) throws Exception {
+ Set received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() < deadline) {
+ Message msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ }
+ return received;
+ }
+
+ private static Object getDagWatchOnProducer(Producer> producer) throws Exception {
+ Field f = producer.getClass().getDeclaredField("dagWatch");
+ f.setAccessible(true);
+ Object watch = f.get(producer);
+ assertNotNull(watch, "expected dagWatch on producer");
+ return watch;
+ }
+
+ private static void forceCloseDagWatchOnProducer(Producer> producer) throws Exception {
+ Object watch = getDagWatchOnProducer(producer);
+ Method m = watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+ m.setAccessible(true);
+ m.invoke(watch);
+ }
+
+ private static void forceCloseDagWatchOnConsumer(QueueConsumer> consumer) throws Exception {
+ Field f = consumer.getClass().getDeclaredField("dagWatch");
+ f.setAccessible(true);
+ Object watch = f.get(consumer);
+ assertNotNull(watch, "expected dagWatch on consumer");
+ Method m = watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+ m.setAccessible(true);
+ m.invoke(watch);
+ }
+
+ private static Object readField(Object target, String name) throws Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(target);
+ }
+}
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index b13828e756174..2041dab9b57b7 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl.v5;
import io.github.merlimat.slog.Logger;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,6 +32,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
/**
* Client-side manager for a DAG watch session on a scalable topic.
@@ -39,6 +42,12 @@
*
* Maintains the current {@link ClientSegmentLayout} and notifies a listener
* when it changes.
+ *
+ *
Reconnects automatically on transient broker disconnects. The initial-create
+ * future ({@link #start}) surfaces failures up front so a producer / consumer
+ * {@code create()} fails fast when the broker is unreachable. After the first
+ * layout has arrived, subsequent disconnects schedule a reconnection with
+ * exponential backoff so long-lived producers / consumers survive network blips.
*/
final class DagWatchClient implements DagWatchSession, AutoCloseable {
@@ -52,6 +61,7 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable {
private final long sessionId;
private final AtomicReference currentLayout = new AtomicReference<>();
private final CompletableFuture initialLayoutFuture = new CompletableFuture<>();
+ private final Backoff reconnectBackoff;
private volatile LayoutChangeListener listener;
private volatile ClientCnx cnx;
private volatile boolean closed = false;
@@ -60,6 +70,10 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable {
this.v4Client = v4Client;
this.topicName = topicName;
this.sessionId = SESSION_ID_GENERATOR.incrementAndGet();
+ this.reconnectBackoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(30))
+ .build();
this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build();
}
@@ -70,39 +84,55 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable {
* @return a future that completes with the initial layout
*/
CompletableFuture start() {
- // Get any broker connection and send the lookup command
v4Client.getConnection(topicName.toString())
- .thenAccept(cnx -> {
- this.cnx = cnx;
- if (!cnx.isSupportsScalableTopics()) {
- initialLayoutFuture.completeExceptionally(
- new PulsarClientException.FeatureNotSupportedException(
- "Broker does not support scalable topics",
- PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
- return;
- }
- // Register this session to receive updates
- cnx.registerDagWatchSession(sessionId, this);
-
- // Send the lookup command
- cnx.ctx().writeAndFlush(
- Commands.newScalableTopicLookup(sessionId, topicName.toString()))
- .addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- cnx.removeDagWatchSession(sessionId);
- initialLayoutFuture.completeExceptionally(
- new PulsarClientException(writeFuture.cause()));
- }
- });
- })
+ .thenAccept(this::attach)
.exceptionally(ex -> {
initialLayoutFuture.completeExceptionally(ex);
return null;
});
-
return initialLayoutFuture;
}
+ /**
+ * Wire {@code newCnx} to this session and send a ScalableTopicLookup. Used by
+ * both {@link #start} (first connect) and {@link #reconnect} (after disconnect).
+ */
+ private void attach(ClientCnx newCnx) {
+ if (closed) {
+ return;
+ }
+ if (!newCnx.isSupportsScalableTopics()) {
+ PulsarClientException ex = new PulsarClientException.FeatureNotSupportedException(
+ "Broker does not support scalable topics",
+ PulsarClientException.FailedFeatureCheck.SupportsScalableTopics);
+ if (!initialLayoutFuture.isDone()) {
+ initialLayoutFuture.completeExceptionally(ex);
+ } else {
+ log.warn().exceptionMessage(ex)
+ .log("Reconnect target broker doesn't support scalable topics");
+ scheduleReconnect();
+ }
+ return;
+ }
+ this.cnx = newCnx;
+ newCnx.registerDagWatchSession(sessionId, this);
+ newCnx.ctx().writeAndFlush(
+ Commands.newScalableTopicLookup(sessionId, topicName.toString()))
+ .addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ newCnx.removeDagWatchSession(sessionId);
+ if (!initialLayoutFuture.isDone()) {
+ initialLayoutFuture.completeExceptionally(
+ new PulsarClientException(writeFuture.cause()));
+ } else {
+ log.warn().exceptionMessage(writeFuture.cause())
+ .log("DAG watch reconnect write failed; will retry");
+ scheduleReconnect();
+ }
+ }
+ });
+ }
+
/**
* Called when the broker pushes a ScalableTopicUpdate for this session.
* This is invoked from the Netty I/O thread.
@@ -121,6 +151,10 @@ public void onUpdate(ScalableTopicDAG dag) {
.attr("activeSegmentCount", newLayout.activeSegments().size())
.log("Layout updated");
+ // Reset the reconnect backoff: the broker confirmed the session is live and
+ // our local state is consistent.
+ reconnectBackoff.reset();
+
// Complete the initial layout future if this is the first update
initialLayoutFuture.complete(newLayout);
@@ -138,18 +172,56 @@ public void onUpdate(ScalableTopicDAG dag) {
public void onError(ServerError error, String message) {
log.error().attr("error", error).attr("message", message)
.log("DAG watch session error");
- initialLayoutFuture.completeExceptionally(
- new PulsarClientException(
- "Scalable topic lookup failed: " + error + " - " + message));
+ if (!initialLayoutFuture.isDone()) {
+ initialLayoutFuture.completeExceptionally(
+ new PulsarClientException(
+ "Scalable topic lookup failed: " + error + " - " + message));
+ }
+ // After the initial layout has arrived, broker-side errors on this session
+ // (e.g., metadata unavailable) are transient — a reconnect typically clears
+ // them. The connection-closed path will pick this up; no extra work here.
}
@Override
public void connectionClosed() {
log.warn("DAG watch session connection closed");
cnx = null;
- initialLayoutFuture.completeExceptionally(
- new PulsarClientException("Connection closed while waiting for scalable topic layout"));
- // TODO: implement automatic reconnection with backoff
+ if (closed) {
+ return;
+ }
+ if (!initialLayoutFuture.isDone()) {
+ // Initial lookup never completed — surface the failure rather than
+ // retrying silently behind the caller of producer / consumer create().
+ initialLayoutFuture.completeExceptionally(
+ new PulsarClientException(
+ "Connection closed while waiting for scalable topic layout"));
+ return;
+ }
+ scheduleReconnect();
+ }
+
+ private void scheduleReconnect() {
+ if (closed) {
+ return;
+ }
+ long delayMs = reconnectBackoff.next().toMillis();
+ log.info().attr("delayMs", delayMs).log("Scheduling DAG watch reconnect");
+ v4Client.timer().newTimeout(timeout -> reconnect(),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void reconnect() {
+ if (closed) {
+ return;
+ }
+ v4Client.getConnection(topicName.toString())
+ .thenAccept(this::attach)
+ .exceptionally(ex -> {
+ log.warn().exceptionMessage(ex)
+ .log("DAG watch reconnect failed; will retry");
+ scheduleReconnect();
+ return null;
+ });
}
ClientSegmentLayout currentLayout() {
@@ -182,6 +254,18 @@ public void close() {
}
}
+ /**
+ * Test hook: forcibly close the underlying broker channel to simulate a network
+ * drop. The cnx layer will fire {@link #connectionClosed()} which triggers the
+ * automatic reconnect path. Reached via reflection from cross-module tests.
+ */
+ void forceCloseConnectionForTesting() {
+ ClientCnx c = cnx;
+ if (c != null) {
+ c.ctx().channel().close();
+ }
+ }
+
interface LayoutChangeListener {
void onLayoutChange(ClientSegmentLayout newLayout, ClientSegmentLayout oldLayout);
}