Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ void ProducerImpl::connectionFailed(Result result) {

void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
Lock lock(mutex_);
Comment thread
BewareMyPower marked this conversation as resolved.

LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));

// make sure we're still in the Pending/Ready state, closeAsync could have been invoked
Expand All @@ -180,11 +182,21 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
if (state != Ready && state != Pending) {
LOG_DEBUG("Producer created response received but producer already closed");
failPendingMessages(ResultAlreadyClosed, false);
if (result == ResultOk || result == ResultTimeout) {
auto client = client_.lock();
if (client) {
int requestId = client->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}
}
if (!producerCreatedPromise_.isComplete()) {
lock.unlock();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
return;
}

if (result == ResultOk) {
Lock lock(mutex_);
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
Expand All @@ -203,7 +215,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
setCnx(cnx);
state_ = Ready;
backoff_.reset();
lock.unlock();

if (conf_.isEncryptionEnabled()) {
auto weakSelf = weak_from_this();
Expand All @@ -226,6 +237,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
startSendTimeoutTimer();
}

lock.unlock();
producerCreatedPromise_.setValue(shared_from_this());

} else {
Expand All @@ -234,22 +246,26 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
// since we are not closing the connection
int requestId = client_.lock()->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
auto client = client_.lock();
if (client) {
int requestId = client->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}
}

if (result == ResultProducerFenced) {
state_ = Producer_Fenced;
failPendingMessages(result, true);
failPendingMessages(result, false);
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
}
lock.unlock();
producerCreatedPromise_.setFailed(result);
} else if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
} else if (result == ResultProducerBlockedQuotaExceededError) {
LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
}
Expand All @@ -264,9 +280,10 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
scheduleReconnection(shared_from_this());
} else {
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
failPendingMessages(result, true);
producerCreatedPromise_.setFailed(result);
failPendingMessages(result, false);
state_ = Failed;
lock.unlock();
producerCreatedPromise_.setFailed(result);
}
}
}
Expand Down Expand Up @@ -694,6 +711,8 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) {
}
};

Lock lock(mutex_);
Comment thread
BewareMyPower marked this conversation as resolved.

// if the producer was never started then there is nothing to clean up
State expectedState = NotStarted;
if (state_.compare_exchange_strong(expectedState, Closed)) {
Expand Down
43 changes: 43 additions & 0 deletions tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,47 @@ TEST(ProducerTest, testCloseSubProducerWhenFail) {
client.close();
}

TEST(ProducerTest, testCloseProducerBeforeCreated) {
Client client(serviceUrl);

std::string ns = "test-close-producer-before-created";
std::string localName = std::string("testCloseProducerBeforeCreated") + std::to_string(time(nullptr));
std::string topicName = "persistent://public/" + ns + '/' + localName;
const int maxProducersPerTopic = 10;
const int partitionNum = 5;

// call admin api to create namespace with max prodcuer limit
std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
int res =
makePutRequest(url, "{\"max_producers_per_topic\": " + std::to_string(maxProducersPerTopic) + "}");
ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;

// call admin api to create partitioned topic
res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + localName + "/partitions",
std::to_string(partitionNum));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

ProducerConfiguration producerConfiguration;
producerConfiguration.setLazyStartPartitionedProducers(true);
producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
producerConfiguration.setBatchingEnabled(false);

Message msg = MessageBuilder().setContent("test").build();
for (int i = 0; i < maxProducersPerTopic * 100; ++i) {
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
// trigger lazy producer creation
for (int j = 0; j < partitionNum; ++j) {
producer.sendAsync(msg, [](pulsar::Result, const pulsar::MessageId&) {});
}
producer.close();
}

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, {}, producer));
producer.close();

client.close();
}

INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));