Skip to content

Commit c700dbd

Browse files
author
ZhangJian He
committed
[consumer] Support parse broker metadata
1 parent 4209987 commit c700dbd

3 files changed

Lines changed: 29 additions & 2 deletions

File tree

lib/ClientConnection.cc

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() {
679679
if (incomingCmd.type() == BaseCommand::MESSAGE) {
680680
// Parse message metadata and extract payload
681681
proto::MessageMetadata msgMetadata;
682+
proto::BrokerEntryMetadata brokerEntryMetadata;
682683

683684
// read checksum
684685
uint32_t remainingBytes = frameSize - (cmdSize + 4);
685686
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
686687

688+
auto readerIndex = incomingBuffer_.readerIndex();
689+
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
690+
// broker entry metadata is present
691+
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
692+
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
693+
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
694+
<< ", message ledger id "
695+
<< incomingCmd.message().message_id().ledgerid() //
696+
<< ", entry id " << incomingCmd.message().message_id().entryid()
697+
<< "] Error parsing broker entry metadata");
698+
close();
699+
return;
700+
}
701+
702+
incomingBuffer_.consume(brokerEntryMetadataSize);
703+
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
704+
} else {
705+
incomingBuffer_.setReaderIndex(readerIndex);
706+
}
707+
687708
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
688709
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
689710
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
@@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() {
701722
uint32_t payloadSize = remainingBytes;
702723
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
703724
incomingBuffer_.consume(payloadSize);
704-
handleIncomingMessage(incomingCmd.message(), isChecksumValid, msgMetadata, payload);
725+
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
726+
payload);
705727
} else {
706728
handleIncomingCommand(incomingCmd);
707729
}
@@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() {
710732
// We still have 1 to 3 bytes from the next frame
711733
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
712734

713-
// Restart with a new buffer and copy the the few bytes at the beginning
735+
// Restart with a new buffer and copy the few bytes at the beginning
714736
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);
715737

716738
// At least we need to read 4 bytes to have the complete frame size
@@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveCons
782804
}
783805

784806
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
807+
proto::BrokerEntryMetadata brokerEntryMetadata,
785808
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
786809
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());
787810

lib/Commands.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ class Commands {
7979
};
8080

8181
const static uint16_t magicCrc32c = 0x0e01;
82+
83+
const static uint16_t magicBrokerEntryMetadata = 0x0e02;
84+
8285
const static int checksumSize = 4;
8386

8487
static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,

lib/ProducerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class TopicName;
5757
struct OpSendMsg;
5858

5959
namespace proto {
60+
class BrokerEntryMetadata;
6061
class MessageMetadata;
6162
} // namespace proto
6263

0 commit comments

Comments
 (0)