Skip to content

Commit c1bbcb1

Browse files
author
ZhangJian He
committed
Add getIndex method on Message
1 parent 413461a commit c1bbcb1

11 files changed

Lines changed: 126 additions & 29 deletions

File tree

include/pulsar/Message.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
namespace pulsar {
3232
namespace proto {
3333
class CommandMessage;
34+
class BrokerEntryMetadata;
3435
class MessageMetadata;
3536
class SingleMessageMetadata;
3637
} // namespace proto
@@ -124,6 +125,12 @@ class PULSAR_PUBLIC Message {
124125
*/
125126
void setMessageId(const MessageId& messageId) const;
126127

128+
/**
129+
* Get the index of this message, if it doesn't exist, return -1
130+
* @return
131+
*/
132+
int64_t getIndex() const;
133+
127134
/**
128135
* Get the partition key for this message
129136
* @return key string that is hashed to determine message's topic partition
@@ -195,9 +202,11 @@ class PULSAR_PUBLIC Message {
195202
MessageImplPtr impl_;
196203

197204
Message(MessageImplPtr& impl);
198-
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload);
205+
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
206+
proto::MessageMetadata& metadata, SharedBuffer& payload);
199207
/// Used for Batch Messages
200-
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload,
208+
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
209+
proto::MessageMetadata& metadata, SharedBuffer& payload,
201210
proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName);
202211
friend class PartitionedProducerImpl;
203212
friend class MultiTopicsConsumerImpl;

lib/ClientConnection.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,6 @@ void ClientConnection::processIncomingBuffer() {
683683

684684
// read checksum
685685
uint32_t remainingBytes = frameSize - (cmdSize + 4);
686-
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
687686

688687
auto readerIndex = incomingBuffer_.readerIndex();
689688
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
@@ -698,13 +697,14 @@ void ClientConnection::processIncomingBuffer() {
698697
close();
699698
return;
700699
}
701-
702-
incomingBuffer_.consume(brokerEntryMetadataSize);
700+
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
703701
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
704702
} else {
705703
incomingBuffer_.setReaderIndex(readerIndex);
706704
}
707705

706+
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
707+
708708
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
709709
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
710710
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
@@ -817,7 +817,8 @@ void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, b
817817
// Unlock the mutex before notifying the consumer of the
818818
// new received message
819819
lock.unlock();
820-
consumer->messageReceived(shared_from_this(), msg, isChecksumValid, msgMetadata, payload);
820+
consumer->messageReceived(shared_from_this(), msg, isChecksumValid, brokerEntryMetadata,
821+
msgMetadata, payload);
821822
} else {
822823
consumers_.erase(msg.consumer_id());
823824
LOG_DEBUG(cnxString_ << "Ignoring incoming message for already destroyed consumer "

lib/Commands.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
277277

278278
FeatureFlags* flags = connect->mutable_feature_flags();
279279
flags->set_supports_auth_refresh(true);
280+
flags->set_supports_broker_entry_metadata(true);
280281
if (connectingThroughProxy) {
281282
Url logicalAddressUrl;
282283
Url::parse(logicalAddress, logicalAddressUrl);
@@ -908,7 +909,8 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
908909
const MessageId& m = batchedMessage.impl_->messageId;
909910
auto messageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
910911
auto batchedMessageId = std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
911-
Message singleMessage(MessageId{batchedMessageId}, batchedMessage.impl_->metadata, payload, metadata,
912+
Message singleMessage(MessageId{batchedMessageId}, batchedMessage.impl_->brokerEntryMetadata,
913+
batchedMessage.impl_->metadata, payload, metadata,
912914
batchedMessage.impl_->topicName_);
913915
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
914916

lib/ConsumerImpl.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,8 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
495495
}
496496

497497
void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
498-
bool& isChecksumValid, proto::MessageMetadata& metadata,
499-
SharedBuffer& payload) {
498+
bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
499+
proto::MessageMetadata& metadata, SharedBuffer& payload) {
500500
LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
501501

502502
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
@@ -536,7 +536,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
536536
}
537537
}
538538

539-
Message m(messageId, metadata, payload);
539+
Message m(messageId, brokerEntryMetadata, metadata, payload);
540540
m.impl_->cnx_ = cnx.get();
541541
m.impl_->setTopicName(topic_);
542542
m.impl_->setRedeliveryCount(msg.redelivery_count());
@@ -565,7 +565,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
565565
Lock lock(mutex_);
566566
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count());
567567
} else {
568-
// try convery key value data.
568+
// try convert key value data.
569569
m.impl_->convertPayloadToKeyValue(config_.getSchema());
570570

571571
const auto startMessageId = startMessageId_.get();
@@ -706,6 +706,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
706706
msg.impl_->setRedeliveryCount(redeliveryCount);
707707
msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
708708
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
709+
if (msg.impl_->brokerEntryMetadata.has_index()) {
710+
msg.impl_->brokerEntryMetadata.set_index(msg.impl_->brokerEntryMetadata.index() - batchSize + i + 1);
711+
}
709712

710713
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
711714
possibleToDeadLetter.emplace_back(msg);

lib/ConsumerImpl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>
6060

6161
namespace proto {
6262
class CommandMessage;
63+
class BrokerEntryMetadata;
6364
class MessageMetadata;
6465
} // namespace proto
6566

@@ -87,7 +88,8 @@ class ConsumerImpl : public ConsumerImplBase {
8788
void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages);
8889
uint64_t getConsumerId();
8990
void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
90-
bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
91+
bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
92+
proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
9193
void messageProcessed(Message& msg, bool track = true);
9294
void activeConsumerChanged(bool isActive);
9395
inline CommandSubscribe_SubType getSubType();

lib/Message.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,21 @@ Message::Message() : impl_() {}
7070

7171
Message::Message(MessageImplPtr& impl) : impl_(impl) {}
7272

73-
Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload)
73+
Message::Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
74+
proto::MessageMetadata& metadata, SharedBuffer& payload)
7475
: impl_(std::make_shared<MessageImpl>()) {
7576
impl_->messageId = messageId;
77+
impl_->brokerEntryMetadata = brokerEntryMetadata;
7678
impl_->metadata = metadata;
7779
impl_->payload = payload;
7880
}
7981

80-
Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload,
82+
Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerEntryMetadata,
83+
proto::MessageMetadata& metadata, SharedBuffer& payload,
8184
proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName)
8285
: impl_(std::make_shared<MessageImpl>()) {
8386
impl_->messageId = messageID;
87+
impl_->brokerEntryMetadata = brokerEntryMetadata;
8488
impl_->metadata = metadata;
8589
impl_->payload = payload;
8690
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
@@ -136,6 +140,15 @@ void Message::setMessageId(const MessageId& messageID) const {
136140
return;
137141
}
138142

143+
int64_t Message::getIndex() const {
144+
if (!impl_ || !impl_->brokerEntryMetadata.has_index()) {
145+
return -1;
146+
} else {
147+
// casting uint64_t to int64_t, server definition ensures that's safe
148+
return static_cast<int64_t>(impl_->brokerEntryMetadata.index());
149+
}
150+
}
151+
139152
bool Message::hasPartitionKey() const {
140153
if (impl_) {
141154
return impl_->hasPartitionKey();

lib/MessageImpl.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
namespace pulsar {
2222

23-
MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_(), redeliveryCount_() {}
24-
2523
const Message::StringMap& MessageImpl::properties() {
2624
if (properties_.size() == 0) {
2725
for (int i = 0; i < metadata.properties_size(); i++) {

lib/MessageImpl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ class BatchMessageContainer;
3535

3636
class MessageImpl {
3737
public:
38-
MessageImpl();
39-
4038
const Message::StringMap& properties();
4139

40+
proto::BrokerEntryMetadata brokerEntryMetadata;
4241
proto::MessageMetadata metadata;
4342
SharedBuffer payload;
4443
std::shared_ptr<KeyValueImpl> keyValuePtr;

tests/ConsumerTest.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,27 @@ class ActiveInactiveListenerEvent : public ConsumerEventListener {
112112
std::mutex mutex_;
113113
};
114114

115+
TEST(ConsumerTest, testConsumerIndex) {
116+
Client client(lookupUrl);
117+
const std::string topicName = "testConsumerIndex-topic-" + std::to_string(time(nullptr));
118+
const std::string subName = "sub";
119+
Producer producer;
120+
Result producerResult = client.createProducer(topicName, producer);
121+
ASSERT_EQ(producerResult, ResultOk);
122+
Consumer consumer;
123+
Result consumerResult = client.subscribe(topicName, subName, consumer);
124+
ASSERT_EQ(consumerResult, ResultOk);
125+
const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
126+
Result sendResult = producer.send(msg);
127+
ASSERT_EQ(sendResult, ResultOk);
128+
Message receivedMsg;
129+
Result receiveResult = consumer.receive(receivedMsg);
130+
ASSERT_EQ(receiveResult, ResultOk);
131+
ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
132+
ASSERT_EQ(receivedMsg.getIndex(), -1);
133+
client.close();
134+
}
135+
115136
typedef std::shared_ptr<ActiveInactiveListenerEvent> ActiveInactiveListenerEventPtr;
116137

117138
TEST(ConsumerTest, testConsumerEventWithoutPartition) {

tests/brokermetadata/BrokerMetadataTest.cc

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,73 @@
2020
#include <gtest/gtest.h>
2121
#include <pulsar/Client.h>
2222

23+
#include "lib/Latch.h"
24+
2325
using namespace pulsar;
2426

2527
TEST(BrokerMetadataTest, testConsumeSuccess) {
2628
Client client{"pulsar://localhost:6650"};
2729
Producer producer;
28-
Result producerResult = client.createProducer("persistent://public/default/testConsumeSuccess", producer);
30+
ProducerConfiguration producerConfiguration;
31+
producerConfiguration.setBatchingEnabled(false);
32+
Result producerResult =
33+
client.createProducer("persistent://public/default/topic-non-batch", producerConfiguration, producer);
2934
ASSERT_EQ(producerResult, ResultOk);
3035
Consumer consumer;
31-
Result consumerResult =
32-
client.subscribe("persistent://public/default/testConsumeSuccess", "testConsumeSuccess", consumer);
36+
Result consumerResult = client.subscribe("persistent://public/default/topic-non-batch", "sub", consumer);
3337
ASSERT_EQ(consumerResult, ResultOk);
34-
const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
35-
Result sendResult = producer.send(msg);
36-
ASSERT_EQ(sendResult, ResultOk);
38+
for (int i = 0; i < 10; i++) {
39+
std::string content = "testConsumeSuccess" + std::to_string(i);
40+
const auto msg = MessageBuilder().setContent(content).build();
41+
Result sendResult = producer.send(msg);
42+
ASSERT_EQ(sendResult, ResultOk);
43+
}
44+
45+
Message receivedMsg;
46+
for (int i = 0; i < 10; i++) {
47+
Result receiveResult =
48+
consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message
49+
printf("receive index: %d\n", i);
50+
ASSERT_EQ(receiveResult, ResultOk);
51+
ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + std::to_string(i));
52+
ASSERT_EQ(receivedMsg.getIndex(), i);
53+
Result ackResult = consumer.acknowledge(receivedMsg);
54+
ASSERT_EQ(ackResult, ResultOk);
55+
}
56+
client.close();
57+
}
58+
59+
TEST(BrokerMetadataTest, testConsumeBatchSuccess) {
60+
Client client{"pulsar://localhost:6650"};
61+
Producer producer;
62+
Result producerResult = client.createProducer("persistent://public/default/topic-batch", producer);
63+
ASSERT_EQ(producerResult, ResultOk);
64+
Consumer consumer;
65+
Result consumerResult = client.subscribe("persistent://public/default/topic-batch", "sub", consumer);
66+
ASSERT_EQ(consumerResult, ResultOk);
67+
68+
Latch latch(10);
69+
auto sendCallback = [&latch](Result result, const MessageId& id) {
70+
ASSERT_EQ(result, ResultOk);
71+
latch.countdown();
72+
};
73+
74+
for (int i = 0; i < 10; i++) {
75+
std::string content = "testConsumeSuccess" + std::to_string(i);
76+
const auto msg = MessageBuilder().setContent(content).build();
77+
producer.sendAsync(msg, sendCallback);
78+
}
79+
80+
latch.wait();
81+
3782
Message receivedMsg;
38-
Result receiveResult = consumer.receive(receivedMsg);
39-
ASSERT_EQ(receiveResult, ResultOk);
40-
ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
83+
for (int i = 0; i < 10; i++) {
84+
Result receiveResult =
85+
consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message
86+
ASSERT_EQ(receiveResult, ResultOk);
87+
ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + std::to_string(i));
88+
ASSERT_EQ(receivedMsg.getIndex(), i);
89+
}
4190
client.close();
4291
}
4392

0 commit comments

Comments
 (0)