diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java new file mode 100644 index 0000000000000..27d353d284da8 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerAsyncUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * This util class contains some future-based methods to replace callback-based APIs. With a callback-based API, if any + * exception is thrown in the callback, the callback will never have a chance to be called. While with a future-based + * API, if any exception is thrown in future's callback (e.g. `thenApply`), the future will eventually be completed + * exceptionally. In addition, future-based API is easier for users to switch a different executor to execute the + * callback (e.g. `thenApplyAsync`). + */ +@InterfaceStability.Evolving +public class ManagedLedgerAsyncUtils { + + public static CompletableFuture openCursor(ManagedLedger ml, String cursorName) { + final var future = new CompletableFuture(); + ml.asyncOpenCursor(cursorName, new AsyncCallbacks.OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + future.complete(cursor); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + return future; + } + + public static CompletableFuture> readEntries(ManagedCursor cursor, int numberOfEntriesToRead, + Position maxPosition) { + final var future = new CompletableFuture>(); + cursor.asyncReadEntries(numberOfEntriesToRead, new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + future.complete(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null, maxPosition); + return future; + } + + public static CompletableFuture markDelete(ManagedCursor cursor, Position position, + Map properties) { + final var future = new CompletableFuture(); + cursor.asyncMarkDelete(position, properties, new AsyncCallbacks.MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + future.complete(null); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, new Object()); + return future; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6124c913b26db..a632cecbc8aca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -132,6 +132,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactory; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryClassic; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; @@ -1657,16 +1658,20 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c * @return CompletableFuture * @throws RuntimeException */ - protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, + @VisibleForTesting + public CompletableFuture> loadOrCreatePersistentTopic(final String topic, boolean createIfMissing, Map properties) { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); - topicFuture.exceptionally(t -> { + topicFuture.exceptionallyAsync(e -> { pulsarStats.recordTopicLoadFailed(); - return null; - }); + if (topics.remove(topic, topicFuture)) { + log.info("Removed topic {} for: {}", topic, e.getMessage()); + } + return Optional.empty(); + }, executor()); checkTopicNsOwnership(topic) .thenRun(() -> { @@ -1781,7 +1786,6 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new NotAllowedException(msg)); return; } @@ -1842,6 +1846,12 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, @Override public void openLedgerComplete(ManagedLedger ledger, Object ctx) { try { + if (topicFuture.isCompletedExceptionally()) { + // Don't close the managed ledger because next time the topic is accessed, the + // managed ledger will be created again. The managed ledger will be removed if the + // ownership has been changed. + return; + } PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); @@ -1870,20 +1880,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { log.error("{} future is already completed by another thread, " + "which is not expected. Closing the current one", topic); } - executor().submit(() -> { - persistentTopic.close().whenComplete((ignore, ex) -> { - topics.remove(topic, topicFuture); - if (ex != null) { - log.warn("[{}] Get an error when closing topic.", - topic, ex); - } - }); - }); } else { addTopicToStatsMaps(topicName, persistentTopic); } }) .exceptionally((ex) -> { + if (MessageDeduplication.RECOVERY_FAILURE.equals(ex.getCause())) { + log.info("Deduplication recovery of {} is cancelled", topic); + return null; + } log.warn("Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 5dc06842f9843..0fe9949c069ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.markDelete; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.openCursor; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerAsyncUtils.readEntries; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_POSITION; import com.google.common.annotations.VisibleForTesting; @@ -30,10 +33,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -56,10 +57,12 @@ */ public class MessageDeduplication { + public static final IllegalStateException RECOVERY_FAILURE = new IllegalStateException("recovery is cancelled"); private final PulsarService pulsar; private final PersistentTopic topic; private final ManagedLedger managedLedger; private ManagedCursor managedCursor; + private Position lastPosition = PositionFactory.EARLIEST; private static final String IS_LAST_CHUNK = "isLastChunk"; @@ -118,7 +121,7 @@ public MessageDupUnknownException(String topicName, String producerName) { private final int snapshotInterval; // Counter of number of entries stored after last snapshot was taken - private int snapshotCounter; + private final AtomicInteger snapshotCounter = new AtomicInteger(0); // The timestamp when the snapshot was taken by the scheduled task last time private volatile long lastSnapshotTimestamp = 0L; @@ -133,6 +136,7 @@ public MessageDupUnknownException(String topicName, String producerName) { private final AtomicBoolean snapshotTaking = new AtomicBoolean(false); + private volatile CompletableFuture replayFuture = new CompletableFuture<>(); public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) { this.pulsar = pulsar; @@ -141,70 +145,50 @@ public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, Managed this.status = Status.Initialized; this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval(); this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers(); - this.snapshotCounter = 0; this.replicatorPrefix = pulsar.getConfiguration().getReplicatorPrefix(); } - private CompletableFuture recoverSequenceIdsMap() { - // Load the sequence ids from the snapshot in the cursor properties - managedCursor.getProperties().forEach((k, v) -> { - producerRemoved(k); - highestSequencedPushed.put(k, v); - highestSequencedPersisted.put(k, v); - }); - - // Replay all the entries and apply all the sequence ids updates - log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries()); - CompletableFuture future = new CompletableFuture<>(); - replayCursor(future); - return future.thenCompose(lastPosition -> { - if (lastPosition != null && snapshotCounter >= snapshotInterval) { - snapshotCounter = 0; - return takeSnapshot(lastPosition); + private CompletableFuture replay(ManagedCursor cursor) { + return replayOnce(cursor).thenCompose(hasMoreEntries -> { + if (hasMoreEntries) { + return replay(cursor); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { + if (snapshotCounter.get() >= snapshotInterval) { + snapshotCounter.set(0); + // Ignore the possible failure when taking the snapshot + return takeSnapshot(lastPosition).exceptionally(e -> null); + } else { + return CompletableFuture.completedFuture(null); } - return CompletableFuture.completedFuture(null); }); } - /** - * Read all the entries published from the cursor position until the most recent and update the highest sequence id - * from each producer. - * - * @param future future to trigger when the replay is complete - */ - private void replayCursor(CompletableFuture future) { - managedCursor.asyncReadEntries(100, new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - Position lastPosition = null; - for (Entry entry : entries) { - ByteBuf messageMetadataAndPayload = entry.getDataBuffer(); - MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload); - - String producerName = md.getProducerName(); - long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId()); - highestSequencedPushed.put(producerName, sequenceId); - highestSequencedPersisted.put(producerName, sequenceId); - producerRemoved(producerName); - snapshotCounter++; - lastPosition = entry.getPosition(); - entry.release(); - } - - if (managedCursor.hasMoreEntries()) { - // Read next batch of entries - pulsar.getExecutor().execute(() -> replayCursor(future)); - } else { - // Done replaying - future.complete(lastPosition); + private CompletableFuture replayOnce(ManagedCursor cursor) { + return readEntries(cursor, 100, PositionFactory.LATEST).thenApply(entries -> { + try { + synchronized (this) { + if (status == Status.Failed) { + return false; + } + for (final var entry : entries) { + final var metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + final var producerName = metadata.getProducerName(); + final var sequenceId = Math.max(metadata.getHighestSequenceId(), metadata.getSequenceId()); + highestSequencedPushed.put(producerName, sequenceId); + highestSequencedPersisted.put(producerName, sequenceId); + producerRemoved(producerName); + lastPosition = entry.getPosition(); + snapshotCounter.getAndIncrement(); + } } + } finally { + entries.forEach(Entry::release); } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null, PositionFactory.LATEST); + return cursor.hasMoreEntries(); + }); } public Status getStatus() { @@ -248,6 +232,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { // Disabled deduping CompletableFuture future = new CompletableFuture<>(); status = Status.Removing; + lastPosition = PositionFactory.EARLIEST; managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new DeleteCursorCallback() { @@ -282,35 +267,36 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) { // Enable deduping - CompletableFuture future = new CompletableFuture<>(); - managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() { - - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - // We don't want to retain cache for this cursor - cursor.setAlwaysInactive(); - managedCursor = cursor; - recoverSequenceIdsMap().thenRun(() -> { - status = Status.Enabled; - future.complete(null); - log.info("[{}] Enabled deduplication", topic.getName()); - }).exceptionally(ex -> { - status = Status.Failed; - log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), ex.getMessage()); - future.completeExceptionally(ex); - return null; - }); - } - - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), - exception.getMessage()); - future.completeExceptionally(exception); - } - - }, null); - return future; + lastPosition = PositionFactory.EARLIEST; + final var cursorFuture = openCursor(managedLedger, PersistentTopic.DEDUPLICATION_CURSOR_NAME); + replayFuture = cursorFuture.thenCompose(cursor -> { + cursor.setAlwaysInactive(); + managedCursor = cursor; + lastPosition = managedCursor.getReadPosition(); + // Load the sequence ids from the snapshot in the cursor properties + cursor.getProperties().forEach((k, v) -> { + producerRemoved(k); + highestSequencedPushed.put(k, v); + highestSequencedPersisted.put(k, v); + }); + log.info("[{}] Replaying {} entries for deduplication", topic.getName(), + cursor.getNumberOfEntries()); + return replay(cursor).thenAccept(__ -> { + synchronized (this) { // synchronize with cancelRecovery() + if (status != Status.Failed) { + status = Status.Enabled; + log.info("[{}] Enabled deduplication", topic.getName()); + } + } + }); + }); + replayFuture.exceptionally(e -> { + status = Status.Failed; + log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), e.getMessage()); + return null; + }); + status = Status.Recovering; + return replayFuture; } else { // Nothing to do, we are in the correct state return CompletableFuture.completedFuture(null); @@ -581,8 +567,8 @@ public void recordMessagePersistedNormal(PublishContext publishContext, Position } private void increaseSnapshotCounterAndTakeSnapshotIfNeeded(Position position) { - if (++snapshotCounter >= snapshotInterval) { - snapshotCounter = 0; + if (snapshotCounter.incrementAndGet() >= snapshotInterval) { + snapshotCounter.set(0); takeSnapshot(position); } else { if (log.isDebugEnabled()) { @@ -604,14 +590,12 @@ public void resetHighestSequenceIdPushed() { } private CompletableFuture takeSnapshot(Position position) { - CompletableFuture future = new CompletableFuture<>(); if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } if (!snapshotTaking.compareAndSet(false, true)) { - future.complete(null); - return future; + return CompletableFuture.completedFuture(null); } Map snapshot = new TreeMap<>(); @@ -621,25 +605,24 @@ private CompletableFuture takeSnapshot(Position position) { } }); - getManagedCursor().asyncMarkDelete(position, snapshot, new MarkDeleteCallback() { - @Override - public void markDeleteComplete(Object ctx) { - if (log.isDebugEnabled()) { - log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); - } - lastSnapshotTimestamp = System.currentTimeMillis(); - snapshotTaking.set(false); - future.complete(null); - } + final var cursor = managedCursor; + if (cursor == null) { + log.error("[{}] Managed cursor is null when taking snapshot on {}", topic.getName(), position); + return CompletableFuture.completedFuture(null); + } - @Override - public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to store new deduplication snapshot at {}", - topic.getName(), position, exception); - snapshotTaking.set(false); - future.completeExceptionally(exception); + final var future = markDelete(cursor, position, snapshot).thenAccept(__ -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); + } + lastSnapshotTimestamp = System.currentTimeMillis(); + }); + future.whenComplete((__, e) -> { + snapshotTaking.set(false); + if (e != null) { + log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position, e); } - }, null); + }); return future; } @@ -739,5 +722,32 @@ Map getInactiveProducers() { return inactiveProducers; } + @VisibleForTesting + public int getSnapshotCounter() { + return snapshotCounter.get(); + } + + public synchronized CompletableFuture cancelRecovery() { + if (status != Status.Recovering) { + return CompletableFuture.completedFuture(null); + } + status = Status.Failed; + final var replayFuture = this.replayFuture; + if (replayFuture != null) { + // Fail the future of `checkStatus()` in a different thread to avoid executing the callback in the + // synchronized block. + CompletableFuture.runAsync(() -> replayFuture.completeExceptionally(RECOVERY_FAILURE), + pulsar.getExecutor()); + } + if (snapshotCounter.get() > 0) { + log.info("[{}] Take snapshot at {} when the deduplication replay is cancelled", topic.getName(), + lastPosition); + return takeSnapshot(lastPosition); + } else { + log.info("[{}] Cancel the deduplication replay for failure", topic.getName()); + return CompletableFuture.completedFuture(null); + } + } + private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 91d964125bf36..ecd8df5c9b39c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1794,7 +1794,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { } }, null)); - disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> { + disconnectClientsInCurrentCall.thenRun(() -> { + if (brokerService.getTopics().get(topic) == createFuture) { + closeLedgerAfterCloseClients.run(); + } + }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); unfenceTopicToResume(); closeFuture.completeExceptionally(exception); @@ -1893,6 +1897,11 @@ CompletableFuture checkReplicationAndRetryOnFailure() { } public CompletableFuture checkDeduplicationStatus() { + createFuture.exceptionallyAsync(e -> { + // cancelRecovery() might take snapshot, we need to close the managed ledger after the snapshot is taken + messageDeduplication.cancelRecovery().whenComplete((__, ___) -> close()); + return Optional.empty(); + }, orderedExecutor); return messageDeduplication.checkStatus(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java index 499b03e5fd1b4..852b7de8ddd6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -127,8 +126,7 @@ public void testSnapshotCounterAfterUnload() throws Exception { for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) { producer.send(i + ""); } - int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); - assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + assertEquals(deduplication1.getSnapshotCounter(), brokerDeduplicationEntriesInterval - 1); admin.topics().unload(topic); PersistentTopic persistentTopic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); @@ -145,8 +143,7 @@ public void testSnapshotCounterAfterUnload() throws Exception { ml2.trimConsumedLedgersInBackground(new CompletableFuture<>()); // step 4. Awaitility.await().untilAsserted(() -> { - int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval); + assertTrue(deduplication2.getSnapshotCounter() < brokerDeduplicationEntriesInterval); // Verify: the previous ledger will be removed because all messages have been acked. assertEquals(ml2.getLedgersInfo().size(), 1); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index e1c5fef89ce2f..bbff41005810d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -26,28 +28,32 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; -import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker") @@ -57,20 +63,32 @@ public class TopicDuplicationTest extends ProducerConsumerBase { private final String myNamespace = testTenant + "/" + testNamespace; private final String testTopic = "persistent://" + myNamespace + "/max-unacked-"; - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { - this.conf.setBrokerDeduplicationEnabled(true); super.internalSetup(); super.producerBaseSetup(); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setBrokerDeduplicationEnabled(true); + BrokerTestInterceptor.INSTANCE.configure(conf); + } + + @AfterMethod(alwaysRun = true) + protected void resetInterceptors() throws PulsarAdminException { + BrokerTestInterceptor.INSTANCE.reset(); + admin.namespaces().removeDeduplicationStatus(myNamespace); + } + @Test(timeOut = 10000) public void testDuplicationApi() throws Exception { final String topicName = testTopic + UUID.randomUUID().toString(); @@ -555,9 +573,6 @@ public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() thro @Test public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { - cleanup(); - setup(); - // Create a topic and wait deduplication is started. int brokerDeduplicationEntriesInterval = 1000; pulsar.getConfiguration().setBrokerDeduplicationEnabled(true); @@ -583,8 +598,7 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { producer.send(i + ""); } producer.close(); - int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter"); - assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1); + assertEquals(deduplication1.getSnapshotCounter(), brokerDeduplicationEntriesInterval - 1); // Unload and load topic, simulate topic load is timeout. @@ -597,13 +611,22 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { // Mock message deduplication recovery speed topicLoadTimeoutSeconds pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1); - String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" - + TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; - mockZooKeeper.delay(2 * 1000, (op, path) -> { - if (mlPath.equals(path)) { - return true; + + final var readCount = new AtomicInteger(0); + BrokerTestInterceptor.INSTANCE.applyCursorSpyDecorator(cursor -> { + if (cursor.getManagedLedger().getName().contains(TopicName.get(topic).getLocalName())) { + doAnswer(invocation -> { + if (readCount.getAndIncrement() == 1) { + // The 1st replay will only be able to read first 100 entries + final var callback = (AsyncCallbacks.ReadEntriesCallback) invocation.getArgument(1); + CompletableFuture.delayedExecutor(1, TimeUnit.MINUTES).execute(() -> + callback.readEntriesComplete(List.of(), null)); + } else { + invocation.callRealMethod(); + } + return null; + }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); } - return false; }); final var topics = pulsar.getBrokerService().getTopics(); @@ -618,29 +641,19 @@ public void testFinishTakeSnapshotWhenTopicLoading() throws Exception { Assert.assertFalse(topics.containsKey(topic)); }); - // Load topic again, setBrokerDeduplicationEntriesInterval to 10000, // make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot. // But actually it should not replay again in recoverSequenceIdsMap, // since previous topic loading should finish the replay process. pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000); pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60); - PersistentTopic persistentTopic2 = - (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); - ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger(); - MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication(); - - Awaitility.await().untilAsserted(() -> { - int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter"); - Assert.assertEquals(snapshotCounter3, 0); - Assert.assertEquals(ml2.getLedgersInfo().size(), 1); - }); + PersistentTopic persistentTopic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false) + .get(3, TimeUnit.SECONDS).orElseThrow(); + assertEquals(persistentTopic2.messageDeduplication.getSnapshotCounter(), 899); // cleanup. admin.topics().delete(topic); - cleanup(); - setup(); } private void waitCacheInit(String topicName) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index b7c323af5bcd4..e20428e5a8b11 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -20,10 +20,16 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.Unpooled; import java.lang.reflect.Field; import java.util.Collections; import java.util.List; @@ -31,25 +37,38 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.compaction.CompactionServiceFactory; +import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; +import org.jspecify.annotations.Nullable; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -61,6 +80,7 @@ public class OrphanPersistentTopicTest extends ProducerConsumerBase { @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { + super.isTcpLookup = true; super.internalSetup(); super.producerBaseSetup(); } @@ -71,27 +91,30 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setBrokerDeduplicationEnabled(true); + BrokerTestInterceptor.INSTANCE.configure(conf); + } + + @AfterMethod(alwaysRun = true) + protected void resetInterceptors() { + BrokerTestInterceptor.INSTANCE.reset(); + } + @Test public void testNoOrphanTopicAfterCreateTimeout() throws Exception { // Make the topic loading timeout faster. int topicLoadTimeoutSeconds = 2; long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); - pulsar.getConfig().setTopicLoadTimeoutSeconds(2); + pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName) - .getPersistenceNamingEncoding(); // Make topic load timeout 5 times. - AtomicInteger timeoutCounter = new AtomicInteger(); for (int i = 0; i < 5; i++) { - mockZooKeeper.delay(topicLoadTimeoutSeconds * 2 * 1000, (op, path) -> { - if (mlPath.equals(path)) { - log.info("Topic load timeout: " + timeoutCounter.incrementAndGet()); - return true; - } - return false; - }); + delayManagedLedgerCreation(i, TopicName.get(tpName), null); } // Load topic. @@ -126,20 +149,10 @@ public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); int topicLoadTimeoutSeconds = 1; pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); - pulsar.getConfig().setBrokerDeduplicationEnabled(true); pulsar.getConfig().setTransactionCoordinatorEnabled(true); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); - // Mock message deduplication recovery speed topicLoadTimeoutSeconds - String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" - + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; - mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> { - if (mlPath.equals(path)) { - log.info("Topic load timeout: " + path); - return true; - } - return false; - }); + delayManagedLedgerCreation(0, TopicName.get(tpName), DEDUPLICATION_CURSOR_NAME); // First load topic will trigger timeout // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. @@ -178,7 +191,6 @@ public CompletableFuture closeAsync() { // set to back pulsar.setTransactionBufferProvider(originalTransactionBufferProvider); pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); - pulsar.getConfig().setBrokerDeduplicationEnabled(false); pulsar.getConfig().setTransactionCoordinatorEnabled(false); } @@ -237,7 +249,7 @@ public Object[][] whetherTimeoutOrNot() { @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot") public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { if (injectTimeout) { - pulsar.getConfig().setTopicLoadTimeoutSeconds(5); + pulsar.getConfig().setTopicLoadTimeoutSeconds(2); } String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", ""); String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); @@ -252,7 +264,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { TopicName paramTp = (TopicName) invocation.getArguments()[0]; if (paramTp.toString().equalsIgnoreCase(tpName) && failedTimes.incrementAndGet() <= 2) { if (injectTimeout) { - Thread.sleep(10 * 1000); + Thread.sleep(conf.getTopicLoadTimeoutSeconds() * 1000 + 500); } log.info("Failed {} times", failedTimes.get()); return CompletableFuture.failedFuture(new RuntimeException("mocked error")); @@ -274,7 +286,7 @@ public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot") public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Exception { if (injectTimeout) { - pulsar.getConfig().setTopicLoadTimeoutSeconds(5); + pulsar.getConfig().setTopicLoadTimeoutSeconds(2); } String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", ""); String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); @@ -289,7 +301,7 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex TopicName paramTp = (TopicName) invocation.getArguments()[0]; if (paramTp.toString().equalsIgnoreCase(tpName) && mockRaceConditionCounter.incrementAndGet() <= 1) { if (injectTimeout) { - Thread.sleep(10 * 1000); + Thread.sleep(conf.getTopicLoadTimeoutSeconds() * 1000 + 500); } log.info("Race condition occurs {} times", mockRaceConditionCounter.get()); pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); @@ -305,13 +317,146 @@ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Ex } // Verify: the consumer create successfully after allowing to create topic automatically. - Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + try (final var consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe()) { + } catch (PulsarClientException.TopicDoesNotExistException e) { + fail(); + } + }); // cleanup. if (injectTimeout) { pulsar.getConfig().setTopicLoadTimeoutSeconds(60); } - consumer.close(); admin.topics().delete(tpName); } + + private void delayManagedLedgerCreation(int i, TopicName topicName, @Nullable String cursor) { + final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + topicName.getPersistenceNamingEncoding() + + (cursor == null ? "" : ("/" + cursor)); + mockZooKeeper.delay(conf.getTopicLoadTimeoutSeconds() * 1000 + 500, (__, path) -> { + if (mlPath.equals(path)) { + log.info("Topic {} load timeout: {}", i, path); + return true; + } + return false; + }); + } + + @Test + public void testDeduplicationReplayStuck() throws Exception { + final var topic = "test-deduplication-replay-stuck"; + conf.setTopicLoadTimeoutSeconds(1); + try (final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create()) { + producer.send("msg"); + } + final var firstTime = new AtomicBoolean(true); + BrokerTestInterceptor.INSTANCE.applyCursorSpyDecorator(cursor -> { + if (cursor.getManagedLedger().getName().contains(topic)) { + if (firstTime.compareAndSet(true, false)) { + doAnswer(invocation -> { + final var callback = (AsyncCallbacks.ReadEntriesCallback) invocation.getArgument(1); + CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS).execute(() -> + callback.readEntriesComplete(List.of(), null)); + return null; + }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); + } else { + doCallRealMethod().when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); + } + } + }); + admin.topics().unload(topic); + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribeAsync() + .get(3, TimeUnit.SECONDS); + firstTime.set(true); + admin.topics().unload(topic); + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).createAsync() + .get(3, TimeUnit.SECONDS); + producer.sendAsync("msg").get(3, TimeUnit.SECONDS); + final var msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals(msg.getValue(), "msg"); + } + + @Test + public void testOrphanManagedLedgerRemovedAfterUnload() throws Exception { + final var topicName = TopicName.get("test-orphan-managed-ledger-removed-after-unload"); + final var topic = topicName.toString(); + + conf.setTopicLoadTimeoutSeconds(1); + final var mlKey = topicName.getPersistenceNamingEncoding(); + final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + mlKey; + mockZooKeeper.delay(conf.getTopicLoadTimeoutSeconds() * 1000 + 500, (op, path) -> + op == MockZooKeeper.Op.CREATE && mlPath.equals(path)); + + try { + pulsar.getBrokerService().getTopic(topic, true).get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getMessage().contains("Failed to load topic within timeout")); + } + + final var managedLedgers = pulsar.getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory() + .getManagedLedgers(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertTrue(managedLedgers.containsKey(mlKey))); + + admin.topics().unload(topic); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertFalse(managedLedgers.containsKey(mlKey))); + } + + @Test + public void testDuplicatedTopics() throws Exception { + final var topicName = TopicName.get("test-duplicated-topics"); + final var topic = topicName.toString(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().unload(topic); + // When load timeout happens, there might be an orphan PersistentTopic that will be closed after the + // initialization is done. Here we simulate the case by manually creating such an instance. + final var orphanPersistentTopic = (PersistentTopic) pulsar.getBrokerService() + .loadOrCreatePersistentTopic(topic, true, null).get().orElseThrow(); + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get() + .orElseThrow(); + assertSame(pulsar.getBrokerService().getTopics().get(topic), persistentTopic.getCreateFuture()); + orphanPersistentTopic.close(); + final BiFunction, Topic.PublishContext> newCallback = (sequence, future) -> + new Topic.PublishContext() { + @Override + public void completed(Exception e, long ledgerId, long entryId) { + if (e == null) { + future.complete(PositionFactory.create(ledgerId, entryId)); + } else { + future.completeExceptionally(e); + } + } + + @Override + public String getProducerName() { + return "producer"; + } + + @Override + public long getSequenceId() { + return sequence; + } + }; + + final var future = new CompletableFuture(); + persistentTopic.publishMessage(Unpooled.buffer(1), newCallback.apply(0, future)); + final var position = future.get(3, TimeUnit.SECONDS); + log.info("Sent to {}", position); + admin.topics().delete(topic); + + final var future2 = new CompletableFuture(); + persistentTopic.publishMessage(Unpooled.buffer(1), newCallback.apply(1, future2)); + try { + future2.get(3, TimeUnit.SECONDS); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.TopicFencedException); + } + assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic)); + } }