From 550c42c10af7f73a0284f84216716e60399ed1c8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 15 Mar 2024 01:30:34 +0800 Subject: [PATCH 1/5] [fix] [broker] Delete topic stuck due to mismatch between dispatcher.consumerList and dispatcher.consumerSet --- ...PersistentDispatcherMultipleConsumers.java | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index be82b190ffb32..0e61dd771a23f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -214,18 +214,10 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE // decrement unack-message count for removed consumer addUnAckedMessages(-consumer.getUnackedMessages()); if (consumerSet.removeAll(consumer) == 1) { - consumerList.remove(consumer); + consumerList.removeIf(c -> consumer.equals(c)); log.info("Removed consumer {} with pending {} acks", consumer, consumer.getPendingAcks().size()); if (consumerList.isEmpty()) { - cancelPendingRead(); - - redeliveryMessages.clear(); - redeliveryTracker.clear(); - if (closeFuture != null) { - log.info("[{}] All consumers removed. Subscription is disconnected", name); - closeFuture.complete(null); - } - totalAvailablePermits = 0; + clearComponentsAfterRemovedAllConsumers(); } else { if (log.isDebugEnabled()) { log.debug("[{}] Consumer are left, reading more entries", name); @@ -242,8 +234,24 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE readMoreEntries(); } } else { - log.info("[{}] Trying to remove a non-connected consumer: {}", name, consumer); + consumerList.removeIf(c -> consumer.equals(c)); + if (consumerList.isEmpty()) { + clearComponentsAfterRemovedAllConsumers(); + } + log.warn("[{}] Trying to remove a non-connected consumer: {}", name, consumer); + } + } + + private synchronized void clearComponentsAfterRemovedAllConsumers() { + cancelPendingRead(); + + redeliveryMessages.clear(); + redeliveryTracker.clear(); + if (closeFuture != null) { + log.info("[{}] All consumers removed. Subscription is disconnected", name); + closeFuture.complete(null); } + totalAvailablePermits = 0; } @Override @@ -554,6 +562,9 @@ public synchronized CompletableFuture disconnectAllConsumers( if (consumerList.isEmpty()) { closeFuture.complete(null); } else { + // Iterator of CopyOnWriteArrayList uses the internal array to do the for-each, and CopyOnWriteArrayList + // will create a new internal array when adding/removing a new item. So remove items in the for-each + // block is safety when the for-each and add/remove are using a same lock. consumerList.forEach(consumer -> consumer.disconnect(isResetCursor, assignedBrokerLookupData)); cancelPendingRead(); } From c59501b5c101a99fdc3bc625e09391320e7f08ea Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 16 Mar 2024 10:05:58 +0800 Subject: [PATCH 2/5] add a test --- ...istentDispatcherMultipleConsumersTest.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java new file mode 100644 index 0000000000000..a032a9b9856c4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.carrotsearch.hppc.ObjectSet; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class PersistentDispatcherMultipleConsumersTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30 * 1000) + public void testTopicDeleteIfConsumerSetMismatchConsumerList() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscription, MessageId.earliest); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + // Make an error that makes "consumerSet" is mismatch with "consumerList". + Dispatcher dispatcher = pulsar.getBrokerService() + .getTopic(topicName, false).join().get() + .getSubscription(subscription).getDispatcher(); + ObjectSet consumerSet = + WhiteboxImpl.getInternalState(dispatcher, "consumerSet"); + List consumerList = + WhiteboxImpl.getInternalState(dispatcher, "consumerList"); + + org.apache.pulsar.broker.service.Consumer serviceConsumer = consumerList.get(0); + consumerSet.add(serviceConsumer); + consumerList.add(serviceConsumer); + + // Verify: the topic can be deleted successfully. + consumer.close(); + admin.topics().delete(topicName, false); + } +} From 2f67d1b679469832577fa78ac90e94a68d139628 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 16 Mar 2024 10:06:43 +0800 Subject: [PATCH 3/5] add a test --- .../persistent/PersistentDispatcherMultipleConsumersTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index a032a9b9856c4..453943509afb4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -59,7 +59,7 @@ public void testTopicDeleteIfConsumerSetMismatchConsumerList() throws Exception Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription) .subscriptionType(SubscriptionType.Shared).subscribe(); - // Make an error that makes "consumerSet" is mismatch with "consumerList". + // Make an error that "consumerSet" is mismatch with "consumerList". Dispatcher dispatcher = pulsar.getBrokerService() .getTopic(topicName, false).join().get() .getSubscription(subscription).getDispatcher(); From b709898283c60b3c116c8e41c71572c0d8095d63 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 19 Mar 2024 10:07:02 +0800 Subject: [PATCH 4/5] add another test --- ...istentDispatcherMultipleConsumersTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index 453943509afb4..f24c5c5933e5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -76,4 +76,26 @@ public void testTopicDeleteIfConsumerSetMismatchConsumerList() throws Exception consumer.close(); admin.topics().delete(topicName, false); } + + @Test(timeOut = 30 * 1000) + public void testTopicDeleteIfConsumerSetMismatchConsumerList2() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscription, MessageId.earliest); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).subscribe(); + // Make an error that "consumerSet" is mismatch with "consumerList". + Dispatcher dispatcher = pulsar.getBrokerService() + .getTopic(topicName, false).join().get() + .getSubscription(subscription).getDispatcher(); + ObjectSet consumerSet = + WhiteboxImpl.getInternalState(dispatcher, "consumerSet"); + consumerSet.clear(); + + // Verify: the topic can be deleted successfully. + consumer.close(); + admin.topics().delete(topicName, false); + } } From 4882977a240c9d0fe440f71163d8fc4280a37b5a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 21 Mar 2024 22:12:47 +0800 Subject: [PATCH 5/5] address comment --- .../PersistentDispatcherMultipleConsumers.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 0e61dd771a23f..35204e7af72bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -214,7 +214,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE // decrement unack-message count for removed consumer addUnAckedMessages(-consumer.getUnackedMessages()); if (consumerSet.removeAll(consumer) == 1) { - consumerList.removeIf(c -> consumer.equals(c)); + consumerList.remove(consumer); log.info("Removed consumer {} with pending {} acks", consumer, consumer.getPendingAcks().size()); if (consumerList.isEmpty()) { clearComponentsAfterRemovedAllConsumers(); @@ -234,11 +234,16 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE readMoreEntries(); } } else { + /** + * This is not an expected scenario, it will never happen in expected. + * Just add a defensive code to avoid the topic can not be unloaded anymore: remove the consumers which + * are not mismatch with {@link #consumerSet}. See more detail: https://github.com/apache/pulsar/pull/22270. + */ + log.error("[{}] Trying to remove a non-connected consumer: {}", name, consumer); consumerList.removeIf(c -> consumer.equals(c)); if (consumerList.isEmpty()) { clearComponentsAfterRemovedAllConsumers(); } - log.warn("[{}] Trying to remove a non-connected consumer: {}", name, consumer); } }