Skip to content

Commit 78bb0be

Browse files
committed
Fix the wrong backoff computation when retrying
### Motivation All the retryable operations share the same `Backoff` object in `RetryableLookupService`, so if the reconnection happens for some times, the delay of retrying will keeps the maximum value (30 seconds). ### Modifications Refactor the design of the `RetryableLookupService`: - Add a `RetryableOperation` class to represent a retryable operation, each instance has its own `Backoff` object. The operation could only be executed once. - Add a `RetryableOperationCache` class to represent a map that maps a specific name to its associated operation. It's an optimization that if an operation (e.g. find the owner topic of topic A) was not complete while the same operation was executed, the future would be reused. - In `RetryableLookupService`, just maintain some caches for different operations. - Add `RetryableOperationCacheTest` to verify the behaviors.
1 parent 94cf3fc commit 78bb0be

7 files changed

Lines changed: 403 additions & 119 deletions

lib/RetryableLookupService.h

Lines changed: 21 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,15 @@
1818
*/
1919
#pragma once
2020

21-
#include <algorithm>
22-
#include <memory>
23-
24-
#include "Backoff.h"
25-
#include "ExecutorService.h"
26-
#include "LogUtils.h"
2721
#include "LookupDataResult.h"
2822
#include "LookupService.h"
29-
#include "SynchronizedHashMap.h"
23+
#include "NamespaceName.h"
24+
#include "RetryableOperationCache.h"
3025
#include "TopicName.h"
3126

3227
namespace pulsar {
3328

34-
class RetryableLookupService : public LookupService,
35-
public std::enable_shared_from_this<RetryableLookupService> {
29+
class RetryableLookupService : public LookupService {
3630
private:
3731
friend class PulsarFriend;
3832
struct PassKey {
@@ -50,117 +44,50 @@ class RetryableLookupService : public LookupService,
5044
}
5145

5246
LookupResultFuture getBroker(const TopicName& topicName) override {
53-
return executeAsync<LookupResult>("get-broker-" + topicName.toString(),
54-
[this, topicName] { return lookupService_->getBroker(topicName); });
47+
return lookupCache_->run("get-broker-" + topicName.toString(),
48+
[this, topicName] { return lookupService_->getBroker(topicName); });
5549
}
5650

5751
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
58-
return executeAsync<LookupDataResultPtr>(
52+
return partitionLookupCache_->run(
5953
"get-partition-metadata-" + topicName->toString(),
6054
[this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); });
6155
}
6256

6357
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
6458
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override {
65-
return executeAsync<NamespaceTopicsPtr>(
59+
return namespaceLookupCache_->run(
6660
"get-topics-of-namespace-" + nsName->toString(),
6761
[this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
6862
}
6963

7064
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override {
71-
return executeAsync<SchemaInfo>("get-schema" + topicName->toString(), [this, topicName, version] {
65+
return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] {
7266
return lookupService_->getSchema(topicName, version);
7367
});
7468
}
7569

76-
template <typename T>
77-
Future<Result, T> executeAsync(const std::string& key, std::function<Future<Result, T>()> f) {
78-
Promise<Result, T> promise;
79-
executeAsyncImpl(key, f, promise, timeout_);
80-
return promise.getFuture();
70+
size_t getNumberOfPendingTasks() const {
71+
return lookupCache_->size() + partitionLookupCache_->size() + namespaceLookupCache_->size() +
72+
getSchemaCache_->size();
8173
}
8274

8375
private:
8476
const std::shared_ptr<LookupService> lookupService_;
85-
const TimeDuration timeout_;
86-
Backoff backoff_;
87-
const ExecutorServiceProviderPtr executorProvider_;
88-
89-
SynchronizedHashMap<std::string, DeadlineTimerPtr> backoffTimers_;
77+
RetryableOperationCachePtr<LookupResult> lookupCache_;
78+
RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_;
79+
RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
80+
RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;
9081

9182
RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds,
9283
ExecutorServiceProviderPtr executorProvider)
9384
: lookupService_(lookupService),
94-
timeout_(boost::posix_time::seconds(timeoutSeconds)),
95-
backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
96-
boost::posix_time::milliseconds(0)),
97-
executorProvider_(executorProvider) {}
98-
99-
std::weak_ptr<RetryableLookupService> weak_from_this() noexcept { return shared_from_this(); }
100-
101-
// NOTE: Set the visibility to fix compilation error in GCC 6
102-
template <typename T>
103-
#ifndef _WIN32
104-
__attribute__((visibility("hidden")))
105-
#endif
106-
void
107-
executeAsyncImpl(const std::string& key, std::function<Future<Result, T>()> f, Promise<Result, T> promise,
108-
TimeDuration remainingTime) {
109-
auto weakSelf = weak_from_this();
110-
f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) {
111-
auto self = weakSelf.lock();
112-
if (!self) {
113-
return;
114-
}
115-
116-
if (result == ResultOk) {
117-
backoffTimers_.remove(key);
118-
promise.setValue(value);
119-
} else if (result == ResultRetryable) {
120-
if (remainingTime.total_milliseconds() <= 0) {
121-
backoffTimers_.remove(key);
122-
promise.setFailed(ResultTimeout);
123-
return;
124-
}
125-
126-
DeadlineTimerPtr timerPtr;
127-
try {
128-
timerPtr = executorProvider_->get()->createDeadlineTimer();
129-
} catch (const std::runtime_error& e) {
130-
LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what());
131-
promise.setFailed(ResultConnectError);
132-
return;
133-
}
134-
auto it = backoffTimers_.emplace(key, timerPtr);
135-
auto& timer = *(it.first->second);
136-
auto delay = std::min(backoff_.next(), remainingTime);
137-
timer.expires_from_now(delay);
138-
139-
auto nextRemainingTime = remainingTime - delay;
140-
LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds()
141-
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
142-
<< " ms");
143-
timer.async_wait([this, weakSelf, key, f, promise,
144-
nextRemainingTime](const boost::system::error_code& ec) {
145-
auto self = weakSelf.lock();
146-
if (!self || ec) {
147-
if (self && ec != boost::asio::error::operation_aborted) {
148-
LOG_ERROR("The timer for " << key << " failed: " << ec.message());
149-
}
150-
// The lookup service has been destructed or the timer has been cancelled
151-
promise.setFailed(ResultTimeout);
152-
return;
153-
}
154-
executeAsyncImpl(key, f, promise, nextRemainingTime);
155-
});
156-
} else {
157-
backoffTimers_.remove(key);
158-
promise.setFailed(result);
159-
}
160-
});
161-
}
162-
163-
DECLARE_LOG_OBJECT()
85+
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeoutSeconds)),
86+
partitionLookupCache_(
87+
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeoutSeconds)),
88+
namespaceLookupCache_(
89+
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeoutSeconds)),
90+
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, timeoutSeconds)) {}
16491
};
16592

