Skip to content

Commit a42261b

Browse files
Fix flaky testConsumerEventWithoutPartition cause by the change of Pulsar 3.0 (#281)
### Motivation apache/pulsar#19502 brings a change to the consumer selection of the Failover subscription. Before that PR, if a new consumer joins a Failover subscription when there is a consumer that subscribes the topic, the new consumer will receive the "inactive" event. After that PR, a random consumer will receive the "inactive" event. ### Modifications Change the assertion when `consumer` subscribes the topic with the same subscription. In addition, use a thread safe queue to avoid race conditions.
1 parent 413461a commit a42261b

2 files changed

Lines changed: 69 additions & 37 deletions

File tree

tests/ConsumerTest.cc

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include "HttpHelper.h"
3333
#include "NoOpsCryptoKeyReader.h"
3434
#include "PulsarFriend.h"
35+
#include "SynchronizedQueue.h"
36+
#include "WaitUtils.h"
3537
#include "lib/ClientConnection.h"
3638
#include "lib/Future.h"
3739
#include "lib/LogUtils.h"
@@ -64,34 +66,13 @@ class ConsumerStateEventListener : public ConsumerEventListener {
6466
inActiveQueue_.push(partitionId);
6567
}
6668

67-
std::queue<int> activeQueue_;
68-
std::queue<int> inActiveQueue_;
69+
SynchronizedQueue<int> activeQueue_;
70+
SynchronizedQueue<int> inActiveQueue_;
6971
std::string name_;
7072
};
7173

7274
typedef std::shared_ptr<ConsumerStateEventListener> ConsumerStateEventListenerPtr;
7375

74-
void verifyConsumerNotReceiveAnyStateChanges(ConsumerStateEventListenerPtr listener) {
75-
ASSERT_EQ(0, listener->activeQueue_.size());
76-
ASSERT_EQ(0, listener->inActiveQueue_.size());
77-
}
78-
79-
void verifyConsumerActive(ConsumerStateEventListenerPtr listener, int partitionId) {
80-
ASSERT_NE(0, listener->activeQueue_.size());
81-
int pid = listener->activeQueue_.front();
82-
listener->activeQueue_.pop();
83-
ASSERT_EQ(partitionId, pid);
84-
ASSERT_EQ(0, listener->inActiveQueue_.size());
85-
}
86-
87-
void verifyConsumerInactive(ConsumerStateEventListenerPtr listener, int partitionId) {
88-
ASSERT_NE(0, listener->inActiveQueue_.size());
89-
int pid = listener->inActiveQueue_.front();
90-
listener->inActiveQueue_.pop();
91-
ASSERT_EQ(partitionId, pid);
92-
ASSERT_EQ(0, listener->activeQueue_.size());
93-
}
94-
9576
class ActiveInactiveListenerEvent : public ConsumerEventListener {
9677
public:
9778
void becameActive(Consumer consumer, int partitionId) override {
@@ -119,9 +100,7 @@ TEST(ConsumerTest, testConsumerEventWithoutPartition) {
119100

120101
const std::string topicName = "testConsumerEventWithoutPartition-topic-" + std::to_string(time(nullptr));
121102
const std::string subName = "sub";
122-
const int waitTimeInMs = 1000;
123-
// constexpr int unAckedMessagesTimeoutMs = 10000;
124-
// constexpr int tickDurationInMs = 1000;
103+
const auto waitTime = std::chrono::seconds(3);
125104

126105
// 1. two consumers on the same subscription
127106
Consumer consumer1;
@@ -132,7 +111,9 @@ TEST(ConsumerTest, testConsumerEventWithoutPartition) {
132111
config1.setConsumerType(ConsumerType::ConsumerFailover);
133112

134113
ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config1, consumer1));
135-
std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
114+
waitUntil(waitTime, [&listener1]() -> bool { return listener1->activeQueue_.size() == 1; });
115+
ASSERT_EQ(listener1->activeQueue_.size(), 1);
116+
ASSERT_EQ(listener1->activeQueue_.pop(), -1);
136117

137118
Consumer consumer2;
138119
ConsumerConfiguration config2;
@@ -142,18 +123,22 @@ TEST(ConsumerTest, testConsumerEventWithoutPartition) {
142123
config2.setConsumerType(ConsumerType::ConsumerFailover);
143124

144125
ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config2, consumer2));
145-
std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
146-
147-
verifyConsumerActive(listener1, -1);
148-
verifyConsumerInactive(listener2, -1);
149-
150-
// clear inActiveQueue_
151-
std::queue<int>().swap(listener2->inActiveQueue_);
126+
// Since https://github.com/apache/pulsar/pull/19502, both consumer and consumer2 could receive the
127+
// inactive event
128+
waitUntil(waitTime, [&listener1, &listener2]() -> bool {
129+
return listener1->inActiveQueue_.size() == 1 || listener2->inActiveQueue_.size() == 1;
130+
});
131+
if (listener1->inActiveQueue_.size() == 1) {
132+
ASSERT_EQ(listener1->inActiveQueue_.pop(), -1);
133+
} else {
134+
ASSERT_EQ(listener2->inActiveQueue_.size(), 1);
135+
ASSERT_EQ(listener2->inActiveQueue_.pop(), -1);
136+
}
152137

153138
consumer1.close();
154-
std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
155-
verifyConsumerActive(listener2, -1);
156-
verifyConsumerNotReceiveAnyStateChanges(listener1);
139+
waitUntil(waitTime, [&listener2]() -> bool { return listener2->activeQueue_.size() == 1; });
140+
ASSERT_EQ(listener2->activeQueue_.size(), 1);
141+
ASSERT_EQ(listener2->activeQueue_.pop(), -1);
157142
}
158143

159144
TEST(ConsumerTest, testConsumerEventWithPartition) {

tests/SynchronizedQueue.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <mutex>
22+
#include <queue>
23+
24+
template <typename T>
25+
class SynchronizedQueue {
26+
public:
27+
void push(const T& value) {
28+
std::lock_guard<std::mutex> lock{mutex_};
29+
queue_.push(value);
30+
}
31+
32+
T pop() {
33+
std::lock_guard<std::mutex> lock{mutex_};
34+
auto value = queue_.front();
35+
queue_.pop();
36+
return value;
37+
}
38+
39+
size_t size() const {
40+
std::lock_guard<std::mutex> lock{mutex_};
41+
return queue_.size();
42+
}
43+
44+
private:
45+
mutable std::mutex mutex_;
46+
std::queue<T> queue_;
47+
};

0 commit comments

Comments
 (0)