From 75d258fa752535203e73cf9b2e6f55275172cb87 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 5 Jun 2023 21:52:44 +0800 Subject: [PATCH] Fix flaky testConsumerEventWithoutPartition cause by the change of Pulsar 3.0 ### Motivation https://github.com/apache/pulsar/pull/19502 brings a change to the consumer selection of the Failover subscription. Before that PR, if a new consumer joins a Failover subscription when there is a consumer that subscribes the topic, the new consumer will receive the "inactive" event. After that PR, a random consumer will receive the "inactive" event. ### Modifications Change the assertion when `consumer` subscribes the topic with the same subscription. In addition, use a thread safe queue to avoid race conditions. --- tests/ConsumerTest.cc | 59 +++++++++++++++------------------------ tests/SynchronizedQueue.h | 47 +++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 37 deletions(-) create mode 100644 tests/SynchronizedQueue.h diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 7867a57e..b6956fe6 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -32,6 +32,8 @@ #include "HttpHelper.h" #include "NoOpsCryptoKeyReader.h" #include "PulsarFriend.h" +#include "SynchronizedQueue.h" +#include "WaitUtils.h" #include "lib/ClientConnection.h" #include "lib/Future.h" #include "lib/LogUtils.h" @@ -64,34 +66,13 @@ class ConsumerStateEventListener : public ConsumerEventListener { inActiveQueue_.push(partitionId); } - std::queue activeQueue_; - std::queue inActiveQueue_; + SynchronizedQueue activeQueue_; + SynchronizedQueue inActiveQueue_; std::string name_; }; typedef std::shared_ptr ConsumerStateEventListenerPtr; -void verifyConsumerNotReceiveAnyStateChanges(ConsumerStateEventListenerPtr listener) { - ASSERT_EQ(0, listener->activeQueue_.size()); - ASSERT_EQ(0, listener->inActiveQueue_.size()); -} - -void verifyConsumerActive(ConsumerStateEventListenerPtr listener, int partitionId) { - ASSERT_NE(0, listener->activeQueue_.size()); - int pid = listener->activeQueue_.front(); - listener->activeQueue_.pop(); - ASSERT_EQ(partitionId, pid); - ASSERT_EQ(0, listener->inActiveQueue_.size()); -} - -void verifyConsumerInactive(ConsumerStateEventListenerPtr listener, int partitionId) { - ASSERT_NE(0, listener->inActiveQueue_.size()); - int pid = listener->inActiveQueue_.front(); - listener->inActiveQueue_.pop(); - ASSERT_EQ(partitionId, pid); - ASSERT_EQ(0, listener->activeQueue_.size()); -} - class ActiveInactiveListenerEvent : public ConsumerEventListener { public: void becameActive(Consumer consumer, int partitionId) override { @@ -119,9 +100,7 @@ TEST(ConsumerTest, testConsumerEventWithoutPartition) { const std::string topicName = "testConsumerEventWithoutPartition-topic-" + std::to_string(time(nullptr)); const std::string subName = "sub"; - const int waitTimeInMs = 1000; - // constexpr int unAckedMessagesTimeoutMs = 10000; - // constexpr int tickDurationInMs = 1000; + const auto waitTime = std::chrono::seconds(3); // 1. two consumers on the same subscription Consumer consumer1; @@ -132,7 +111,9 @@ TEST(ConsumerTest, testConsumerEventWithoutPartition) { config1.setConsumerType(ConsumerType::ConsumerFailover); ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config1, consumer1)); - std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2)); + waitUntil(waitTime, [&listener1]() -> bool { return listener1->activeQueue_.size() == 1; }); + ASSERT_EQ(listener1->activeQueue_.size(), 1); + ASSERT_EQ(listener1->activeQueue_.pop(), -1); Consumer consumer2; ConsumerConfiguration config2; @@ -142,18 +123,22 @@ TEST(ConsumerTest, testConsumerEventWithoutPartition) { config2.setConsumerType(ConsumerType::ConsumerFailover); ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config2, consumer2)); - std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2)); - - verifyConsumerActive(listener1, -1); - verifyConsumerInactive(listener2, -1); - - // clear inActiveQueue_ - std::queue().swap(listener2->inActiveQueue_); + // Since https://github.com/apache/pulsar/pull/19502, both consumer and consumer2 could receive the + // inactive event + waitUntil(waitTime, [&listener1, &listener2]() -> bool { + return listener1->inActiveQueue_.size() == 1 || listener2->inActiveQueue_.size() == 1; + }); + if (listener1->inActiveQueue_.size() == 1) { + ASSERT_EQ(listener1->inActiveQueue_.pop(), -1); + } else { + ASSERT_EQ(listener2->inActiveQueue_.size(), 1); + ASSERT_EQ(listener2->inActiveQueue_.pop(), -1); + } consumer1.close(); - std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2)); - verifyConsumerActive(listener2, -1); - verifyConsumerNotReceiveAnyStateChanges(listener1); + waitUntil(waitTime, [&listener2]() -> bool { return listener2->activeQueue_.size() == 1; }); + ASSERT_EQ(listener2->activeQueue_.size(), 1); + ASSERT_EQ(listener2->activeQueue_.pop(), -1); } TEST(ConsumerTest, testConsumerEventWithPartition) { diff --git a/tests/SynchronizedQueue.h b/tests/SynchronizedQueue.h new file mode 100644 index 00000000..d8c18a81 --- /dev/null +++ b/tests/SynchronizedQueue.h @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +template +class SynchronizedQueue { + public: + void push(const T& value) { + std::lock_guard lock{mutex_}; + queue_.push(value); + } + + T pop() { + std::lock_guard lock{mutex_}; + auto value = queue_.front(); + queue_.pop(); + return value; + } + + size_t size() const { + std::lock_guard lock{mutex_}; + return queue_.size(); + } + + private: + mutable std::mutex mutex_; + std::queue queue_; +};