diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java new file mode 100644 index 0000000000000..c5e294c35cef3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * Coverage for {@code DagWatchClient}'s auto-reconnect path. The DAG watch holds + * the producer / consumer's view of the scalable topic layout — without + * reconnect, a transient broker disconnect silently strands the client on stale + * layout and never delivers another split / merge update. + * + *

These tests force-close the watch's underlying channel and assert that + * producers and consumers continue to operate end-to-end without application + * intervention. + */ +public class V5DagWatchAutoReconnectTest extends V5ClientBaseTest { + + /** + * Producer's DAG watch channel is force-closed mid-life. Sends made after the + * close must still succeed: the cached layout keeps existing segments + * reachable, and the reconnect re-establishes the watch so subsequent layout + * changes would still be observed. + */ + @Test + public void testProducerSurvivesDagWatchConnectionDrop() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("dag-reconnect-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int firstN = 10; + Set firstSent = new HashSet<>(); + for (int i = 0; i < firstN; i++) { + String v = "first-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + firstSent.add(v); + } + assertEquals(drain(consumer, firstN), firstSent, + "first batch must arrive before the disconnect"); + + // Force-close the DAG watch channel. The cnx layer fires connectionClosed() + // on the DagWatchClient, which schedules a reconnect. + forceCloseDagWatchOnProducer(producer); + + // Send a second batch immediately. Existing segments are still reachable + // through the per-segment v4 producers (their own connections are unaffected), + // so this proves the producer keeps working through the reconnect window. + int secondN = 10; + Set secondSent = new HashSet<>(); + for (int i = 0; i < secondN; i++) { + String v = "second-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + secondSent.add(v); + } + assertEquals(drain(consumer, secondN), secondSent, + "producer must keep sending after DAG watch reconnect kicks in"); + } + + /** + * After a force-close, the DAG watch must observe a fresh broker connection + * (i.e., its internal {@code cnx} field is re-populated). Asserts the + * reconnect path actually fires rather than silently staying disconnected. + */ + @Test + public void testDagWatchReattachesAfterDisconnect() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + + Object dagWatch = getDagWatchOnProducer(producer); + Object originalCnx = readField(dagWatch, "cnx"); + assertNotNull(originalCnx, "DAG watch must have an initial connection"); + + forceCloseDagWatchOnProducer(producer); + + // Wait for the reconnect path to land a fresh ClientCnx on the DagWatchClient. + // Backoff starts at 100ms; allow a generous window for CI. + Awaitility.await().atMost(Duration.ofSeconds(15)) + .until(() -> { + Object current = readField(dagWatch, "cnx"); + return current != null && current != originalCnx; + }); + } + + /** + * A consumer's DAG watch channel is force-closed mid-life. Like the producer + * test, this asserts the consumer continues to deliver messages produced + * after the disconnect. + */ + @Test + public void testConsumerSurvivesDagWatchConnectionDrop() throws Exception { + String topic = newScalableTopic(2); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("dag-reconnect-consumer-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int firstN = 10; + for (int i = 0; i < firstN; i++) { + producer.newMessage().key("k-" + i).value("first-" + i).send(); + } + assertEquals(drain(consumer, firstN).size(), firstN, + "first batch must arrive before disconnect"); + + forceCloseDagWatchOnConsumer(consumer); + + int secondN = 10; + for (int i = 0; i < secondN; i++) { + producer.newMessage().key("k-" + i).value("second-" + i).send(); + } + Set got = drain(consumer, secondN); + assertEquals(got.size(), secondN, + "consumer must keep receiving after DAG watch reconnect kicks in"); + } + + // --- Helpers --- + + private Set drain(QueueConsumer consumer, int expected) throws Exception { + Set received = new HashSet<>(); + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < expected && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + } + return received; + } + + private static Object getDagWatchOnProducer(Producer producer) throws Exception { + Field f = producer.getClass().getDeclaredField("dagWatch"); + f.setAccessible(true); + Object watch = f.get(producer); + assertNotNull(watch, "expected dagWatch on producer"); + return watch; + } + + private static void forceCloseDagWatchOnProducer(Producer producer) throws Exception { + Object watch = getDagWatchOnProducer(producer); + Method m = watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting"); + m.setAccessible(true); + m.invoke(watch); + } + + private static void forceCloseDagWatchOnConsumer(QueueConsumer consumer) throws Exception { + Field f = consumer.getClass().getDeclaredField("dagWatch"); + f.setAccessible(true); + Object watch = f.get(consumer); + assertNotNull(watch, "expected dagWatch on consumer"); + Method m = watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting"); + m.setAccessible(true); + m.invoke(watch); + } + + private static Object readField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.get(target); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java index b13828e756174..2041dab9b57b7 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java @@ -19,7 +19,9 @@ package org.apache.pulsar.client.impl.v5; import io.github.merlimat.slog.Logger; +import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.PulsarClientException; @@ -30,6 +32,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Backoff; /** * Client-side manager for a DAG watch session on a scalable topic. @@ -39,6 +42,12 @@ * *

