diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 0b5d4527..a4b53107 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -148,6 +148,10 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result const auto numPartitions = getNumPartitionsWithLock(); assert(numProducersCreated_ <= numPartitions && partitionIndex <= numPartitions); + if (state_ == Closing) { + return; + } + if (state_ == Failed) { // We have already informed client that producer creation failed if (++numProducersCreated_ == numPartitions) { diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 69697359..7c997713 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -22,6 +22,7 @@ #include #include "HttpHelper.h" +#include "PulsarFriend.h" #include "lib/Future.h" #include "lib/Latch.h" #include "lib/LogUtils.h" @@ -467,4 +468,27 @@ TEST(ProducerTest, testCloseProducerBeforeCreated) { client.close(); } +TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUpdate) { + const std::string topic = "public/default/partition-test" + std::to_string(time(nullptr)); + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions"; + + int res = makePutRequest(topicOperateUrl, "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ClientConfiguration clientConf; + clientConf.setPartititionsUpdateInterval(1); + Client client(serviceUrl, clientConf); + ProducerConfiguration conf; + Producer producer; + client.createProducer(topic, conf, producer); + PartitionedProducerImpl& partitionedProducer = PulsarFriend::getPartitionedProducerImpl(producer); + + // TODO: Replace by producer interceptor to reproduce the issue then we can remove + // PulsarFriend::updatePartitions + PulsarFriend::updatePartitions(partitionedProducer, 3); + + producer.close(); + client.close(); +} + INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false)); diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index aa7737d0..e61259e7 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -187,6 +187,18 @@ class PulsarFriend { long unAckedMessagesTimeoutMs) { consumerConfiguration.impl_->unAckedMessagesTimeoutMs = unAckedMessagesTimeoutMs; } + + static PartitionedProducerImpl& getPartitionedProducerImpl(Producer producer) { + PartitionedProducerImpl* partitionedProducer = + static_cast(producer.impl_.get()); + return *partitionedProducer; + } + + static void updatePartitions(PartitionedProducerImpl& partitionedProducer, int newPartitions) { + LookupDataResultPtr lookupData = std::make_shared(); + lookupData->setPartitions(newPartitions); + partitionedProducer.handleGetPartitions(ResultOk, lookupData); + } }; } // namespace pulsar