diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 45c604b3..f7957d61 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -471,8 +471,8 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, } } else { LOG_WARN("Failed to getPartitionMetadata: " << strResult(result)); - runPartitionUpdateTask(); } + runPartitionUpdateTask(); } bool PartitionedProducerImpl::isConnected() const { diff --git a/tests/PartitionsUpdateTest.cc b/tests/PartitionsUpdateTest.cc index 010e5cb0..4d2fa27f 100644 --- a/tests/PartitionsUpdateTest.cc +++ b/tests/PartitionsUpdateTest.cc @@ -101,6 +101,11 @@ static void waitForPartitionsUpdated() { std::this_thread::sleep_for(std::chrono::seconds(3)); } +static void waitForPartitionUpdateTaskRunMultipleTimes() { + // Assume runPartitionUpdateTask run more than one time in 2 seconds if enabled + std::this_thread::sleep_for(std::chrono::seconds(2)); +} + TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) { ClientConfiguration clientConfig; ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval()); @@ -131,6 +136,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, lazyStartPartitionedProducers)); ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true)); + waitForPartitionUpdateTaskRunMultipleTimes(); res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; waitForPartitionsUpdated(); @@ -143,6 +149,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false)); ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false)); + waitForPartitionUpdateTaskRunMultipleTimes(); res = makePostRequest(topicOperateUrl, "5"); // update partitions to 5 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; waitForPartitionsUpdated(); @@ -155,6 +162,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false)); ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true)); + waitForPartitionUpdateTaskRunMultipleTimes(); res = makePostRequest(topicOperateUrl, "7"); // update partitions to 7 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; waitForPartitionsUpdated(); @@ -167,6 +175,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false)); ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false)); + waitForPartitionUpdateTaskRunMultipleTimes(); res = makePostRequest(topicOperateUrl, "10"); // update partitions to 10 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; waitForPartitionsUpdated();