@@ -25,19 +25,60 @@ using namespace pulsar;
2525TEST (BrokerMetadataTest, testConsumeSuccess) {
2626 Client client{" pulsar://localhost:6650" };
2727 Producer producer;
28- Result producerResult = client.createProducer (" persistent://public/default/testConsumeSuccess" , producer);
28+ ProducerConfiguration producerConfiguration;
29+ producerConfiguration.setBatchingEnabled (false );
30+ Result producerResult = client.createProducer (" persistent://public/default/topic-non-batch" ,
31+ producerConfiguration, producer);
2932 ASSERT_EQ (producerResult, ResultOk);
3033 Consumer consumer;
3134 Result consumerResult =
32- client.subscribe (" persistent://public/default/testConsumeSuccess " , " testConsumeSuccess " , consumer);
35+ client.subscribe (" persistent://public/default/topic-non-batch " , " sub " , consumer);
3336 ASSERT_EQ (consumerResult, ResultOk);
34- const auto msg = MessageBuilder ().setContent (" testConsumeSuccess" ).build ();
35- Result sendResult = producer.send (msg);
36- ASSERT_EQ (sendResult, ResultOk);
37+ for (int i = 0 ; i < 10 ; i++) {
38+ std::string content = " testConsumeSuccess" + std::to_string (i);
39+ const auto msg = MessageBuilder ().setContent (content).build ();
40+ Result sendResult = producer.send (msg);
41+ ASSERT_EQ (sendResult, ResultOk);
42+ }
43+
44+ Message receivedMsg;
45+ for (int i = 0 ; i < 10 ; i++) {
46+ Result receiveResult =
47+ consumer.receive (receivedMsg, 1000 ); // Assumed that we wait 1000 ms for each message
48+ printf (" receive index: %d\n " , i);
49+ ASSERT_EQ (receiveResult, ResultOk);
50+ ASSERT_EQ (receivedMsg.getDataAsString (), " testConsumeSuccess" + std::to_string (i));
51+ ASSERT_EQ (receivedMsg.getIndex (), i);
52+ Result ackResult = consumer.acknowledge (receivedMsg);
53+ ASSERT_EQ (ackResult, ResultOk);
54+ }
55+ client.close ();
56+ }
57+
58+ TEST (BrokerMetadataTest, testConsumeBatchSuccess) {
59+ Client client{" pulsar://localhost:6650" };
60+ Producer producer;
61+ Result producerResult = client.createProducer (" persistent://public/default/topic-batch" , producer);
62+ ASSERT_EQ (producerResult, ResultOk);
63+ Consumer consumer;
64+ Result consumerResult =
65+ client.subscribe (" persistent://public/default/topic-batch" , " sub" , consumer);
66+ ASSERT_EQ (consumerResult, ResultOk);
67+ for (int i = 0 ; i < 10 ; i++) {
68+ std::string content = " testConsumeSuccess" + std::to_string (i);
69+ const auto msg = MessageBuilder ().setContent (content).build ();
70+ Result sendResult = producer.send (msg);
71+ ASSERT_EQ (sendResult, ResultOk);
72+ }
73+
3774 Message receivedMsg;
38- Result receiveResult = consumer.receive (receivedMsg);
39- ASSERT_EQ (receiveResult, ResultOk);
40- ASSERT_EQ (receivedMsg.getDataAsString (), " testConsumeSuccess" );
75+ for (int i = 0 ; i < 10 ; i++) {
76+ Result receiveResult =
77+ consumer.receive (receivedMsg, 1000 ); // Assumed that we wait 1000 ms for each message
78+ ASSERT_EQ (receiveResult, ResultOk);
79+ ASSERT_EQ (receivedMsg.getDataAsString (), " testConsumeSuccess" + std::to_string (i));
80+ ASSERT_EQ (receivedMsg.getIndex (), i);
81+ }
4182 client.close ();
4283}
4384
0 commit comments