diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 0418cfaf..520901c2 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -519,6 +519,26 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ bool isAutoAckOldestChunkedMessageOnQueueFull() const; + /** + * If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if + * consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature. + * + * Default: 60000, which means 1 minutes + * + * @param expireTimeOfIncompleteChunkedMessageMs expire time in milliseconds + * @return Consumer Configuration + */ + ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs( + long expireTimeOfIncompleteChunkedMessageMs); + + /** + * + * Get the expire time of incomplete chunked message in milliseconds + * + * @return the expire time of incomplete chunked message in milliseconds + */ + long getExpireTimeOfIncompleteChunkedMessageMs() const; + /** * Set the consumer to include the given position of any reset operation like Consumer::seek. * diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index 74427a26..a778660a 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -200,6 +200,7 @@ class PULSAR_PUBLIC Message { friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const StringMap& map); friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const Message& msg); + friend class PulsarFriend; }; } // namespace pulsar diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index 62982458..f37e042d 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -262,6 +262,16 @@ bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const { return impl_->autoAckOldestChunkedMessageOnQueueFull; } +ConsumerConfiguration& ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs( + long expireTimeOfIncompleteChunkedMessageMs) { + impl_->expireTimeOfIncompleteChunkedMessageMs = expireTimeOfIncompleteChunkedMessageMs; + return *this; +} + +long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const { + return impl_->expireTimeOfIncompleteChunkedMessageMs; +} + ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) { impl_->startMessageIdInclusive = startMessageIdInclusive; return *this; diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index 444fedf9..259b9354 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -55,6 +55,7 @@ struct ConsumerConfigurationImpl { size_t maxPendingChunkedMessage{10}; bool autoAckOldestChunkedMessageOnQueueFull{false}; bool startMessageIdInclusive{false}; + long expireTimeOfIncompleteChunkedMessageMs{60000}; }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 19d5055c..48c9b913 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -78,7 +78,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, readCompacted_(conf.isReadCompacted()), startMessageId_(startMessageId), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), - autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()) { + autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), + expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()) { std::stringstream consumerStrStream; consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); @@ -109,6 +110,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, if (conf.isEncryptionEnabled()) { msgCrypto_ = std::make_shared(consumerStr_, false); } + + checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); } ConsumerImpl::~ConsumerImpl() { @@ -319,6 +322,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { } } +void ConsumerImpl::triggerCheckExpiredChunkedTimer() { + checkExpiredChunkedTimer_->expires_from_now( + boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + std::weak_ptr weakSelf{shared_from_this()}; + checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (ec) { + LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec + << "]."); + return; + } + Lock lock(chunkProcessMutex_); + long currentTimeMs = TimeUtils::currentTimeMillis(); + chunkedMessageCache_.removeOldestValuesIf( + [this, currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool { + bool expired = + currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_; + if (!expired) { + return false; + } + for (const MessageId& msgId : ctx.getChunkedMessageIds()) { + LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId); + doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge discarded chunk, uuid: " + << uuid << ", messageId: " << msgId); + } + }); + } + return true; + }); + triggerCheckExpiredChunkedTimer(); + return; + }); +} + Optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, const MessageId& messageId, @@ -331,6 +373,14 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay << payload.readableBytes() << " bytes"); Lock lock(chunkProcessMutex_); + + // Lazy task scheduling to expire incomplete chunk message + bool expected = false; + if (expireTimeOfIncompleteChunkedMessageMs_ > 0 && + expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) { + triggerCheckExpiredChunkedTimer(); + } + auto it = chunkedMessageCache_.find(uuid); if (chunkId == 0) { @@ -1436,6 +1486,7 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { void ConsumerImpl::cancelTimers() noexcept { boost::system::error_code ec; batchReceiveTimer_->cancel(ec); + checkExpiredChunkedTimer_->cancel(ec); } } /* namespace pulsar */ diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index d65676ba..13cd6f2f 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -33,6 +33,7 @@ #include "NegativeAcksTracker.h" #include "Synchronized.h" #include "TestUtil.h" +#include "TimeUtils.h" #include "UnboundedBlockingQueue.h" namespace pulsar { @@ -257,6 +258,7 @@ class ConsumerImpl : public ConsumerImplBase { void appendChunk(const MessageId& messageId, const SharedBuffer& payload) { chunkedMessageIds_.emplace_back(messageId); chunkedMsgBuffer_.write(payload.data(), payload.readableBytes()); + receivedTimeMs_ = TimeUtils::currentTimeMillis(); } bool isCompleted() const noexcept { return totalChunks_ == numChunks(); } @@ -265,6 +267,8 @@ class ConsumerImpl : public ConsumerImplBase { const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } + long getReceivedTimeMs() const noexcept { return receivedTimeMs_; } + friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) { return os << "ChunkedMessageCtx " << ctx.chunkedMsgBuffer_.readableBytes() << " of " << ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << ctx.numChunks() << " of " @@ -275,6 +279,7 @@ class ConsumerImpl : public ConsumerImplBase { const int totalChunks_; SharedBuffer chunkedMsgBuffer_; std::vector chunkedMessageIds_; + long receivedTimeMs_; int numChunks() const noexcept { return static_cast(chunkedMessageIds_.size()); } }; @@ -295,6 +300,12 @@ class ConsumerImpl : public ConsumerImplBase { MapCache chunkedMessageCache_; mutable std::mutex chunkProcessMutex_; + const long expireTimeOfIncompleteChunkedMessageMs_; + DeadlineTimerPtr checkExpiredChunkedTimer_; + std::atomic_bool expireChunkMessageTaskScheduled_{false}; + + void triggerCheckExpiredChunkedTimer(); + /** * Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the * payload and return it. diff --git a/lib/MapCache.h b/lib/MapCache.h index b9a0069e..55d58f63 100644 --- a/lib/MapCache.h +++ b/lib/MapCache.h @@ -73,6 +73,23 @@ class MapCache { } } + void removeOldestValuesIf(const std::function& condition) { + if (!condition) return; + while (!keys_.empty()) { + const auto key = keys_.front(); + auto it = map_.find(key); + if (it == map_.end()) { + continue; + } + if (condition(it->first, it->second)) { + map_.erase(it); + keys_.pop_front(); + } else { + return; + } + } + } + void remove(const Key& key) { auto it = map_.find(key); if (it != map_.end()) { diff --git a/tests/MapCacheTest.cc b/tests/MapCacheTest.cc index 2140937f..69496c49 100644 --- a/tests/MapCacheTest.cc +++ b/tests/MapCacheTest.cc @@ -77,3 +77,33 @@ TEST(MapCacheTest, testRemoveAllValues) { ASSERT_TRUE(cache.getKeys().empty()); ASSERT_EQ(cache.size(), 0); } + +TEST(MapCacheTest, testRemoveOldestValuesIf) { + MapCache cache; + cache.putIfAbsent(1, {100}); + cache.putIfAbsent(2, {200}); + cache.putIfAbsent(3, {300}); + int expireTime = 100; + + auto checkCondition = [&expireTime](const int& key, const MoveOnlyInt& value) -> bool { + return expireTime > value.x; + }; + + cache.removeOldestValuesIf(nullptr); + ASSERT_EQ(cache.size(), 3); + + cache.removeOldestValuesIf(checkCondition); + ASSERT_EQ(cache.size(), 3); + + expireTime = 200; + cache.removeOldestValuesIf(checkCondition); + + auto keys = cache.getKeys(); + ASSERT_EQ(cache.size(), 2); + ASSERT_EQ(cache.find(2)->second.x, 200); + ASSERT_EQ(cache.find(3)->second.x, 300); + + expireTime = 400; + cache.removeOldestValuesIf(checkCondition); + ASSERT_EQ(cache.size(), 0); +} diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 61a97144..8675886f 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -23,6 +23,7 @@ #include #include "PulsarFriend.h" +#include "WaitUtils.h" #include "lib/LogUtils.h" DECLARE_LOG_OBJECT() @@ -81,6 +82,10 @@ class MessageChunkingTest : public ::testing::TestWithParam { ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer)); } + void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) { + ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer)); + } + private: Client client_{lookupUrl}; }; @@ -130,6 +135,49 @@ TEST_P(MessageChunkingTest, testEndToEnd) { // Verify the cache has been cleared auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); ASSERT_EQ(chunkedMessageCache.size(), 0); + + producer.close(); + consumer.close(); +} + +TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) { + // This test is time-consuming and is not related to the compressionType. So skip other compressionType + // here. + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) + + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + auto msg = MessageBuilder().setContent("test-data").build(); + auto& metadata = PulsarFriend::getMessageMetadata(msg); + metadata.set_num_chunks_from_msg(2); + metadata.set_chunk_id(0); + metadata.set_total_chunk_msg_size(100); + + producer.send(msg); + + auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); + + waitUntil( + std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0; }, 1000); + ASSERT_EQ(chunkedMessageCache.size(), 1); + + // Wait for triggering the check of the expiration. + // Need to wait for 2 * expireTime because there may be a gap in checking the expiration time. + waitUntil( + std::chrono::seconds(10), [&] { return chunkedMessageCache.size() == 0; }, 1000); + ASSERT_EQ(chunkedMessageCache.size(), 0); + + producer.close(); + consumer.close(); } // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 18f2bb66..c5fdfe09 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -16,12 +16,15 @@ * specific language governing permissions and limitations * under the License. */ +#ifndef PULSAR_FRIEND_HPP_ +#define PULSAR_FRIEND_HPP_ #include #include "lib/ClientConnection.h" #include "lib/ClientImpl.h" #include "lib/ConsumerImpl.h" +#include "lib/MessageImpl.h" #include "lib/MultiTopicsConsumerImpl.h" #include "lib/NamespaceName.h" #include "lib/PartitionedProducerImpl.h" @@ -164,5 +167,9 @@ class PulsarFriend { static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) { return lookupService.backoffTimers_.size(); } + + static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } }; } // namespace pulsar + +#endif /* PULSAR_FRIEND_HPP_ */ diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h index abe3efcc..d7db82e5 100644 --- a/tests/WaitUtils.h +++ b/tests/WaitUtils.h @@ -25,14 +25,15 @@ namespace pulsar { template -inline void waitUntil(std::chrono::duration timeout, std::function condition) { +inline void waitUntil(std::chrono::duration timeout, const std::function& condition, + long durationMs = 10) { auto timeoutMs = std::chrono::duration_cast(timeout).count(); while (timeoutMs > 0) { auto now = std::chrono::high_resolution_clock::now(); if (condition()) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(durationMs)); auto elapsed = std::chrono::duration_cast( std::chrono::high_resolution_clock::now() - now) .count();