From 2c2d3a68c8a53c0cd7fada0ab2c58fdc4ef4a8cb Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 26 Sep 2023 19:00:43 +0800 Subject: [PATCH 1/6] [fix] Fix consumer doesn't acknowledge all chunk message Ids --- lib/AckGroupingTracker.cc | 32 ++++++++++++++++++++++++++++---- lib/ChunkMessageIdImpl.h | 11 +++++++++++ lib/ConsumerImpl.cc | 6 ++++-- lib/ConsumerImpl.h | 4 ++++ tests/MessageChunkingTest.cc | 17 ++++++++++++++++- 5 files changed, 63 insertions(+), 7 deletions(-) diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc index 9a471354..1f946a7e 100644 --- a/lib/AckGroupingTracker.cc +++ b/lib/AckGroupingTracker.cc @@ -21,8 +21,10 @@ #include #include +#include #include "BitSet.h" +#include "ChunkMessageIdImpl.h" #include "ClientConnection.h" #include "Commands.h" #include "LogUtils.h" @@ -42,6 +44,14 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c } return; } + if (auto chunkMessageId = + std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { + auto msgIdList = chunkMessageId->moveChunkedMessageIds(); + doImmediateAck(std::set(std::make_move_iterator(msgIdList.begin()), + std::make_move_iterator(msgIdList.end())), + callback); + return; + } const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet(); if (waitResponse_) { const auto requestId = requestIdSupplier_(); @@ -84,29 +94,43 @@ void AckGroupingTracker::doImmediateAck(const std::set& msgIds, Resul return; } + std::set ackMsgIds; + + for (const auto& msgId : msgIds) { + if (auto chunkMessageId = + std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { + auto msgIdList = chunkMessageId->moveChunkedMessageIds(); + doImmediateAck(std::set(std::make_move_iterator(msgIdList.begin()), + std::make_move_iterator(msgIdList.end())), + callback); + } else { + ackMsgIds.insert(msgId); + } + } + if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { if (waitResponse_) { const auto requestId = requestIdSupplier_(); - cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId) + cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId) .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); } }); } else { - cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds)); + cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds)); if (callback) { callback(ResultOk); } } } else { - auto count = std::make_shared>(msgIds.size()); + auto count = std::make_shared>(ackMsgIds.size()); auto wrappedCallback = [callback, count](Result result) { if (--*count == 0 && callback) { callback(result); } }; - for (auto&& msgId : msgIds) { + for (auto&& msgId : ackMsgIds) { doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual); } } diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index 3081ff03..36dddbc1 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -38,11 +38,22 @@ class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_ this->partition_ = msgId.partition(); } + void setChunkedMessageIds(std::vector&& chunkedMessageIds) { + chunkedMessageIds_ = std::move(chunkedMessageIds); + setFirstChunkMessageId(chunkedMessageIds_.front()); + setLastChunkMessageId(chunkedMessageIds_.back()); + } + std::shared_ptr getFirstChunkMessageId() const { return firstChunkMsgId_; } + std::vector moveChunkedMessageIds() const noexcept { + return std::move(chunkedMessageIds_); + } + MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } private: std::shared_ptr firstChunkMsgId_; + std::vector chunkedMessageIds_; }; } // namespace pulsar diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 0ee6a74d..b2bdc8f7 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -479,8 +479,7 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff } ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); - chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); + chunkMsgId->setChunkedMessageIds(chunkedMsgCtx.moveChunkedMessageIds()); messageId = chunkMsgId->build(); LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx @@ -1165,6 +1164,9 @@ bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType consumerType) std::pair ConsumerImpl::prepareIndividualAck(const MessageId& messageId) { auto messageIdImpl = Commands::getMessageIdImpl(messageId); + if (std::dynamic_pointer_cast(messageIdImpl)) { + return std::make_pair(messageId, true); + } auto batchedMessageIdImpl = std::dynamic_pointer_cast(messageIdImpl); auto batchSize = messageId.batchSize(); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index def2543d..292bcf7f 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -270,6 +270,10 @@ class ConsumerImpl : public ConsumerImplBase { const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } + std::vector moveChunkedMessageIds() const noexcept { + return std::move(chunkedMessageIds_); + } + long getReceivedTimeMs() const noexcept { return receivedTimeMs_; } friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) { diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 6d54a69f..e4cc76e0 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam { } void createConsumer(const std::string& topic, Consumer& consumer) { - ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer)); + ConsumerConfiguration conf; + conf.setBrokerConsumerStatsCacheTimeInMs(1000); + ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer)); } void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) { @@ -138,6 +140,7 @@ TEST_P(MessageChunkingTest, testEndToEnd) { std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); ASSERT_TRUE(chunkMsgId); receivedMessageIds.emplace_back(messageId); + consumer.acknowledge(messageId); } ASSERT_EQ(receivedMessageIds, sendMessageIds); ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId()); @@ -147,6 +150,18 @@ TEST_P(MessageChunkingTest, testEndToEnd) { auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); ASSERT_EQ(chunkedMessageCache.size(), 0); + BrokerConsumerStats consumerStats; + waitUntil( + std::chrono::seconds(10), + [&] { + if (consumer.getBrokerConsumerStats(consumerStats) != ResultOk) { + return false; + } + return consumerStats.getMsgBacklog() == 0; + }, + 1000); + ASSERT_EQ(consumerStats.getMsgBacklog(), 0); + producer.close(); consumer.close(); } From abba9910b4596a992871021101403b1305f429a2 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 27 Sep 2023 10:53:28 +0800 Subject: [PATCH 2/6] Refactor and refine chunked message id --- lib/ChunkMessageIdImpl.h | 18 ++++++------------ lib/ConsumerImpl.h | 2 +- lib/MessageId.cc | 4 ++-- lib/OpSendMsg.h | 1 + lib/ProducerImpl.cc | 9 ++++----- tests/MessageChunkingTest.cc | 10 ++++------ 6 files changed, 18 insertions(+), 26 deletions(-) diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index 36dddbc1..32879f32 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -30,23 +30,17 @@ class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_ public: ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared()) {} - void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; } - - void setLastChunkMessageId(const MessageId& msgId) { - this->ledgerId_ = msgId.ledgerId(); - this->entryId_ = msgId.entryId(); - this->partition_ = msgId.partition(); - } - void setChunkedMessageIds(std::vector&& chunkedMessageIds) { chunkedMessageIds_ = std::move(chunkedMessageIds); - setFirstChunkMessageId(chunkedMessageIds_.front()); - setLastChunkMessageId(chunkedMessageIds_.back()); + auto lastChunkMsgId = chunkedMessageIds_.back(); + this->ledgerId_ = lastChunkMsgId.ledgerId(); + this->entryId_ = lastChunkMsgId.entryId(); + this->partition_ = lastChunkMsgId.partition(); } - std::shared_ptr getFirstChunkMessageId() const { return firstChunkMsgId_; } + std::shared_ptr getFirstChunkMessageId() const { return chunkedMessageIds_.front().impl_; } - std::vector moveChunkedMessageIds() const noexcept { + std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 292bcf7f..7b811a60 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -270,7 +270,7 @@ class ConsumerImpl : public ConsumerImplBase { const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } - std::vector moveChunkedMessageIds() const noexcept { + std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 12b6f403..8100a178 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -100,8 +100,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { if (idData.has_first_chunk_message_id()) { ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build()); - chunkMsgId->setLastChunkMessageId(msgId); + chunkMsgId->setChunkedMessageIds( + {MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}); return chunkMsgId->build(); } diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index 06fa77f4..69cb7ad8 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -55,6 +55,7 @@ struct OpSendMsg { const SendCallback sendCallback; std::vector> trackerCallbacks; ChunkMessageIdImplPtr chunkedMessageId; + std::vector chunkMessageIdList; // Use shared_ptr here because producer might resend the message with the same arguments const std::shared_ptr sendArgs; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d8f02be6..d7fe4142 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -886,7 +886,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { return true; } - const auto& op = *pendingMessagesQueue_.front(); + auto& op = *pendingMessagesQueue_.front(); if (op.result != ResultOk) { LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and " << rawMessageId); @@ -912,10 +912,9 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { if (op.chunkedMessageId) { // Handling the chunk message id. - if (op.chunkId == 0) { - op.chunkedMessageId->setFirstChunkMessageId(messageId); - } else if (op.chunkId == op.numChunks - 1) { - op.chunkedMessageId->setLastChunkMessageId(messageId); + op.chunkMessageIdList.push_back(messageId); + if (op.chunkId == op.numChunks - 1) { + op.chunkedMessageId->setChunkedMessageIds(std::move(op.chunkMessageIdList)); messageId = op.chunkedMessageId->build(); } } diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index e4cc76e0..8caa794b 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -154,10 +154,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) { waitUntil( std::chrono::seconds(10), [&] { - if (consumer.getBrokerConsumerStats(consumerStats) != ResultOk) { - return false; - } - return consumerStats.getMsgBacklog() == 0; + return consumer.getBrokerConsumerStats(consumerStats) == ResultOk && + consumerStats.getMsgBacklog() == 0; }, 1000); ASSERT_EQ(consumerStats.getMsgBacklog(), 0); @@ -333,8 +331,8 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { MessageId msgId; { ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build()); - chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()); + chunkMsgId->setChunkedMessageIds({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(), + MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}); msgId = chunkMsgId->build(); // Test the destructor of the underlying message id should also work for the generated messageId. } From 0ffad7b838c38ead258a52969fab302f22ccb976 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 27 Sep 2023 11:06:43 +0800 Subject: [PATCH 3/6] Fix format --- lib/ChunkMessageIdImpl.h | 8 ++++---- lib/ConsumerImpl.h | 4 +--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index 32879f32..31c80373 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -38,12 +38,12 @@ class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_ this->partition_ = lastChunkMsgId.partition(); } - std::shared_ptr getFirstChunkMessageId() const { return chunkedMessageIds_.front().impl_; } - - std::vector moveChunkedMessageIds() noexcept { - return std::move(chunkedMessageIds_); + std::shared_ptr getFirstChunkMessageId() const { + return chunkedMessageIds_.front().impl_; } + std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } + MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } private: diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 7b811a60..7794192b 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -270,9 +270,7 @@ class ConsumerImpl : public ConsumerImplBase { const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } - std::vector moveChunkedMessageIds() noexcept { - return std::move(chunkedMessageIds_); - } + std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } long getReceivedTimeMs() const noexcept { return receivedTimeMs_; } From b56206b0fdab6f6a7fbfa366e8ec634791307fe1 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 28 Sep 2023 12:26:00 +0800 Subject: [PATCH 4/6] Refactor and fix producer chunk logic --- lib/AckGroupingTracker.cc | 23 ++++++++++++----------- lib/ChunkMessageIdImpl.h | 11 +++-------- lib/Commands.cc | 6 +++--- lib/ConsumerImpl.cc | 10 ++++------ lib/ConsumerImpl.h | 2 -- lib/MessageId.cc | 19 +++++++++---------- lib/OpSendMsg.h | 9 +++++---- lib/ProducerImpl.cc | 14 +++++++------- tests/MessageChunkingTest.cc | 32 +++++++++++++++++--------------- 9 files changed, 60 insertions(+), 66 deletions(-) diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc index 1f946a7e..ab7381f1 100644 --- a/lib/AckGroupingTracker.cc +++ b/lib/AckGroupingTracker.cc @@ -44,13 +44,16 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c } return; } - if (auto chunkMessageId = - std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { - auto msgIdList = chunkMessageId->moveChunkedMessageIds(); - doImmediateAck(std::set(std::make_move_iterator(msgIdList.begin()), - std::make_move_iterator(msgIdList.end())), - callback); - return; + if (ackType == CommandAck_AckType_Individual) { + // If it's individual ack, we need to acknowledge all message IDs in a chunked message Id + // If it's cumulative ack, we only need to ack the last message ID of a chunked message. + // ChunkedMessageId return last chunk message ID by default, so we don't need to handle it. + if (auto chunkMessageId = + std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { + auto msgIdList = chunkMessageId->getChunkedMessageIds(); + doImmediateAck(std::set(msgIdList.begin(), msgIdList.end()), callback); + return; + } } const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet(); if (waitResponse_) { @@ -99,10 +102,8 @@ void AckGroupingTracker::doImmediateAck(const std::set& msgIds, Resul for (const auto& msgId : msgIds) { if (auto chunkMessageId = std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { - auto msgIdList = chunkMessageId->moveChunkedMessageIds(); - doImmediateAck(std::set(std::make_move_iterator(msgIdList.begin()), - std::make_move_iterator(msgIdList.end())), - callback); + auto msgIdList = chunkMessageId->getChunkedMessageIds(); + ackMsgIds.insert(msgIdList.begin(), msgIdList.end()); } else { ackMsgIds.insert(msgId); } diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index 31c80373..bb9f85a8 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -28,26 +28,21 @@ class ChunkMessageIdImpl; typedef std::shared_ptr ChunkMessageIdImplPtr; class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this { public: - ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared()) {} - - void setChunkedMessageIds(std::vector&& chunkedMessageIds) { - chunkedMessageIds_ = std::move(chunkedMessageIds); + explicit ChunkMessageIdImpl(std::vector&& chunkedMessageIds) + : chunkedMessageIds_(std::move(chunkedMessageIds)) { auto lastChunkMsgId = chunkedMessageIds_.back(); this->ledgerId_ = lastChunkMsgId.ledgerId(); this->entryId_ = lastChunkMsgId.entryId(); this->partition_ = lastChunkMsgId.partition(); } - std::shared_ptr getFirstChunkMessageId() const { - return chunkedMessageIds_.front().impl_; - } + const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } private: - std::shared_ptr firstChunkMsgId_; std::vector chunkedMessageIds_; }; } // namespace pulsar diff --git a/lib/Commands.cc b/lib/Commands.cc index f2e6c6d2..4b10b732 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); if (chunkMsgId) { - auto firstId = chunkMsgId->getFirstChunkMessageId(); - messageIdData.set_ledgerid(firstId->ledgerId_); - messageIdData.set_entryid(firstId->entryId_); + const auto& firstId = chunkMsgId->getChunkedMessageIds().front(); + messageIdData.set_ledgerid(firstId.ledgerId()); + messageIdData.set_entryid(firstId.entryId()); } else { messageIdData.set_ledgerid(messageId.ledgerId()); messageIdData.set_entryid(messageId.entryId()); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b2bdc8f7..fd2f1ffc 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -478,9 +478,7 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff return boost::none; } - ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setChunkedMessageIds(chunkedMsgCtx.moveChunkedMessageIds()); - messageId = chunkMsgId->build(); + messageId = std::make_shared(chunkedMsgCtx.moveChunkedMessageIds())->build(); LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); @@ -1164,9 +1162,6 @@ bool ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType consumerType) std::pair ConsumerImpl::prepareIndividualAck(const MessageId& messageId) { auto messageIdImpl = Commands::getMessageIdImpl(messageId); - if (std::dynamic_pointer_cast(messageIdImpl)) { - return std::make_pair(messageId, true); - } auto batchedMessageIdImpl = std::dynamic_pointer_cast(messageIdImpl); auto batchSize = messageId.batchSize(); @@ -1175,6 +1170,9 @@ std::pair ConsumerImpl::prepareIndividualAck(const MessageId& m (batchSize > 0) ? batchSize : 1); unAckedMessageTrackerPtr_->remove(messageId); possibleSendToDeadLetterTopicMessages_.remove(messageId); + if (std::dynamic_pointer_cast(messageIdImpl)) { + return std::make_pair(messageId, true); + } return std::make_pair(discardBatch(messageId), true); } else if (config_.isBatchIndexAckEnabled()) { return std::make_pair(messageId, true); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 7794192b..690d8fc4 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -294,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase { // concurrently on the topic) then it guards against broken chunked message which was not fully published const bool autoAckOldestChunkedMessageOnQueueFull_; - // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message. - std::unordered_map chunkedMessagesMap_; // This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending // chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and // the associated ChunkedMessageCtx will be removed. diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 8100a178..b51fd9f1 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const { auto chunkMsgId = std::dynamic_pointer_cast(impl_); if (chunkMsgId) { proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id(); - auto firstChunkId = chunkMsgId->getFirstChunkMessageId(); - firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_); - firstChunkIdData.set_entryid(firstChunkId->entryId_); + const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front(); + firstChunkIdData.set_ledgerid(firstChunkId.ledgerId()); + firstChunkIdData.set_entryid(firstChunkId.entryId()); if (chunkMsgId->partition_ != -1) { - firstChunkIdData.set_partition(firstChunkId->partition_); + firstChunkIdData.set_partition(firstChunkId.partition()); } } @@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { MessageId msgId = MessageIdBuilder::from(idData).build(); if (idData.has_first_chunk_message_id()) { - ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setChunkedMessageIds( - {MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}); + ChunkMessageIdImplPtr chunkMsgId = std::make_shared( + std::vector({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId})); return chunkMsgId->build(); } @@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; } PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); if (chunkMsgId) { - auto firstId = chunkMsgId->getFirstChunkMessageId(); - s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ',' - << firstId->batchIndex_ << ");"; + const auto& firstId = chunkMsgId->getChunkedMessageIds().front(); + s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ',' + << firstId.batchIndex() << ");"; } s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')'; diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index 69cb7ad8..a1319e10 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -45,6 +45,8 @@ struct SendArguments { SendArguments& operator=(const SendArguments&) = delete; }; +typedef std::shared_ptr> ChunkMessageIdListPtr; + struct OpSendMsg { const Result result; const int32_t chunkId; @@ -54,8 +56,7 @@ struct OpSendMsg { const boost::posix_time::ptime timeout; const SendCallback sendCallback; std::vector> trackerCallbacks; - ChunkMessageIdImplPtr chunkedMessageId; - std::vector chunkMessageIdList; + ChunkMessageIdListPtr chunkMessageIdList; // Use shared_ptr here because producer might resend the message with the same arguments const std::shared_ptr sendArgs; @@ -90,7 +91,7 @@ struct OpSendMsg { sendArgs(nullptr) {} OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, - int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId, + int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList, uint64_t producerId, SharedBuffer payload) : result(ResultOk), chunkId(metadata.chunk_id()), @@ -99,7 +100,7 @@ struct OpSendMsg { messagesSize(messagesSize), timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), sendCallback(std::move(callback)), - chunkedMessageId(chunkedMessageId), + chunkMessageIdList(std::move(chunkMessageIdList)), sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} }; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d7fe4142..1ba51714 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -571,14 +571,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c } } else { const bool sendChunks = (totalChunks > 1); + ChunkMessageIdListPtr chunkMessageIdList; if (sendChunks) { msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId)); msgMetadata.set_num_chunks_from_msg(totalChunks); msgMetadata.set_total_chunk_msg_size(compressedSize); + chunkMessageIdList = std::make_shared>(); } - auto chunkMessageId = totalChunks > 1 ? std::make_shared() : nullptr; - int beginIndex = 0; for (int chunkId = 0; chunkId < totalChunks; chunkId++) { if (sendChunks) { @@ -595,7 +595,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c } auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(), - (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId, + (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList, producerId_, encryptedPayload); if (!chunkingEnabled_) { @@ -910,12 +910,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - if (op.chunkedMessageId) { + if (op.numChunks > 1) { // Handling the chunk message id. - op.chunkMessageIdList.push_back(messageId); + op.chunkMessageIdList->push_back(messageId); if (op.chunkId == op.numChunks - 1) { - op.chunkedMessageId->setChunkedMessageIds(std::move(op.chunkMessageIdList)); - messageId = op.chunkedMessageId->build(); + auto chunkedMessageId = std::make_shared(std::move(*op.chunkMessageIdList)); + messageId = chunkedMessageId->build(); } } diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 8caa794b..f68dd3d8 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -120,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) { for (int i = 0; i < numMessages; i++) { MessageId messageId; ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); - auto chunkMsgId = - std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); - ASSERT_TRUE(chunkMsgId); LOG_INFO("Send " << i << " to " << messageId); sendMessageIds.emplace_back(messageId); } @@ -136,14 +133,19 @@ TEST_P(MessageChunkingTest, testEndToEnd) { ASSERT_EQ(msg.getMessageId().batchIndex(), -1); ASSERT_EQ(msg.getMessageId().batchSize(), 0); auto messageId = msg.getMessageId(); - auto chunkMsgId = - std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); - ASSERT_TRUE(chunkMsgId); receivedMessageIds.emplace_back(messageId); consumer.acknowledge(messageId); } ASSERT_EQ(receivedMessageIds, sendMessageIds); - ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId()); + for (int i = 0; i < sendMessageIds.size(); ++i) { + auto sendChunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(sendMessageIds[i])); + ASSERT_TRUE(sendChunkMsgId); + auto receiveChunkMsgId = std::dynamic_pointer_cast( + PulsarFriend::getMessageIdImpl(receivedMessageIds[i])); + ASSERT_TRUE(receiveChunkMsgId); + ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds()); + } ASSERT_GT(receivedMessageIds.back().entryId(), numMessages); // Verify the cache has been cleared @@ -330,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) { TEST(ChunkMessageIdTest, testSetChunkMessageId) { MessageId msgId; { - ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setChunkedMessageIds({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(), - MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}); + ChunkMessageIdImplPtr chunkMsgId = std::make_shared( + std::vector({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(), + MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()})); msgId = chunkMsgId->build(); // Test the destructor of the underlying message id should also work for the generated messageId. } @@ -345,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { ASSERT_EQ(deserializedMsgId.entryId(), 5); ASSERT_EQ(deserializedMsgId.partition(), 6); - auto chunkMsgId = + const auto& chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(deserializedMsgId)); ASSERT_TRUE(chunkMsgId); - auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); - ASSERT_EQ(firstChunkMsgId->ledgerId_, 1); - ASSERT_EQ(firstChunkMsgId->entryId_, 2); - ASSERT_EQ(firstChunkMsgId->partition_, 3); + auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front(); + ASSERT_EQ(firstChunkMsgId.ledgerId(), 1); + ASSERT_EQ(firstChunkMsgId.entryId(), 2); + ASSERT_EQ(firstChunkMsgId.partition(), 3); } // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P From ff1d918f7bcfc19927acf945baa836b23f929ed4 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 28 Sep 2023 12:59:53 +0800 Subject: [PATCH 5/6] Fix --- lib/ProducerImpl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 1ba51714..c13e60ef 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -910,7 +910,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - if (op.numChunks > 1) { + if (op.chunkMessageIdList) { // Handling the chunk message id. op.chunkMessageIdList->push_back(messageId); if (op.chunkId == op.numChunks - 1) { From 7a6c0ee98a0b64857528d4880435e37d7a519d88 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 7 Oct 2023 12:21:37 +0800 Subject: [PATCH 6/6] Remove unused method --- lib/ChunkMessageIdImpl.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index bb9f85a8..3fb0f135 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -38,8 +38,6 @@ class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_ const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } - std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } - MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } private: