From b8aa4aabb4bd461cd5f16f6868e233817d0b3a1e Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 9 Dec 2022 17:48:45 +0800 Subject: [PATCH 1/7] Implement chunk message id. --- include/pulsar/MessageId.h | 1 + lib/ChunkMessageIdImpl.h | 52 ++++++++++++++++++++++++++++++++++++ tests/MessageChunkingTest.cc | 21 +++++++++++++++ tests/PulsarFriend.h | 4 +++ 4 files changed, 78 insertions(+) create mode 100644 lib/ChunkMessageIdImpl.h diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index 28b88c85..3871e87b 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -108,6 +108,7 @@ class PULSAR_PUBLIC MessageId { friend class PulsarFriend; friend class NegativeAcksTracker; friend class MessageIdBuilder; + friend class ChunkMessageIdImpl; friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId); diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h new file mode 100644 index 00000000..c42e8b2b --- /dev/null +++ b/lib/ChunkMessageIdImpl.h @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "MessageIdImpl.h" + +namespace pulsar { +class ChunkMessageIdImpl; +typedef std::shared_ptr ChunkMessageIdImplPtr; +class ChunkMessageIdImpl : public MessageIdImpl { + public: + ChunkMessageIdImpl() {} +// ChunkMessageIdImpl(const MessageIdImpl& firstChunkMsgId, const MessageIdImpl& lastChunkMsgId) +// : MessageIdImpl(lastChunkMsgId), firstChunkMsgId_(firstChunkMsgId) {} + + void setFirstChunkMessageId(const MessageId& msgId) { + *firstChunkMsgId_.impl_ = *msgId.impl_; + } + + void setLastChunkMessageId(const MessageId& msgId) { + this->ledgerId_ = msgId.ledgerId(); + this->entryId_ = msgId.entryId(); + this->partition_ = msgId.partition(); + } + + const MessageId& getFirstChunkMessageId() const { return firstChunkMsgId_; } + + static MessageId buildMessageId(ChunkMessageIdImplPtr& msgIdImpl) { + return MessageId{msgIdImpl}; + } + + private: + MessageId firstChunkMsgId_; +}; +} // namespace pulsar diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index e3f01786..1d84d106 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -255,6 +255,27 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { consumer.close(); } +TEST(ChunkMessageIdTest, testSetChunkMessageId) { + MessageId msgId; + { + ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); + chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build()); + chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()); + msgId = ChunkMessageIdImpl::buildMessageId(chunkMsgId); + // Test the destructor of the underlying message id should also work for the generated messageId. + } + ASSERT_EQ(msgId.ledgerId(), 1); + ASSERT_EQ(msgId.entryId(), 2); + ASSERT_EQ(msgId.partition(), 3); + + auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(msgId)); + ASSERT_TRUE(chunkMsgId); + auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); + ASSERT_EQ(firstChunkMsgId.ledgerId(), 4); + ASSERT_EQ(firstChunkMsgId.entryId(), 5); + ASSERT_EQ(firstChunkMsgId.partition(), 6); +} + // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest, ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD, diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 878d80cb..ee7370b9 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -179,6 +179,10 @@ class PulsarFriend { } static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } + + static std::shared_ptr getMessageIdImpl(MessageId& msgId) { + return msgId.impl_; + } }; } // namespace pulsar From d71232c4813579a897f0caa0c03c6b8bf8a93e09 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 15 Dec 2022 14:54:27 +0800 Subject: [PATCH 2/7] Producer send return chunk msg id. --- lib/ChunkMessageIdImpl.h | 16 +++++---------- lib/OpSendMsg.h | 2 ++ lib/ProducerImpl.cc | 38 ++++++++++++++++++++++++------------ tests/MessageChunkingTest.cc | 15 ++++++++------ 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index c42e8b2b..bffebb45 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -26,13 +26,9 @@ class ChunkMessageIdImpl; typedef std::shared_ptr ChunkMessageIdImplPtr; class ChunkMessageIdImpl : public MessageIdImpl { public: - ChunkMessageIdImpl() {} -// ChunkMessageIdImpl(const MessageIdImpl& firstChunkMsgId, const MessageIdImpl& lastChunkMsgId) -// : MessageIdImpl(lastChunkMsgId), firstChunkMsgId_(firstChunkMsgId) {} + ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared()) {} - void setFirstChunkMessageId(const MessageId& msgId) { - *firstChunkMsgId_.impl_ = *msgId.impl_; - } + void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; } void setLastChunkMessageId(const MessageId& msgId) { this->ledgerId_ = msgId.ledgerId(); @@ -40,13 +36,11 @@ class ChunkMessageIdImpl : public MessageIdImpl { this->partition_ = msgId.partition(); } - const MessageId& getFirstChunkMessageId() const { return firstChunkMsgId_; } + MessageId getFirstChunkMessageId() { return MessageId{firstChunkMsgId_}; } - static MessageId buildMessageId(ChunkMessageIdImplPtr& msgIdImpl) { - return MessageId{msgIdImpl}; - } + static MessageId buildMessageId(ChunkMessageIdImplPtr& msgIdImpl) { return MessageId{msgIdImpl}; } private: - MessageId firstChunkMsgId_; + std::shared_ptr firstChunkMsgId_; }; } // namespace pulsar diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index d805dd33..790b54ef 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -24,6 +24,7 @@ #include +#include "ChunkMessageIdImpl.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" #include "TimeUtils.h" @@ -40,6 +41,7 @@ struct OpSendMsg { uint32_t messagesCount_; uint64_t messagesSize_; std::vector> trackerCallbacks_; + ChunkMessageIdImplPtr chunkedMessageId_; OpSendMsg() = default; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index a3a5a95a..599af185 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -868,22 +868,36 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { << " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId << " producer: " << producerId_); return true; - } else { - // Message was persisted correctly - LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - releaseSemaphoreForSendOp(op); - lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1; + } - pendingMessagesQueue_.pop_front(); + // Message was persisted correctly + LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - lock.unlock(); - try { - op.complete(ResultOk, messageId); - } catch (const std::exception& e) { - LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + auto totalChunks = op.metadata_.num_chunks_from_msg(); + if (totalChunks > 1) { + if (!op.chunkedMessageId_) { + op.chunkedMessageId_ = std::make_shared(); } - return true; + if (op.metadata_.chunk_id() == 0) { + op.chunkedMessageId_->setFirstChunkMessageId(messageId); + } else if (op.metadata_.chunk_id() == totalChunks - 1) { + op.chunkedMessageId_->setLastChunkMessageId(messageId); + messageId = ChunkMessageIdImpl::buildMessageId(op.chunkedMessageId_); + } + } + + releaseSemaphoreForSendOp(op); + lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1; + + pendingMessagesQueue_.pop_front(); + + lock.unlock(); + try { + op.complete(ResultOk, messageId); + } catch (const std::exception& e) { + LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); } + return true; } bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 1d84d106..a18aa2d2 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -18,6 +18,7 @@ */ #include #include +#include #include #include @@ -116,6 +117,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) { for (int i = 0; i < numMessages; i++) { MessageId messageId; ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); + auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); + ASSERT_TRUE(chunkMsgId); LOG_INFO("Send " << i << " to " << messageId); sendMessageIds.emplace_back(messageId); } @@ -264,16 +267,16 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { msgId = ChunkMessageIdImpl::buildMessageId(chunkMsgId); // Test the destructor of the underlying message id should also work for the generated messageId. } - ASSERT_EQ(msgId.ledgerId(), 1); - ASSERT_EQ(msgId.entryId(), 2); - ASSERT_EQ(msgId.partition(), 3); + ASSERT_EQ(msgId.ledgerId(), 4); + ASSERT_EQ(msgId.entryId(), 5); + ASSERT_EQ(msgId.partition(), 6); auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(msgId)); ASSERT_TRUE(chunkMsgId); auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); - ASSERT_EQ(firstChunkMsgId.ledgerId(), 4); - ASSERT_EQ(firstChunkMsgId.entryId(), 5); - ASSERT_EQ(firstChunkMsgId.partition(), 6); + ASSERT_EQ(firstChunkMsgId.ledgerId(), 1); + ASSERT_EQ(firstChunkMsgId.entryId(), 2); + ASSERT_EQ(firstChunkMsgId.partition(), 3); } // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P From cf4ebd68bdcba1456bfdcd5a1ffa61145e0a3870 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 15 Dec 2022 18:22:25 +0800 Subject: [PATCH 3/7] Consumer receive return chunk msg id. --- include/pulsar/Message.h | 3 +-- lib/ConsumerImpl.cc | 19 +++++++++++++------ lib/ConsumerImpl.h | 6 +++--- lib/Message.cc | 5 ++--- tests/MessageChunkingTest.cc | 5 ++++- 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index b7b3fdd0..77f30d41 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -188,8 +188,7 @@ class PULSAR_PUBLIC Message { MessageImplPtr impl_; Message(MessageImplPtr& impl); - Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload, - int32_t partition); + Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload); /// Used for Batch Messages Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata, const std::string& topicName); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index e277d99b..ba763e8f 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -27,6 +27,7 @@ #include "AckGroupingTrackerEnabled.h" #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" +#include "ChunkMessageIdImpl.h" #include "ClientConnection.h" #include "ClientImpl.h" #include "Commands.h" @@ -375,9 +376,9 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { boost::optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, - const MessageId& messageId, const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx) { + const ClientConnectionPtr& cnx, + MessageId& messageId) { const auto chunkId = metadata.chunk_id(); const auto uuid = metadata.uuid(); LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid @@ -432,6 +433,11 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff return boost::none; } + ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); + chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); + chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); + messageId = ChunkMessageIdImpl::buildMessageId(chunkMsgId); + LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); @@ -472,11 +478,12 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } } + const auto& messageIdData = msg.message_id(); + auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build(); + // Only a non-batched messages can be a chunk if (!metadata.has_num_messages_in_batch() && isChunkedMessage) { - const auto& messageIdData = msg.message_id(); - auto messageId = MessageIdBuilder::from(messageIdData).build(); - auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx); + auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId); if (optionalPayload) { payload = optionalPayload.value(); } else { @@ -484,7 +491,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } } - Message m(msg, metadata, payload, partitionIndex_); + Message m(messageId, metadata, payload); m.impl_->cnx_ = cnx.get(); m.impl_->setTopicName(topic_); m.impl_->setRedeliveryCount(msg.redelivery_count()); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index d2480b88..2da1525a 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -298,18 +298,18 @@ class ConsumerImpl : public ConsumerImplBase { * * @param payload the payload of a chunk * @param metadata the message metadata - * @param messageId * @param messageIdData * @param cnx + * @param messageId * * @return the concatenated payload if chunks are concatenated into a completed message payload * successfully, else Optional::empty() */ boost::optional processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, - const MessageId& messageId, const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx); + const ClientConnectionPtr& cnx, + MessageId& messageId); friend class PulsarFriend; diff --git a/lib/Message.cc b/lib/Message.cc index 0f28f7d9..545c8932 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -69,10 +69,9 @@ Message::Message() : impl_() {} Message::Message(MessageImplPtr& impl) : impl_(impl) {} -Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload, - int32_t partition) +Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload) : impl_(std::make_shared()) { - impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build(); + impl_->messageId = messageId; impl_->metadata = metadata; impl_->payload = payload; } diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index a18aa2d2..c3cc2c34 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -131,7 +131,10 @@ TEST_P(MessageChunkingTest, testEndToEnd) { ASSERT_EQ(msg.getDataAsString(), largeMessage); ASSERT_EQ(msg.getMessageId().batchIndex(), -1); ASSERT_EQ(msg.getMessageId().batchSize(), 0); - receivedMessageIds.emplace_back(msg.getMessageId()); + auto messageId = msg.getMessageId(); + auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); + ASSERT_TRUE(chunkMsgId); + receivedMessageIds.emplace_back(messageId); } ASSERT_EQ(receivedMessageIds, sendMessageIds); ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId()); From 2a208924efc23116ddeb505b431345e865f3c936 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 15 Dec 2022 22:45:14 +0800 Subject: [PATCH 4/7] Support serialization for ChunkMessageId. --- lib/ChunkMessageIdImpl.h | 4 ++-- lib/MessageId.cc | 30 +++++++++++++++++++++++++++++- tests/MessageChunkingTest.cc | 20 +++++++++++++------- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index bffebb45..f6e8cdde 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -36,9 +36,9 @@ class ChunkMessageIdImpl : public MessageIdImpl { this->partition_ = msgId.partition(); } - MessageId getFirstChunkMessageId() { return MessageId{firstChunkMsgId_}; } + std::shared_ptr getFirstChunkMessageId() { return firstChunkMsgId_; } - static MessageId buildMessageId(ChunkMessageIdImplPtr& msgIdImpl) { return MessageId{msgIdImpl}; } + static MessageId buildMessageId(const ChunkMessageIdImplPtr& msgIdImpl) { return MessageId{msgIdImpl}; } private: std::shared_ptr firstChunkMsgId_; diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 9a1a38c8..4aa8ed69 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -25,6 +25,7 @@ #include #include +#include "ChunkMessageIdImpl.h" #include "MessageIdImpl.h" #include "PulsarApi.pb.h" @@ -68,6 +69,18 @@ void MessageId::serialize(std::string& result) const { idData.set_batch_index(impl_->batchIndex_); } + auto chunkMsgId = std::dynamic_pointer_cast(impl_); + if(chunkMsgId) { + auto* firstChunkIdData = new proto::MessageIdData(); + auto firstChunkId = chunkMsgId->getFirstChunkMessageId(); + firstChunkIdData->set_ledgerid(firstChunkId->ledgerId_); + firstChunkIdData->set_entryid(firstChunkId->entryId_); + if(chunkMsgId->partition_!=-1){ + firstChunkIdData->set_partition(firstChunkId->partition_); + } + idData.set_allocated_first_chunk_message_id(firstChunkIdData); + } + idData.SerializeToString(&result); } @@ -80,7 +93,16 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { throw std::invalid_argument("Failed to parse serialized message id"); } - return MessageIdBuilder::from(idData).build(); + MessageId msgId = MessageIdBuilder::from(idData).build(); + + if(idData.has_first_chunk_message_id()) { + ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); + chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build()); + chunkMsgId->setLastChunkMessageId(msgId); + return ChunkMessageIdImpl::buildMessageId(chunkMsgId); + } + + return msgId; } int64_t MessageId::ledgerId() const { return impl_->ledgerId_; } @@ -94,6 +116,12 @@ int32_t MessageId::partition() const { return impl_->partition_; } int32_t MessageId::batchSize() const { return impl_->batchSize_; } PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { + auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); + if(chunkMsgId) { + auto firstId = chunkMsgId->getFirstChunkMessageId(); + s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' + << firstId->partition_ << ',' << firstId->batchIndex_ << ");"; + } s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')'; return s; diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index c3cc2c34..89f49961 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -270,16 +270,22 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { msgId = ChunkMessageIdImpl::buildMessageId(chunkMsgId); // Test the destructor of the underlying message id should also work for the generated messageId. } - ASSERT_EQ(msgId.ledgerId(), 4); - ASSERT_EQ(msgId.entryId(), 5); - ASSERT_EQ(msgId.partition(), 6); - auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(msgId)); + std::string msgIdData; + msgId.serialize(msgIdData); + MessageId deserializedMsgId = MessageId::deserialize(msgIdData); + + ASSERT_EQ(deserializedMsgId.ledgerId(), 4); + ASSERT_EQ(deserializedMsgId.entryId(), 5); + ASSERT_EQ(deserializedMsgId.partition(), 6); + + auto chunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(deserializedMsgId)); ASSERT_TRUE(chunkMsgId); auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); - ASSERT_EQ(firstChunkMsgId.ledgerId(), 1); - ASSERT_EQ(firstChunkMsgId.entryId(), 2); - ASSERT_EQ(firstChunkMsgId.partition(), 3); + ASSERT_EQ(firstChunkMsgId->ledgerId_, 1); + ASSERT_EQ(firstChunkMsgId->entryId_, 2); + ASSERT_EQ(firstChunkMsgId->partition_, 3); } // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P From dbdb748b8b8a8809d74136e1454a747eb2722641 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 16 Dec 2022 12:02:01 +0800 Subject: [PATCH 5/7] Shared chunk message id across OpSendMsg --- lib/OpSendMsg.h | 7 ++++--- lib/ProducerImpl.cc | 13 ++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index 790b54ef..d365b906 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -47,8 +47,8 @@ struct OpSendMsg { OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload, const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs, - uint32_t messagesCount, uint64_t messagesSize) - : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with the + uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr) + : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with // a shared metadata object payload_(payload), sendCallback_(sendCallback), @@ -56,7 +56,8 @@ struct OpSendMsg { sequenceId_(sequenceId), timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)), messagesCount_(messagesCount), - messagesSize_(messagesSize) {} + messagesSize_(messagesSize), + chunkedMessageId_(chunkedMessageId) {} void complete(Result result, const MessageId& messageId) const { if (sendCallback_) { diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 599af185..f7f508bb 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba msgMetadata.set_total_chunk_msg_size(compressedSize); } + auto chunkMessageId = totalChunks > 1 ? std::make_shared() : nullptr; + int beginIndex = 0; for (int chunkId = 0; chunkId < totalChunks; chunkId++) { if (sendChunks) { @@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, conf_.getSendTimeout(), - 1, uncompressedSize}; + 1, uncompressedSize, chunkMessageId}; if (!chunkingEnabled_) { const uint32_t msgMetadataSize = op.metadata_.ByteSize(); @@ -873,14 +875,11 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - auto totalChunks = op.metadata_.num_chunks_from_msg(); - if (totalChunks > 1) { - if (!op.chunkedMessageId_) { - op.chunkedMessageId_ = std::make_shared(); - } + if (op.chunkedMessageId_) { + // Handling the chunk message id. if (op.metadata_.chunk_id() == 0) { op.chunkedMessageId_->setFirstChunkMessageId(messageId); - } else if (op.metadata_.chunk_id() == totalChunks - 1) { + } else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) { op.chunkedMessageId_->setLastChunkMessageId(messageId); messageId = ChunkMessageIdImpl::buildMessageId(op.chunkedMessageId_); } From 67205f7f758a698c67a5aac88891af286ca4a9c3 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 16 Dec 2022 15:24:32 +0800 Subject: [PATCH 6/7] Support seek for chunk messages --- lib/Commands.cc | 14 +++++++-- lib/ConsumerImpl.h | 3 +- lib/MessageId.cc | 21 +++++++------ tests/MessageChunkingTest.cc | 57 ++++++++++++++++++++++++++++++++++-- tests/PulsarFriend.h | 4 +-- 5 files changed, 79 insertions(+), 20 deletions(-) diff --git a/lib/Commands.cc b/lib/Commands.cc index 3cd97e28..43f4316f 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -28,6 +28,7 @@ #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" +#include "ChunkMessageIdImpl.h" #include "LogUtils.h" #include "MessageImpl.h" #include "PulsarApi.pb.h" @@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me commandSeek->set_request_id(requestId); MessageIdData& messageIdData = *commandSeek->mutable_message_id(); - messageIdData.set_ledgerid(messageId.ledgerId()); - messageIdData.set_entryid(messageId.entryId()); + + auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); + if (chunkMsgId) { + auto firstId = chunkMsgId->getFirstChunkMessageId(); + messageIdData.set_ledgerid(firstId->ledgerId_); + messageIdData.set_entryid(firstId->entryId_); + } else { + messageIdData.set_ledgerid(messageId.ledgerId()); + messageIdData.set_entryid(messageId.entryId()); + } + return writeMessageWithSize(cmd); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 2da1525a..29d5d0bf 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -308,8 +308,7 @@ class ConsumerImpl : public ConsumerImplBase { boost::optional processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx, - MessageId& messageId); + const ClientConnectionPtr& cnx, MessageId& messageId); friend class PulsarFriend; diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 4aa8ed69..3e7c6079 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -70,15 +70,14 @@ void MessageId::serialize(std::string& result) const { } auto chunkMsgId = std::dynamic_pointer_cast(impl_); - if(chunkMsgId) { - auto* firstChunkIdData = new proto::MessageIdData(); + if (chunkMsgId) { + proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id(); auto firstChunkId = chunkMsgId->getFirstChunkMessageId(); - firstChunkIdData->set_ledgerid(firstChunkId->ledgerId_); - firstChunkIdData->set_entryid(firstChunkId->entryId_); - if(chunkMsgId->partition_!=-1){ - firstChunkIdData->set_partition(firstChunkId->partition_); + firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_); + firstChunkIdData.set_entryid(firstChunkId->entryId_); + if (chunkMsgId->partition_ != -1) { + firstChunkIdData.set_partition(firstChunkId->partition_); } - idData.set_allocated_first_chunk_message_id(firstChunkIdData); } idData.SerializeToString(&result); @@ -95,7 +94,7 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { MessageId msgId = MessageIdBuilder::from(idData).build(); - if(idData.has_first_chunk_message_id()) { + if (idData.has_first_chunk_message_id()) { ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build()); chunkMsgId->setLastChunkMessageId(msgId); @@ -117,10 +116,10 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; } PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); - if(chunkMsgId) { + if (chunkMsgId) { auto firstId = chunkMsgId->getFirstChunkMessageId(); - s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' - << firstId->partition_ << ',' << firstId->batchIndex_ << ");"; + s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ',' + << firstId->batchIndex_ << ");"; } s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')'; diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 89f49961..e9e1517f 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -23,6 +23,7 @@ #include #include +#include "ChunkMessageIdImpl.h" #include "PulsarFriend.h" #include "WaitUtils.h" #include "lib/LogUtils.h" @@ -117,7 +118,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) { for (int i = 0; i < numMessages; i++) { MessageId messageId; ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); - auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); + auto chunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); ASSERT_TRUE(chunkMsgId); LOG_INFO("Send " << i << " to " << messageId); sendMessageIds.emplace_back(messageId); @@ -132,7 +134,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) { ASSERT_EQ(msg.getMessageId().batchIndex(), -1); ASSERT_EQ(msg.getMessageId().batchSize(), 0); auto messageId = msg.getMessageId(); - auto chunkMsgId = std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); + auto chunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); ASSERT_TRUE(chunkMsgId); receivedMessageIds.emplace_back(messageId); } @@ -261,6 +264,56 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { consumer.close(); } +TEST_P(MessageChunkingTest, testSeekChunkMessages) { + const std::string topic = + "MessageChunkingTest-testSeekChunkMessages-" + toString(GetParam()) + std::to_string(time(nullptr)); + + constexpr int numMessages = 10; + + Consumer consumer1; + ConsumerConfiguration consumer1Conf; + consumer1Conf.setStartMessageIdInclusive(true); + createConsumer(topic, consumer1, consumer1Conf); + + Producer producer; + createProducer(topic, producer); + + for (int i = 0; i < numMessages; i++) { + MessageId messageId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); + LOG_INFO("Send " << i << " to " << messageId); + } + + Message msg; + std::vector receivedMessageIds; + for (int i = 0; i < numMessages; i++) { + ASSERT_EQ(ResultOk, consumer1.receive(msg, 3000)); + LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId()); + receivedMessageIds.emplace_back(msg.getMessageId()); + } + + consumer1.seek(receivedMessageIds[1]); + for (int i = 1; i < numMessages; i++) { + Message msgAfterSeek; + ASSERT_EQ(ResultOk, consumer1.receive(msgAfterSeek, 3000)); + ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]); + } + + consumer1.close(); + Consumer consumer2; + createConsumer(topic, consumer2); + + consumer2.seek(receivedMessageIds[1]); + for (int i = 2; i < numMessages; i++) { + Message msgAfterSeek; + ASSERT_EQ(ResultOk, consumer2.receive(msgAfterSeek, 3000)); + ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]); + } + + consumer2.close(); + producer.close(); +} + TEST(ChunkMessageIdTest, testSetChunkMessageId) { MessageId msgId; { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index ee7370b9..fb6bb591 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -180,9 +180,7 @@ class PulsarFriend { static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } - static std::shared_ptr getMessageIdImpl(MessageId& msgId) { - return msgId.impl_; - } + static std::shared_ptr getMessageIdImpl(MessageId& msgId) { return msgId.impl_; } }; } // namespace pulsar From 244b4173451466bf6e978292eb5d1c2140fa9eb1 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 20 Dec 2022 17:30:43 +0800 Subject: [PATCH 7/7] Address comments --- lib/ChunkMessageIdImpl.h | 8 +++++--- lib/ConsumerImpl.cc | 2 +- lib/MessageId.cc | 2 +- lib/ProducerImpl.cc | 2 +- tests/MessageChunkingTest.cc | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h index f6e8cdde..3081ff03 100644 --- a/lib/ChunkMessageIdImpl.h +++ b/lib/ChunkMessageIdImpl.h @@ -19,12 +19,14 @@ #pragma once +#include + #include "MessageIdImpl.h" namespace pulsar { class ChunkMessageIdImpl; typedef std::shared_ptr ChunkMessageIdImplPtr; -class ChunkMessageIdImpl : public MessageIdImpl { +class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this { public: ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared()) {} @@ -36,9 +38,9 @@ class ChunkMessageIdImpl : public MessageIdImpl { this->partition_ = msgId.partition(); } - std::shared_ptr getFirstChunkMessageId() { return firstChunkMsgId_; } + std::shared_ptr getFirstChunkMessageId() const { return firstChunkMsgId_; } - static MessageId buildMessageId(const ChunkMessageIdImplPtr& msgIdImpl) { return MessageId{msgIdImpl}; } + MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } private: std::shared_ptr firstChunkMsgId_; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index ba763e8f..8589d254 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -436,7 +436,7 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); - messageId = ChunkMessageIdImpl::buildMessageId(chunkMsgId); + messageId = chunkMsgId->build(); LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 3e7c6079..9b5205bb 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -98,7 +98,7 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build()); chunkMsgId->setLastChunkMessageId(msgId); - return ChunkMessageIdImpl::buildMessageId(chunkMsgId); + return chunkMsgId->build(); } return msgId; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index f7f508bb..cdcf14ef 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -881,7 +881,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { op.chunkedMessageId_->setFirstChunkMessageId(messageId); } else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) { op.chunkedMessageId_->setLastChunkMessageId(messageId); - messageId = ChunkMessageIdImpl::buildMessageId(op.chunkedMessageId_); + messageId = op.chunkedMessageId_->build(); } } diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index e9e1517f..9a65fbc6 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -320,7 +320,7 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) { ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build()); chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()); - msgId = ChunkMessageIdImpl::buildMessageId(chunkMsgId); + msgId = chunkMsgId->build(); // Test the destructor of the underlying message id should also work for the generated messageId. }