Skip to content

Commit 4969de7

Browse files
author
ZhangJian He
committed
[consumer] Support parse broker metadata
1 parent 4209987 commit 4969de7

7 files changed

Lines changed: 135 additions & 2 deletions

File tree

lib/ClientConnection.cc

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() {
679679
if (incomingCmd.type() == BaseCommand::MESSAGE) {
680680
// Parse message metadata and extract payload
681681
proto::MessageMetadata msgMetadata;
682+
proto::BrokerEntryMetadata brokerEntryMetadata;
682683

683684
// read checksum
684685
uint32_t remainingBytes = frameSize - (cmdSize + 4);
685686
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
686687

688+
auto readerIndex = incomingBuffer_.readerIndex();
689+
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
690+
// broker entry metadata is present
691+
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
692+
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
693+
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()
694+
<< ", message ledger id "
695+
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
696+
<< incomingCmd.message().message_id().entryid()
697+
<< "] Error parsing broker entry metadata");
698+
close();
699+
return;
700+
}
701+
702+
incomingBuffer_.consume(brokerEntryMetadataSize);
703+
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
704+
} else {
705+
incomingBuffer_.setReaderIndex(readerIndex);
706+
}
707+
687708
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
688709
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
689710
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
@@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() {
701722
uint32_t payloadSize = remainingBytes;
702723
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
703724
incomingBuffer_.consume(payloadSize);
704-
handleIncomingMessage(incomingCmd.message(), isChecksumValid, msgMetadata, payload);
725+
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
726+
payload);
705727
} else {
706728
handleIncomingCommand(incomingCmd);
707729
}
@@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() {
710732
// We still have 1 to 3 bytes from the next frame
711733
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
712734

713-
// Restart with a new buffer and copy the the few bytes at the beginning
735+
// Restart with a new buffer and copy the few bytes at the beginning
714736
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);
715737

716738
// At least we need to read 4 bytes to have the complete frame size
@@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveCons
782804
}
783805

784806
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
807+
proto::BrokerEntryMetadata& brokerEntryMetadata,
785808
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
786809
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());
787810

lib/ClientConnection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ struct OpSendMsg;
7474

7575
namespace proto {
7676
class BaseCommand;
77+
class BrokerEntryMetadata;
7778
class CommandActiveConsumerChange;
7879
class CommandAckResponse;
7980
class CommandMessage;
@@ -225,6 +226,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
225226
void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change);
226227
void handleIncomingCommand(proto::BaseCommand& incomingCmd);
227228
void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
229+
proto::BrokerEntryMetadata& brokerEntryMetadata,
228230
proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
229231

230232
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);

lib/Commands.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ class Commands {
7979
};
8080

8181
const static uint16_t magicCrc32c = 0x0e01;
82+
83+
const static uint16_t magicBrokerEntryMetadata = 0x0e02;
84+
8285
const static int checksumSize = 4;
8386

8487
static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,

run-unit-tests.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ sleep 15
3838
$CMAKE_BUILD_DIRECTORY/tests/Oauth2Test
3939
docker compose -f tests/oauth2/docker-compose.yml down
4040

41+
# Run BrokerMetadata tests
42+
docker compose -f tests/brokermetadata/docker-compose.yml up -d
43+
sleep 15
44+
$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
45+
docker compose -f tests/brokermetadata/docker-compose.yml down
46+
4147
./pulsar-test-service-start.sh
4248

4349
pushd $CMAKE_BUILD_DIRECTORY/tests

tests/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ if (UNIX)
6565
target_link_libraries(ConnectionFailTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
6666
endif ()
6767

68+
add_executable(BrokerMetadataTest brokermetadata/BrokerMetadataTest.cc)
69+
target_compile_options(BrokerMetadataTest PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
70+
target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
71+
6872
add_executable(Oauth2Test oauth2/Oauth2Test.cc)
6973
target_compile_options(Oauth2Test PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
7074
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
// Run `docker-compose up -d` to set up the test environment for this test.
20+
#include <gtest/gtest.h>
21+
#include <pulsar/Client.h>
22+
23+
using namespace pulsar;
24+
25+
#ifndef TEST_ROOT_PATH
26+
#define TEST_ROOT_PATH "."
27+
#endif
28+
29+
TEST(BrokerMetadataTest, testConsumeSuccess) {
30+
Client client{"pulsar://localhost:6650"};
31+
Producer producer;
32+
Result producerResult = client.createProducer("persistent://public/default/testConsumeSuccess", producer);
33+
ASSERT_EQ(producerResult, ResultOk);
34+
Consumer consumer;
35+
Result consumerResult =
36+
client.subscribe("persistent://public/default/testConsumeSuccess", "testConsumeSuccess", consumer);
37+
ASSERT_EQ(consumerResult, ResultOk);
38+
const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
39+
Result sendResult = producer.send(msg);
40+
ASSERT_EQ(sendResult, ResultOk);
41+
Message receivedMsg;
42+
Result receiveResult = consumer.receive(receivedMsg);
43+
ASSERT_EQ(receiveResult, ResultOk);
44+
ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
45+
client.close();
46+
}
47+
48+
int main(int argc, char* argv[]) {
49+
::testing::InitGoogleTest(&argc, argv);
50+
return RUN_ALL_TESTS();
51+
return 0;
52+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
version: '3'
21+
networks:
22+
pulsar:
23+
driver: bridge
24+
services:
25+
standalone:
26+
image: apachepulsar/pulsar:latest
27+
container_name: standalone
28+
hostname: local
29+
restart: "no"
30+
networks:
31+
- pulsar
32+
environment:
33+
- metadataStoreUrl=zk:localhost:2181
34+
- clusterName=standalone-broker-metadata
35+
- advertisedAddress=localhost
36+
- advertisedListeners=external:pulsar://localhost:6650
37+
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
38+
- PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
39+
- PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true
40+
ports:
41+
- "6650:6650"
42+
- "8080:8080"
43+
command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw"

0 commit comments

Comments
 (0)