From 78bb0be21d261cd0faf9fdaeafcfd15c6dc47759 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 30 Jun 2023 00:22:32 +0800 Subject: [PATCH 1/4] Fix the wrong backoff computation when retrying ### Motivation All the retryable operations share the same `Backoff` object in `RetryableLookupService`, so if the reconnection happens for some times, the delay of retrying will keeps the maximum value (30 seconds). ### Modifications Refactor the design of the `RetryableLookupService`: - Add a `RetryableOperation` class to represent a retryable operation, each instance has its own `Backoff` object. The operation could only be executed once. - Add a `RetryableOperationCache` class to represent a map that maps a specific name to its associated operation. It's an optimization that if an operation (e.g. find the owner topic of topic A) was not complete while the same operation was executed, the future would be reused. - In `RetryableLookupService`, just maintain some caches for different operations. - Add `RetryableOperationCacheTest` to verify the behaviors. --- lib/RetryableLookupService.h | 115 +++++------------------ lib/RetryableOperation.h | 134 +++++++++++++++++++++++++++ lib/RetryableOperationCache.h | 120 ++++++++++++++++++++++++ tests/LookupServiceTest.cc | 22 +---- tests/PulsarFriend.h | 5 - tests/RetryableOperationCacheTest.cc | 124 +++++++++++++++++++++++++ tests/TableViewTest.cc | 2 + 7 files changed, 403 insertions(+), 119 deletions(-) create mode 100644 lib/RetryableOperation.h create mode 100644 lib/RetryableOperationCache.h create mode 100644 tests/RetryableOperationCacheTest.cc diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index c8fdaabe..639eccf1 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -18,21 +18,15 @@ */ #pragma once -#include -#include - -#include "Backoff.h" -#include "ExecutorService.h" -#include "LogUtils.h" #include "LookupDataResult.h" #include "LookupService.h" -#include "SynchronizedHashMap.h" +#include "NamespaceName.h" +#include "RetryableOperationCache.h" #include "TopicName.h" namespace pulsar { -class RetryableLookupService : public LookupService, - public std::enable_shared_from_this { +class RetryableLookupService : public LookupService { private: friend class PulsarFriend; struct PassKey { @@ -50,117 +44,50 @@ class RetryableLookupService : public LookupService, } LookupResultFuture getBroker(const TopicName& topicName) override { - return executeAsync("get-broker-" + topicName.toString(), - [this, topicName] { return lookupService_->getBroker(topicName); }); + return lookupCache_->run("get-broker-" + topicName.toString(), + [this, topicName] { return lookupService_->getBroker(topicName); }); } Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { - return executeAsync( + return partitionLookupCache_->run( "get-partition-metadata-" + topicName->toString(), [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); } Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { - return executeAsync( + return namespaceLookupCache_->run( "get-topics-of-namespace-" + nsName->toString(), [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { - return executeAsync("get-schema" + topicName->toString(), [this, topicName, version] { + return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] { return lookupService_->getSchema(topicName, version); }); } - template - Future executeAsync(const std::string& key, std::function()> f) { - Promise promise; - executeAsyncImpl(key, f, promise, timeout_); - return promise.getFuture(); + size_t getNumberOfPendingTasks() const { + return lookupCache_->size() + partitionLookupCache_->size() + namespaceLookupCache_->size() + + getSchemaCache_->size(); } private: const std::shared_ptr lookupService_; - const TimeDuration timeout_; - Backoff backoff_; - const ExecutorServiceProviderPtr executorProvider_; - - SynchronizedHashMap backoffTimers_; + RetryableOperationCachePtr lookupCache_; + RetryableOperationCachePtr partitionLookupCache_; + RetryableOperationCachePtr namespaceLookupCache_; + RetryableOperationCachePtr getSchemaCache_; RetryableLookupService(std::shared_ptr lookupService, int timeoutSeconds, ExecutorServiceProviderPtr executorProvider) : lookupService_(lookupService), - timeout_(boost::posix_time::seconds(timeoutSeconds)), - backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_, - boost::posix_time::milliseconds(0)), - executorProvider_(executorProvider) {} - - std::weak_ptr weak_from_this() noexcept { return shared_from_this(); } - - // NOTE: Set the visibility to fix compilation error in GCC 6 - template -#ifndef _WIN32 - __attribute__((visibility("hidden"))) -#endif - void - executeAsyncImpl(const std::string& key, std::function()> f, Promise promise, - TimeDuration remainingTime) { - auto weakSelf = weak_from_this(); - f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) { - auto self = weakSelf.lock(); - if (!self) { - return; - } - - if (result == ResultOk) { - backoffTimers_.remove(key); - promise.setValue(value); - } else if (result == ResultRetryable) { - if (remainingTime.total_milliseconds() <= 0) { - backoffTimers_.remove(key); - promise.setFailed(ResultTimeout); - return; - } - - DeadlineTimerPtr timerPtr; - try { - timerPtr = executorProvider_->get()->createDeadlineTimer(); - } catch (const std::runtime_error& e) { - LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); - promise.setFailed(ResultConnectError); - return; - } - auto it = backoffTimers_.emplace(key, timerPtr); - auto& timer = *(it.first->second); - auto delay = std::min(backoff_.next(), remainingTime); - timer.expires_from_now(delay); - - auto nextRemainingTime = remainingTime - delay; - LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds() - << " ms, remaining time: " << nextRemainingTime.total_milliseconds() - << " ms"); - timer.async_wait([this, weakSelf, key, f, promise, - nextRemainingTime](const boost::system::error_code& ec) { - auto self = weakSelf.lock(); - if (!self || ec) { - if (self && ec != boost::asio::error::operation_aborted) { - LOG_ERROR("The timer for " << key << " failed: " << ec.message()); - } - // The lookup service has been destructed or the timer has been cancelled - promise.setFailed(ResultTimeout); - return; - } - executeAsyncImpl(key, f, promise, nextRemainingTime); - }); - } else { - backoffTimers_.remove(key); - promise.setFailed(result); - } - }); - } - - DECLARE_LOG_OBJECT() + lookupCache_(RetryableOperationCache::create(executorProvider, timeoutSeconds)), + partitionLookupCache_( + RetryableOperationCache::create(executorProvider, timeoutSeconds)), + namespaceLookupCache_( + RetryableOperationCache::create(executorProvider, timeoutSeconds)), + getSchemaCache_(RetryableOperationCache::create(executorProvider, timeoutSeconds)) {} }; } // namespace pulsar diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h new file mode 100644 index 00000000..d26cd9c8 --- /dev/null +++ b/lib/RetryableOperation.h @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include + +#include "Backoff.h" +#include "ExecutorService.h" +#include "Future.h" +#include "LogUtils.h" + +namespace pulsar { + +template +class RetryableOperation : public std::enable_shared_from_this> { + struct PassKey { + explicit PassKey() {} + }; + + RetryableOperation(const std::string& name, std::function()>&& func, int timeoutSeconds, + DeadlineTimerPtr timer) + : name_(name), + func_(std::move(func)), + timeout_(boost::posix_time::seconds(timeoutSeconds)), + backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_, + boost::posix_time::milliseconds(0)), + timer_(timer) {} + + public: + template + explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward(args)...) {} + + template + static std::shared_ptr> create(Args&&... args) { + return std::make_shared>(PassKey{}, std::forward(args)...); + } + + Future run() { + bool expected = false; + if (!started_.compare_exchange_strong(expected, true)) { + return promise_.getFuture(); + } + return runImpl(timeout_); + } + + void cancel() { + promise_.setValue(T{}); + boost::system::error_code ec; + timer_->cancel(ec); + } + + private: + const std::string name_; + std::function()> func_; + const TimeDuration timeout_; + Backoff backoff_; + Promise promise_; + std::atomic_bool started_{false}; + DeadlineTimerPtr timer_; + + Future runImpl(TimeDuration remainingTime) { + std::weak_ptr> weakSelf{this->shared_from_this()}; + func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (result == ResultOk) { + promise_.setValue(value); + return; + } + if (result != ResultRetryable) { + promise_.setFailed(result); + return; + } + if (remainingTime.total_milliseconds() <= 0) { + promise_.setFailed(ResultTimeout); + return; + } + + auto delay = std::min(backoff_.next(), remainingTime); + timer_->expires_from_now(delay); + + auto nextRemainingTime = remainingTime - delay; + LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds() + << " ms, remaining time: " << nextRemainingTime.total_milliseconds() + << " ms"); + timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (ec) { + if (ec == boost::asio::error::operation_aborted) { + LOG_DEBUG("Timer for " << name_ << " is cancelled"); + promise_.setFailed(ResultTimeout); + } else { + LOG_WARN("Timer for " << name_ << " failed: " << ec.message()); + } + } else { + LOG_DEBUG("Run operation " << name_ << ", remaining time: " + << nextRemainingTime.total_milliseconds() << " ms"); + runImpl(nextRemainingTime); + } + }); + }); + return promise_.getFuture(); + } + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h new file mode 100644 index 00000000..b9d93494 --- /dev/null +++ b/lib/RetryableOperationCache.h @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +#include "ExecutorService.h" +#include "RetryableOperation.h" + +namespace pulsar { + +template +class RetryableOperationCache; + +template +using RetryableOperationCachePtr = std::shared_ptr>; + +template +class RetryableOperationCache : public std::enable_shared_from_this> { + struct PassKey { + explicit PassKey() {} + }; + + RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, int timeoutSeconds) + : executorProvider_(executorProvider), timeoutSeconds_(timeoutSeconds) {} + + using Self = RetryableOperationCache; + + public: + template + explicit RetryableOperationCache(PassKey, Args&&... args) + : RetryableOperationCache(std::forward(args)...) {} + + template + static std::shared_ptr create(Args&&... args) { + return std::make_shared(PassKey{}, std::forward(args)...); + } + + Future run(const std::string& key, std::function()>&& func) { + std::unique_lock lock{mutex_}; + auto it = operations_.find(key); + if (it == operations_.end()) { + DeadlineTimerPtr timer; + try { + timer = executorProvider_->get()->createDeadlineTimer(); + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); + Promise promise; + promise.setFailed(ResultConnectError); + return promise.getFuture(); + } + + auto operation = RetryableOperation::create(key, std::move(func), timeoutSeconds_, timer); + auto future = operation->run(); + operations_[key] = operation; + lock.unlock(); + + std::weak_ptr weakSelf{this->shared_from_this()}; + future.addListener([this, weakSelf, key, operation](Result, const T&) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + std::lock_guard lock{mutex_}; + operations_.erase(key); + operation->cancel(); + }); + + return future; + } else { + return it->second->run(); + } + } + + void clear() { + decltype(operations_) operations; + { + std::lock_guard lock{mutex_}; + operations.swap(operations_); + } + // cancel() could trigger the listener to erase the key from operations, so we should use a swap way + // to release the lock here + for (auto&& kv : operations) { + kv.second->cancel(); + } + } + + size_t size() const { + std::lock_guard lock{mutex_}; + return operations_.size(); + } + + private: + ExecutorServiceProviderPtr executorProvider_; + const int timeoutSeconds_; + + std::unordered_map>> operations_; + mutable std::mutex mutex_; + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 7712c4e7..62f2aaf7 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -159,25 +159,7 @@ TEST(LookupServiceTest, testRetry) { ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr)); LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics"); - std::atomic_int retryCount{0}; - constexpr int totalRetryCount = 3; - auto future4 = lookupService->executeAsync("key", [&retryCount]() -> Future { - Promise promise; - if (++retryCount < totalRetryCount) { - LOG_INFO("Retry count: " << retryCount); - promise.setFailed(ResultRetryable); - } else { - LOG_INFO("Retry done with " << retryCount << " times"); - promise.setValue(100); - } - return promise.getFuture(); - }); - int customResult = 0; - ASSERT_EQ(ResultOk, future4.get(customResult)); - ASSERT_EQ(customResult, 100); - ASSERT_EQ(retryCount.load(), totalRetryCount); - - ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); + ASSERT_EQ(lookupService->getNumberOfPendingTasks(), 0); } TEST(LookupServiceTest, testTimeout) { @@ -221,7 +203,7 @@ TEST(LookupServiceTest, testTimeout) { ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr)); afterMethod("getTopicsOfNamespaceAsync"); - ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); + ASSERT_EQ(lookupService->getNumberOfPendingTasks(), 0); } class LookupServiceTest : public ::testing::TestWithParam { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 5a3f820a..acbbf165 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -31,7 +31,6 @@ #include "lib/PartitionedProducerImpl.h" #include "lib/ProducerImpl.h" #include "lib/ReaderImpl.h" -#include "lib/RetryableLookupService.h" #include "lib/stats/ConsumerStatsImpl.h" #include "lib/stats/ProducerStatsImpl.h" @@ -181,10 +180,6 @@ class PulsarFriend { setServiceUrlIndex(client.impl_->serviceNameResolver_, index); } - static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) { - return lookupService.backoffTimers_.size(); - } - static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } static std::shared_ptr getMessageIdImpl(MessageId& msgId) { return msgId.impl_; } diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc new file mode 100644 index 00000000..ddd879e8 --- /dev/null +++ b/tests/RetryableOperationCacheTest.cc @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "lib/RetryableOperationCache.h" + +using namespace pulsar; + +using IntFuture = Future; + +static int wait(IntFuture future) { + int value; + auto result = future.get(value); + if (result != ResultOk) { + throw std::runtime_error(strResult(result)); + } + return value; +} + +class CountdownFunc { + const int result_; + const int totalRetryCount_ = 3; + std::atomic_int current_{0}; + + public: + CountdownFunc(int result, int totalRetryCount = 3) : result_(result), totalRetryCount_(totalRetryCount) {} + + CountdownFunc(const CountdownFunc& rhs) + : result_(rhs.result_), totalRetryCount_(rhs.totalRetryCount_), current_(rhs.current_.load()) {} + + IntFuture operator()() { + Promise promise; + if (++current_ < totalRetryCount_) { + promise.setFailed(ResultRetryable); + } else { + promise.setValue(result_); + } + return promise.getFuture(); + } +}; + +class RetryableOperationCacheTest : public ::testing::Test { + protected: + void SetUp() override { provider_ = std::make_shared(1); } + + void TearDown() override { + provider_->close(); + futures_.clear(); + } + + ExecutorServiceProviderPtr provider_; + std::vector futures_; +}; + +TEST_F(RetryableOperationCacheTest, testRetry) { + auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + for (int i = 0; i < 10; i++) { + futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{i * 100})); + } + ASSERT_EQ(cache->size(), 10); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(wait(futures_[i]), i * 100); + } + ASSERT_EQ(cache->size(), 0); +} + +TEST_F(RetryableOperationCacheTest, testCache) { + auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + constexpr int numKeys = 5; + for (int i = 0; i < 100; i++) { + futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys), CountdownFunc{i * 100})); + } + ASSERT_EQ(cache->size(), numKeys); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(wait(futures_[i]), (i % numKeys) * 100); + } + ASSERT_EQ(cache->size(), 0); +} + +TEST_F(RetryableOperationCacheTest, testTimeout) { + auto cache = RetryableOperationCache::create(provider_, 1 /* seconds */); + auto future = cache->run("key", CountdownFunc{0, 1000 /* retry count */}); + try { + wait(future); + FAIL(); + } catch (const std::runtime_error& e) { + ASSERT_STREQ(e.what(), strResult(ResultTimeout)); + } +} + +TEST_F(RetryableOperationCacheTest, testClear) { + auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + for (int i = 0; i < 10; i++) { + futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100})); + } + ASSERT_EQ(cache->size(), 10); + cache->clear(); + for (auto&& future : futures_) { + int value; + // All cancelled futures complete with the default int value + ASSERT_EQ(ResultOk, future.get(value)); + ASSERT_EQ(value, 0); + } + ASSERT_EQ(cache->size(), 0); +} diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc index 5d42173c..a645b9a0 100644 --- a/tests/TableViewTest.cc +++ b/tests/TableViewTest.cc @@ -23,7 +23,9 @@ #include #include "HttpHelper.h" +#include "LogUtils.h" #include "PulsarFriend.h" +#include "TopicName.h" #include "WaitUtils.h" using namespace pulsar; From af74dbf11bfa027ecfbaabfd658a06f0d69dd85e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Jul 2023 13:12:44 +0800 Subject: [PATCH 2/4] Use friend class to access private fields --- lib/ClientImpl.cc | 1 + lib/LookupService.h | 2 ++ lib/RetryableLookupService.h | 14 ++++++---- lib/RetryableOperationCache.h | 7 ++--- tests/LookupServiceTest.cc | 41 +++++++++++++++++++--------- tests/RetryableOperationCacheTest.cc | 24 +++++++++++----- 6 files changed, 58 insertions(+), 31 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index c94434eb..762d2b83 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -598,6 +598,7 @@ void ClientImpl::closeAsync(CloseCallback callback) { state_ = Closing; memoryLimitController_.close(); + lookupServicePtr_->close(); auto producers = producers_.move(); auto consumers = consumers_.move(); diff --git a/lib/LookupService.h b/lib/LookupService.h index 74011210..35f47308 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -86,6 +86,8 @@ class LookupService { const std::string& version = "") = 0; virtual ~LookupService() {} + + virtual void close() {} }; typedef std::shared_ptr LookupServicePtr; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 639eccf1..b8e3e0d7 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -28,7 +28,7 @@ namespace pulsar { class RetryableLookupService : public LookupService { private: - friend class PulsarFriend; + friend class LookupServiceTest; struct PassKey { explicit PassKey() {} }; @@ -38,6 +38,13 @@ class RetryableLookupService : public LookupService { explicit RetryableLookupService(PassKey, Args&&... args) : RetryableLookupService(std::forward(args)...) {} + void close() override { + lookupCache_->clear(); + partitionLookupCache_->clear(); + namespaceLookupCache_->clear(); + getSchemaCache_->clear(); + } + template static std::shared_ptr create(Args&&... args) { return std::make_shared(PassKey{}, std::forward(args)...); @@ -67,11 +74,6 @@ class RetryableLookupService : public LookupService { }); } - size_t getNumberOfPendingTasks() const { - return lookupCache_->size() + partitionLookupCache_->size() + namespaceLookupCache_->size() + - getSchemaCache_->size(); - } - private: const std::shared_ptr lookupService_; RetryableOperationCachePtr lookupCache_; diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index b9d93494..70fa9140 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -34,6 +34,8 @@ using RetryableOperationCachePtr = std::shared_ptr>; template class RetryableOperationCache : public std::enable_shared_from_this> { + friend class LookupServiceTest; + friend class RetryableOperationCacheTest; struct PassKey { explicit PassKey() {} }; @@ -102,11 +104,6 @@ class RetryableOperationCache : public std::enable_shared_from_this lock{mutex_}; - return operations_.size(); - } - private: ExecutorServiceProviderPtr executorProvider_; const int timeoutSeconds_; diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 62f2aaf7..aff8409b 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -37,8 +37,6 @@ #include "lib/RetryableLookupService.h" #include "lib/TimeUtils.h" -using namespace pulsar; - DECLARE_LOG_OBJECT() static std::string binaryLookupUrl = "pulsar://localhost:6650"; @@ -46,6 +44,32 @@ static std::string httpLookupUrl = "http://localhost:8080"; extern std::string unique_str(); +namespace pulsar { + +class LookupServiceTest : public ::testing::TestWithParam { + public: + void SetUp() override { client_ = Client{GetParam()}; } + void TearDown() override { client_.close(); } + + template + static bool isEmpty(const RetryableOperationCache& cache) { + std::lock_guard lock{cache.mutex_}; + return cache.operations_.empty(); + } + + static size_t isEmpty(const RetryableLookupService& service) { + return isEmpty(*service.lookupCache_) && isEmpty(*service.partitionLookupCache_) && + isEmpty(*service.namespaceLookupCache_) && isEmpty(*service.getSchemaCache_); + } + + protected: + Client client_{httpLookupUrl}; +}; + +} // namespace pulsar + +using namespace pulsar; + TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared(1); AuthenticationPtr authData = AuthFactory::Disabled(); @@ -159,7 +183,7 @@ TEST(LookupServiceTest, testRetry) { ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr)); LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics"); - ASSERT_EQ(lookupService->getNumberOfPendingTasks(), 0); + ASSERT_TRUE(LookupServiceTest::isEmpty(*lookupService)); } TEST(LookupServiceTest, testTimeout) { @@ -203,18 +227,9 @@ TEST(LookupServiceTest, testTimeout) { ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr)); afterMethod("getTopicsOfNamespaceAsync"); - ASSERT_EQ(lookupService->getNumberOfPendingTasks(), 0); + ASSERT_TRUE(LookupServiceTest::isEmpty(*lookupService)); } -class LookupServiceTest : public ::testing::TestWithParam { - public: - void SetUp() override { client_ = Client{GetParam()}; } - void TearDown() override { client_.close(); } - - protected: - Client client_{httpLookupUrl}; -}; - TEST_P(LookupServiceTest, basicGetNamespaceTopics) { Result result; diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index ddd879e8..9652b402 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -23,7 +23,7 @@ #include "lib/RetryableOperationCache.h" -using namespace pulsar; +namespace pulsar { using IntFuture = Future; @@ -67,20 +67,30 @@ class RetryableOperationCacheTest : public ::testing::Test { futures_.clear(); } + template + size_t getSize(const RetryableOperationCache& cache) { + std::lock_guard lock{cache.mutex_}; + return cache.operations_.size(); + } + ExecutorServiceProviderPtr provider_; std::vector futures_; }; +} // namespace pulsar + +using namespace pulsar; + TEST_F(RetryableOperationCacheTest, testRetry) { auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); for (int i = 0; i < 10; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{i * 100})); } - ASSERT_EQ(cache->size(), 10); + ASSERT_EQ(getSize(*cache), 10); for (int i = 0; i < 10; i++) { ASSERT_EQ(wait(futures_[i]), i * 100); } - ASSERT_EQ(cache->size(), 0); + ASSERT_EQ(getSize(*cache), 0); } TEST_F(RetryableOperationCacheTest, testCache) { @@ -89,11 +99,11 @@ TEST_F(RetryableOperationCacheTest, testCache) { for (int i = 0; i < 100; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys), CountdownFunc{i * 100})); } - ASSERT_EQ(cache->size(), numKeys); + ASSERT_EQ(getSize(*cache), numKeys); for (int i = 0; i < 100; i++) { ASSERT_EQ(wait(futures_[i]), (i % numKeys) * 100); } - ASSERT_EQ(cache->size(), 0); + ASSERT_EQ(getSize(*cache), 0); } TEST_F(RetryableOperationCacheTest, testTimeout) { @@ -112,7 +122,7 @@ TEST_F(RetryableOperationCacheTest, testClear) { for (int i = 0; i < 10; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100})); } - ASSERT_EQ(cache->size(), 10); + ASSERT_EQ(getSize(*cache), 10); cache->clear(); for (auto&& future : futures_) { int value; @@ -120,5 +130,5 @@ TEST_F(RetryableOperationCacheTest, testClear) { ASSERT_EQ(ResultOk, future.get(value)); ASSERT_EQ(value, 0); } - ASSERT_EQ(cache->size(), 0); + ASSERT_EQ(getSize(*cache), 0); } From a497f7d4e99bb5333dfd68ddaf3858d2cb93b586 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Jul 2023 19:20:26 +0800 Subject: [PATCH 3/4] Fix segfault when the pending RetryableOperation is cancelled --- lib/RetryableOperation.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index d26cd9c8..8a7b4710 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -65,7 +65,7 @@ class RetryableOperation : public std::enable_shared_from_thiscancel(ec); } From 25be277c8c3fb2e1d6e78c7aa94401fbe9d0a4b1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Jul 2023 22:34:29 +0800 Subject: [PATCH 4/4] Fix failed RetryableOperationCacheTest.testClear --- tests/RetryableOperationCacheTest.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index 9652b402..ea1eb695 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -126,8 +126,8 @@ TEST_F(RetryableOperationCacheTest, testClear) { cache->clear(); for (auto&& future : futures_) { int value; - // All cancelled futures complete with the default int value - ASSERT_EQ(ResultOk, future.get(value)); + // All cancelled futures complete with ResultDisconnected and the default int value + ASSERT_EQ(ResultDisconnected, future.get(value)); ASSERT_EQ(value, 0); } ASSERT_EQ(getSize(*cache), 0);