2121#include < pulsar/Reader.h>
2222#include < time.h>
2323
24+ #include < atomic>
25+ #include < functional>
2426#include < future>
27+ #include < set>
2528#include < string>
2629#include < thread>
30+ #include < vector>
2731
2832#include " HttpHelper.h"
2933#include " PulsarFriend.h"
@@ -37,7 +41,7 @@ DECLARE_LOG_OBJECT()
3741using namespace pulsar;
3842
3943static std::string serviceUrl = " pulsar://localhost:6650" ;
40- static const std::string adminUrl = " http://localhost:8080 /" ;
44+ static const std::string adminUrl = " http://localhost:8090 /" ;
4145
4246class ReaderTest : public ::testing::TestWithParam<bool > {
4347 public:
@@ -110,15 +114,25 @@ TEST_P(ReaderTest, testAsyncRead) {
110114 ASSERT_EQ (ResultOk, producer.send (msg));
111115 }
112116
117+ // readNextAsync callbacks may complete in any order (e.g. with partitioned topic); collect all 10 then verify set
118+ std::string received[10 ];
119+ std::atomic<int > receivedCount{0 };
113120 for (int i = 0 ; i < 10 ; i++) {
114- reader.readNextAsync ([i ](Result result, const Message& msg) {
121+ reader.readNextAsync ([& ](Result result, const Message& msg) {
115122 ASSERT_EQ (ResultOk, result);
116- std::string content = msg.getDataAsString ();
117- std::string expected = " my-message-" + std::to_string (i);
118- ASSERT_EQ (expected, content);
123+ int idx = receivedCount.fetch_add (1 );
124+ if (idx < 10 ) received[idx] = msg.getDataAsString ();
119125 });
120126 }
121127
128+ waitUntil (std::chrono::seconds (5 ), [&]() { return receivedCount.load () == 10 ; }, 1000 );
129+ ASSERT_EQ (10 , receivedCount.load ()) << " Expected 10 messages" ;
130+
131+ std::set<std::string> receivedSet (received, received + 10 );
132+ for (int i = 0 ; i < 10 ; i++) {
133+ ASSERT_TRUE (receivedSet.count (" my-message-" + std::to_string (i))) << " Missing my-message-" << i;
134+ }
135+
122136 waitUntil (
123137 std::chrono::seconds (5 ),
124138 [&]() {
0 commit comments