diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc index 9a471354..ab7381f1 100644 --- a/lib/AckGroupingTracker.cc +++ b/lib/AckGroupingTracker.cc @@ -21,8 +21,10 @@ #include #include +#include #include "BitSet.h" +#include "ChunkMessageIdImpl.h" #include "ClientConnection.h" #include "Commands.h" #include "LogUtils.h" @@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c } return; } + if (ackType == CommandAck_AckType_Individual) { + // If it's individual ack, we need to acknowledge all message IDs in a chunked message Id + // If it's cumulative ack, we only need to ack the last message ID of a chunked message. + // ChunkedMessageId return last chunk message ID by default, so we don't need to handle it. + if (auto chunkMessageId = + std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { + auto msgIdList = chunkMessageId->getChunkedMessageIds(); + doImmediateAck(std::set(msgIdList.begin(), msgIdList.end()), callback); + return; + } + } const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet(); if (waitResponse_) { const auto requestId = requestIdSupplier_(); @@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const std::set& msgIds, Resul return; } + std::set ackMsgIds; + + for (const auto& msgId : msgIds) { + if (auto chunkMessageId = + std::dynamic_pointer_cast(Commands::getMessageIdImpl(msgId))) { + auto msgIdList = chunkMessageId->getChunkedMessageIds(); + ackMsgIds.insert(msgIdList.begin(), msgIdList.end()); + } else { + ackMsgIds.insert(msgId); + } + } + if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { if (waitResponse_) { const auto requestId = requestIdSupplier_(); - cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId) + cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId) .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); } }); } else { - cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds)); + cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds)); if (callback) { callback(ResultOk); } } } else { - auto count = std::make_shared>(msgIds.size()); + auto count = std::make_shared>(ackMsgIds.size()); auto wrappedCallback = [callback, count](Result result) { if (--*count == 0 && callback) { callback(result); } }; - for (auto&& msgId : msgIds) { + for (auto&& msgId : ackMsgIds) { doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual); } } diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index 3081ff03..3fb0f135 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -28,21 +28,19 @@ class ChunkMessageIdImpl; typedef std::shared_ptr ChunkMessageIdImplPtr; class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this { public: - ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared()) {} - - void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; } - - void setLastChunkMessageId(const MessageId& msgId) { - this->ledgerId_ = msgId.ledgerId(); - this->entryId_ = msgId.entryId(); - this->partition_ = msgId.partition(); + explicit ChunkMessageIdImpl(std::vector&& chunkedMessageIds) + : chunkedMessageIds_(std::move(chunkedMessageIds)) { + auto lastChunkMsgId = chunkedMessageIds_.back(); + this->ledgerId_ = lastChunkMsgId.ledgerId(); + this->entryId_ = lastChunkMsgId.entryId(); + this->partition_ = lastChunkMsgId.partition(); } - std::shared_ptr getFirstChunkMessageId() const { return firstChunkMsgId_; } + const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } private: - std::shared_ptr firstChunkMsgId_; + std::vector chunkedMessageIds_; }; } // namespace pulsar diff --git a/lib/Commands.cc b/lib/Commands.cc index f2e6c6d2..4b10b732 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); if (chunkMsgId) { - auto firstId = chunkMsgId->getFirstChunkMessageId(); - messageIdData.set_ledgerid(firstId->ledgerId_); - messageIdData.set_entryid(firstId->entryId_); + const auto& firstId = chunkMsgId->getChunkedMessageIds().front(); + messageIdData.set_ledgerid(firstId.ledgerId()); + messageIdData.set_entryid(firstId.entryId()); } else { messageIdData.set_ledgerid(messageId.ledgerId()); messageIdData.set_entryid(messageId.entryId()); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 0ee6a74d..fd2f1ffc 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -478,10 +478,7 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff return boost::none; } - ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); - chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); - messageId = chunkMsgId->build(); + messageId = std::make_shared(chunkedMsgCtx.moveChunkedMessageIds())->build(); LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); @@ -1173,6 +1170,9 @@ std::pair ConsumerImpl::prepareIndividualAck(const MessageId& m (batchSize > 0) ? batchSize : 1); unAckedMessageTrackerPtr_->remove(messageId); possibleSendToDeadLetterTopicMessages_.remove(messageId); + if (std::dynamic_pointer_cast(messageIdImpl)) { + return std::make_pair(messageId, true); + } return std::make_pair(discardBatch(messageId), true); } else if (config_.isBatchIndexAckEnabled()) { return std::make_pair(messageId, true); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index def2543d..690d8fc4 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase { const std::vector& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; } + std::vector moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); } + long getReceivedTimeMs() const noexcept { return receivedTimeMs_; } friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) { @@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase { // concurrently on the topic) then it guards against broken chunked message which was not fully published const bool autoAckOldestChunkedMessageOnQueueFull_; - // The key is UUID, value is the associated ChunkedMessageCtx of the chunked message. - std::unordered_map chunkedMessagesMap_; // This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending // chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and // the associated ChunkedMessageCtx will be removed. diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 12b6f403..b51fd9f1 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const { auto chunkMsgId = std::dynamic_pointer_cast(impl_); if (chunkMsgId) { proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id(); - auto firstChunkId = chunkMsgId->getFirstChunkMessageId(); - firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_); - firstChunkIdData.set_entryid(firstChunkId->entryId_); + const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front(); + firstChunkIdData.set_ledgerid(firstChunkId.ledgerId()); + firstChunkIdData.set_entryid(firstChunkId.entryId()); if (chunkMsgId->partition_ != -1) { - firstChunkIdData.set_partition(firstChunkId->partition_); + firstChunkIdData.set_partition(firstChunkId.partition()); } } @@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { MessageId msgId = MessageIdBuilder::from(idData).build(); if (idData.has_first_chunk_message_id()) { - ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build()); - chunkMsgId->setLastChunkMessageId(msgId); + ChunkMessageIdImplPtr chunkMsgId = std::make_shared( + std::vector({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId})); return chunkMsgId->build(); } @@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; } PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); if (chunkMsgId) { - auto firstId = chunkMsgId->getFirstChunkMessageId(); - s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ',' - << firstId->batchIndex_ << ");"; + const auto& firstId = chunkMsgId->getChunkedMessageIds().front(); + s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ',' + << firstId.batchIndex() << ");"; } s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')'; diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index 06fa77f4..a1319e10 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -45,6 +45,8 @@ struct SendArguments { SendArguments& operator=(const SendArguments&) = delete; }; +typedef std::shared_ptr> ChunkMessageIdListPtr; + struct OpSendMsg { const Result result; const int32_t chunkId; @@ -54,7 +56,7 @@ struct OpSendMsg { const boost::posix_time::ptime timeout; const SendCallback sendCallback; std::vector> trackerCallbacks; - ChunkMessageIdImplPtr chunkedMessageId; + ChunkMessageIdListPtr chunkMessageIdList; // Use shared_ptr here because producer might resend the message with the same arguments const std::shared_ptr sendArgs; @@ -89,7 +91,7 @@ struct OpSendMsg { sendArgs(nullptr) {} OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, - int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId, + int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList, uint64_t producerId, SharedBuffer payload) : result(ResultOk), chunkId(metadata.chunk_id()), @@ -98,7 +100,7 @@ struct OpSendMsg { messagesSize(messagesSize), timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), sendCallback(std::move(callback)), - chunkedMessageId(chunkedMessageId), + chunkMessageIdList(std::move(chunkMessageIdList)), sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} }; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index d8f02be6..c13e60ef 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -571,14 +571,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c } } else { const bool sendChunks = (totalChunks > 1); + ChunkMessageIdListPtr chunkMessageIdList; if (sendChunks) { msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId)); msgMetadata.set_num_chunks_from_msg(totalChunks); msgMetadata.set_total_chunk_msg_size(compressedSize); + chunkMessageIdList = std::make_shared>(); } - auto chunkMessageId = totalChunks > 1 ? std::make_shared() : nullptr; - int beginIndex = 0; for (int chunkId = 0; chunkId < totalChunks; chunkId++) { if (sendChunks) { @@ -595,7 +595,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c } auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(), - (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId, + (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList, producerId_, encryptedPayload); if (!chunkingEnabled_) { @@ -886,7 +886,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { return true; } - const auto& op = *pendingMessagesQueue_.front(); + auto& op = *pendingMessagesQueue_.front(); if (op.result != ResultOk) { LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and " << rawMessageId); @@ -910,13 +910,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - if (op.chunkedMessageId) { + if (op.chunkMessageIdList) { // Handling the chunk message id. - if (op.chunkId == 0) { - op.chunkedMessageId->setFirstChunkMessageId(messageId); - } else if (op.chunkId == op.numChunks - 1) { - op.chunkedMessageId->setLastChunkMessageId(messageId); - messageId = op.chunkedMessageId->build(); + op.chunkMessageIdList->push_back(messageId); + if (op.chunkId == op.numChunks - 1) { + auto chunkedMessageId = std::make_shared(std::move(*op.chunkMessageIdList)); + messageId = chunkedMessageId->build(); } } diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 6d54a69f..f68dd3d8 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam { } void createConsumer(const std::string& topic, Consumer& consumer) { - ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer)); + ConsumerConfiguration conf; + conf.setBrokerConsumerStatsCacheTimeInMs(1000); + ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer)); } void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) { @@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) { for (int i = 0; i < numMessages; i++) { MessageId messageId; ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); - auto chunkMsgId = - std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); - ASSERT_TRUE(chunkMsgId); LOG_INFO("Send " << i << " to " << messageId); sendMessageIds.emplace_back(messageId); } @@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) { ASSERT_EQ(msg.getMessageId().batchIndex(), -1); ASSERT_EQ(msg.getMessageId().batchSize(), 0); auto messageId = msg.getMessageId(); - auto chunkMsgId = - std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); - ASSERT_TRUE(chunkMsgId); receivedMessageIds.emplace_back(messageId); + consumer.acknowledge(messageId); } ASSERT_EQ(receivedMessageIds, sendMessageIds); - ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId()); + for (int i = 0; i < sendMessageIds.size(); ++i) { + auto sendChunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(sendMessageIds[i])); + ASSERT_TRUE(sendChunkMsgId); + auto receiveChunkMsgId = std::dynamic_pointer_cast( + PulsarFriend::getMessageIdImpl(receivedMessageIds[i])); + ASSERT_TRUE(receiveChunkMsgId); + ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds()); + } ASSERT_GT(receivedMessageIds.back().entryId(), numMessages); // Verify the cache has been cleared auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); ASSERT_EQ(chunkedMessageCache.size(), 0); + BrokerConsumerStats consumerStats; + waitUntil( + std::chrono::seconds(10), + [&] { + return consumer.getBrokerConsumerStats(consumerStats) == ResultOk && + consumerStats.getMsgBacklog() == 0; + }, + 1000); + ASSERT_EQ(consumerStats.getMsgBacklog(), 0); + producer.close(); consumer.close(); } @@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) { TEST(ChunkMessageIdTest, testSetChunkMessageId) { MessageId msgId; { - ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); - chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build()); - chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()); + ChunkMessageIdImplPtr chunkMsgId = std::make_shared( + std::vector({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(), + MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()})); msgId = chunkMsgId->build(); // Test the destructor of the underlying message id should also work for the generated messageId. } @@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { ASSERT_EQ(deserializedMsgId.entryId(), 5); ASSERT_EQ(deserializedMsgId.partition(), 6); - auto chunkMsgId = + const auto& chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(deserializedMsgId)); ASSERT_TRUE(chunkMsgId); - auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); - ASSERT_EQ(firstChunkMsgId->ledgerId_, 1); - ASSERT_EQ(firstChunkMsgId->entryId_, 2); - ASSERT_EQ(firstChunkMsgId->partition_, 3); + auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front(); + ASSERT_EQ(firstChunkMsgId.ledgerId(), 1); + ASSERT_EQ(firstChunkMsgId.entryId(), 2); + ASSERT_EQ(firstChunkMsgId.partition(), 3); } // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P