From 40c4a245665df98fce174b44c0ce61081fda38b0 Mon Sep 17 00:00:00 2001 From: erobot Date: Fri, 20 Oct 2023 15:05:54 +0800 Subject: [PATCH 1/3] Avoid blocking the message listener threads --- lib/ConsumerImpl.cc | 14 +++++++- lib/ConsumerImpl.h | 1 + lib/MessageImpl.h | 1 + lib/MultiTopicsConsumerImpl.cc | 10 ++---- lib/MultiTopicsConsumerImpl.h | 4 +-- tests/ConsumerTest.cc | 63 ++++++++++++++++++++++++++++++++++ 6 files changed, 83 insertions(+), 10 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 61def687..8dd334ea 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { return; } - increaseAvailablePermits(currentCnx); + if (!hasParent_) { + increaseAvailablePermits(currentCnx); + } if (track) { trackMessage(msg.getMessageId()); } @@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn } } +void ConsumerImpl::increaseAvailablePermits(const Message& msg) { + ClientConnectionPtr currentCnx = getCnx().lock(); + if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { + LOG_DEBUG(getName() << "Not adding permit since connection is different."); + return; + } + + increaseAvailablePermits(currentCnx); +} + inline CommandSubscribe_SubType ConsumerImpl::getSubType() { ConsumerType type = config_.getConsumerType(); switch (type) { diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 8d5bdcfb..dd7163fb 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase { void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId, CommandAck_ValidationError validationError); void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1); + void increaseAvailablePermits(const Message& msg); void drainIncomingMessageQueue(size_t count); uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage, const BitSet& ackSet, int redeliveryCount); diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h index cc07c586..66e43d15 100644 --- a/lib/MessageImpl.h +++ b/lib/MessageImpl.h @@ -47,6 +47,7 @@ class MessageImpl { int redeliveryCount_; bool hasSchemaVersion_; const std::string* schemaVersion_; + std::shared_ptr consumerPtr_; const std::string& getPartitionKey() const; bool hasPartitionKey() const; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 5162a61c..8fa40c04 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic() << " message:" << msg.getDataAsString()); msg.impl_->setTopicName(consumer.impl_->getTopicPtr()); + msg.impl_->consumerPtr_ = std::static_pointer_cast(consumer.impl_); Lock lock(pendingReceiveMutex_); if (!pendingReceives_.empty()) { @@ -530,18 +531,12 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& auto self = weakSelf.lock(); if (self) { notifyPendingReceivedCallback(ResultOk, msg, callback); + msg.impl_->consumerPtr_->increaseAvailablePermits(msg); } }); return; } - if (incomingMessages_.full()) { - lock.unlock(); - } - - // add message to block queue. - // when messages queue is full, will block listener thread on ConsumerImpl, - // then will not send permits to broker, will broker stop push message. incomingMessages_.push(msg); incomingMessagesSize_.fetch_add(msg.getLength()); @@ -1072,6 +1067,7 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece void MultiTopicsConsumerImpl::messageProcessed(Message& msg) { incomingMessagesSize_.fetch_sub(msg.getLength()); unAckedMessageTrackerPtr_->add(msg.getMessageId()); + msg.impl_->consumerPtr_->increaseAvailablePermits(msg); } std::shared_ptr MultiTopicsConsumerImpl::get_shared_this_ptr() { diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index b00b0f23..d4127f63 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -24,7 +24,6 @@ #include #include -#include "BlockingQueue.h" #include "Commands.h" #include "ConsumerImplBase.h" #include "ConsumerInterceptors.h" @@ -33,6 +32,7 @@ #include "LookupDataResult.h" #include "SynchronizedHashMap.h" #include "TestUtil.h" +#include "UnboundedBlockingQueue.h" namespace pulsar { typedef std::shared_ptr> ConsumerSubResultPromisePtr; @@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { std::map topicsPartitions_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - BlockingQueue incomingMessages_; + UnboundedBlockingQueue incomingMessages_; std::atomic_int incomingMessagesSize_ = {0}; MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 2d195fd8..76c36170 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1335,4 +1335,67 @@ TEST(ConsumerTest, testRetrySubscribe) { // milliseconds } +TEST(ConsumerTest, testNoListenerThreadBlocking) { + Client client{lookupUrl}; + + const int numPartitions = 2; + const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr)); + int res = + makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", + std::to_string(numPartitions)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + const int receiverQueueSize = 1; + const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions; + + Consumer consumer1, consumer2; + ConsumerConfiguration consumerConfig; + consumerConfig.setReceiverQueueSize(receiverQueueSize); + consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions); + Result consumerResult; + consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1); + ASSERT_EQ(consumerResult, ResultOk); + consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2); + ASSERT_EQ(consumerResult, ResultOk); + + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setBatchingEnabled(false); + producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer); + ASSERT_EQ(producerResult, ResultOk); + + const int msgCount = receiverQueueSizeAcrossPartitions * 100; + + for (int i = 0; i < msgCount; ++i) { + auto msg = MessageBuilder().setContent("test").build(); + producer.sendAsync(msg, [](Result code, const MessageId& messageId) {}); + } + producer.flush(); + producer.close(); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // check consumer1 prefetch num + auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1); + int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages(); + ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions); + + // read consumer2 while consumer1 reaches the prefech limit + for (int i = 0; i < msgCount; ++i) { + auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2); + int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages(); + ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions); + + Message msg; + Result ret = consumer2.receive(msg, 1000); + ASSERT_EQ(ret, ResultOk); + consumer2.acknowledge(msg); + } + + consumer2.close(); + consumer1.close(); + client.close(); +} + } // namespace pulsar From 6a0fb70f970f8e3b26a338fb43ccf21969c8f508 Mon Sep 17 00:00:00 2001 From: erobot Date: Fri, 20 Oct 2023 19:36:31 +0800 Subject: [PATCH 2/3] use waitUntil in test --- tests/ConsumerTest.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 76c36170..79175080 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1374,7 +1374,10 @@ TEST(ConsumerTest, testNoListenerThreadBlocking) { producer.flush(); producer.close(); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + waitUntil(std::chrono::seconds(1), [consumer1] { + auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1); + return multiConsumerImpl->getNumOfPrefetchedMessages() == receiverQueueSizeAcrossPartitions; + }); // check consumer1 prefetch num auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1); From 296ce23a73f569ca9b27639ae5f6be6c6a00c904 Mon Sep 17 00:00:00 2001 From: erobot Date: Fri, 20 Oct 2023 20:42:13 +0800 Subject: [PATCH 3/3] use weak_ptr --- lib/MessageImpl.h | 2 +- lib/MultiTopicsConsumerImpl.cc | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h index 66e43d15..55b9612e 100644 --- a/lib/MessageImpl.h +++ b/lib/MessageImpl.h @@ -47,7 +47,7 @@ class MessageImpl { int redeliveryCount_; bool hasSchemaVersion_; const std::string* schemaVersion_; - std::shared_ptr consumerPtr_; + std::weak_ptr consumerPtr_; const std::string& getPartitionKey() const; bool hasPartitionKey() const; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 8fa40c04..abc54c80 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -531,7 +531,10 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& auto self = weakSelf.lock(); if (self) { notifyPendingReceivedCallback(ResultOk, msg, callback); - msg.impl_->consumerPtr_->increaseAvailablePermits(msg); + auto consumer = msg.impl_->consumerPtr_.lock(); + if (consumer) { + consumer->increaseAvailablePermits(msg); + } } }); return; @@ -1067,7 +1070,10 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece void MultiTopicsConsumerImpl::messageProcessed(Message& msg) { incomingMessagesSize_.fetch_sub(msg.getLength()); unAckedMessageTrackerPtr_->add(msg.getMessageId()); - msg.impl_->consumerPtr_->increaseAvailablePermits(msg); + auto consumer = msg.impl_->consumerPtr_.lock(); + if (consumer) { + consumer->increaseAvailablePermits(msg); + } } std::shared_ptr MultiTopicsConsumerImpl::get_shared_this_ptr() {