Skip to content

Commit 94b70bf

Browse files
committed
Fix crash due to asio object lifetime and thread safety issue
1 parent 0c6a7c0 commit 94b70bf

9 files changed

Lines changed: 346 additions & 308 deletions

.github/workflows/ci-pr-validation.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,15 @@ jobs:
260260
Pop-Location
261261
}
262262
263+
- name: Ensure vcpkg has full history(windows)
264+
if: runner.os == 'Windows'
265+
shell: pwsh
266+
run: |
267+
$isShallow = (git -C "${{ env.VCPKG_ROOT }}" rev-parse --is-shallow-repository).Trim()
268+
if ($isShallow -eq "true") {
269+
git -C "${{ env.VCPKG_ROOT }}" fetch --unshallow
270+
}
271+
263272
- name: remove system vcpkg(windows)
264273
if: runner.os == 'Windows'
265274
run: rm -rf "$VCPKG_INSTALLATION_ROOT"

lib/ClientConnection.cc

Lines changed: 250 additions & 284 deletions
Large diffs are not rendered by default.

lib/ClientConnection.h

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include <any>
2626
#include <atomic>
2727
#include <cstdint>
28+
#include <future>
29+
#include <optional>
2830
#ifdef USE_ASIO
2931
#include <asio/bind_executor.hpp>
3032
#include <asio/io_context.hpp>
@@ -156,11 +158,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
156158
* Close the connection.
157159
*
158160
* @param result all pending futures will complete with this result
159-
* @param detach remove it from the pool if it's true
160-
*
161-
* `detach` should only be false when the connection pool is closed.
162161
*/
163-
void close(Result result = ResultConnectError, bool detach = true);
162+
const std::future<void>& close(Result result = ResultConnectError);
164163

165164
bool isClosed() const;
166165

@@ -193,7 +192,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
193192

194193
const std::string& brokerAddress() const;
195194

196-
const std::string& cnxString() const;
195+
auto cnxString() const { return *std::atomic_load(&cnxStringPtr_); }
197196

198197
int getServerProtocolVersion() const;
199198

@@ -219,28 +218,51 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
219218
mockingRequests_.store(true, std::memory_order_release);
220219
}
221220

222-
void handleKeepAliveTimeout();
221+
void handleKeepAliveTimeout(const ASIO_ERROR& ec);
223222

224223
private:
225224
struct PendingRequestData {
226225
Promise<Result, ResponseData> promise;
227226
DeadlineTimerPtr timer;
228227
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};
228+
229+
void fail(Result result) {
230+
cancelTimer(*timer);
231+
;
232+
promise.setFailed(result);
233+
}
229234
};
230235

231236
struct LookupRequestData {
232237
LookupDataResultPromisePtr promise;
233238
DeadlineTimerPtr timer;
239+
240+
void fail(Result result) {
241+
cancelTimer(*timer);
242+
;
243+
promise->setFailed(result);
244+
}
234245
};
235246

236247
struct LastMessageIdRequestData {
237248
GetLastMessageIdResponsePromisePtr promise;
238249
DeadlineTimerPtr timer;
250+
251+
void fail(Result result) {
252+
cancelTimer(*timer);
253+
;
254+
promise->setFailed(result);
255+
}
239256
};
240257

241258
struct GetSchemaRequest {
242259
Promise<Result, SchemaInfo> promise;
243260
DeadlineTimerPtr timer;
261+
262+
void fail(Result result) {
263+
cancelTimer(*timer);
264+
promise.setFailed(result);
265+
}
244266
};
245267

246268
/*
@@ -297,26 +319,26 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
297319
}
298320

299321
template <typename ConstBufferSequence, typename WriteHandler>
300-
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler handler) {
322+
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler&& handler) {
301323
if (isClosed()) {
302324
return;
303325
}
304326
if (tlsSocket_) {
305-
ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler));
327+
ASIO::async_write(*tlsSocket_, buffers, std::forward<WriteHandler>(handler));
306328
} else {
307329
ASIO::async_write(*socket_, buffers, handler);
308330
}
309331
}
310332

311333
template <typename MutableBufferSequence, typename ReadHandler>
312-
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler handler) {
334+
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler&& handler) {
313335
if (isClosed()) {
314336
return;
315337
}
316338
if (tlsSocket_) {
317-
tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, handler));
339+
tlsSocket_->async_read_some(buffers, std::forward<ReadHandler>(handler));
318340
} else {
319-
socket_->async_receive(buffers, handler);
341+
socket_->async_receive(buffers, std::forward<ReadHandler>(handler));
320342
}
321343
}
322344

@@ -337,7 +359,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
337359
*/
338360
SocketPtr socket_;
339361
TlsSocketPtr tlsSocket_;
340-
ASIO::strand<ASIO::io_context::executor_type> strand_;
341362

