diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index c94434eb..762d2b83 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -598,6 +598,7 @@ void ClientImpl::closeAsync(CloseCallback callback) { state_ = Closing; memoryLimitController_.close(); + lookupServicePtr_->close(); auto producers = producers_.move(); auto consumers = consumers_.move(); diff --git a/lib/LookupService.h b/lib/LookupService.h index 74011210..35f47308 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -86,6 +86,8 @@ class LookupService { const std::string& version = "") = 0; virtual ~LookupService() {} + + virtual void close() {} }; typedef std::shared_ptr LookupServicePtr; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index c8fdaabe..b8e3e0d7 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -18,23 +18,17 @@ */ #pragma once -#include -#include - -#include "Backoff.h" -#include "ExecutorService.h" -#include "LogUtils.h" #include "LookupDataResult.h" #include "LookupService.h" -#include "SynchronizedHashMap.h" +#include "NamespaceName.h" +#include "RetryableOperationCache.h" #include "TopicName.h" namespace pulsar { -class RetryableLookupService : public LookupService, - public std::enable_shared_from_this { +class RetryableLookupService : public LookupService { private: - friend class PulsarFriend; + friend class LookupServiceTest; struct PassKey { explicit PassKey() {} }; @@ -44,123 +38,58 @@ class RetryableLookupService : public LookupService, explicit RetryableLookupService(PassKey, Args&&... args) : RetryableLookupService(std::forward(args)...) {} + void close() override { + lookupCache_->clear(); + partitionLookupCache_->clear(); + namespaceLookupCache_->clear(); + getSchemaCache_->clear(); + } + template static std::shared_ptr create(Args&&... args) { return std::make_shared(PassKey{}, std::forward(args)...); } LookupResultFuture getBroker(const TopicName& topicName) override { - return executeAsync("get-broker-" + topicName.toString(), - [this, topicName] { return lookupService_->getBroker(topicName); }); + return lookupCache_->run("get-broker-" + topicName.toString(), + [this, topicName] { return lookupService_->getBroker(topicName); }); } Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { - return executeAsync( + return partitionLookupCache_->run( "get-partition-metadata-" + topicName->toString(), [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); } Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { - return executeAsync( + return namespaceLookupCache_->run( "get-topics-of-namespace-" + nsName->toString(), [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { - return executeAsync("get-schema" + topicName->toString(), [this, topicName, version] { + return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] { return lookupService_->getSchema(topicName, version); }); } - template - Future executeAsync(const std::string& key, std::function()> f) { - Promise promise; - executeAsyncImpl(key, f, promise, timeout_); - return promise.getFuture(); - } - private: const std::shared_ptr lookupService_; - const TimeDuration timeout_; - Backoff backoff_; - const ExecutorServiceProviderPtr executorProvider_; - - SynchronizedHashMap backoffTimers_; + RetryableOperationCachePtr lookupCache_; + RetryableOperationCachePtr partitionLookupCache_; + RetryableOperationCachePtr namespaceLookupCache_; + RetryableOperationCachePtr getSchemaCache_; RetryableLookupService(std::shared_ptr lookupService, int timeoutSeconds, ExecutorServiceProviderPtr executorProvider) : lookupService_(lookupService), - timeout_(boost::posix_time::seconds(timeoutSeconds)), - backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_, - boost::posix_time::milliseconds(0)), - executorProvider_(executorProvider) {} - - std::weak_ptr weak_from_this() noexcept { return shared_from_this(); } - - // NOTE: Set the visibility to fix compilation error in GCC 6 - template -#ifndef _WIN32 - __attribute__((visibility("hidden"))) -#endif - void - executeAsyncImpl(const std::string& key, std::function()> f, Promise promise, - TimeDuration remainingTime) { - auto weakSelf = weak_from_this(); - f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) { - auto self = weakSelf.lock(); - if (!self) { - return; - } - - if (result == ResultOk) { - backoffTimers_.remove(key); - promise.setValue(value); - } else if (result == ResultRetryable) { - if (remainingTime.total_milliseconds() <= 0) { - backoffTimers_.remove(key); - promise.setFailed(ResultTimeout); - return; - } - - DeadlineTimerPtr timerPtr; - try { - timerPtr = executorProvider_->get()->createDeadlineTimer(); - } catch (const std::runtime_error& e) { - LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); - promise.setFailed(ResultConnectError); - return; - } - auto it = backoffTimers_.emplace(key, timerPtr); - auto& timer = *(it.first->second); - auto delay = std::min(backoff_.next(), remainingTime); - timer.expires_from_now(delay); - - auto nextRemainingTime = remainingTime - delay; - LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds() - << " ms, remaining time: " << nextRemainingTime.total_milliseconds() - << " ms"); - timer.async_wait([this, weakSelf, key, f, promise, - nextRemainingTime](const boost::system::error_code& ec) { - auto self = weakSelf.lock(); - if (!self || ec) { - if (self && ec != boost::asio::error::operation_aborted) { - LOG_ERROR("The timer for " << key << " failed: " << ec.message()); - } - // The lookup service has been destructed or the timer has been cancelled - promise.setFailed(ResultTimeout); - return; - } - executeAsyncImpl(key, f, promise, nextRemainingTime); - }); - } else { - backoffTimers_.remove(key); - promise.setFailed(result); - } - }); - } - - DECLARE_LOG_OBJECT() + lookupCache_(RetryableOperationCache::create(executorProvider, timeoutSeconds)), + partitionLookupCache_( + RetryableOperationCache::create(executorProvider, timeoutSeconds)), + namespaceLookupCache_( + RetryableOperationCache::create(executorProvider, timeoutSeconds)), + getSchemaCache_(RetryableOperationCache::create(executorProvider, timeoutSeconds)) {} }; } // namespace pulsar diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h new file mode 100644 index 00000000..8a7b4710 --- /dev/null +++ b/lib/RetryableOperation.h @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include + +#include "Backoff.h" +#include "ExecutorService.h" +#include "Future.h" +#include "LogUtils.h" + +namespace pulsar { + +template +class RetryableOperation : public std::enable_shared_from_this> { + struct PassKey { + explicit PassKey() {} + }; + + RetryableOperation(const std::string& name, std::function()>&& func, int timeoutSeconds, + DeadlineTimerPtr timer) + : name_(name), + func_(std::move(func)), + timeout_(boost::posix_time::seconds(timeoutSeconds)), + backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_, + boost::posix_time::milliseconds(0)), + timer_(timer) {} + + public: + template + explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward(args)...) {} + + template + static std::shared_ptr> create(Args&&... args) { + return std::make_shared>(PassKey{}, std::forward(args)...); + } + + Future run() { + bool expected = false; + if (!started_.compare_exchange_strong(expected, true)) { + return promise_.getFuture(); + } + return runImpl(timeout_); + } + + void cancel() { + promise_.setFailed(ResultDisconnected); + boost::system::error_code ec; + timer_->cancel(ec); + } + + private: + const std::string name_; + std::function()> func_; + const TimeDuration timeout_; + Backoff backoff_; + Promise promise_; + std::atomic_bool started_{false}; + DeadlineTimerPtr timer_; + + Future runImpl(TimeDuration remainingTime) { + std::weak_ptr> weakSelf{this->shared_from_this()}; + func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (result == ResultOk) { + promise_.setValue(value); + return; + } + if (result != ResultRetryable) { + promise_.setFailed(result); + return; + } + if (remainingTime.total_milliseconds() <= 0) { + promise_.setFailed(ResultTimeout); + return; + } + + auto delay = std::min(backoff_.next(), remainingTime); + timer_->expires_from_now(delay); + + auto nextRemainingTime = remainingTime - delay; + LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds() + << " ms, remaining time: " << nextRemainingTime.total_milliseconds() + << " ms"); + timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (ec) { + if (ec == boost::asio::error::operation_aborted) { + LOG_DEBUG("Timer for " << name_ << " is cancelled"); + promise_.setFailed(ResultTimeout); + } else { + LOG_WARN("Timer for " << name_ << " failed: " << ec.message()); + } + } else { + LOG_DEBUG("Run operation " << name_ << ", remaining time: " + << nextRemainingTime.total_milliseconds() << " ms"); + runImpl(nextRemainingTime); + } + }); + }); + return promise_.getFuture(); + } + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h new file mode 100644 index 00000000..70fa9140 --- /dev/null +++ b/lib/RetryableOperationCache.h @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +#include "ExecutorService.h" +#include "RetryableOperation.h" + +namespace pulsar { + +template +class RetryableOperationCache; + +template +using RetryableOperationCachePtr = std::shared_ptr>; + +template +class RetryableOperationCache : public std::enable_shared_from_this> { + friend class LookupServiceTest; + friend class RetryableOperationCacheTest; + struct PassKey { + explicit PassKey() {} + }; + + RetryableOperationCache(ExecutorServiceProviderPtr executorProvider, int timeoutSeconds) + : executorProvider_(executorProvider), timeoutSeconds_(timeoutSeconds) {} + + using Self = RetryableOperationCache; + + public: + template + explicit RetryableOperationCache(PassKey, Args&&... args) + : RetryableOperationCache(std::forward(args)...) {} + + template + static std::shared_ptr create(Args&&... args) { + return std::make_shared(PassKey{}, std::forward(args)...); + } + + Future run(const std::string& key, std::function()>&& func) { + std::unique_lock lock{mutex_}; + auto it = operations_.find(key); + if (it == operations_.end()) { + DeadlineTimerPtr timer; + try { + timer = executorProvider_->get()->createDeadlineTimer(); + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); + Promise promise; + promise.setFailed(ResultConnectError); + return promise.getFuture(); + } + + auto operation = RetryableOperation::create(key, std::move(func), timeoutSeconds_, timer); + auto future = operation->run(); + operations_[key] = operation; + lock.unlock(); + + std::weak_ptr weakSelf{this->shared_from_this()}; + future.addListener([this, weakSelf, key, operation](Result, const T&) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + std::lock_guard lock{mutex_}; + operations_.erase(key); + operation->cancel(); + }); + + return future; + } else { + return it->second->run(); + } + } + + void clear() { + decltype(operations_) operations; + { + std::lock_guard lock{mutex_}; + operations.swap(operations_); + } + // cancel() could trigger the listener to erase the key from operations, so we should use a swap way + // to release the lock here + for (auto&& kv : operations) { + kv.second->cancel(); + } + } + + private: + ExecutorServiceProviderPtr executorProvider_; + const int timeoutSeconds_; + + std::unordered_map>> operations_; + mutable std::mutex mutex_; + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 7712c4e7..aff8409b 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -37,8 +37,6 @@ #include "lib/RetryableLookupService.h" #include "lib/TimeUtils.h" -using namespace pulsar; - DECLARE_LOG_OBJECT() static std::string binaryLookupUrl = "pulsar://localhost:6650"; @@ -46,6 +44,32 @@ static std::string httpLookupUrl = "http://localhost:8080"; extern std::string unique_str(); +namespace pulsar { + +class LookupServiceTest : public ::testing::TestWithParam { + public: + void SetUp() override { client_ = Client{GetParam()}; } + void TearDown() override { client_.close(); } + + template + static bool isEmpty(const RetryableOperationCache& cache) { + std::lock_guard lock{cache.mutex_}; + return cache.operations_.empty(); + } + + static size_t isEmpty(const RetryableLookupService& service) { + return isEmpty(*service.lookupCache_) && isEmpty(*service.partitionLookupCache_) && + isEmpty(*service.namespaceLookupCache_) && isEmpty(*service.getSchemaCache_); + } + + protected: + Client client_{httpLookupUrl}; +}; + +} // namespace pulsar + +using namespace pulsar; + TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared(1); AuthenticationPtr authData = AuthFactory::Disabled(); @@ -159,25 +183,7 @@ TEST(LookupServiceTest, testRetry) { ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr)); LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics"); - std::atomic_int retryCount{0}; - constexpr int totalRetryCount = 3; - auto future4 = lookupService->executeAsync("key", [&retryCount]() -> Future { - Promise promise; - if (++retryCount < totalRetryCount) { - LOG_INFO("Retry count: " << retryCount); - promise.setFailed(ResultRetryable); - } else { - LOG_INFO("Retry done with " << retryCount << " times"); - promise.setValue(100); - } - return promise.getFuture(); - }); - int customResult = 0; - ASSERT_EQ(ResultOk, future4.get(customResult)); - ASSERT_EQ(customResult, 100); - ASSERT_EQ(retryCount.load(), totalRetryCount); - - ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); + ASSERT_TRUE(LookupServiceTest::isEmpty(*lookupService)); } TEST(LookupServiceTest, testTimeout) { @@ -221,18 +227,9 @@ TEST(LookupServiceTest, testTimeout) { ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr)); afterMethod("getTopicsOfNamespaceAsync"); - ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); + ASSERT_TRUE(LookupServiceTest::isEmpty(*lookupService)); } -class LookupServiceTest : public ::testing::TestWithParam { - public: - void SetUp() override { client_ = Client{GetParam()}; } - void TearDown() override { client_.close(); } - - protected: - Client client_{httpLookupUrl}; -}; - TEST_P(LookupServiceTest, basicGetNamespaceTopics) { Result result; diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 5a3f820a..acbbf165 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -31,7 +31,6 @@ #include "lib/PartitionedProducerImpl.h" #include "lib/ProducerImpl.h" #include "lib/ReaderImpl.h" -#include "lib/RetryableLookupService.h" #include "lib/stats/ConsumerStatsImpl.h" #include "lib/stats/ProducerStatsImpl.h" @@ -181,10 +180,6 @@ class PulsarFriend { setServiceUrlIndex(client.impl_->serviceNameResolver_, index); } - static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) { - return lookupService.backoffTimers_.size(); - } - static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } static std::shared_ptr getMessageIdImpl(MessageId& msgId) { return msgId.impl_; } diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc new file mode 100644 index 00000000..ea1eb695 --- /dev/null +++ b/tests/RetryableOperationCacheTest.cc @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "lib/RetryableOperationCache.h" + +namespace pulsar { + +using IntFuture = Future; + +static int wait(IntFuture future) { + int value; + auto result = future.get(value); + if (result != ResultOk) { + throw std::runtime_error(strResult(result)); + } + return value; +} + +class CountdownFunc { + const int result_; + const int totalRetryCount_ = 3; + std::atomic_int current_{0}; + + public: + CountdownFunc(int result, int totalRetryCount = 3) : result_(result), totalRetryCount_(totalRetryCount) {} + + CountdownFunc(const CountdownFunc& rhs) + : result_(rhs.result_), totalRetryCount_(rhs.totalRetryCount_), current_(rhs.current_.load()) {} + + IntFuture operator()() { + Promise promise; + if (++current_ < totalRetryCount_) { + promise.setFailed(ResultRetryable); + } else { + promise.setValue(result_); + } + return promise.getFuture(); + } +}; + +class RetryableOperationCacheTest : public ::testing::Test { + protected: + void SetUp() override { provider_ = std::make_shared(1); } + + void TearDown() override { + provider_->close(); + futures_.clear(); + } + + template + size_t getSize(const RetryableOperationCache& cache) { + std::lock_guard lock{cache.mutex_}; + return cache.operations_.size(); + } + + ExecutorServiceProviderPtr provider_; + std::vector futures_; +}; + +} // namespace pulsar + +using namespace pulsar; + +TEST_F(RetryableOperationCacheTest, testRetry) { + auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + for (int i = 0; i < 10; i++) { + futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{i * 100})); + } + ASSERT_EQ(getSize(*cache), 10); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(wait(futures_[i]), i * 100); + } + ASSERT_EQ(getSize(*cache), 0); +} + +TEST_F(RetryableOperationCacheTest, testCache) { + auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + constexpr int numKeys = 5; + for (int i = 0; i < 100; i++) { + futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys), CountdownFunc{i * 100})); + } + ASSERT_EQ(getSize(*cache), numKeys); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(wait(futures_[i]), (i % numKeys) * 100); + } + ASSERT_EQ(getSize(*cache), 0); +} + +TEST_F(RetryableOperationCacheTest, testTimeout) { + auto cache = RetryableOperationCache::create(provider_, 1 /* seconds */); + auto future = cache->run("key", CountdownFunc{0, 1000 /* retry count */}); + try { + wait(future); + FAIL(); + } catch (const std::runtime_error& e) { + ASSERT_STREQ(e.what(), strResult(ResultTimeout)); + } +} + +TEST_F(RetryableOperationCacheTest, testClear) { + auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + for (int i = 0; i < 10; i++) { + futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100})); + } + ASSERT_EQ(getSize(*cache), 10); + cache->clear(); + for (auto&& future : futures_) { + int value; + // All cancelled futures complete with ResultDisconnected and the default int value + ASSERT_EQ(ResultDisconnected, future.get(value)); + ASSERT_EQ(value, 0); + } + ASSERT_EQ(getSize(*cache), 0); +} diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc index 5d42173c..a645b9a0 100644 --- a/tests/TableViewTest.cc +++ b/tests/TableViewTest.cc @@ -23,7 +23,9 @@ #include #include "HttpHelper.h" +#include "LogUtils.h" #include "PulsarFriend.h" +#include "TopicName.h" #include "WaitUtils.h" using namespace pulsar;