16693
} // namespace pulsar

lib/RetryableOperation.h

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/Result.h>
22+
23+
#include <algorithm>
24+
#include <atomic>
25+
#include <functional>
26+
#include <memory>
27+
28+
#include "Backoff.h"
29+
#include "ExecutorService.h"
30+
#include "Future.h"
31+
#include "LogUtils.h"
32+
33+
namespace pulsar {
34+
35+
template <typename T>
36+
class RetryableOperation : public std::enable_shared_from_this<RetryableOperation<T>> {
37+
struct PassKey {
38+
explicit PassKey() {}
39+
};
40+
41+
RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func, int timeoutSeconds,
42+
DeadlineTimerPtr timer)
43+
: name_(name),
44+
func_(std::move(func)),
45+
timeout_(boost::posix_time::seconds(timeoutSeconds)),
46+
backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
47+
boost::posix_time::milliseconds(0)),
48+
timer_(timer) {}
49+
50+
public:
51+
template <typename... Args>
52+
explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward<Args>(args)...) {}
53+
54+
template <typename... Args>
55+
static std::shared_ptr<RetryableOperation<T>> create(Args&&... args) {
56+
return std::make_shared<RetryableOperation<T>>(PassKey{}, std::forward<Args>(args)...);
57+
}
58+
59+
Future<Result, T> run() {
60+
bool expected = false;
61+
if (!started_.compare_exchange_strong(expected, true)) {
62+
return promise_.getFuture();
63+
}
64+
return runImpl(timeout_);
65+
}
66+
67+
void cancel() {
68+
promise_.setValue(T{});
69+
boost::system::error_code ec;
70+
timer_->cancel(ec);
71+
}
72+
73+
private:
74+
const std::string name_;
75+
std::function<Future<Result, T>()> func_;
76+
const TimeDuration timeout_;
77+
Backoff backoff_;
78+
Promise<Result, T> promise_;
79+
std::atomic_bool started_{false};
80+
DeadlineTimerPtr timer_;
81+
82+
Future<Result, T> runImpl(TimeDuration remainingTime) {
83+
std::weak_ptr<RetryableOperation<T>> weakSelf{this->shared_from_this()};
84+
func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) {
85+
auto self = weakSelf.lock();
86+
if (!self) {
87+
return;
88+
}
89+
if (result == ResultOk) {
90+
promise_.setValue(value);
91+
return;
92+
}
93+
if (result != ResultRetryable) {
94+
promise_.setFailed(result);
95+
return;
96+
}
97+
if (remainingTime.total_milliseconds() <= 0) {
98+
promise_.setFailed(ResultTimeout);
99+
return;
100+
}
101+
102+
auto delay = std::min(backoff_.next(), remainingTime);
103+
timer_->expires_from_now(delay);
104+
105+
auto nextRemainingTime = remainingTime - delay;
106+
LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds()
107+
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
108+
<< " ms");
109+
timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) {
110+
auto self = weakSelf.lock();
111+
if (!self) {
112+
return;
113+
}
114+
if (ec) {
115+
if (ec == boost::asio::error::operation_aborted) {
116+
LOG_DEBUG("Timer for " << name_ << " is cancelled");
117+
promise_.setFailed(ResultTimeout);
118+
} else {
119+
LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
120+
}
121+
} else {
122+
LOG_DEBUG("Run operation " << name_ << ", remaining time: "
123+
<< nextRemainingTime.total_milliseconds() << " ms");
124+
runImpl(nextRemainingTime);
125+
}
126+
});
127+
});
128+
return promise_.getFuture();
129+
}
130+
131+
DECLARE_LOG_OBJECT()
132+
};
133+
134+
} // namespace pulsar

0 commit comments

Comments
 (0)