diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index b7b3fdd0..77f30d41 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -188,8 +188,7 @@ class PULSAR_PUBLIC Message { MessageImplPtr impl_; Message(MessageImplPtr& impl); - Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload, - int32_t partition); + Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload); /// Used for Batch Messages Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata, const std::string& topicName); diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index 28b88c85..3871e87b 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -108,6 +108,7 @@ class PULSAR_PUBLIC MessageId { friend class PulsarFriend; friend class NegativeAcksTracker; friend class MessageIdBuilder; + friend class ChunkMessageIdImpl; friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId); diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h new file mode 100644 index 00000000..3081ff03 --- /dev/null +++ b/lib/ChunkMessageIdImpl.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "MessageIdImpl.h" + +namespace pulsar { +class ChunkMessageIdImpl; +typedef std::shared_ptr ChunkMessageIdImplPtr; +class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this { + public: + ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared()) {} + + void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; } + + void setLastChunkMessageId(const MessageId& msgId) { + this->ledgerId_ = msgId.ledgerId(); + this->entryId_ = msgId.entryId(); + this->partition_ = msgId.partition(); + } + + std::shared_ptr getFirstChunkMessageId() const { return firstChunkMsgId_; } + + MessageId build() { return MessageId{std::dynamic_pointer_cast(shared_from_this())}; } + + private: + std::shared_ptr firstChunkMsgId_; +}; +} // namespace pulsar diff --git a/lib/Commands.cc b/lib/Commands.cc index 3cd97e28..43f4316f 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -28,6 +28,7 @@ #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" +#include "ChunkMessageIdImpl.h" #include "LogUtils.h" #include "MessageImpl.h" #include "PulsarApi.pb.h" @@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me commandSeek->set_request_id(requestId); MessageIdData& messageIdData = *commandSeek->mutable_message_id(); - messageIdData.set_ledgerid(messageId.ledgerId()); - messageIdData.set_entryid(messageId.entryId()); + + auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); + if (chunkMsgId) { + auto firstId = chunkMsgId->getFirstChunkMessageId(); + messageIdData.set_ledgerid(firstId->ledgerId_); + messageIdData.set_entryid(firstId->entryId_); + } else { + messageIdData.set_ledgerid(messageId.ledgerId()); + messageIdData.set_entryid(messageId.entryId()); + } + return writeMessageWithSize(cmd); } diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index e277d99b..8589d254 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -27,6 +27,7 @@ #include "AckGroupingTrackerEnabled.h" #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" +#include "ChunkMessageIdImpl.h" #include "ClientConnection.h" #include "ClientImpl.h" #include "Commands.h" @@ -375,9 +376,9 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { boost::optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, - const MessageId& messageId, const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx) { + const ClientConnectionPtr& cnx, + MessageId& messageId) { const auto chunkId = metadata.chunk_id(); const auto uuid = metadata.uuid(); LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid @@ -432,6 +433,11 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff return boost::none; } + ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); + chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front()); + chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back()); + messageId = chunkMsgId->build(); + LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx << ", sequenceId: " << metadata.sequence_id()); @@ -472,11 +478,12 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } } + const auto& messageIdData = msg.message_id(); + auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build(); + // Only a non-batched messages can be a chunk if (!metadata.has_num_messages_in_batch() && isChunkedMessage) { - const auto& messageIdData = msg.message_id(); - auto messageId = MessageIdBuilder::from(messageIdData).build(); - auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx); + auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId); if (optionalPayload) { payload = optionalPayload.value(); } else { @@ -484,7 +491,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } } - Message m(msg, metadata, payload, partitionIndex_); + Message m(messageId, metadata, payload); m.impl_->cnx_ = cnx.get(); m.impl_->setTopicName(topic_); m.impl_->setRedeliveryCount(msg.redelivery_count()); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index d2480b88..29d5d0bf 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -298,18 +298,17 @@ class ConsumerImpl : public ConsumerImplBase { * * @param payload the payload of a chunk * @param metadata the message metadata - * @param messageId * @param messageIdData * @param cnx + * @param messageId * * @return the concatenated payload if chunks are concatenated into a completed message payload * successfully, else Optional::empty() */ boost::optional processMessageChunk(const SharedBuffer& payload, const proto::MessageMetadata& metadata, - const MessageId& messageId, const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx); + const ClientConnectionPtr& cnx, MessageId& messageId); friend class PulsarFriend; diff --git a/lib/Message.cc b/lib/Message.cc index 0f28f7d9..545c8932 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -69,10 +69,9 @@ Message::Message() : impl_() {} Message::Message(MessageImplPtr& impl) : impl_(impl) {} -Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload, - int32_t partition) +Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload) : impl_(std::make_shared()) { - impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build(); + impl_->messageId = messageId; impl_->metadata = metadata; impl_->payload = payload; } diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 9a1a38c8..9b5205bb 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -25,6 +25,7 @@ #include #include +#include "ChunkMessageIdImpl.h" #include "MessageIdImpl.h" #include "PulsarApi.pb.h" @@ -68,6 +69,17 @@ void MessageId::serialize(std::string& result) const { idData.set_batch_index(impl_->batchIndex_); } + auto chunkMsgId = std::dynamic_pointer_cast(impl_); + if (chunkMsgId) { + proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id(); + auto firstChunkId = chunkMsgId->getFirstChunkMessageId(); + firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_); + firstChunkIdData.set_entryid(firstChunkId->entryId_); + if (chunkMsgId->partition_ != -1) { + firstChunkIdData.set_partition(firstChunkId->partition_); + } + } + idData.SerializeToString(&result); } @@ -80,7 +92,16 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { throw std::invalid_argument("Failed to parse serialized message id"); } - return MessageIdBuilder::from(idData).build(); + MessageId msgId = MessageIdBuilder::from(idData).build(); + + if (idData.has_first_chunk_message_id()) { + ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); + chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build()); + chunkMsgId->setLastChunkMessageId(msgId); + return chunkMsgId->build(); + } + + return msgId; } int64_t MessageId::ledgerId() const { return impl_->ledgerId_; } @@ -94,6 +115,12 @@ int32_t MessageId::partition() const { return impl_->partition_; } int32_t MessageId::batchSize() const { return impl_->batchSize_; } PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { + auto chunkMsgId = std::dynamic_pointer_cast(messageId.impl_); + if (chunkMsgId) { + auto firstId = chunkMsgId->getFirstChunkMessageId(); + s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ',' + << firstId->batchIndex_ << ");"; + } s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')'; return s; diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index d805dd33..d365b906 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -24,6 +24,7 @@ #include +#include "ChunkMessageIdImpl.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" #include "TimeUtils.h" @@ -40,13 +41,14 @@ struct OpSendMsg { uint32_t messagesCount_; uint64_t messagesSize_; std::vector> trackerCallbacks_; + ChunkMessageIdImplPtr chunkedMessageId_; OpSendMsg() = default; OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload, const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs, - uint32_t messagesCount, uint64_t messagesSize) - : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with the + uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr) + : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with // a shared metadata object payload_(payload), sendCallback_(sendCallback), @@ -54,7 +56,8 @@ struct OpSendMsg { sequenceId_(sequenceId), timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)), messagesCount_(messagesCount), - messagesSize_(messagesSize) {} + messagesSize_(messagesSize), + chunkedMessageId_(chunkedMessageId) {} void complete(Result result, const MessageId& messageId) const { if (sendCallback_) { diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index a3a5a95a..cdcf14ef 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba msgMetadata.set_total_chunk_msg_size(compressedSize); } + auto chunkMessageId = totalChunks > 1 ? std::make_shared() : nullptr; + int beginIndex = 0; for (int chunkId = 0; chunkId < totalChunks; chunkId++) { if (sendChunks) { @@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, conf_.getSendTimeout(), - 1, uncompressedSize}; + 1, uncompressedSize, chunkMessageId}; if (!chunkingEnabled_) { const uint32_t msgMetadataSize = op.metadata_.ByteSize(); @@ -868,22 +870,33 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { << " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId << " producer: " << producerId_); return true; - } else { - // Message was persisted correctly - LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - releaseSemaphoreForSendOp(op); - lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1; + } - pendingMessagesQueue_.pop_front(); + // Message was persisted correctly + LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - lock.unlock(); - try { - op.complete(ResultOk, messageId); - } catch (const std::exception& e) { - LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + if (op.chunkedMessageId_) { + // Handling the chunk message id. + if (op.metadata_.chunk_id() == 0) { + op.chunkedMessageId_->setFirstChunkMessageId(messageId); + } else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) { + op.chunkedMessageId_->setLastChunkMessageId(messageId); + messageId = op.chunkedMessageId_->build(); } - return true; } + + releaseSemaphoreForSendOp(op); + lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1; + + pendingMessagesQueue_.pop_front(); + + lock.unlock(); + try { + op.complete(ResultOk, messageId); + } catch (const std::exception& e) { + LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + } + return true; } bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index e3f01786..9a65fbc6 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -18,10 +18,12 @@ */ #include #include +#include #include #include +#include "ChunkMessageIdImpl.h" #include "PulsarFriend.h" #include "WaitUtils.h" #include "lib/LogUtils.h" @@ -116,6 +118,9 @@ TEST_P(MessageChunkingTest, testEndToEnd) { for (int i = 0; i < numMessages; i++) { MessageId messageId; ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); + auto chunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); + ASSERT_TRUE(chunkMsgId); LOG_INFO("Send " << i << " to " << messageId); sendMessageIds.emplace_back(messageId); } @@ -128,7 +133,11 @@ TEST_P(MessageChunkingTest, testEndToEnd) { ASSERT_EQ(msg.getDataAsString(), largeMessage); ASSERT_EQ(msg.getMessageId().batchIndex(), -1); ASSERT_EQ(msg.getMessageId().batchSize(), 0); - receivedMessageIds.emplace_back(msg.getMessageId()); + auto messageId = msg.getMessageId(); + auto chunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(messageId)); + ASSERT_TRUE(chunkMsgId); + receivedMessageIds.emplace_back(messageId); } ASSERT_EQ(receivedMessageIds, sendMessageIds); ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId()); @@ -255,6 +264,83 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { consumer.close(); } +TEST_P(MessageChunkingTest, testSeekChunkMessages) { + const std::string topic = + "MessageChunkingTest-testSeekChunkMessages-" + toString(GetParam()) + std::to_string(time(nullptr)); + + constexpr int numMessages = 10; + + Consumer consumer1; + ConsumerConfiguration consumer1Conf; + consumer1Conf.setStartMessageIdInclusive(true); + createConsumer(topic, consumer1, consumer1Conf); + + Producer producer; + createProducer(topic, producer); + + for (int i = 0; i < numMessages; i++) { + MessageId messageId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); + LOG_INFO("Send " << i << " to " << messageId); + } + + Message msg; + std::vector receivedMessageIds; + for (int i = 0; i < numMessages; i++) { + ASSERT_EQ(ResultOk, consumer1.receive(msg, 3000)); + LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId()); + receivedMessageIds.emplace_back(msg.getMessageId()); + } + + consumer1.seek(receivedMessageIds[1]); + for (int i = 1; i < numMessages; i++) { + Message msgAfterSeek; + ASSERT_EQ(ResultOk, consumer1.receive(msgAfterSeek, 3000)); + ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]); + } + + consumer1.close(); + Consumer consumer2; + createConsumer(topic, consumer2); + + consumer2.seek(receivedMessageIds[1]); + for (int i = 2; i < numMessages; i++) { + Message msgAfterSeek; + ASSERT_EQ(ResultOk, consumer2.receive(msgAfterSeek, 3000)); + ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]); + } + + consumer2.close(); + producer.close(); +} + +TEST(ChunkMessageIdTest, testSetChunkMessageId) { + MessageId msgId; + { + ChunkMessageIdImplPtr chunkMsgId = std::make_shared(); + chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build()); + chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()); + msgId = chunkMsgId->build(); + // Test the destructor of the underlying message id should also work for the generated messageId. + } + + std::string msgIdData; + msgId.serialize(msgIdData); + MessageId deserializedMsgId = MessageId::deserialize(msgIdData); + + ASSERT_EQ(deserializedMsgId.ledgerId(), 4); + ASSERT_EQ(deserializedMsgId.entryId(), 5); + ASSERT_EQ(deserializedMsgId.partition(), 6); + + auto chunkMsgId = + std::dynamic_pointer_cast(PulsarFriend::getMessageIdImpl(deserializedMsgId)); + ASSERT_TRUE(chunkMsgId); + auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); + ASSERT_EQ(firstChunkMsgId->ledgerId_, 1); + ASSERT_EQ(firstChunkMsgId->entryId_, 2); + ASSERT_EQ(firstChunkMsgId->partition_, 3); +} + // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest, ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD, diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 878d80cb..fb6bb591 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -179,6 +179,8 @@ class PulsarFriend { } static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } + + static std::shared_ptr getMessageIdImpl(MessageId& msgId) { return msgId.impl_; } }; } // namespace pulsar