From 4b46b4519af34b2412d174c4bfb1d28f85111d2a Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 25 Oct 2022 18:08:40 +0800 Subject: [PATCH 01/10] [feat] Support expiration for chunked messages --- include/pulsar/ConsumerConfiguration.h | 20 +++++++++++ include/pulsar/Message.h | 1 + lib/ConsumerConfiguration.cc | 10 ++++++ lib/ConsumerConfigurationImpl.h | 1 + lib/ConsumerImpl.cc | 33 +++++++++++++++++- lib/ConsumerImpl.h | 10 ++++++ lib/MapCache.h | 17 +++++++++ tests/MapCacheTest.cc | 30 ++++++++++++++++ tests/MessageChunkingTest.cc | 48 ++++++++++++++++++++++++++ tests/PulsarFriend.h | 4 +++ tests/WaitUtils.h | 5 +-- 11 files changed, 176 insertions(+), 3 deletions(-) diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 13d5cc02..2e4a8a30 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -517,6 +517,26 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ bool isAutoAckOldestChunkedMessageOnQueueFull() const; + /** + * If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if + * consumer won't be able to receive all chunks in expire times. + * + * Default: 60000, which means 1 minutes + * + * @param expireTimeOfIncompleteChunkedMessageMs expire time in milliseconds + * @return Consumer Configuration + */ + ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs( + long expireTimeOfIncompleteChunkedMessageMs); + + /** + * + * Get the expire time of incomplete chunked message in milliseconds + * + * @return the expire time of incomplete chunked message in milliseconds + */ + long getExpireTimeOfIncompleteChunkedMessageMs() const; + /** * Set the consumer to include the given position of any reset operation like Consumer::seek. * diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index 935236bd..24c0c522 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -192,6 +192,7 @@ class PULSAR_PUBLIC Message { friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const StringMap& map); friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const Message& msg); + friend class PulsarFriend; }; } // namespace pulsar diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index 0705cca3..663faee1 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -261,6 +261,16 @@ bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const { return impl_->autoAckOldestChunkedMessageOnQueueFull; } +ConsumerConfiguration& ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs( + long expireTimeOfIncompleteChunkedMessageMs) { + impl_->expireTimeOfIncompleteChunkedMessageMs = expireTimeOfIncompleteChunkedMessageMs; + return *this; +} + +long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const { + return impl_->expireTimeOfIncompleteChunkedMessageMs; +} + ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) { impl_->startMessageIdInclusive = startMessageIdInclusive; return *this; diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index 444fedf9..259b9354 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -55,6 +55,7 @@ struct ConsumerConfigurationImpl { size_t maxPendingChunkedMessage{10}; bool autoAckOldestChunkedMessageOnQueueFull{false}; bool startMessageIdInclusive{false}; + long expireTimeOfIncompleteChunkedMessageMs{60000}; }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 7be5a6aa..ecf06415 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -69,7 +69,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, readCompacted_(conf.isReadCompacted()), startMessageId_(startMessageId), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), - autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()) { + autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), + expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()) { std::stringstream consumerStrStream; consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); @@ -310,6 +311,27 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { } } +void ConsumerImpl::triggerCheckExpiredChunkedTimer() { + checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); + checkExpiredChunkedTimer_->expires_from_now( + boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void { + if (ec) { + LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec + << "]."); + return; + } + Lock lock(chunkProcessMutex_); + long currentTimeMs = TimeUtils::currentTimeMillis(); + chunkedMessageCache_.removeOldestValuesIf( + [this, ¤tTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { + return currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; + }); + triggerCheckExpiredChunkedTimer(); + return; + }); +} + Optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, const MessageId& messageId, @@ -322,6 +344,12 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay << payload.readableBytes() << " bytes"); Lock lock(chunkProcessMutex_); + + // Lazy task scheduling to expire incomplete chunk message + if(!checkExpiredChunkedTimer_){ + triggerCheckExpiredChunkedTimer(); + } + auto it = chunkedMessageCache_.find(uuid); if (chunkId == 0) { @@ -1422,6 +1450,9 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { void ConsumerImpl::cancelTimers() noexcept { boost::system::error_code ec; batchReceiveTimer_->cancel(ec); + if(checkExpiredChunkedTimer_){ + checkExpiredChunkedTimer_->cancel(ec); + } } } /* namespace pulsar */ diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 3aa632a7..44ae2943 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -47,6 +47,7 @@ #include #include #include "Synchronized.h" +#include "TimeUtils.h" using namespace pulsar; @@ -258,6 +259,7 @@ class ConsumerImpl : public ConsumerImplBase { void appendChunk(const MessageId& messageId, const SharedBuffer& payload) { chunkedMessageIds_.emplace_back(messageId); chunkedMsgBuffer_.write(payload.data(), payload.readableBytes()); + receivedTimeMs_ = TimeUtils::currentTimeMillis(); } bool isCompleted() const noexcept { return totalChunks_ == numChunks(); } @@ -266,6 +268,8 @@ class ConsumerImpl : public ConsumerImplBase { const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } + long getReceivedTimeMs() const noexcept { return receivedTimeMs_; } + friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) { return os << "ChunkedMessageCtx " << ctx.chunkedMsgBuffer_.readableBytes() << " of " << ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << ctx.numChunks() << " of " @@ -276,6 +280,7 @@ class ConsumerImpl : public ConsumerImplBase { const int totalChunks_; SharedBuffer chunkedMsgBuffer_; std::vector chunkedMessageIds_; + long receivedTimeMs_; int numChunks() const noexcept { return static_cast(chunkedMessageIds_.size()); } }; @@ -296,6 +301,11 @@ class ConsumerImpl : public ConsumerImplBase { MapCache chunkedMessageCache_; mutable std::mutex chunkProcessMutex_; + const long expireTimeOfIncompleteChunkedMessageMs_; + DeadlineTimerPtr checkExpiredChunkedTimer_; + + void triggerCheckExpiredChunkedTimer(); + /** * Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the * payload and return it. diff --git a/lib/MapCache.h b/lib/MapCache.h index b9a0069e..55d58f63 100644 --- a/lib/MapCache.h +++ b/lib/MapCache.h @@ -73,6 +73,23 @@ class MapCache { } } + void removeOldestValuesIf(const std::function& condition) { + if (!condition) return; + while (!keys_.empty()) { + const auto key = keys_.front(); + auto it = map_.find(key); + if (it == map_.end()) { + continue; + } + if (condition(it->first, it->second)) { + map_.erase(it); + keys_.pop_front(); + } else { + return; + } + } + } + void remove(const Key& key) { auto it = map_.find(key); if (it != map_.end()) { diff --git a/tests/MapCacheTest.cc b/tests/MapCacheTest.cc index 12a89ee1..e66ec0ca 100644 --- a/tests/MapCacheTest.cc +++ b/tests/MapCacheTest.cc @@ -76,3 +76,33 @@ TEST(MapCacheTest, testRemoveAllValues) { ASSERT_TRUE(cache.getKeys().empty()); ASSERT_EQ(cache.size(), 0); } + +TEST(MapCacheTest, testRemoveOldestValuesIf) { + MapCache cache; + cache.putIfAbsent(1, {100}); + cache.putIfAbsent(2, {200}); + cache.putIfAbsent(3, {300}); + int expireTime = 100; + + auto checkCondition = [&expireTime](const int& key, const MoveOnlyInt& value) -> bool { + return expireTime > value.x; + }; + + cache.removeOldestValuesIf(nullptr); + ASSERT_EQ(cache.size(), 3); + + cache.removeOldestValuesIf(checkCondition); + ASSERT_EQ(cache.size(), 3); + + expireTime = 200; + cache.removeOldestValuesIf(checkCondition); + + auto keys = cache.getKeys(); + ASSERT_EQ(cache.size(), 2); + ASSERT_EQ(cache.find(2)->second.x, 200); + ASSERT_EQ(cache.find(3)->second.x, 300); + + expireTime = 400; + cache.removeOldestValuesIf(checkCondition); + ASSERT_EQ(cache.size(), 0); +} diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index ae0114ce..09b52b92 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -23,6 +23,7 @@ #include #include "lib/LogUtils.h" #include "PulsarFriend.h" +#include "WaitUtils.h" DECLARE_LOG_OBJECT() @@ -80,6 +81,10 @@ class MessageChunkingTest : public ::testing::TestWithParam { ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer)); } + void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) { + ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer)); + } + private: Client client_{lookupUrl}; }; @@ -129,6 +134,49 @@ TEST_P(MessageChunkingTest, testEndToEnd) { // Verify the cache has been cleared auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); ASSERT_EQ(chunkedMessageCache.size(), 0); + + producer.close(); + consumer.close(); +} + +TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) { + // This test is time-consuming and is not related to the compressionType. So skip other compressionType + // here. + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) + + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + auto msg = MessageBuilder().setContent("test-data").build(); + auto& metadata = PulsarFriend::getMessageMetadata(msg); + metadata.set_num_chunks_from_msg(2); + metadata.set_chunk_id(0); + metadata.set_total_chunk_msg_size(100); + + producer.send(msg); + + auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); + + waitUntil( + std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0; }, 1000); + ASSERT_EQ(chunkedMessageCache.size(), 1); + + // Wait for triggering the check of the expiration. + // Need to wait for 2 * expireTime because there may be a gap in checking the expiration time. + waitUntil( + std::chrono::seconds(10), [&] { return chunkedMessageCache.size() == 0; }, 1000); + ASSERT_EQ(chunkedMessageCache.size(), 0); + + producer.close(); + consumer.close(); } // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index df8e3dc0..cff7bc1a 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -160,5 +160,9 @@ class PulsarFriend { static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) { return lookupService.backoffTimers_.size(); } + + static proto::MessageMetadata& getMessageMetadata(Message& message) { + return message.impl_->metadata; + } }; } // namespace pulsar diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h index abe3efcc..d7db82e5 100644 --- a/tests/WaitUtils.h +++ b/tests/WaitUtils.h @@ -25,14 +25,15 @@ namespace pulsar { template -inline void waitUntil(std::chrono::duration timeout, std::function condition) { +inline void waitUntil(std::chrono::duration timeout, const std::function& condition, + long durationMs = 10) { auto timeoutMs = std::chrono::duration_cast(timeout).count(); while (timeoutMs > 0) { auto now = std::chrono::high_resolution_clock::now(); if (condition()) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(durationMs)); auto elapsed = std::chrono::duration_cast( std::chrono::high_resolution_clock::now() - now) .count(); From adc0e3f41495bc311d4236ec10d051c956f510a5 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 25 Oct 2022 18:31:55 +0800 Subject: [PATCH 02/10] Fix merge issues --- lib/ConsumerImpl.h | 1 + tests/PulsarFriend.h | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 9e8e8a34..bc9357b7 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -34,6 +34,7 @@ #include "Synchronized.h" #include "TestUtil.h" #include "UnboundedBlockingQueue.h" +#include "TimeUtils.h" using namespace pulsar; diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index e22256af..411fee4e 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -16,6 +16,8 @@ * specific language governing permissions and limitations * under the License. */ +#ifndef PULSAR_FRIEND_HPP_ +#define PULSAR_FRIEND_HPP_ #include @@ -30,6 +32,7 @@ #include "lib/RetryableLookupService.h" #include "lib/stats/ConsumerStatsImpl.h" #include "lib/stats/ProducerStatsImpl.h" +#include "lib/MessageImpl.h" using std::string; @@ -170,3 +173,5 @@ class PulsarFriend { } }; } // namespace pulsar + +#endif /* PULSAR_FRIEND_HPP_ */ From 63e86a7a68046bd0e3301281b17138a281c0ae46 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 25 Oct 2022 19:18:35 +0800 Subject: [PATCH 03/10] Fix format --- lib/ConsumerImpl.cc | 4 ++-- lib/ConsumerImpl.h | 2 +- tests/MessageChunkingTest.cc | 3 +-- tests/PulsarFriend.h | 6 ++---- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index fe5dd885..8ab2356e 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -355,7 +355,7 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay Lock lock(chunkProcessMutex_); // Lazy task scheduling to expire incomplete chunk message - if(!checkExpiredChunkedTimer_){ + if (!checkExpiredChunkedTimer_) { triggerCheckExpiredChunkedTimer(); } @@ -1460,7 +1460,7 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { void ConsumerImpl::cancelTimers() noexcept { boost::system::error_code ec; batchReceiveTimer_->cancel(ec); - if(checkExpiredChunkedTimer_){ + if (checkExpiredChunkedTimer_) { checkExpiredChunkedTimer_->cancel(ec); } } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index bc9357b7..82fbb7c2 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -33,8 +33,8 @@ #include "NegativeAcksTracker.h" #include "Synchronized.h" #include "TestUtil.h" -#include "UnboundedBlockingQueue.h" #include "TimeUtils.h" +#include "UnboundedBlockingQueue.h" using namespace pulsar; diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 2da4e025..8675886f 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -22,10 +22,9 @@ #include #include -#include "PulsarFriend.h" -#include "lib/LogUtils.h" #include "PulsarFriend.h" #include "WaitUtils.h" +#include "lib/LogUtils.h" DECLARE_LOG_OBJECT() diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 411fee4e..c5fdfe09 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -24,6 +24,7 @@ #include "lib/ClientConnection.h" #include "lib/ClientImpl.h" #include "lib/ConsumerImpl.h" +#include "lib/MessageImpl.h" #include "lib/MultiTopicsConsumerImpl.h" #include "lib/NamespaceName.h" #include "lib/PartitionedProducerImpl.h" @@ -32,7 +33,6 @@ #include "lib/RetryableLookupService.h" #include "lib/stats/ConsumerStatsImpl.h" #include "lib/stats/ProducerStatsImpl.h" -#include "lib/MessageImpl.h" using std::string; @@ -168,9 +168,7 @@ class PulsarFriend { return lookupService.backoffTimers_.size(); } - static proto::MessageMetadata& getMessageMetadata(Message& message) { - return message.impl_->metadata; - } + static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } }; } // namespace pulsar From 07f019f4e84f411b156fcdb7339c824bb9fe5111 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 28 Oct 2022 10:48:42 +0800 Subject: [PATCH 04/10] Apply comments. --- lib/ConsumerImpl.cc | 9 +++++++-- lib/ConsumerImpl.h | 2 -- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 8ab2356e..2548dff8 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -321,10 +321,14 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { } void ConsumerImpl::triggerCheckExpiredChunkedTimer() { - checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); checkExpiredChunkedTimer_->expires_from_now( boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); - checkExpiredChunkedTimer_->async_wait([this](const boost::system::error_code& ec) -> void { + std::weak_ptr weakSelf{shared_from_this()}; + checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void { + auto self = weakSelf.lock(); + if (!self) { + return; + } if (ec) { LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec << "]."); @@ -356,6 +360,7 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay // Lazy task scheduling to expire incomplete chunk message if (!checkExpiredChunkedTimer_) { + checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); triggerCheckExpiredChunkedTimer(); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 82fbb7c2..6f58aaf6 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -36,8 +36,6 @@ #include "TimeUtils.h" #include "UnboundedBlockingQueue.h" -using namespace pulsar; - namespace pulsar { class UnAckedMessageTrackerInterface; class ExecutorService; From 2c911a83721a3fde5ed334e1e8f78dfa177a28b9 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 3 Nov 2022 15:37:47 +0800 Subject: [PATCH 05/10] Discard chunk messages when expired. --- lib/ConsumerImpl.cc | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 40c894a0..055eb3b8 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -338,7 +338,24 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { long currentTimeMs = TimeUtils::currentTimeMillis(); chunkedMessageCache_.removeOldestValuesIf( [this, ¤tTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { - return currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; + bool expired = + currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; + if (!expired) { + return false; + } + for (const MessageId& msgId : ctx.getChunkedMessageIds()) { + if (autoAckOldestChunkedMessageOnQueueFull_) { + doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge discarded chunk, uuid: " + << uuid << ", messageId: " << msgId); + } + }); + } else { + trackMessage(msgId); + } + } + return true; }); triggerCheckExpiredChunkedTimer(); return; From 16d38856047c36ae181979f8d9c92d2b64833477 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 3 Nov 2022 20:18:25 +0800 Subject: [PATCH 06/10] Ack all expired messages. --- lib/ConsumerImpl.cc | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 055eb3b8..69eaaea6 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -344,16 +344,13 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { return false; } for (const MessageId& msgId : ctx.getChunkedMessageIds()) { - if (autoAckOldestChunkedMessageOnQueueFull_) { - doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) { - if (result != ResultOk) { - LOG_WARN("Failed to acknowledge discarded chunk, uuid: " - << uuid << ", messageId: " << msgId); - } - }); - } else { - trackMessage(msgId); - } + LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId); + doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge discarded chunk, uuid: " + << uuid << ", messageId: " << msgId); + } + }); } return true; }); From 992e31c15aa5dacb0eb86590ea2c87a29157cfa2 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 4 Nov 2022 16:12:51 +0800 Subject: [PATCH 07/10] Apply comments. --- lib/ConsumerImpl.cc | 12 ++++++------ lib/ConsumerImpl.h | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 69eaaea6..0d6cee90 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -110,6 +110,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, if (conf.isEncryptionEnabled()) { msgCrypto_ = std::make_shared(consumerStr_, false); } + + checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); } ConsumerImpl::~ConsumerImpl() { @@ -337,7 +339,7 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { Lock lock(chunkProcessMutex_); long currentTimeMs = TimeUtils::currentTimeMillis(); chunkedMessageCache_.removeOldestValuesIf( - [this, ¤tTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { + [this, currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { bool expired = currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; if (!expired) { @@ -373,8 +375,8 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay Lock lock(chunkProcessMutex_); // Lazy task scheduling to expire incomplete chunk message - if (!checkExpiredChunkedTimer_) { - checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); + if (!expireChunkMessageTaskScheduled_) { + expireChunkMessageTaskScheduled_ = true; triggerCheckExpiredChunkedTimer(); } @@ -1483,9 +1485,7 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { void ConsumerImpl::cancelTimers() noexcept { boost::system::error_code ec; batchReceiveTimer_->cancel(ec); - if (checkExpiredChunkedTimer_) { - checkExpiredChunkedTimer_->cancel(ec); - } + checkExpiredChunkedTimer_->cancel(ec); } } /* namespace pulsar */ diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 6f58aaf6..04a35f7e 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -302,6 +302,7 @@ class ConsumerImpl : public ConsumerImplBase { const long expireTimeOfIncompleteChunkedMessageMs_; DeadlineTimerPtr checkExpiredChunkedTimer_; + bool expireChunkMessageTaskScheduled_ = false; void triggerCheckExpiredChunkedTimer(); From f09e9ef11e48b647fbba0ceb1efc687cbb29591f Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 7 Nov 2022 10:42:34 +0800 Subject: [PATCH 08/10] Use CAS to access `expireChunkMessageTaskScheduled_`. --- lib/ConsumerImpl.cc | 4 ++-- lib/ConsumerImpl.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 0d6cee90..5ce2f170 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -375,8 +375,8 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay Lock lock(chunkProcessMutex_); // Lazy task scheduling to expire incomplete chunk message - if (!expireChunkMessageTaskScheduled_) { - expireChunkMessageTaskScheduled_ = true; + bool expected = false; + if (expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) { triggerCheckExpiredChunkedTimer(); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 04a35f7e..ea1e6de8 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -302,7 +302,7 @@ class ConsumerImpl : public ConsumerImplBase { const long expireTimeOfIncompleteChunkedMessageMs_; DeadlineTimerPtr checkExpiredChunkedTimer_; - bool expireChunkMessageTaskScheduled_ = false; + std::atomic_bool expireChunkMessageTaskScheduled_ = {false}; void triggerCheckExpiredChunkedTimer(); From d8d36ae3a28af92bde369765e7aa417547c43444 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 7 Nov 2022 10:57:30 +0800 Subject: [PATCH 09/10] Allow disabling this feature. --- include/pulsar/ConsumerConfiguration.h | 2 +- lib/ConsumerImpl.cc | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index ddb67334..520901c2 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -521,7 +521,7 @@ class PULSAR_PUBLIC ConsumerConfiguration { /** * If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if - * consumer won't be able to receive all chunks in expire times. + * consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature. * * Default: 60000, which means 1 minutes * diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 5ce2f170..48c9b913 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -376,7 +376,8 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay // Lazy task scheduling to expire incomplete chunk message bool expected = false; - if (expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) { + if (expireTimeOfIncompleteChunkedMessageMs_ > 0 && + expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) { triggerCheckExpiredChunkedTimer(); } From 32b9e54649ffef6859570fdc9984acb5e1f1db6f Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 7 Nov 2022 10:58:15 +0800 Subject: [PATCH 10/10] Update lib/ConsumerImpl.h Co-authored-by: Yunze Xu --- lib/ConsumerImpl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index ea1e6de8..13cd6f2f 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -302,7 +302,7 @@ class ConsumerImpl : public ConsumerImplBase { const long expireTimeOfIncompleteChunkedMessageMs_; DeadlineTimerPtr checkExpiredChunkedTimer_; - std::atomic_bool expireChunkMessageTaskScheduled_ = {false}; + std::atomic_bool expireChunkMessageTaskScheduled_{false}; void triggerCheckExpiredChunkedTimer();