|
20 | 20 | #include <gtest/gtest.h> |
21 | 21 | #include <pulsar/Client.h> |
22 | 22 |
|
| 23 | +#include "lib/Latch.h" |
| 24 | + |
23 | 25 | using namespace pulsar; |
24 | 26 |
|
25 | 27 | TEST(BrokerMetadataTest, testConsumeSuccess) { |
26 | 28 | Client client{"pulsar://localhost:6650"}; |
27 | 29 | Producer producer; |
28 | | - Result producerResult = client.createProducer("persistent://public/default/testConsumeSuccess", producer); |
| 30 | + ProducerConfiguration producerConfiguration; |
| 31 | + producerConfiguration.setBatchingEnabled(false); |
| 32 | + Result producerResult = |
| 33 | + client.createProducer("persistent://public/default/topic-non-batch", producerConfiguration, producer); |
29 | 34 | ASSERT_EQ(producerResult, ResultOk); |
30 | 35 | Consumer consumer; |
31 | | - Result consumerResult = |
32 | | - client.subscribe("persistent://public/default/testConsumeSuccess", "testConsumeSuccess", consumer); |
| 36 | + Result consumerResult = client.subscribe("persistent://public/default/topic-non-batch", "sub", consumer); |
33 | 37 | ASSERT_EQ(consumerResult, ResultOk); |
34 | | - const auto msg = MessageBuilder().setContent("testConsumeSuccess").build(); |
35 | | - Result sendResult = producer.send(msg); |
36 | | - ASSERT_EQ(sendResult, ResultOk); |
| 38 | + for (int i = 0; i < 10; i++) { |
| 39 | + std::string content = "testConsumeSuccess" + std::to_string(i); |
| 40 | + const auto msg = MessageBuilder().setContent(content).build(); |
| 41 | + Result sendResult = producer.send(msg); |
| 42 | + ASSERT_EQ(sendResult, ResultOk); |
| 43 | + } |
| 44 | + |
| 45 | + Message receivedMsg; |
| 46 | + for (int i = 0; i < 10; i++) { |
| 47 | + Result receiveResult = |
| 48 | + consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message |
| 49 | + printf("receive index: %d\n", i); |
| 50 | + ASSERT_EQ(receiveResult, ResultOk); |
| 51 | + ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + std::to_string(i)); |
| 52 | + ASSERT_EQ(receivedMsg.getIndex(), i); |
| 53 | + Result ackResult = consumer.acknowledge(receivedMsg); |
| 54 | + ASSERT_EQ(ackResult, ResultOk); |
| 55 | + } |
| 56 | + client.close(); |
| 57 | +} |
| 58 | + |
| 59 | +TEST(BrokerMetadataTest, testConsumeBatchSuccess) { |
| 60 | + Client client{"pulsar://localhost:6650"}; |
| 61 | + Producer producer; |
| 62 | + Result producerResult = client.createProducer("persistent://public/default/topic-batch", producer); |
| 63 | + ASSERT_EQ(producerResult, ResultOk); |
| 64 | + Consumer consumer; |
| 65 | + Result consumerResult = client.subscribe("persistent://public/default/topic-batch", "sub", consumer); |
| 66 | + ASSERT_EQ(consumerResult, ResultOk); |
| 67 | + |
| 68 | + Latch latch(10); |
| 69 | + auto sendCallback = [&latch](Result result, const MessageId& id) { |
| 70 | + ASSERT_EQ(result, ResultOk); |
| 71 | + latch.countdown(); |
| 72 | + }; |
| 73 | + |
| 74 | + for (int i = 0; i < 10; i++) { |
| 75 | + std::string content = "testConsumeSuccess" + std::to_string(i); |
| 76 | + const auto msg = MessageBuilder().setContent(content).build(); |
| 77 | + producer.sendAsync(msg, sendCallback); |
| 78 | + } |
| 79 | + |
| 80 | + latch.wait(); |
| 81 | + |
37 | 82 | Message receivedMsg; |
38 | | - Result receiveResult = consumer.receive(receivedMsg); |
39 | | - ASSERT_EQ(receiveResult, ResultOk); |
40 | | - ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess"); |
| 83 | + for (int i = 0; i < 10; i++) { |
| 84 | + Result receiveResult = |
| 85 | + consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message |
| 86 | + ASSERT_EQ(receiveResult, ResultOk); |
| 87 | + ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + std::to_string(i)); |
| 88 | + ASSERT_GE(receivedMsg.getIndex(), 0); |
| 89 | + ASSERT_LT(receivedMsg.getIndex(), 10); |
| 90 | + } |
41 | 91 | client.close(); |
42 | 92 | } |
43 | 93 |
|
|
0 commit comments