Skip to content
20 changes: 20 additions & 0 deletions include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,26 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
bool isAutoAckOldestChunkedMessageOnQueueFull() const;

/**
* If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if
* consumer won't be able to receive all chunks in expire times. Use value 0 to disable this feature.
*
* Default: 60000, which means 1 minutes
*
* @param expireTimeOfIncompleteChunkedMessageMs expire time in milliseconds
* @return Consumer Configuration
*/
ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs(
long expireTimeOfIncompleteChunkedMessageMs);

/**
*
* Get the expire time of incomplete chunked message in milliseconds
*
* @return the expire time of incomplete chunked message in milliseconds
*/
long getExpireTimeOfIncompleteChunkedMessageMs() const;

/**
* Set the consumer to include the given position of any reset operation like Consumer::seek.
*
Expand Down
1 change: 1 addition & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class PULSAR_PUBLIC Message {

friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const StringMap& map);
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const Message& msg);
friend class PulsarFriend;
};
} // namespace pulsar

Expand Down
10 changes: 10 additions & 0 deletions lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ bool ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
return impl_->autoAckOldestChunkedMessageOnQueueFull;
}

ConsumerConfiguration& ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs(
long expireTimeOfIncompleteChunkedMessageMs) {
impl_->expireTimeOfIncompleteChunkedMessageMs = expireTimeOfIncompleteChunkedMessageMs;
return *this;
}

long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const {
return impl_->expireTimeOfIncompleteChunkedMessageMs;
}

ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) {
impl_->startMessageIdInclusive = startMessageIdInclusive;
return *this;
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct ConsumerConfigurationImpl {
size_t maxPendingChunkedMessage{10};
bool autoAckOldestChunkedMessageOnQueueFull{false};
bool startMessageIdInclusive{false};
long expireTimeOfIncompleteChunkedMessageMs{60000};
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
53 changes: 52 additions & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
readCompacted_(conf.isReadCompacted()),
startMessageId_(startMessageId),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()) {
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()) {
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
Expand Down Expand Up @@ -109,6 +110,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
if (conf.isEncryptionEnabled()) {
msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
}

checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
}

ConsumerImpl::~ConsumerImpl() {
Expand Down Expand Up @@ -319,6 +322,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
}
}

void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
checkExpiredChunkedTimer_->expires_from_now(
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void {
Comment thread
RobertIndie marked this conversation as resolved.
auto self = weakSelf.lock();
if (!self) {
return;
}
if (ec) {
LOG_DEBUG(getName() << " Check expired chunked messages was failed or cancelled, code[" << ec
<< "].");
return;
}
Lock lock(chunkProcessMutex_);
long currentTimeMs = TimeUtils::currentTimeMillis();
chunkedMessageCache_.removeOldestValuesIf(
[this, currentTimeMs](const std::string& uuid, const ChunkedMessageCtx& ctx) -> bool {
bool expired =
currentTimeMs > ctx.getReceivedTimeMs() + expireTimeOfIncompleteChunkedMessageMs_;
if (!expired) {
return false;
}
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId);
doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) {
Comment thread
BewareMyPower marked this conversation as resolved.
if (result != ResultOk) {
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
<< uuid << ", messageId: " << msgId);
}
});
}
return true;
});
triggerCheckExpiredChunkedTimer();
return;
});
}

Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
Expand All @@ -331,6 +373,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
<< payload.readableBytes() << " bytes");

Lock lock(chunkProcessMutex_);

// Lazy task scheduling to expire incomplete chunk message
bool expected = false;
if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) {
triggerCheckExpiredChunkedTimer();
}

auto it = chunkedMessageCache_.find(uuid);

if (chunkId == 0) {
Expand Down Expand Up @@ -1436,6 +1486,7 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
void ConsumerImpl::cancelTimers() noexcept {
boost::system::error_code ec;
batchReceiveTimer_->cancel(ec);
checkExpiredChunkedTimer_->cancel(ec);
}

} /* namespace pulsar */
11 changes: 11 additions & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "NegativeAcksTracker.h"
#include "Synchronized.h"
#include "TestUtil.h"
#include "TimeUtils.h"
#include "UnboundedBlockingQueue.h"

