diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 7fa3ff29..a3a5a95a 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -172,6 +172,8 @@ void ProducerImpl::connectionFailed(Result result) { void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result, const ResponseData& responseData) { + Lock lock(mutex_); + LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result)); // make sure we're still in the Pending/Ready state, closeAsync could have been invoked @@ -180,11 +182,21 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r if (state != Ready && state != Pending) { LOG_DEBUG("Producer created response received but producer already closed"); failPendingMessages(ResultAlreadyClosed, false); + if (result == ResultOk || result == ResultTimeout) { + auto client = client_.lock(); + if (client) { + int requestId = client->newRequestId(); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + } + } + if (!producerCreatedPromise_.isComplete()) { + lock.unlock(); + producerCreatedPromise_.setFailed(ResultAlreadyClosed); + } return; } if (result == ResultOk) { - Lock lock(mutex_); // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString()); @@ -203,7 +215,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r setCnx(cnx); state_ = Ready; backoff_.reset(); - lock.unlock(); if (conf_.isEncryptionEnabled()) { auto weakSelf = weak_from_this(); @@ -226,6 +237,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r startSendTimeoutTimer(); } + lock.unlock(); producerCreatedPromise_.setValue(shared_from_this()); } else { @@ -234,22 +246,26 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r // Creating the producer has timed out. We need to ensure the broker closes the producer // in case it was indeed created, otherwise it might prevent new create producer operation, // since we are not closing the connection - int requestId = client_.lock()->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + auto client = client_.lock(); + if (client) { + int requestId = client->newRequestId(); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + } } if (result == ResultProducerFenced) { state_ = Producer_Fenced; - failPendingMessages(result, true); + failPendingMessages(result, false); auto client = client_.lock(); if (client) { client->cleanupProducer(this); } + lock.unlock(); producerCreatedPromise_.setFailed(result); } else if (producerCreatedPromise_.isComplete()) { if (result == ResultProducerBlockedQuotaExceededException) { LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer"); - failPendingMessages(ResultProducerBlockedQuotaExceededException, true); + failPendingMessages(ResultProducerBlockedQuotaExceededException, false); } else if (result == ResultProducerBlockedQuotaExceededError) { LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic"); } @@ -264,9 +280,10 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(shared_from_this()); } else { LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); - failPendingMessages(result, true); - producerCreatedPromise_.setFailed(result); + failPendingMessages(result, false); state_ = Failed; + lock.unlock(); + producerCreatedPromise_.setFailed(result); } } } @@ -694,6 +711,8 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { } }; + Lock lock(mutex_); + // if the producer was never started then there is nothing to clean up State expectedState = NotStarted; if (state_.compare_exchange_strong(expectedState, Closed)) { diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 74d3cf20..69697359 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -424,4 +424,47 @@ TEST(ProducerTest, testCloseSubProducerWhenFail) { client.close(); } +TEST(ProducerTest, testCloseProducerBeforeCreated) { + Client client(serviceUrl); + + std::string ns = "test-close-producer-before-created"; + std::string localName = std::string("testCloseProducerBeforeCreated") + std::to_string(time(nullptr)); + std::string topicName = "persistent://public/" + ns + '/' + localName; + const int maxProducersPerTopic = 10; + const int partitionNum = 5; + + // call admin api to create namespace with max prodcuer limit + std::string url = adminUrl + "admin/v2/namespaces/public/" + ns; + int res = + makePutRequest(url, "{\"max_producers_per_topic\": " + std::to_string(maxProducersPerTopic) + "}"); + ASSERT_TRUE(res == 204 || res == 409) << "res:" << res; + + // call admin api to create partitioned topic + res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + localName + "/partitions", + std::to_string(partitionNum)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ProducerConfiguration producerConfiguration; + producerConfiguration.setLazyStartPartitionedProducers(true); + producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + producerConfiguration.setBatchingEnabled(false); + + Message msg = MessageBuilder().setContent("test").build(); + for (int i = 0; i < maxProducersPerTopic * 100; ++i) { + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + // trigger lazy producer creation + for (int j = 0; j < partitionNum; ++j) { + producer.sendAsync(msg, [](pulsar::Result, const pulsar::MessageId&) {}); + } + producer.close(); + } + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, {}, producer)); + producer.close(); + + client.close(); +} + INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));