From 0f05b36d6e869dcefccbb726fd5ec87e066f107f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 8 Jul 2025 21:21:43 +0800 Subject: [PATCH 01/16] [fix][broker] Fix failed topic future not removed before the PersistentTopic future is done --- .../pulsar/broker/service/BrokerService.java | 17 +++- .../persistent/MessageDeduplication.java | 65 ++++++++++--- .../service/persistent/PersistentTopic.java | 6 ++ .../client/api/OrphanPersistentTopicTest.java | 93 +++++++++++++++---- 4 files changed, 143 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6124c913b26db..36a8f41b77fcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -86,6 +86,7 @@ import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -1663,10 +1664,13 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); - topicFuture.exceptionally(t -> { + topicFuture.exceptionallyAsync(e -> { pulsarStats.recordTopicLoadFailed(); - return null; - }); + if (topics.remove(topic, topicFuture)) { + log.info("Removed topic {} for: {}", topic, e.getMessage()); + } + return Optional.empty(); + }, executor()); checkTopicNsOwnership(topic) .thenRun(() -> { @@ -1781,7 +1785,6 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new NotAllowedException(msg)); return; } @@ -1842,6 +1845,12 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, @Override public void openLedgerComplete(ManagedLedger ledger, Object ctx) { try { + if (topicFuture.isCompletedExceptionally()) { + // Don't close the managed ledger because next time the topic is accessed, the + // managed ledger will be created again. + // TODO: add tests for it + return; + } PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 5dc06842f9843..b6d8ee704d492 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -56,10 +56,15 @@ */ public class MessageDeduplication { + private static final CompletableFuture RECOVERY_FAILURE = CompletableFuture.failedFuture( + new IllegalStateException("recovery is cancelled")); private final PulsarService pulsar; private final PersistentTopic topic; private final ManagedLedger managedLedger; + private final Object recoverLock = new Object(); private ManagedCursor managedCursor; + private CompletableFuture recoverFuture; + private boolean recoverCancelled = false; private static final String IS_LAST_CHUNK = "isLastChunk"; @@ -154,25 +159,36 @@ private CompletableFuture recoverSequenceIdsMap() { }); // Replay all the entries and apply all the sequence ids updates - log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); - CompletableFuture future = new CompletableFuture<>(); - replayCursor(future); - return future.thenCompose(lastPosition -> { - if (lastPosition != null && snapshotCounter >= snapshotInterval) { - snapshotCounter = 0; - return takeSnapshot(lastPosition); + synchronized (recoverLock) { + if (recoverCancelled) { + return RECOVERY_FAILURE; } - return CompletableFuture.completedFuture(null); - }); + if (recoverFuture == null) { + recoverFuture = new CompletableFuture<>(); + } + } + log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); + replayCursor(); + return recoverFuture; } /** * Read all the entries published from the cursor position until the most recent and update the highest sequence id * from each producer. - * - * @param future future to trigger when the replay is complete */ - private void replayCursor(CompletableFuture future) { + private void replayCursor() { + final CompletableFuture future; + synchronized (recoverLock) { + if (recoverCancelled) { + log.info("Cancelled replaying cursor for {} (snapshotCounter: {})", topic.getName(), snapshotCounter); + return; + } + if (recoverFuture == null) { + log.error("Unexpected null recoverFuture for {}", topic.getName()); + return; + } + future = recoverFuture; + } managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { @@ -193,10 +209,21 @@ public void readEntriesComplete(List entries, Object ctx) { if (managedCursor.hasMoreEntries()) { // Read next batch of entries - pulsar.getExecutor().execute(() -> replayCursor(future)); + pulsar.getExecutor().execute(() -> replayCursor()); } else { // Done replaying - future.complete(lastPosition); + if (lastPosition != null && snapshotCounter >= snapshotInterval) { + snapshotCounter = 0; + takeSnapshot(lastPosition).whenComplete((ignored, e) -> { + if (e == null) { + future.complete(null); + } else { + future.completeExceptionally(e); + } + }); + } else { + future.complete(null); + } } } @@ -739,5 +766,15 @@ Map getInactiveProducers() { return inactiveProducers; } + public void cancelRecovery() { + synchronized (recoverLock) { + recoverCancelled = true; + if (recoverFuture != null) { + recoverFuture.completeExceptionally(new IllegalStateException( + "MessageDeduplication recovery is interrupted")); + } + } + } + private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 91d964125bf36..e60af35ffc40b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1893,6 +1893,12 @@ CompletableFuture checkReplicationAndRetryOnFailure() { } public CompletableFuture checkDeduplicationStatus() { + createFuture.exceptionallyAsync(e -> { + log.info("Cancelling message deduplication due to {}", e.getMessage()); + messageDeduplication.cancelRecovery(); + close(); + return Optional.empty(); + }, orderedExecutor); return messageDeduplication.checkStatus(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 6f79c573ed3d0..602dc3ed7f101 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -20,7 +20,9 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -32,9 +34,12 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -42,6 +47,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; @@ -50,6 +56,7 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -71,12 +78,24 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setBrokerDeduplicationEnabled(true); + BrokerTestInterceptor.INSTANCE.configure(conf); + } + + @AfterMethod(alwaysRun = true) + protected void resetInterceptors() { + BrokerTestInterceptor.INSTANCE.reset(); + } + @Test public void testNoOrphanTopicAfterCreateTimeout() throws Exception { // Make the topic loading timeout faster. int topicLoadTimeoutSeconds = 2; long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); - pulsar.getConfig().setTopicLoadTimeoutSeconds(2); + pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName).getPersistenceNamingEncoding(); @@ -84,13 +103,7 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { // Make topic load timeout 5 times. AtomicInteger timeoutCounter = new AtomicInteger(); for (int i = 0; i < 5; i++) { - mockZooKeeper.delay(topicLoadTimeoutSeconds * 2 * 1000, (op, path) -> { - if (mlPath.equals(path)) { - log.info("Topic load timeout: " + timeoutCounter.incrementAndGet()); - return true; - } - return false; - }); + delayMetadataOperation(i, tpName); } // Load topic. @@ -125,20 +138,13 @@ public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); int topicLoadTimeoutSeconds = 1; pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); - pulsar.getConfig().setBrokerDeduplicationEnabled(true); pulsar.getConfig().setTransactionCoordinatorEnabled(true); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); // Mock message deduplication recovery speed topicLoadTimeoutSeconds String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; - mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> { - if (mlPath.equals(path)) { - log.info("Topic load timeout: " + path); - return true; - } - return false; - }); + delayMetadataOperation(0, tpName); // First load topic will trigger timeout // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. @@ -177,7 +183,6 @@ public CompletableFuture closeAsync() { // set to back pulsar.setTransactionBufferProvider(originalTransactionBufferProvider); pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); - pulsar.getConfig().setBrokerDeduplicationEnabled(false); pulsar.getConfig().setTransactionCoordinatorEnabled(false); } @@ -236,7 +241,7 @@ public Object[][] whetherTimeoutOrNot() { @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot") public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { if (injectTimeout) { - pulsar.getConfig().setTopicLoadTimeoutSeconds(5); + pulsar.getConfig().setTopicLoadTimeoutSeconds(2); } String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", ""); String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); @@ -251,7 +256,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { TopicName paramTp = (TopicName) invocation.getArguments()[0]; if (paramTp.toString().equalsIgnoreCase(tpName) && failedTimes.incrementAndGet() <= 2) { if (injectTimeout) { - Thread.sleep(10 * 1000); + Thread.sleep(conf.getTopicLoadTimeoutSeconds() * 1000 + 500); } log.info("Failed {} times", failedTimes.get()); return CompletableFuture.failedFuture(new RuntimeException("mocked error")); @@ -273,7 +278,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot") public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Exception { if (injectTimeout) { - pulsar.getConfig().setTopicLoadTimeoutSeconds(5); + pulsar.getConfig().setTopicLoadTimeoutSeconds(2); } String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", ""); String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); @@ -313,4 +318,52 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex consumer.close(); admin.topics().delete(tpName); } + + private void delayMetadataOperation(int i, String topic) { + final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + + TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + mockZooKeeper.delay(conf.getTopicLoadTimeoutSeconds() * 1000 + 500, (__, path) -> { + if (mlPath.equals(path)) { + log.info("Topic {} load timeout: {}", i, path); + return true; + } + return false; + }); + } + + @Test + public void testDeduplicationReplayStuck() throws Exception { + final var topic = "test-deduplication-replay-stuck"; + conf.setTopicLoadTimeoutSeconds(1); + try (final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create()) { + producer.send("msg"); + } + final var firstTime = new AtomicBoolean(true); + BrokerTestInterceptor.INSTANCE.applyCursorSpyDecorator(cursor -> { + if (cursor.getManagedLedger().getName().contains(topic)) { + if (firstTime.compareAndSet(true, false)) { + doAnswer(invocation -> { + final var callback = (AsyncCallbacks.ReadEntriesCallback) invocation.getArgument(1); + CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS).execute(() -> + callback.readEntriesComplete(List.of(), null)); + return null; + }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); + } else { + doCallRealMethod().when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); + } + } + }); + admin.topics().unload(topic); + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).createAsync() + .get(3, TimeUnit.SECONDS); + producer.sendAsync("msg").get(3, TimeUnit.SECONDS); + firstTime.set(true); + admin.topics().unload(topic); + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribeAsync() + .get(3, TimeUnit.SECONDS); + final var msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals(msg.getValue(), "msg"); + } } From db2c9f56adf3c67802825f46bdea3831fe2f1d08 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 8 Jul 2025 22:44:50 +0800 Subject: [PATCH 02/16] Fix checkstyle --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 36a8f41b77fcf..2eec907623b11 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -86,7 +86,6 @@ import lombok.Setter; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -1848,7 +1847,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { if (topicFuture.isCompletedExceptionally()) { // Don't close the managed ledger because next time the topic is accessed, the // managed ledger will be created again. - // TODO: add tests for it + // TODO: prevent orphan managed ledgers return; } PersistentTopic persistentTopic = isSystemTopic(topic) From f2c206df5a4ed7963f35a3c43468dc9ecdfc81f3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 8 Jul 2025 23:09:39 +0800 Subject: [PATCH 03/16] Fix wrong tests --- .../client/api/OrphanPersistentTopicTest.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 602dc3ed7f101..2f2c3a7227cf8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -55,6 +55,7 @@ import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; +import org.jspecify.annotations.Nullable; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -98,12 +99,10 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName).getPersistenceNamingEncoding(); // Make topic load timeout 5 times. - AtomicInteger timeoutCounter = new AtomicInteger(); for (int i = 0; i < 5; i++) { - delayMetadataOperation(i, tpName); + delayManagedLedgerCreation(i, TopicName.get(tpName), null); } // Load topic. @@ -141,10 +140,7 @@ public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { pulsar.getConfig().setTransactionCoordinatorEnabled(true); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); - // Mock message deduplication recovery speed topicLoadTimeoutSeconds - String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + - TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; - delayMetadataOperation(0, tpName); + delayManagedLedgerCreation(0, TopicName.get(tpName), DEDUPLICATION_CURSOR_NAME); // First load topic will trigger timeout // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. @@ -319,9 +315,9 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex admin.topics().delete(tpName); } - private void delayMetadataOperation(int i, String topic) { - final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" - + TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + private void delayManagedLedgerCreation(int i, TopicName topicName, @Nullable String cursor) { + final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + topicName.getPersistenceNamingEncoding() + + (cursor == null ? "" : ("/" + cursor)); mockZooKeeper.delay(conf.getTopicLoadTimeoutSeconds() * 1000 + 500, (__, path) -> { if (mlPath.equals(path)) { log.info("Topic {} load timeout: {}", i, path); From db3aeefdef15f6e49d8eb890074bb2ce4bcfd3a0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 8 Jul 2025 23:51:11 +0800 Subject: [PATCH 04/16] Add testOrphanManagedLedgerRemovedAfterUnload --- .../pulsar/broker/service/BrokerService.java | 4 +-- .../client/api/OrphanPersistentTopicTest.java | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2eec907623b11..6ce47c046fa43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1846,8 +1846,8 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { try { if (topicFuture.isCompletedExceptionally()) { // Don't close the managed ledger because next time the topic is accessed, the - // managed ledger will be created again. - // TODO: prevent orphan managed ledgers + // managed ledger will be created again. The managed ledger will be removed if the + // ownership has been changed. return; } PersistentTopic persistentTopic = isSystemTopic(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 2f2c3a7227cf8..3a675e7bc1841 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -24,8 +24,10 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.lang.reflect.Field; import java.util.Collections; import java.util.List; @@ -33,6 +35,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +56,7 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.compaction.CompactionServiceFactory; +import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.jspecify.annotations.Nullable; @@ -69,6 +73,7 @@ public class OrphanPersistentTopicTest extends ProducerConsumerBase { @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { + super.isTcpLookup = true; super.internalSetup(); super.producerBaseSetup(); } @@ -362,4 +367,32 @@ public void testDeduplicationReplayStuck() throws Exception { assertNotNull(msg); assertEquals(msg.getValue(), "msg"); } + + @Test + public void testOrphanManagedLedgerRemovedAfterUnload() throws Exception { + final var topicName = TopicName.get("test-orphan-managed-ledger-removed-after-unload"); + final var topic = topicName.toString(); + + conf.setTopicLoadTimeoutSeconds(1); + final var mlKey = topicName.getPersistenceNamingEncoding(); + final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + mlKey; + mockZooKeeper.delay(conf.getTopicLoadTimeoutSeconds() * 1000 + 500, (op, path) -> + op == MockZooKeeper.Op.CREATE && mlPath.equals(path)); + + try { + pulsar.getBrokerService().getTopic(topic, true).get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getMessage().contains("Failed to load topic within timeout")); + } + + final var managedLedgers = pulsar.getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory() + .getManagedLedgers(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertTrue(managedLedgers.containsKey(mlKey))); + + admin.topics().unload(topic); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertFalse(managedLedgers.containsKey(mlKey))); + } } From b9d29f27a1ede5f8521eb907ea0098924c5d957c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 11:17:46 +0800 Subject: [PATCH 05/16] Take snapshot when the recovery is interrupted --- .../pulsar/broker/service/BrokerService.java | 4 ++ .../persistent/MessageDeduplication.java | 28 ++++++--- .../service/persistent/PersistentTopic.java | 4 +- .../persistent/TopicDuplicationTest.java | 57 ++++++++++++------- 4 files changed, 62 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6ce47c046fa43..6fe2efc1060c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -132,6 +132,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactory; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; @@ -1892,6 +1893,9 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { } }) .exceptionally((ex) -> { + if (MessageDeduplication.RECOVERY_FAILURE.equals(ex.getCause())) { + return null; + } log.warn("Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index b6d8ee704d492..5a1754d1d94a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -56,8 +56,7 @@ */ public class MessageDeduplication { - private static final CompletableFuture RECOVERY_FAILURE = CompletableFuture.failedFuture( - new IllegalStateException("recovery is cancelled")); + public static IllegalStateException RECOVERY_FAILURE = new IllegalStateException("recovery is cancelled"); private final PulsarService pulsar; private final PersistentTopic topic; private final ManagedLedger managedLedger; @@ -65,6 +64,7 @@ public class MessageDeduplication { private ManagedCursor managedCursor; private CompletableFuture recoverFuture; private boolean recoverCancelled = false; + private Position lastPosition = null; private static final String IS_LAST_CHUNK = "isLastChunk"; @@ -123,7 +123,8 @@ public MessageDupUnknownException(String topicName, String producerName) { private final int snapshotInterval; // Counter of number of entries stored after last snapshot was taken - private int snapshotCounter; + @VisibleForTesting + int snapshotCounter; // The timestamp when the snapshot was taken by the scheduled task last time private volatile long lastSnapshotTimestamp = 0L; @@ -161,7 +162,7 @@ private CompletableFuture recoverSequenceIdsMap() { // Replay all the entries and apply all the sequence ids updates synchronized (recoverLock) { if (recoverCancelled) { - return RECOVERY_FAILURE; + return CompletableFuture.failedFuture(RECOVERY_FAILURE); } if (recoverFuture == null) { recoverFuture = new CompletableFuture<>(); @@ -180,7 +181,8 @@ private void replayCursor() { final CompletableFuture future; synchronized (recoverLock) { if (recoverCancelled) { - log.info("Cancelled replaying cursor for {} (snapshotCounter: {})", topic.getName(), snapshotCounter); + log.info("Cancelled replaying cursor for {} (snapshotCounter: {}, lastPosition: {})", topic.getName(), + snapshotCounter, lastPosition == null ? "(null)" : lastPosition.toString()); return; } if (recoverFuture == null) { @@ -192,7 +194,6 @@ private void replayCursor() { managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { - Position lastPosition = null; for (Entry entry : entries) { ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); @@ -766,13 +767,22 @@ Map getInactiveProducers() { return inactiveProducers; } - public void cancelRecovery() { + public CompletableFuture cancelRecovery() { + final Position lastPosition; synchronized (recoverLock) { recoverCancelled = true; if (recoverFuture != null) { - recoverFuture.completeExceptionally(new IllegalStateException( - "MessageDeduplication recovery is interrupted")); + recoverFuture.completeExceptionally(RECOVERY_FAILURE); } + lastPosition = this.lastPosition; + } + if (lastPosition != null) { + // Take the snapshot regardless of the snapshot counter so that the recovery will take less time next time + log.info("[{}] Cancelled recovery and take snapshot for last position {}", topic.getName(), lastPosition); + return takeSnapshot(lastPosition); + } else { + log.info("[{}] Cancelled recovery before reading the 1st entry", topic.getName()); + return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e60af35ffc40b..76461e86fdab6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1895,8 +1895,8 @@ CompletableFuture checkReplicationAndRetryOnFailure() { public CompletableFuture checkDeduplicationStatus() { createFuture.exceptionallyAsync(e -> { log.info("Cancelling message deduplication due to {}", e.getMessage()); - messageDeduplication.cancelRecovery(); - close(); + // cancelRecovery() might take snapshot, we need to close the managed ledger after the snapshot is taken + messageDeduplication.cancelRecovery().whenComplete((__, ___) -> close()); return Optional.empty(); }, orderedExecutor); return messageDeduplication.checkStatus(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index f1940a2899978..9834f8da1871e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -27,19 +29,22 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; @@ -61,7 +66,6 @@ public class TopicDuplicationTest extends ProducerConsumerBase { @BeforeMethod @Override protected void setup() throws Exception { - this.conf.setBrokerDeduplicationEnabled(true); super.internalSetup(); super.producerBaseSetup(); } @@ -72,6 +76,18 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setBrokerDeduplicationEnabled(true); + BrokerTestInterceptor.INSTANCE.configure(conf); + } + + @AfterMethod(alwaysRun = true) + protected void resetInterceptors() { + BrokerTestInterceptor.INSTANCE.reset(); + } + @Test(timeOut = 10000) public void testDuplicationApi() throws Exception { final String topicName = testTopic + UUID.randomUUID().toString(); @@ -580,13 +596,22 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { // Mock message deduplication recovery speed topicLoadTimeoutSeconds pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1); - String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + - TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; - mockZooKeeper.delay(2 * 1000, (op, path) -> { - if (mlPath.equals(path)) { - return true; + + final var readCount = new AtomicInteger(0); + BrokerTestInterceptor.INSTANCE.applyCursorSpyDecorator(cursor -> { + if (cursor.getManagedLedger().getName().contains(TopicName.get(topic).getLocalName())) { + doAnswer(invocation -> { + if (readCount.getAndIncrement() == 1) { + // The 1st replay will only be able to read first 100 entries + final var callback = (AsyncCallbacks.ReadEntriesCallback) invocation.getArgument(1); + CompletableFuture.delayedExecutor(1, TimeUnit.MINUTES).execute(() -> + callback.readEntriesComplete(List.of(), null)); + } else { + invocation.callRealMethod(); + } + return null; + }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); } - return false; }); final var topics = pulsar.getBrokerService().getTopics(); @@ -601,23 +626,15 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { Assert.assertFalse(topics.containsKey(topic)); }); - // Load topic again, setBrokerDeduplicationEntriesInterval to 10000, // make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot. // But actually it should not replay again in recoverSequenceIdsMap, // since previous topic loading should finish the replay process. pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000); pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60); - PersistentTopic persistentTopic2 = - (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); - ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger(); - MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication(); - - Awaitility.await().untilAsserted(() -> { - int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - Assert.assertEquals(snapshotCounter3, 0); - Assert.assertEquals(ml2.getLedgersInfo().size(), 1); - }); + PersistentTopic persistentTopic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false) + .get(3, TimeUnit.SECONDS).orElseThrow(); + assertEquals(persistentTopic2.messageDeduplication.snapshotCounter, 899); // cleanup. From 6ad4dd6301e47d5ae8f0db392d786025f791cc2d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 11:41:43 +0800 Subject: [PATCH 06/16] Speed up tests --- .../service/persistent/TopicDuplicationTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index 9834f8da1871e..3067ee993f258 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; @@ -52,8 +53,9 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker") @@ -63,14 +65,14 @@ public class TopicDuplicationTest extends ProducerConsumerBase { private final String myNamespace = testTenant + "/" + testNamespace; private final String testTopic = "persistent://" + myNamespace + "/max-unacked-"; - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -84,8 +86,9 @@ protected void doInitConf() throws Exception { } @AfterMethod(alwaysRun = true) - protected void resetInterceptors() { + protected void resetInterceptors() throws PulsarAdminException { BrokerTestInterceptor.INSTANCE.reset(); + admin.namespaces().removeDeduplicationStatus(myNamespace); } @Test(timeOut = 10000) From 2c8c0fcc3e180acca6ae39c394c58db134b3f28e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 11:42:58 +0800 Subject: [PATCH 07/16] Fix checkstyle --- .../pulsar/broker/service/persistent/MessageDeduplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 5a1754d1d94a2..e05f6996f92a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -56,7 +56,7 @@ */ public class MessageDeduplication { - public static IllegalStateException RECOVERY_FAILURE = new IllegalStateException("recovery is cancelled"); + public static final IllegalStateException RECOVERY_FAILURE = new IllegalStateException("recovery is cancelled"); private final PulsarService pulsar; private final PersistentTopic topic; private final ManagedLedger managedLedger; From aa01fc4c869dfb7866c7edc385b35bd91677510c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 20:06:50 +0800 Subject: [PATCH 08/16] Refactor MessageDeduplication --- .../mledger/util/ManagedLedgerAsyncUtils.java | 73 ++++++ .../pulsar/broker/service/BrokerService.java | 1 + .../persistent/MessageDeduplication.java | 209 +++++++----------- .../DeduplicationDisabledBrokerLevelTest.java | 7 +- .../persistent/TopicDuplicationTest.java | 11 +- 5 files changed, 162 insertions(+), 139 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java new file mode 100644 index 0000000000000..a85555a7d3ce7 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * This util class contains some future-based methods to replace callback-based APIs. With a callback-based API, if any + * exception is thrown in the callback, the callback will never have a chance to be called. While with a future-based + * API, if any exception is thrown in future's callback (e.g. `thenApply`), the future will eventually be completed + * exceptionally. In addition, future-based API is easier for users to switch a different executor to execute the + * callback (e.g. `thenApplyAsync`). + */ +@InterfaceStability.Evolving +public class ManagedLedgerAsyncUtils { + + public static CompletableFuture openCursor(ManagedLedger ml, String cursorName) { + final var future = new CompletableFuture(); + ml.asyncOpenCursor(cursorName, new AsyncCallbacks.OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + future.complete(cursor); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + return future; + } + + public static CompletableFuture> readEntries(ManagedCursor cursor, int numberOfEntriesToRead, + Position maxPosition) { + final var future = new CompletableFuture>(); + cursor.asyncReadEntries(numberOfEntriesToRead, new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + future.complete(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null, maxPosition); + return future; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6fe2efc1060c4..df9fb53f7caad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1894,6 +1894,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }) .exceptionally((ex) -> { if (MessageDeduplication.RECOVERY_FAILURE.equals(ex.getCause())) { + log.info("Deduplication recovery of {} is cancelled", topic); return null; } log.warn("Replication or dedup check failed." diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e05f6996f92a5..87206760f9ae7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.openCursor; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.readEntries; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import com.google.common.annotations.VisibleForTesting; @@ -30,10 +32,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -60,11 +61,8 @@ public class MessageDeduplication { private final PulsarService pulsar; private final PersistentTopic topic; private final ManagedLedger managedLedger; - private final Object recoverLock = new Object(); private ManagedCursor managedCursor; - private CompletableFuture recoverFuture; - private boolean recoverCancelled = false; - private Position lastPosition = null; + private Position lastPosition = PositionFactory.EARLIEST; private static final String IS_LAST_CHUNK = "isLastChunk"; @@ -123,8 +121,7 @@ public MessageDupUnknownException(String topicName, String producerName) { private final int snapshotInterval; // Counter of number of entries stored after last snapshot was taken - @VisibleForTesting - int snapshotCounter; + private final AtomicInteger snapshotCounter = new AtomicInteger(0); // The timestamp when the snapshot was taken by the scheduled task last time private volatile long lastSnapshotTimestamp = 0L; @@ -139,6 +136,7 @@ public MessageDupUnknownException(String topicName, String producerName) { private final AtomicBoolean snapshotTaking = new AtomicBoolean(false); + private volatile CompletableFuture replayFuture = new CompletableFuture<>(); public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) { this.pulsar = pulsar; @@ -147,92 +145,49 @@ public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, Managed this.status = Status.Initialized; this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval(); this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers(); - this.snapshotCounter = 0; this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix(); } - private CompletableFuture recoverSequenceIdsMap() { - // Load the sequence ids from the snapshot in the cursor properties - managedCursor.getProperties().forEach((k, v) -> { - producerRemoved(k); - highestSequencedPushed.put(k, v); - highestSequencedPersisted.put(k, v); - }); - - // Replay all the entries and apply all the sequence ids updates - synchronized (recoverLock) { - if (recoverCancelled) { - return CompletableFuture.failedFuture(RECOVERY_FAILURE); + private CompletableFuture replay(ManagedCursor cursor) { + return replayOnce(cursor).thenCompose(hasMoreEntries -> { + if (hasMoreEntries) { + return replay(cursor); + } else { + return CompletableFuture.completedFuture(null); } - if (recoverFuture == null) { - recoverFuture = new CompletableFuture<>(); + }).thenCompose(__ -> { + if (snapshotCounter.get() >= snapshotInterval) { + snapshotCounter.set(0); + return takeSnapshot(lastPosition); + } else { + return CompletableFuture.completedFuture(null); } - } - log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); - replayCursor(); - return recoverFuture; + }); } - /** - * Read all the entries published from the cursor position until the most recent and update the highest sequence id - * from each producer. - */ - private void replayCursor() { - final CompletableFuture future; - synchronized (recoverLock) { - if (recoverCancelled) { - log.info("Cancelled replaying cursor for {} (snapshotCounter: {}, lastPosition: {})", topic.getName(), - snapshotCounter, lastPosition == null ? "(null)" : lastPosition.toString()); - return; - } - if (recoverFuture == null) { - log.error("Unexpected null recoverFuture for {}", topic.getName()); - return; - } - future = recoverFuture; - } - managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - for (Entry entry : entries) { - ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); - MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); - - String producerName = md.getProducerName(); - long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId()); - highestSequencedPushed.put(producerName, sequenceId); - highestSequencedPersisted.put(producerName, sequenceId); - producerRemoved(producerName); - snapshotCounter++; - lastPosition = entry.getPosition(); - entry.release(); - } - - if (managedCursor.hasMoreEntries()) { - // Read next batch of entries - pulsar.getExecutor().execute(() -> replayCursor()); - } else { - // Done replaying - if (lastPosition != null && snapshotCounter >= snapshotInterval) { - snapshotCounter = 0; - takeSnapshot(lastPosition).whenComplete((ignored, e) -> { - if (e == null) { - future.complete(null); - } else { - future.completeExceptionally(e); - } - }); - } else { - future.complete(null); + private CompletableFuture replayOnce(ManagedCursor cursor) { + return readEntries(cursor, 100, PositionFactory.LATEST).thenApply(entries -> { + try { + synchronized (this) { + if (status == Status.Failed) { + return false; + } + for (final var entry : entries) { + final var metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + final var producerName = metadata.getProducerName(); + final var sequenceId = Math.max(metadata.getHighestSequenceId(), metadata.getSequenceId()); + highestSequencedPushed.put(producerName, sequenceId); + highestSequencedPersisted.put(producerName, sequenceId); + producerRemoved(producerName); + lastPosition = entry.getPosition(); + snapshotCounter.getAndIncrement(); } } + } finally { + entries.forEach(Entry::release); } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null, PositionFactory.LATEST); + return cursor.hasMoreEntries(); + }); } public Status getStatus() { @@ -276,6 +231,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { // Disabled deduping CompletableFuture future = new CompletableFuture<>(); status = Status.Removing; + lastPosition = PositionFactory.EARLIEST; managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new DeleteCursorCallback() { @@ -310,35 +266,32 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) { // Enable deduping - CompletableFuture future = new CompletableFuture<>(); - managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() { - - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - // We don't want to retain cache for this cursor - cursor.setAlwaysInactive(); - managedCursor = cursor; - recoverSequenceIdsMap().thenRun(() -> { - status = Status.Enabled; - future.complete(null); - log.info("[{}] Enabled deduplication", topic.getName()); - }).exceptionally(ex -> { - status = Status.Failed; - log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), ex.getMessage()); - future.completeExceptionally(ex); - return null; - }); - } - - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), - exception.getMessage()); - future.completeExceptionally(exception); - } - - }, null); - return future; + lastPosition = PositionFactory.EARLIEST; + final var cursorFuture = openCursor(managedLedger, PersistentTopic.DEDUPLICATION_CURSOR_NAME); + replayFuture = cursorFuture.thenCompose(cursor -> { + cursor.setAlwaysInactive(); + managedCursor = cursor; + lastPosition = managedCursor.getReadPosition(); + // Load the sequence ids from the snapshot in the cursor properties + cursor.getProperties().forEach((k, v) -> { + producerRemoved(k); + highestSequencedPushed.put(k, v); + highestSequencedPersisted.put(k, v); + }); + log.info("[{}] Replaying {} entries for deduplication", topic.getName(), + cursor.getNumberOfEntries()); + return replay(cursor).thenAccept(__ -> { + status = Status.Enabled; + log.info("[{}] Enabled deduplication", topic.getName()); + }); + }); + replayFuture.exceptionally(e -> { + status = Status.Failed; + log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), e.getMessage()); + return null; + }); + status = Status.Recovering; + return replayFuture; } else { // Nothing to do, we are in the correct state return CompletableFuture.completedFuture(null); @@ -609,8 +562,8 @@ public void recordMessagePersistedNormal(PublishContext publishContext, Position } private void increaseSnapshotCounterAndTakeSnapshotIfNeeded(Position position) { - if (++snapshotCounter >= snapshotInterval) { - snapshotCounter = 0; + if (snapshotCounter.incrementAndGet() >= snapshotInterval) { + snapshotCounter.set(0); takeSnapshot(position); } else { if (log.isDebugEnabled()) { @@ -767,21 +720,27 @@ Map getInactiveProducers() { return inactiveProducers; } + @VisibleForTesting + public int getSnapshotCounter() { + return snapshotCounter.get(); + } + public CompletableFuture cancelRecovery() { - final Position lastPosition; - synchronized (recoverLock) { - recoverCancelled = true; - if (recoverFuture != null) { - recoverFuture.completeExceptionally(RECOVERY_FAILURE); + final boolean cancelled; + synchronized (this) { + if (status == Status.Recovering) { + status = Status.Failed; + replayFuture.completeExceptionally(RECOVERY_FAILURE); + cancelled = true; + } else { + cancelled = false; } - lastPosition = this.lastPosition; } - if (lastPosition != null) { - // Take the snapshot regardless of the snapshot counter so that the recovery will take less time next time - log.info("[{}] Cancelled recovery and take snapshot for last position {}", topic.getName(), lastPosition); + if (cancelled) { + log.info("[{}] Take snapshot at {} when the deduplication replay is cancelled", topic.getName(), + lastPosition); return takeSnapshot(lastPosition); } else { - log.info("[{}] Cancelled recovery before reading the 1st entry", topic.getName()); return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java index 195d0155a31c6..eb8eb9ed555e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -127,8 +126,7 @@ public void testSnapshotCounterAfterUnload() throws Exception { for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) { producer.send(i + ""); } - int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); - assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + assertEquals(deduplication1.getSnapshotCounter(), brokerDeduplicationEntriesInterval - 1); admin.topics().unload(topic); PersistentTopic persistentTopic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); @@ -145,8 +143,7 @@ public void testSnapshotCounterAfterUnload() throws Exception { ml2.trimConsumedLedgersInBackground(new CompletableFuture<>()); // step 4. Awaitility.await().untilAsserted(() -> { - int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval); + assertTrue(deduplication2.getSnapshotCounter() < brokerDeduplicationEntriesInterval); // Verify: the previous ledger will be removed because all messages have been acked. assertEquals(ml2.getLedgersInfo().size(), 1); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index 3067ee993f258..d694d0f855e02 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -51,7 +51,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -557,9 +556,6 @@ public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() thro @Test public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { - cleanup(); - setup(); - // Create a topic and wait deduplication is started. int brokerDeduplicationEntriesInterval = 1000; pulsar.getConfiguration().setBrokerDeduplicationEnabled(true); @@ -585,8 +581,7 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { producer.send(i + ""); } producer.close(); - int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); - assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + assertEquals(deduplication1.getSnapshotCounter(), brokerDeduplicationEntriesInterval - 1); // Unload and load topic, simulate topic load is timeout. @@ -637,13 +632,11 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60); PersistentTopic persistentTopic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false) .get(3, TimeUnit.SECONDS).orElseThrow(); - assertEquals(persistentTopic2.messageDeduplication.snapshotCounter, 899); + assertEquals(persistentTopic2.messageDeduplication.getSnapshotCounter(), 899); // cleanup. admin.topics().delete(topic); - cleanup(); - setup(); } private void waitCacheInit(String topicName) throws Exception { From 5c41fee0ff1ffa4be212271b29f3f5caff23b1b6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 20:11:13 +0800 Subject: [PATCH 09/16] Fix failed testDeduplicationReplayStuck due to snapshot --- .../pulsar/client/api/OrphanPersistentTopicTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 3a675e7bc1841..536b193e38231 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -355,14 +355,14 @@ public void testDeduplicationReplayStuck() throws Exception { } }); admin.topics().unload(topic); - @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).createAsync() + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribeAsync() .get(3, TimeUnit.SECONDS); - producer.sendAsync("msg").get(3, TimeUnit.SECONDS); firstTime.set(true); admin.topics().unload(topic); - @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribeAsync() + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).createAsync() .get(3, TimeUnit.SECONDS); + producer.sendAsync("msg").get(3, TimeUnit.SECONDS); final var msg = consumer.receive(3, TimeUnit.SECONDS); assertNotNull(msg); assertEquals(msg.getValue(), "msg"); From 9c608666999ade1e2b78a57b288e564c91f30fbd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 20:27:08 +0800 Subject: [PATCH 10/16] Speed up tests --- .../org/apache/pulsar/client/api/OrphanPersistentTopicTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 536b193e38231..0df0e252e5ee3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -294,7 +294,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex TopicName paramTp = (TopicName) invocation.getArguments()[0]; if (paramTp.toString().equalsIgnoreCase(tpName) && mockRaceConditionCounter.incrementAndGet() <= 1) { if (injectTimeout) { - Thread.sleep(10 * 1000); + Thread.sleep(conf.getTopicLoadTimeoutSeconds() * 1000 + 500); } log.info("Race condition occurs {} times", mockRaceConditionCounter.get()); pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); From 7e2d957efd44579e4b14448485dbce175be65208 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 21:31:47 +0800 Subject: [PATCH 11/16] Fix testTopicLoadAndDeleteAtTheSameTime --- .../org/apache/pulsar/broker/service/BrokerService.java | 9 --------- .../pulsar/client/api/OrphanPersistentTopicTest.java | 8 ++++++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index df9fb53f7caad..f482a6afe5ad7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1879,15 +1879,6 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { log.error("{} future is already completed by another thread, " + "which is not expected. Closing the current one", topic); } - executor().submit(() -> { - persistentTopic.close().whenComplete((ignore, ex) -> { - topics.remove(topic, topicFuture); - if (ex != null) { - log.warn("[{}] Get an error when closing topic.", - topic, ex); - } - }); - }); } else { addTopicToStatsMaps(topicName, persistentTopic); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 0df0e252e5ee3..57b2b1c6bd9bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -310,13 +310,17 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex } // Verify: the consumer create successfully after allowing to create topic automatically. - Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + try (final var consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe()) { + } catch (PulsarClientException.TopicDoesNotExistException e) { + fail(); + } + }); // cleanup. if (injectTimeout) { pulsar.getConfig().setTopicLoadTimeoutSeconds(60); } - consumer.close(); admin.topics().delete(tpName); } From 2c74ff26beda38cdf1b77295cb6f70e14bec6e23 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 22:05:37 +0800 Subject: [PATCH 12/16] Fix testCloseTransactionBufferWhenTimeout --- .../broker/service/persistent/MessageDeduplication.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 87206760f9ae7..a5e88bfd8315a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -726,17 +726,17 @@ public int getSnapshotCounter() { } public CompletableFuture cancelRecovery() { - final boolean cancelled; + final boolean takeSnapshot; synchronized (this) { if (status == Status.Recovering) { status = Status.Failed; replayFuture.completeExceptionally(RECOVERY_FAILURE); - cancelled = true; + takeSnapshot = snapshotCounter.get() > 0; } else { - cancelled = false; + takeSnapshot = false; } } - if (cancelled) { + if (takeSnapshot) { log.info("[{}] Take snapshot at {} when the deduplication replay is cancelled", topic.getName(), lastPosition); return takeSnapshot(lastPosition); From 0428f9e6a3ccdc258e4b25d1087edbdc704a3894 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 9 Jul 2025 22:25:11 +0800 Subject: [PATCH 13/16] Adjust logs --- .../persistent/MessageDeduplication.java | 19 +++++++++++++++---- .../service/persistent/PersistentTopic.java | 3 +-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index a5e88bfd8315a..1a4b8e8dfd1d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -281,8 +281,12 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { log.info("[{}] Replaying {} entries for deduplication", topic.getName(), cursor.getNumberOfEntries()); return replay(cursor).thenAccept(__ -> { - status = Status.Enabled; - log.info("[{}] Enabled deduplication", topic.getName()); + synchronized (this) { // synchronize with failRecovery() + if (status != Status.Failed) { + status = Status.Enabled; + log.info("[{}] Enabled deduplication", topic.getName()); + } + } }); }); replayFuture.exceptionally(e -> { @@ -725,14 +729,20 @@ public int getSnapshotCounter() { return snapshotCounter.get(); } - public CompletableFuture cancelRecovery() { + public CompletableFuture failRecovery() { final boolean takeSnapshot; synchronized (this) { if (status == Status.Recovering) { status = Status.Failed; - replayFuture.completeExceptionally(RECOVERY_FAILURE); + final var future = replayFuture; + if (future != null) { + // Completing this future in a different thread so that callbacks will be called out of the + // synchronized block. + CompletableFuture.runAsync(() -> future.completeExceptionally(RECOVERY_FAILURE)); + } takeSnapshot = snapshotCounter.get() > 0; } else { + replayFuture = null; takeSnapshot = false; } } @@ -741,6 +751,7 @@ public CompletableFuture cancelRecovery() { lastPosition); return takeSnapshot(lastPosition); } else { + log.info("[{}] Cancel the deduplication replay for failure", topic.getName()); return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 76461e86fdab6..6a9c7f0793e92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1894,9 +1894,8 @@ CompletableFuture checkReplicationAndRetryOnFailure() { public CompletableFuture checkDeduplicationStatus() { createFuture.exceptionallyAsync(e -> { - log.info("Cancelling message deduplication due to {}", e.getMessage()); // cancelRecovery() might take snapshot, we need to close the managed ledger after the snapshot is taken - messageDeduplication.cancelRecovery().whenComplete((__, ___) -> close()); + messageDeduplication.failRecovery().whenComplete((__, ___) -> close()); return Optional.empty(); }, orderedExecutor); return messageDeduplication.checkStatus(); From 7048aaf1f80d00247c32c5ff746e7f82fe5c989b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 10 Jul 2025 15:51:58 +0800 Subject: [PATCH 14/16] Don't close managed ledger if there is another pending topic future --- .../pulsar/broker/service/BrokerService.java | 3 +- .../service/persistent/PersistentTopic.java | 6 ++- .../client/api/OrphanPersistentTopicTest.java | 48 +++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f482a6afe5ad7..a632cecbc8aca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1658,7 +1658,8 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c * @return CompletableFuture * @throws RuntimeException */ - protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, + @VisibleForTesting + public CompletableFuture> loadOrCreatePersistentTopic(final String topic, boolean createIfMissing, Map properties) { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6a9c7f0793e92..f8c3676fcbcde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1794,7 +1794,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { } }, null)); - disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> { + disconnectClientsInCurrentCall.thenRun(() -> { + if (brokerService.getTopics().get(topic) == createFuture) { + closeLedgerAfterCloseClients.run(); + } + }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); unfenceTopicToResume(); closeFuture.completeExceptionally(exception); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 57b2b1c6bd9bd..0da7e76dfb01f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -28,6 +28,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.netty.buffer.Unpooled; import java.lang.reflect.Field; import java.util.Collections; import java.util.List; @@ -43,6 +44,8 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -50,10 +53,12 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.zookeeper.MockZooKeeper; @@ -399,4 +404,47 @@ public void testOrphanManagedLedgerRemovedAfterUnload() throws Exception { Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertFalse(managedLedgers.containsKey(mlKey))); } + + @Test + public void testDuplicatedTopics() throws Exception { + final var topicName = TopicName.get("test-duplicated-topics"); + final var topic = topicName.toString(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().unload(topic); + // When load timeout happens, there might be a PersistentTopic initializing at the background. It will be closed + // after the initialization is done. + final var oldPersistentTopic = (PersistentTopic) pulsar.getBrokerService() + .loadOrCreatePersistentTopic(topic, true, null).get().orElseThrow(); + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .loadOrCreatePersistentTopic(topic, true, null).get().orElseThrow(); + oldPersistentTopic.close(); + final var future = new CompletableFuture(); + final var metadata = new MessageMetadata(); + metadata.setProducerName("producer"); + metadata.setSequenceId(0); + metadata.setPublishTime(System.currentTimeMillis()); + persistentTopic.publishMessage(Unpooled.wrappedBuffer(metadata.toByteArray()), new Topic.PublishContext() { + @Override + public void completed(Exception e, long ledgerId, long entryId) { + if (e == null) { + future.complete(PositionFactory.create(ledgerId, entryId)); + } else { + future.completeExceptionally(e); + } + } + + @Override + public String getProducerName() { + return "producer"; + } + + @Override + public long getSequenceId() { + return 0; + } + }); + final var position = future.get(3, TimeUnit.SECONDS); + log.info("Sent to {}", position); + persistentTopic.close(); + } } From d483e67d621c3684c2d075457bdc0a384d206541 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 10 Jul 2025 17:50:58 +0800 Subject: [PATCH 15/16] Improve takeSnapshot --- .../mledger/util/ManagedLedgerAsyncUtils.java | 18 +++++ .../persistent/MessageDeduplication.java | 73 +++++++++---------- .../service/persistent/PersistentTopic.java | 2 +- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java index a85555a7d3ce7..27d353d284da8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.util; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -70,4 +71,21 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { }, null, maxPosition); return future; } + + public static CompletableFuture markDelete(ManagedCursor cursor, Position position, + Map properties) { + final var future = new CompletableFuture(); + cursor.asyncMarkDelete(position, properties, new AsyncCallbacks.MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + future.complete(null); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, new Object()); + return future; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 1a4b8e8dfd1d5..0fe9949c069ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.markDelete; import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.openCursor; import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.readEntries; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; @@ -34,7 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -158,7 +158,8 @@ private CompletableFuture replay(ManagedCursor cursor) { }).thenCompose(__ -> { if (snapshotCounter.get() >= snapshotInterval) { snapshotCounter.set(0); - return takeSnapshot(lastPosition); + // Ignore the possible failure when taking the snapshot + return takeSnapshot(lastPosition).exceptionally(e -> null); } else { return CompletableFuture.completedFuture(null); } @@ -281,7 +282,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { log.info("[{}] Replaying {} entries for deduplication", topic.getName(), cursor.getNumberOfEntries()); return replay(cursor).thenAccept(__ -> { - synchronized (this) { // synchronize with failRecovery() + synchronized (this) { // synchronize with cancelRecovery() if (status != Status.Failed) { status = Status.Enabled; log.info("[{}] Enabled deduplication", topic.getName()); @@ -589,14 +590,12 @@ public void resetHighestSequenceIdPushed() { } private CompletableFuture takeSnapshot(Position position) { - CompletableFuture future = new CompletableFuture<>(); if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } if (!snapshotTaking.compareAndSet(false, true)) { - future.complete(null); - return future; + return CompletableFuture.completedFuture(null); } Map snapshot = new TreeMap<>(); @@ -606,25 +605,24 @@ private CompletableFuture takeSnapshot(Position position) { } }); - getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - if (log.isDebugEnabled()) { - log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); - } - lastSnapshotTimestamp = System.currentTimeMillis(); - snapshotTaking.set(false); - future.complete(null); - } + final var cursor = managedCursor; + if (cursor == null) { + log.error("[{}] Managed cursor is null when taking snapshot on {}", topic.getName(), position); + return CompletableFuture.completedFuture(null); + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to store new deduplication snapshot at {}", - topic.getName(), position, exception); - snapshotTaking.set(false); - future.completeExceptionally(exception); + final var future = markDelete(cursor, position, snapshot).thenAccept(__ -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); + } + lastSnapshotTimestamp = System.currentTimeMillis(); + }); + future.whenComplete((__, e) -> { + snapshotTaking.set(false); + if (e != null) { + log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position, e); } - }, null); + }); return future; } @@ -729,24 +727,19 @@ public int getSnapshotCounter() { return snapshotCounter.get(); } - public CompletableFuture failRecovery() { - final boolean takeSnapshot; - synchronized (this) { - if (status == Status.Recovering) { - status = Status.Failed; - final var future = replayFuture; - if (future != null) { - // Completing this future in a different thread so that callbacks will be called out of the - // synchronized block. - CompletableFuture.runAsync(() -> future.completeExceptionally(RECOVERY_FAILURE)); - } - takeSnapshot = snapshotCounter.get() > 0; - } else { - replayFuture = null; - takeSnapshot = false; - } + public synchronized CompletableFuture cancelRecovery() { + if (status != Status.Recovering) { + return CompletableFuture.completedFuture(null); + } + status = Status.Failed; + final var replayFuture = this.replayFuture; + if (replayFuture != null) { + // Fail the future of `checkStatus()` in a different thread to avoid executing the callback in the + // synchronized block. + CompletableFuture.runAsync(() -> replayFuture.completeExceptionally(RECOVERY_FAILURE), + pulsar.getExecutor()); } - if (takeSnapshot) { + if (snapshotCounter.get() > 0) { log.info("[{}] Take snapshot at {} when the deduplication replay is cancelled", topic.getName(), lastPosition); return takeSnapshot(lastPosition); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f8c3676fcbcde..ecd8df5c9b39c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1899,7 +1899,7 @@ CompletableFuture checkReplicationAndRetryOnFailure() { public CompletableFuture checkDeduplicationStatus() { createFuture.exceptionallyAsync(e -> { // cancelRecovery() might take snapshot, we need to close the managed ledger after the snapshot is taken - messageDeduplication.failRecovery().whenComplete((__, ___) -> close()); + messageDeduplication.cancelRecovery().whenComplete((__, ___) -> close()); return Optional.empty(); }, orderedExecutor); return messageDeduplication.checkStatus(); From 4644bed4dd5d51ef3ce197282ace281c3ed89b1e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 10 Jul 2025 19:10:11 +0800 Subject: [PATCH 16/16] Improve testDuplicatedTopics --- .../client/api/OrphanPersistentTopicTest.java | 74 +++++++++++-------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 0da7e76dfb01f..e20428e5a8b11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.Unpooled; @@ -40,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -50,6 +52,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; @@ -58,7 +61,6 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.zookeeper.MockZooKeeper; @@ -411,40 +413,50 @@ public void testDuplicatedTopics() throws Exception { final var topic = topicName.toString(); admin.topics().createNonPartitionedTopic(topic); admin.topics().unload(topic); - // When load timeout happens, there might be a PersistentTopic initializing at the background. It will be closed - // after the initialization is done. - final var oldPersistentTopic = (PersistentTopic) pulsar.getBrokerService() + // When load timeout happens, there might be an orphan PersistentTopic that will be closed after the + // initialization is done. Here we simulate the case by manually creating such an instance. + final var orphanPersistentTopic = (PersistentTopic) pulsar.getBrokerService() .loadOrCreatePersistentTopic(topic, true, null).get().orElseThrow(); - final var persistentTopic = (PersistentTopic) pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topic, true, null).get().orElseThrow(); - oldPersistentTopic.close(); - final var future = new CompletableFuture(); - final var metadata = new MessageMetadata(); - metadata.setProducerName("producer"); - metadata.setSequenceId(0); - metadata.setPublishTime(System.currentTimeMillis()); - persistentTopic.publishMessage(Unpooled.wrappedBuffer(metadata.toByteArray()), new Topic.PublishContext() { - @Override - public void completed(Exception e, long ledgerId, long entryId) { - if (e == null) { - future.complete(PositionFactory.create(ledgerId, entryId)); - } else { - future.completeExceptionally(e); - } - } + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get() + .orElseThrow(); + assertSame(pulsar.getBrokerService().getTopics().get(topic), persistentTopic.getCreateFuture()); + orphanPersistentTopic.close(); + final BiFunction, Topic.PublishContext> newCallback = (sequence, future) -> + new Topic.PublishContext() { + @Override + public void completed(Exception e, long ledgerId, long entryId) { + if (e == null) { + future.complete(PositionFactory.create(ledgerId, entryId)); + } else { + future.completeExceptionally(e); + } + } - @Override - public String getProducerName() { - return "producer"; - } + @Override + public String getProducerName() { + return "producer"; + } - @Override - public long getSequenceId() { - return 0; - } - }); + @Override + public long getSequenceId() { + return sequence; + } + }; + + final var future = new CompletableFuture(); + persistentTopic.publishMessage(Unpooled.buffer(1), newCallback.apply(0, future)); final var position = future.get(3, TimeUnit.SECONDS); log.info("Sent to {}", position); - persistentTopic.close(); + admin.topics().delete(topic); + + final var future2 = new CompletableFuture(); + persistentTopic.publishMessage(Unpooled.buffer(1), newCallback.apply(1, future2)); + try { + future2.get(3, TimeUnit.SECONDS); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.TopicFencedException); + } + assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic)); } }