namespace pulsar {
Expand Down Expand Up @@ -257,6 +258,7 @@ class ConsumerImpl : public ConsumerImplBase {
void appendChunk(const MessageId& messageId, const SharedBuffer& payload) {
chunkedMessageIds_.emplace_back(messageId);
chunkedMsgBuffer_.write(payload.data(), payload.readableBytes());
receivedTimeMs_ = TimeUtils::currentTimeMillis();
}

bool isCompleted() const noexcept { return totalChunks_ == numChunks(); }
Expand All @@ -265,6 +267,8 @@ class ConsumerImpl : public ConsumerImplBase {

const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }

long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }

friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
return os << "ChunkedMessageCtx " << ctx.chunkedMsgBuffer_.readableBytes() << " of "
<< ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << ctx.numChunks() << " of "
Expand All @@ -275,6 +279,7 @@ class ConsumerImpl : public ConsumerImplBase {
const int totalChunks_;
SharedBuffer chunkedMsgBuffer_;
std::vector<MessageId> chunkedMessageIds_;
long receivedTimeMs_;

int numChunks() const noexcept { return static_cast<int>(chunkedMessageIds_.size()); }
};
Expand All @@ -295,6 +300,12 @@ class ConsumerImpl : public ConsumerImplBase {
MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
mutable std::mutex chunkProcessMutex_;

const long expireTimeOfIncompleteChunkedMessageMs_;
DeadlineTimerPtr checkExpiredChunkedTimer_;
std::atomic_bool expireChunkMessageTaskScheduled_{false};

void triggerCheckExpiredChunkedTimer();

/**
* Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the
* payload and return it.
Expand Down
17 changes: 17 additions & 0 deletions lib/MapCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,23 @@ class MapCache {
}
}

void removeOldestValuesIf(const std::function<bool(const Key&, const Value&)>& condition) {
if (!condition) return;
while (!keys_.empty()) {
const auto key = keys_.front();
auto it = map_.find(key);
if (it == map_.end()) {
continue;
}
if (condition(it->first, it->second)) {
map_.erase(it);
keys_.pop_front();
} else {
return;
}
}
}

void remove(const Key& key) {
auto it = map_.find(key);
if (it != map_.end()) {
Expand Down
30 changes: 30 additions & 0 deletions tests/MapCacheTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,33 @@ TEST(MapCacheTest, testRemoveAllValues) {
ASSERT_TRUE(cache.getKeys().empty());
ASSERT_EQ(cache.size(), 0);
}

TEST(MapCacheTest, testRemoveOldestValuesIf) {
MapCache<int, MoveOnlyInt> cache;
cache.putIfAbsent(1, {100});
cache.putIfAbsent(2, {200});
cache.putIfAbsent(3, {300});
int expireTime = 100;

auto checkCondition = [&expireTime](const int& key, const MoveOnlyInt& value) -> bool {
return expireTime > value.x;
};

cache.removeOldestValuesIf(nullptr);
ASSERT_EQ(cache.size(), 3);

cache.removeOldestValuesIf(checkCondition);
ASSERT_EQ(cache.size(), 3);

expireTime = 200;
cache.removeOldestValuesIf(checkCondition);

auto keys = cache.getKeys();
ASSERT_EQ(cache.size(), 2);
ASSERT_EQ(cache.find(2)->second.x, 200);
ASSERT_EQ(cache.find(3)->second.x, 300);

expireTime = 400;
cache.removeOldestValuesIf(checkCondition);
ASSERT_EQ(cache.size(), 0);
}
48 changes: 48 additions & 0 deletions tests/MessageChunkingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <random>

#include "PulsarFriend.h"
#include "WaitUtils.h"
#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()
Expand Down Expand Up @@ -81,6 +82,10 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
}

void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
}

private:
Client client_{lookupUrl};
};
Expand Down Expand Up @@ -130,6 +135,49 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
// Verify the cache has been cleared
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
ASSERT_EQ(chunkedMessageCache.size(), 0);

producer.close();
consumer.close();
}

TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) {
// This test is time-consuming and is not related to the compressionType. So skip other compressionType
// here.
if (toString(GetParam()) != "None") {
return;
}
const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) +
std::to_string(time(nullptr));
Consumer consumer;
ConsumerConfiguration consumerConf;
consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000);
consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
createConsumer(topic, consumer, consumerConf);
Producer producer;
createProducer(topic, producer);

auto msg = MessageBuilder().setContent("test-data").build();
auto& metadata = PulsarFriend::getMessageMetadata(msg);
metadata.set_num_chunks_from_msg(2);
metadata.set_chunk_id(0);
metadata.set_total_chunk_msg_size(100);

producer.send(msg);

auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);

waitUntil(
std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0; }, 1000);
ASSERT_EQ(chunkedMessageCache.size(), 1);

// Wait for triggering the check of the expiration.
// Need to wait for 2 * expireTime because there may be a gap in checking the expiration time.
waitUntil(
std::chrono::seconds(10), [&] { return chunkedMessageCache.size() == 0; }, 1000);
ASSERT_EQ(chunkedMessageCache.size(), 0);

producer.close();
consumer.close();
}

// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
Expand Down
7 changes: 7 additions & 0 deletions tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_FRIEND_HPP_
#define PULSAR_FRIEND_HPP_

#include <string>

#include "lib/ClientConnection.h"
#include "lib/ClientImpl.h"
#include "lib/ConsumerImpl.h"
#include "lib/MessageImpl.h"
#include "lib/MultiTopicsConsumerImpl.h"
#include "lib/NamespaceName.h"
#include "lib/PartitionedProducerImpl.h"
Expand Down Expand Up @@ -164,5 +167,9 @@ class PulsarFriend {
static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) {
return lookupService.backoffTimers_.size();
}

static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; }
};
} // namespace pulsar

#endif /* PULSAR_FRIEND_HPP_ */
5 changes: 3 additions & 2 deletions tests/WaitUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
namespace pulsar {

template <typename Rep, typename Period>
inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) {
inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
long durationMs = 10) {
auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
while (timeoutMs > 0) {
auto now = std::chrono::high_resolution_clock::now();
if (condition()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - now)
.count();
Expand Down