Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ class PULSAR_PUBLIC Message {
MessageImplPtr impl_;

Message(MessageImplPtr& impl);
Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload,
int32_t partition);
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload);
Comment thread
RobertIndie marked this conversation as resolved.
/// Used for Batch Messages
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata, const std::string& topicName);
Expand Down
1 change: 1 addition & 0 deletions include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class PULSAR_PUBLIC MessageId {
friend class PulsarFriend;
friend class NegativeAcksTracker;
friend class MessageIdBuilder;
friend class ChunkMessageIdImpl;

friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId);

Expand Down
48 changes: 48 additions & 0 deletions lib/ChunkMessageIdImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <pulsar/MessageId.h>

#include "MessageIdImpl.h"
Comment thread
RobertIndie marked this conversation as resolved.

namespace pulsar {
class ChunkMessageIdImpl;
typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
public:
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}

void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }

void setLastChunkMessageId(const MessageId& msgId) {
this->ledgerId_ = msgId.ledgerId();
this->entryId_ = msgId.entryId();
this->partition_ = msgId.partition();
}

std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }

MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }

private:
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
};
} // namespace pulsar
14 changes: 12 additions & 2 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "BatchMessageAcker.h"
#include "BatchedMessageIdImpl.h"
#include "ChunkMessageIdImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
Expand Down Expand Up @@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me
commandSeek->set_request_id(requestId);

MessageIdData& messageIdData = *commandSeek->mutable_message_id();
messageIdData.set_ledgerid(messageId.ledgerId());
messageIdData.set_entryid(messageId.entryId());

auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
messageIdData.set_ledgerid(firstId->ledgerId_);
messageIdData.set_entryid(firstId->entryId_);
} else {
messageIdData.set_ledgerid(messageId.ledgerId());
messageIdData.set_entryid(messageId.entryId());
}

return writeMessageWithSize(cmd);
}

Expand Down
19 changes: 13 additions & 6 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "AckGroupingTrackerEnabled.h"
#include "BatchMessageAcker.h"
#include "BatchedMessageIdImpl.h"
#include "ChunkMessageIdImpl.h"
#include "ClientConnection.h"
#include "ClientImpl.h"
#include "Commands.h"
Expand Down Expand Up @@ -375,9 +376,9 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {

boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx) {
const ClientConnectionPtr& cnx,
MessageId& messageId) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
Expand Down Expand Up @@ -432,6 +433,11 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
return boost::none;
}

ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
messageId = chunkMsgId->build();

LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " << metadata.sequence_id());

Expand Down Expand Up @@ -472,19 +478,20 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}
}

const auto& messageIdData = msg.message_id();
auto messageId = MessageIdBuilder::from(messageIdData).batchIndex(-1).build();

// Only a non-batched messages can be a chunk
if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
const auto& messageIdData = msg.message_id();
auto messageId = MessageIdBuilder::from(messageIdData).build();
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
auto optionalPayload = processMessageChunk(payload, metadata, messageIdData, cnx, messageId);
if (optionalPayload) {
payload = optionalPayload.value();
} else {
return;
}
}

Message m(msg, metadata, payload, partitionIndex_);
Message m(messageId, metadata, payload);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
m.impl_->setRedeliveryCount(msg.redelivery_count());
Expand Down
5 changes: 2 additions & 3 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,18 +298,17 @@ class ConsumerImpl : public ConsumerImplBase {
*
* @param payload the payload of a chunk
* @param metadata the message metadata
* @param messageId
* @param messageIdData
* @param cnx
* @param messageId
*
* @return the concatenated payload if chunks are concatenated into a completed message payload
* successfully, else Optional::empty()
*/
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx);
const ClientConnectionPtr& cnx, MessageId& messageId);

friend class PulsarFriend;

Expand Down
5 changes: 2 additions & 3 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ Message::Message() : impl_() {}

Message::Message(MessageImplPtr& impl) : impl_(impl) {}

Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload,
int32_t partition)
Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload)
: impl_(std::make_shared<MessageImpl>()) {
impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build();
impl_->messageId = messageId;
impl_->metadata = metadata;
impl_->payload = payload;
}
Expand Down
29 changes: 28 additions & 1 deletion lib/MessageId.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <memory>
#include <stdexcept>

#include "ChunkMessageIdImpl.h"
#include "MessageIdImpl.h"
#include "PulsarApi.pb.h"

Expand Down Expand Up @@ -68,6 +69,17 @@ void MessageId::serialize(std::string& result) const {
idData.set_batch_index(impl_->batchIndex_);
}

auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
firstChunkIdData.set_entryid(firstChunkId->entryId_);
if (chunkMsgId->partition_ != -1) {
firstChunkIdData.set_partition(firstChunkId->partition_);
}
}

idData.SerializeToString(&result);
}

Expand All @@ -80,7 +92,16 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
throw std::invalid_argument("Failed to parse serialized message id");
}

return MessageIdBuilder::from(idData).build();
MessageId msgId = MessageIdBuilder::from(idData).build();

if (idData.has_first_chunk_message_id()) {
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
chunkMsgId->setLastChunkMessageId(msgId);
return chunkMsgId->build();
}

return msgId;
}

int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
Expand All @@ -94,6 +115,12 @@ int32_t MessageId::partition() const { return impl_->partition_; }
int32_t MessageId::batchSize() const { return impl_->batchSize_; }

PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
<< firstId->batchIndex_ << ");";
}
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
return s;
Expand Down
9 changes: 6 additions & 3 deletions lib/OpSendMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <boost/date_time/posix_time/ptime.hpp>

#include "ChunkMessageIdImpl.h"
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
#include "TimeUtils.h"
Expand All @@ -40,21 +41,23 @@ struct OpSendMsg {
uint32_t messagesCount_;
uint64_t messagesSize_;
std::vector<std::function<void(Result)>> trackerCallbacks_;
ChunkMessageIdImplPtr chunkedMessageId_;

OpSendMsg() = default;

OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload,
const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs,
uint32_t messagesCount, uint64_t messagesSize)
: metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with the
uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr)
: metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with
// a shared metadata object
payload_(payload),
sendCallback_(sendCallback),
producerId_(producerId),
sequenceId_(sequenceId),
timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
messagesCount_(messagesCount),
messagesSize_(messagesSize) {}
messagesSize_(messagesSize),
chunkedMessageId_(chunkedMessageId) {}

void complete(Result result, const MessageId& messageId) const {
if (sendCallback_) {
Expand Down
39 changes: 26 additions & 13 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
msgMetadata.set_total_chunk_msg_size(compressedSize);
}

auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;

int beginIndex = 0;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
if (sendChunks) {
Expand All @@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
}
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
producerId_, sequenceId, conf_.getSendTimeout(),
1, uncompressedSize};
1, uncompressedSize, chunkMessageId};

if (!chunkingEnabled_) {
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
Expand Down Expand Up @@ -868,22 +870,33 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
<< " producer: " << producerId_);
return true;
} else {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
releaseSemaphoreForSendOp(op);
lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
}

pendingMessagesQueue_.pop_front();
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);

lock.unlock();
try {
op.complete(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
if (op.chunkedMessageId_) {
// Handling the chunk message id.
if (op.metadata_.chunk_id() == 0) {
op.chunkedMessageId_->setFirstChunkMessageId(messageId);
} else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) {
op.chunkedMessageId_->setLastChunkMessageId(messageId);
messageId = op.chunkedMessageId_->build();
}
return true;
}

releaseSemaphoreForSendOp(op);
lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;

pendingMessagesQueue_.pop_front();

lock.unlock();
try {
op.complete(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
return true;
}

bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload,
Expand Down
Loading