Maintains the current {@link ClientSegmentLayout} and notifies a listener * when it changes. + * + *

Reconnects automatically on transient broker disconnects. The initial-create + * future ({@link #start}) surfaces failures up front so a producer / consumer + * {@code create()} fails fast when the broker is unreachable. After the first + * layout has arrived, subsequent disconnects schedule a reconnection with + * exponential backoff so long-lived producers / consumers survive network blips. */ final class DagWatchClient implements DagWatchSession, AutoCloseable { @@ -52,6 +61,7 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { private final long sessionId; private final AtomicReference currentLayout = new AtomicReference<>(); private final CompletableFuture initialLayoutFuture = new CompletableFuture<>(); + private final Backoff reconnectBackoff; private volatile LayoutChangeListener listener; private volatile ClientCnx cnx; private volatile boolean closed = false; @@ -60,6 +70,10 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { this.v4Client = v4Client; this.topicName = topicName; this.sessionId = SESSION_ID_GENERATOR.incrementAndGet(); + this.reconnectBackoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(30)) + .build(); this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build(); } @@ -70,39 +84,55 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { * @return a future that completes with the initial layout */ CompletableFuture start() { - // Get any broker connection and send the lookup command v4Client.getConnection(topicName.toString()) - .thenAccept(cnx -> { - this.cnx = cnx; - if (!cnx.isSupportsScalableTopics()) { - initialLayoutFuture.completeExceptionally( - new PulsarClientException.FeatureNotSupportedException( - "Broker does not support scalable topics", - PulsarClientException.FailedFeatureCheck.SupportsScalableTopics)); - return; - } - // Register this session to receive updates - cnx.registerDagWatchSession(sessionId, this); - - // Send the lookup command - cnx.ctx().writeAndFlush( - Commands.newScalableTopicLookup(sessionId, topicName.toString())) - .addListener(writeFuture -> { - if (!writeFuture.isSuccess()) { - cnx.removeDagWatchSession(sessionId); - initialLayoutFuture.completeExceptionally( - new PulsarClientException(writeFuture.cause())); - } - }); - }) + .thenAccept(this::attach) .exceptionally(ex -> { initialLayoutFuture.completeExceptionally(ex); return null; }); - return initialLayoutFuture; } + /** + * Wire {@code newCnx} to this session and send a ScalableTopicLookup. Used by + * both {@link #start} (first connect) and {@link #reconnect} (after disconnect). + */ + private void attach(ClientCnx newCnx) { + if (closed) { + return; + } + if (!newCnx.isSupportsScalableTopics()) { + PulsarClientException ex = new PulsarClientException.FeatureNotSupportedException( + "Broker does not support scalable topics", + PulsarClientException.FailedFeatureCheck.SupportsScalableTopics); + if (!initialLayoutFuture.isDone()) { + initialLayoutFuture.completeExceptionally(ex); + } else { + log.warn().exceptionMessage(ex) + .log("Reconnect target broker doesn't support scalable topics"); + scheduleReconnect(); + } + return; + } + this.cnx = newCnx; + newCnx.registerDagWatchSession(sessionId, this); + newCnx.ctx().writeAndFlush( + Commands.newScalableTopicLookup(sessionId, topicName.toString())) + .addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + newCnx.removeDagWatchSession(sessionId); + if (!initialLayoutFuture.isDone()) { + initialLayoutFuture.completeExceptionally( + new PulsarClientException(writeFuture.cause())); + } else { + log.warn().exceptionMessage(writeFuture.cause()) + .log("DAG watch reconnect write failed; will retry"); + scheduleReconnect(); + } + } + }); + } + /** * Called when the broker pushes a ScalableTopicUpdate for this session. * This is invoked from the Netty I/O thread. @@ -121,6 +151,10 @@ public void onUpdate(ScalableTopicDAG dag) { .attr("activeSegmentCount", newLayout.activeSegments().size()) .log("Layout updated"); + // Reset the reconnect backoff: the broker confirmed the session is live and + // our local state is consistent. + reconnectBackoff.reset(); + // Complete the initial layout future if this is the first update initialLayoutFuture.complete(newLayout); @@ -138,18 +172,56 @@ public void onUpdate(ScalableTopicDAG dag) { public void onError(ServerError error, String message) { log.error().attr("error", error).attr("message", message) .log("DAG watch session error"); - initialLayoutFuture.completeExceptionally( - new PulsarClientException( - "Scalable topic lookup failed: " + error + " - " + message)); + if (!initialLayoutFuture.isDone()) { + initialLayoutFuture.completeExceptionally( + new PulsarClientException( + "Scalable topic lookup failed: " + error + " - " + message)); + } + // After the initial layout has arrived, broker-side errors on this session + // (e.g., metadata unavailable) are transient — a reconnect typically clears + // them. The connection-closed path will pick this up; no extra work here. } @Override public void connectionClosed() { log.warn("DAG watch session connection closed"); cnx = null; - initialLayoutFuture.completeExceptionally( - new PulsarClientException("Connection closed while waiting for scalable topic layout")); - // TODO: implement automatic reconnection with backoff + if (closed) { + return; + } + if (!initialLayoutFuture.isDone()) { + // Initial lookup never completed — surface the failure rather than + // retrying silently behind the caller of producer / consumer create(). + initialLayoutFuture.completeExceptionally( + new PulsarClientException( + "Connection closed while waiting for scalable topic layout")); + return; + } + scheduleReconnect(); + } + + private void scheduleReconnect() { + if (closed) { + return; + } + long delayMs = reconnectBackoff.next().toMillis(); + log.info().attr("delayMs", delayMs).log("Scheduling DAG watch reconnect"); + v4Client.timer().newTimeout(timeout -> reconnect(), + delayMs, TimeUnit.MILLISECONDS); + } + + private void reconnect() { + if (closed) { + return; + } + v4Client.getConnection(topicName.toString()) + .thenAccept(this::attach) + .exceptionally(ex -> { + log.warn().exceptionMessage(ex) + .log("DAG watch reconnect failed; will retry"); + scheduleReconnect(); + return null; + }); } ClientSegmentLayout currentLayout() { @@ -182,6 +254,18 @@ public void close() { } } + /** + * Test hook: forcibly close the underlying broker channel to simulate a network + * drop. The cnx layer will fire {@link #connectionClosed()} which triggers the + * automatic reconnect path. Reached via reflection from cross-module tests. + */ + void forceCloseConnectionForTesting() { + ClientCnx c = cnx; + if (c != null) { + c.ctx().channel().close(); + } + } + interface LayoutChangeListener { void onLayoutChange(ClientSegmentLayout newLayout, ClientSegmentLayout oldLayout); }