diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index c94bcbe7..d805dd33 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -39,6 +39,7 @@ struct OpSendMsg { boost::posix_time::ptime timeout_; uint32_t messagesCount_; uint64_t messagesSize_; + std::vector> trackerCallbacks_; OpSendMsg() = default; @@ -59,6 +60,13 @@ struct OpSendMsg { if (sendCallback_) { sendCallback_(result, messageId); } + for (const auto& trackerCallback : trackerCallbacks_) { + trackerCallback(result); + } + } + + void addTrackerCallback(std::function trackerCallback) { + trackerCallbacks_.emplace_back(trackerCallback); } }; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 05ab13d3..f3e61204 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -332,17 +332,25 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen } void ProducerImpl::flushAsync(FlushCallback callback) { + if (state_ != Ready) { + callback(ResultAlreadyClosed); + return; + } if (batchMessageContainer_) { - if (state_ == Ready) { - Lock lock(mutex_); - auto failures = batchMessageAndSend(callback); + Lock lock(mutex_); + auto failures = batchMessageAndSend(callback); + lock.unlock(); + failures.complete(); + } else { + Lock lock(mutex_); + if (!pendingMessagesQueue_.empty()) { + auto& opSendMsg = pendingMessagesQueue_.back(); lock.unlock(); - failures.complete(); + opSendMsg.addTrackerCallback(callback); } else { - callback(ResultAlreadyClosed); + lock.unlock(); + callback(ResultOk); } - } else { - callback(ResultOk); } } diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index ee07cbb9..77a79e1a 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -245,11 +245,10 @@ TEST_P(ProducerTest, testMaxMessageSize) { client.close(); } -TEST_P(ProducerTest, testChunkingMaxMessageSize) { +TEST(ProducerTest, testChunkingMaxMessageSize) { Client client(serviceUrl); - const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") + - (GetParam() ? "batch-" : "no-batch-") + std::to_string(time(nullptr)); + const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") + std::to_string(time(nullptr)); Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); @@ -297,4 +296,44 @@ TEST(ProducerTest, testExclusiveProducer) { ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3)); } +TEST_P(ProducerTest, testFlushNoBatch) { + Client client(serviceUrl); + + auto partitioned = GetParam(); + const auto topicName = std::string("testFlushNoBatch") + + (partitioned ? "partitioned-" : "-no-partitioned-") + + std::to_string(time(nullptr)); + + if (partitioned) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + ProducerConfiguration producerConfiguration; + producerConfiguration.setBatchingEnabled(false); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer)); + + std::atomic_int needCallBack(100); + auto cb = [&needCallBack](Result code, const MessageId& msgId) { + ASSERT_EQ(code, ResultOk); + needCallBack.fetch_sub(1); + }; + + for (int i = 0; i < 100; ++i) { + Message msg = MessageBuilder().setContent("content").build(); + producer.sendAsync(msg, cb); + } + + producer.flush(); + ASSERT_EQ(needCallBack.load(), 0); + producer.close(); + + client.close(); +} + INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));