342363
const std::string logicalAddress_;
343364
/*
@@ -350,7 +371,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
350371
ClientConfiguration::ProxyProtocol proxyProtocol_;
351372

352373
// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
353-
std::string cnxString_;
374+
std::shared_ptr<std::string> cnxStringPtr_;
354375

355376
/*
356377
* indicates if async connection establishment failed
@@ -419,6 +440,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
419440
const std::string clientVersion_;
420441
ConnectionPool& pool_;
421442
const size_t poolIndex_;
443+
std::optional<std::future<void>> closeFuture_;
422444

423445
friend class PulsarFriend;
424446
friend class ConsumerTest;

lib/ConnectionPool.cc

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,43 @@ bool ConnectionPool::close() {
5454
return false;
5555
}
5656

57+
std::vector<ClientConnectionPtr> connectionsToClose;
58+
// ClientConnection::close() will remove the connection from the pool, which is not allowed when iterating
59+
// over a map, so we store the connections to close in a vector first and don't iterate the pool when
60+
// closing the connections.
5761
std::unique_lock<std::recursive_mutex> lock(mutex_);
62+
connectionsToClose.reserve(pool_.size());
63+
for (auto&& kv : pool_) {
64+
connectionsToClose.emplace_back(kv.second);
65+
}
66+
pool_.clear();
67+
lock.unlock();
5868

59-
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
60-
auto& cnx = cnxIt->second;
69+
for (auto&& cnx : connectionsToClose) {
6170
if (cnx) {
62-
// The 2nd argument is false because removing a value during the iteration will cause segfault
63-
cnx->close(ResultDisconnected, false);
71+
// Close with a fatal error to not let client retry
72+
auto& future = cnx->close(ResultAlreadyClosed);
73+
using namespace std::chrono_literals;
74+
if (auto status = future.wait_for(5s); status != std::future_status::ready) {
75+
LOG_WARN("Connection close timed out for " << cnx.get()->cnxString());
76+
}
77+
if (cnx.use_count() > 1) {
78+
// There are some asynchronous operations that hold the reference on the connection, we should
79+
// wait until them to finish. Otherwise, `io_context::stop()` will be called in
80+
// `ClientImpl::shutdown()` when closing the `ExecutorServiceProvider`. Then
81+
// `io_context::run()` will return and the `io_context` object will be destroyed. In this
82+
// case, if there is any pending handler, it will crash.
83+
for (int i = 0; i < 500 && cnx.use_count() > 1; i++) {
84+
std::this_thread::sleep_for(10ms);
85+
}
86+
if (cnx.use_count() > 1) {
87+
LOG_WARN("Connection still has " << (cnx.use_count() - 1)
88+
<< " references after waiting for 5 seconds for "
89+
<< cnx.get()->cnxString());
90+
}
91+
}
6492
}
6593
}
66-
pool_.clear();
6794
return true;
6895
}
6996

lib/ExecutorService.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,6 @@ void ExecutorService::close(long timeoutMs) {
125125
}
126126
}
127127

128-
void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, std::move(task)); }
129-
130128
/////////////////////
131129

132130
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)

lib/ExecutorService.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323

2424
#include <atomic>
2525
#ifdef USE_ASIO
26+
#include <asio/dispatch.hpp>
2627
#include <asio/io_context.hpp>
2728
#include <asio/ip/tcp.hpp>
29+
#include <asio/post.hpp>
2830
#include <asio/ssl.hpp>
2931
#else
32+
#include <boost/asio/dispatch.hpp>
3033
#include <boost/asio/io_context.hpp>
3134
#include <boost/asio/ip/tcp.hpp>
35+
#include <boost/asio/post.hpp>
3236
#include <boost/asio/ssl.hpp>
3337
#endif
3438
#include <chrono>
@@ -62,7 +66,19 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
6266
TcpResolverPtr createTcpResolver();
6367
// throws std::runtime_error if failed
6468
DeadlineTimerPtr createDeadlineTimer();
65-
void postWork(std::function<void(void)> task);
69+
70+
// Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop
71+
// queue and executed later.
72+
template <typename T>
73+
void postWork(T &&task) {
74+
ASIO::post(io_context_, std::forward<T>(task));
75+
}
76+
77+
// Different from `postWork`, if it's already in the event loop, execute the task immediately
78+
template <typename T>
79+
void dispatch(T &&task) {
80+
ASIO::dispatch(io_context_, std::forward<T>(task));
81+
}
6682

6783
// See TimeoutProcessor for the semantics of the parameter.
6884
void close(long timeoutMs = 3000);

lib/PeriodicTask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
5353

5454
void stop() noexcept;
5555

56-
void setCallback(CallbackType callback) noexcept { callback_ = callback; }
56+
void setCallback(CallbackType&& callback) noexcept { callback_ = std::move(callback); }
5757

5858
State getState() const noexcept { return state_; }
5959
int getPeriodMs() const noexcept { return periodMs_; }

tests/ClientTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ TEST(ClientTest, testConnectionClose) {
413413
LOG_INFO("Connection refcnt: " << cnx.use_count() << " before close");
414414
auto executor = PulsarFriend::getExecutor(*cnx);
415415
// Simulate the close() happens in the event loop
416-
executor->postWork([cnx, &client, numConnections] {
416+
executor->dispatch([cnx, &client, numConnections] {
417417
cnx->close();
418418
ASSERT_EQ(PulsarFriend::getConnections(client).size(), numConnections - 1);
419419
LOG_INFO("Connection refcnt: " << cnx.use_count() << " after close");

tests/MultiTopicsConsumerTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
166166
future.wait_for(std::chrono::milliseconds(100));
167167
std::this_thread::sleep_for(std::chrono::milliseconds(100));
168168

169-
connection->handleKeepAliveTimeout();
169+
connection->handleKeepAliveTimeout(ASIO_SUCCESS);
170170
ASSERT_EQ(ResultDisconnected, future.get());
171171

172172
mockServer->close();

0 commit comments

Comments
 (0)