From d576ea47941b525fa28daa8f1345a93ce2478275 Mon Sep 17 00:00:00 2001 From: Chris Hamons Date: Tue, 5 May 2026 16:12:20 -0500 Subject: [PATCH 1/5] [fix][broker] Correct multiple race conditions in PersistentDispatcherMultipleConsumers - https://github.com/apache/pulsar/issues/25617 DelayedDeliveryTracker is not thread safe, and any access to it must be done holding the object lock. There were five cases I found, three of them I could correct just by adding synchronized to the method declaration. Two of them were overrides and were corrected using a manual locking scope. This is a significant issue, as exceptions here were uncaught and put the broker into an invalid state that prevented some delayed messages from being delivered until restart. The included unit test failed 100% of the time when run locally without the fix, and we were able to run https://github.com/chamons/pulsar-scheduled-exception-repro without issues with the fix included. --- ...PersistentDispatcherMultipleConsumers.java | 14 ++- ...istentDispatcherMultipleConsumersTest.java | 100 ++++++++++++++++++ 2 files changed, 109 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c569cf5b68cb2..fd1cd79e0c483 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -427,7 +427,7 @@ public synchronized void readMoreEntries() { } } - protected Predicate createReadEntriesSkipConditionForNormalRead() { + protected synchronized Predicate createReadEntriesSkipConditionForNormalRead() { Predicate skipCondition = null; // Filter out and skip read delayed messages exist in DelayedDeliveryTracker if (delayedDeliveryTracker.isPresent()) { @@ -1379,7 +1379,9 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { @Override public long getNumberOfDelayedMessages() { - return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); + synchronized (this) { + return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); + } } @Override @@ -1389,7 +1391,9 @@ public CompletableFuture clearDelayedMessages() { } if (delayedDeliveryTracker.isPresent()) { - return this.delayedDeliveryTracker.get().clear(); + synchronized (this) { + return this.delayedDeliveryTracker.get().clear(); + } } else { DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = topic.getBrokerService().getDelayedDeliveryTrackerFactory(); @@ -1464,11 +1468,11 @@ public PersistentTopic getTopic() { } - public long getDelayedTrackerMemoryUsage() { + public synchronized long getDelayedTrackerMemoryUsage() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } - public Map getBucketDelayedIndexStats() { + public synchronized Map getBucketDelayedIndexStats() { if (delayedDeliveryTracker.isEmpty()) { return Collections.emptyMap(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index e5da7850dfd63..c86d6d600cf97 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -20,7 +20,12 @@ import com.carrotsearch.hppc.ObjectSet; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.CustomLog; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -32,10 +37,13 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertEquals; import org.testng.Assert; import org.testng.annotations.Test; @@ -152,4 +160,96 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception { // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test + public void testRaceConditionInTrackDelayedDelivery() throws Exception { + final int numThreads = 16; + final int operationsPerThread = 2000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference firstException = new AtomicReference<>(); + + final String topicName = newTopicName(); + final String subscription = "s1"; + + // Needed to create the topic + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + PersistentTopic topic = (PersistentTopic) getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + + PersistentDispatcherMultipleConsumers dispatcher = + new PersistentDispatcherMultipleConsumers(topic, cursor, sub); + + // Align all writes to the same bucket + // This is the key which triggers the race condition + long deliverAt = System.currentTimeMillis() + 5000; + + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()) + .setDeliverAtTime(deliverAt); + + ExecutorService executorService = Executors.newFixedThreadPool(32); + + // Start clear message thread + for (int i = 0; i < numThreads / 2; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.clearDelayedMessages(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start track delayed delivery thread + for (int i = numThreads / 2; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + dispatcher.trackDelayedDelivery(1, 1, messageMetadata); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } } From 864522a4aa04d8fa816dec45c257f6c53bac82c0 Mon Sep 17 00:00:00 2001 From: Chris Hamons Date: Wed, 6 May 2026 09:29:02 -0500 Subject: [PATCH 2/5] fix: Add cleanup for text --- .../persistent/PersistentDispatcherMultipleConsumersTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index c86d6d600cf97..7ab1dc596b9ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -200,6 +200,7 @@ public void testRaceConditionInTrackDelayedDelivery() throws Exception { .setPublishTime(System.currentTimeMillis()) .setDeliverAtTime(deliverAt); + @Cleanup("shutdown") ExecutorService executorService = Executors.newFixedThreadPool(32); // Start clear message thread From 30a15a02978f41342fcfacb22feeef7ddbc35524 Mon Sep 17 00:00:00 2001 From: Chris Hamons Date: Wed, 6 May 2026 09:30:48 -0500 Subject: [PATCH 3/5] fix: Move to synchronized over explicit scope --- .../persistent/PersistentDispatcherMultipleConsumers.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index fd1cd79e0c483..274924b716cdb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1378,10 +1378,8 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { } @Override - public long getNumberOfDelayedMessages() { - synchronized (this) { - return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); - } + public synchronized long getNumberOfDelayedMessages() { + return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @Override From 0baeb14651998bf1b2868a2b7c430f60602b794c Mon Sep 17 00:00:00 2001 From: Chris Hamons Date: Wed, 6 May 2026 10:22:52 -0500 Subject: [PATCH 4/5] [fix][broker] Fix multiple race conditions in InMemoryDelayedDeliveryTracker --- .../InMemoryDelayedDeliveryTracker.java | 12 +-- .../delayed/InMemoryDeliveryTrackerTest.java | 102 ++++++++++++++++++ 2 files changed, 108 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 8da74a553ddf5..b9d487fbb12b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -122,7 +122,7 @@ private static long trimLowerBit(long timestamp, int bits) { } @Override - public boolean addMessage(long ledgerId, long entryId, long deliverAt) { + public synchronized boolean addMessage(long ledgerId, long entryId, long deliverAt) { if (deliverAt < 0 || deliverAt <= getCutoffTime()) { messagesHaveFixedDelay = false; return false; @@ -161,7 +161,7 @@ private void checkAndUpdateHighest(long deliverAt) { * Return true if there's at least a message that is scheduled to be delivered already. */ @Override - public boolean hasMessageAvailable() { + public synchronized boolean hasMessageAvailable() { boolean hasMessageAvailable = !delayedMessageMap.isEmpty() && delayedMessageMap.firstKey() <= getCutoffTime(); if (!hasMessageAvailable) { @@ -174,7 +174,7 @@ public boolean hasMessageAvailable() { * Get a set of position of messages that have already reached. */ @Override - public NavigableSet getScheduledMessages(int maxMessages) { + public synchronized NavigableSet getScheduledMessages(int maxMessages) { int n = maxMessages; NavigableSet positions = new TreeSet<>(); long cutoffTime = getCutoffTime(); @@ -237,7 +237,7 @@ public NavigableSet getScheduledMessages(int maxMessages) { } @Override - public CompletableFuture clear() { + public synchronized CompletableFuture clear() { this.delayedMessageMap.clear(); this.delayedMessagesCount.set(0); return CompletableFuture.completedFuture(null); @@ -262,7 +262,7 @@ public long getBufferMemoryUsage() { } @Override - public void close() { + public synchronized void close() { super.close(); } @@ -275,7 +275,7 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead && !hasMessageAvailable(); } - protected long nextDeliveryTime() { + protected synchronized long nextDeliveryTime() { return delayedMessageMap.firstKey(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index e25595072d3c9..768be9a56e96e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -33,6 +33,12 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Method; import java.time.Clock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; @@ -274,4 +280,100 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc tracker.close(); } + @Test(dataProvider = "delayedTracker") + public void testRaceConditionInUpdateTimer(InMemoryDelayedDeliveryTracker tracker) throws Exception { + final int numThreads = 15; + final int operationsPerThread = 2000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference firstException = new AtomicReference<>(); + + @Cleanup("shutdown") + ExecutorService executorService = Executors.newFixedThreadPool(32); + + for (int i = 0; i < numThreads / 4; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + tracker.getNumberOfDelayedMessages(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + for (int i = numThreads / 4; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + tracker.addMessage(1, 1, 10); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + for (int i = numThreads / 4; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + tracker.clear(); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + for (int i = numThreads / 4; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + tracker.getScheduledMessages(1); + Thread.sleep(1); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } } From 27f194be0eea080df2856faae2c247a9ceb764a6 Mon Sep 17 00:00:00 2001 From: Chris Hamons Date: Wed, 6 May 2026 16:44:28 -0500 Subject: [PATCH 5/5] dev: Brute force some synchronized --- .../delayed/InMemoryDelayedDeliveryTracker.java | 4 ++-- .../delayed/InMemoryDeliveryTrackerTest.java | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index b9d487fbb12b5..72531896c1f8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -244,7 +244,7 @@ public synchronized CompletableFuture clear() { } @Override - public long getNumberOfDelayedMessages() { + public synchronized long getNumberOfDelayedMessages() { return delayedMessagesCount.get(); } @@ -255,7 +255,7 @@ public long getNumberOfDelayedMessages() { * @return the memory usage of the buffer */ @Override - public long getBufferMemoryUsage() { + public synchronized long getBufferMemoryUsage() { return delayedMessageMap.values().stream().mapToLong( ledgerMap -> ledgerMap.values().stream().mapToLong( Roaring64Bitmap::getLongSizeInBytes).sum()).sum(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index 768be9a56e96e..8f7ae3adc056a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -282,8 +282,8 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc @Test(dataProvider = "delayedTracker") public void testRaceConditionInUpdateTimer(InMemoryDelayedDeliveryTracker tracker) throws Exception { - final int numThreads = 15; - final int operationsPerThread = 2000; + final int numThreads = 16; + final int operationsPerThread = 1000; final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(numThreads); final AtomicInteger errors = new AtomicInteger(0); @@ -292,12 +292,12 @@ public void testRaceConditionInUpdateTimer(InMemoryDelayedDeliveryTracker tracke @Cleanup("shutdown") ExecutorService executorService = Executors.newFixedThreadPool(32); - for (int i = 0; i < numThreads / 4; i++) { + for (int i = 0; i < 2; i++) { executorService.submit(() -> { try { startLatch.await(); for (int j = 0; j < operationsPerThread; j++) { - tracker.getNumberOfDelayedMessages(); + tracker.clear(); Thread.sleep(1); } } catch (Exception e) { @@ -310,7 +310,7 @@ public void testRaceConditionInUpdateTimer(InMemoryDelayedDeliveryTracker tracke }); } - for (int i = numThreads / 4; i < numThreads; i++) { + for (int i = 0; i < 5; i++) { executorService.submit(() -> { try { startLatch.await(); @@ -328,12 +328,12 @@ public void testRaceConditionInUpdateTimer(InMemoryDelayedDeliveryTracker tracke }); } - for (int i = numThreads / 4; i < numThreads; i++) { + for (int i = 0; i < 5; i++) { executorService.submit(() -> { try { startLatch.await(); for (int j = 0; j < operationsPerThread; j++) { - tracker.clear(); + tracker.getNumberOfDelayedMessages(); Thread.sleep(1); } } catch (Exception e) { @@ -346,7 +346,7 @@ public void testRaceConditionInUpdateTimer(InMemoryDelayedDeliveryTracker tracke }); } - for (int i = numThreads / 4; i < numThreads; i++) { + for (int i = 0; i < 5; i++) { executorService.submit(() -> { try { startLatch.await();