Skip to content

Commit fca9000

Browse files
committed
fix deadlock
1 parent 9f99e82 commit fca9000

4 files changed

Lines changed: 19 additions & 11 deletions

File tree

lib/ClientConnection.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,8 +1322,12 @@ void ClientConnection::close(Result result, bool detach) {
13221322
if (err) {
13231323
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
13241324
}
1325-
} // else: there is an ongoing connect operation, the socket will be closed after the operation
1326-
// succeeds in `handleTcpConnected`
1325+
} else {
1326+
// There is an ongoing connect operation, the socket will be closed after the operation succeeds
1327+
// in `handleTcpConnected`
1328+
LOG_WARN(cnxString_
1329+
<< "Socket is still connecting, it will be closed once the connection attempt finishes");
1330+
}
13271331
}
13281332
state_ = Disconnected;
13291333
if (tlsSocket_) {

lib/ClientConnection.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,12 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
404404
typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
405405
PendingGetSchemaMap pendingGetSchemaRequests_;
406406

407-
mutable std::mutex mutex_;
408-
typedef std::unique_lock<std::mutex> Lock;
407+
// Most accesses to this class happen in the event loop thread, but the `close` method could be called in
408+
// a different thread by `ConnectionPool`, so we have to guard the state of the connection with a mutex.
409+
// However, `close` could be called when a mutex is held, so we have to use a recursive mutex to avoid
410+
// deadlock.
411+
mutable std::recursive_mutex mutex_;
412+
typedef std::unique_lock<std::recursive_mutex> Lock;
409413

410414
// Pending buffers to write on the socket
411415
std::deque<std::any> pendingWriteBuffers_;

lib/MockServer.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
4242
}
4343

4444
void setRequestDelay(std::initializer_list<typename RequestDelayType::value_type> delays) {
45-
std::lock_guard<std::mutex> lock(mutex_);
45+
std::lock_guard lock(mutex_);
4646
for (auto&& delay : delays) {
4747
requestDelays_[delay.first] = delay.second;
4848
}
@@ -53,15 +53,15 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
5353
if (!connection) {
5454
return false;
5555
}
56-
std::lock_guard<std::mutex> lock(mutex_);
56+
std::lock_guard lock(mutex_);
5757
if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) {
5858
// Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers
5959
if (request == "SEEK") {
6060
schedule(connection, "CLOSE_CONSUMER" + std::to_string(requestId),
6161
requestDelays_["CLOSE_CONSUMER"], [connection] {
6262
std::vector<uint64_t> consumerIds;
6363
{
64-
std::lock_guard<std::mutex> lock{connection->mutex_};
64+
std::lock_guard lock{connection->mutex_};
6565
for (auto&& kv : connection->consumers_) {
6666
if (auto consumer = kv.second.lock()) {
6767
consumerIds.push_back(consumer->getConsumerId());
@@ -95,7 +95,7 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
9595

9696
// Return the number of pending timers cancelled
9797
auto close() {
98-
std::lock_guard<std::mutex> lock(mutex_);
98+
std::lock_guard lock(mutex_);
9999
auto result = pendingTimers_.size();
100100
for (auto&& kv : pendingTimers_) {
101101
try {
@@ -124,7 +124,7 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
124124
auto self = shared_from_this();
125125
timer->async_wait([this, self, key, connection, task{std::move(task)}](const auto& ec) {
126126
{
127-
std::lock_guard<std::mutex> lock(mutex_);
127+
std::lock_guard lock(mutex_);
128128
pendingTimers_.erase(key);
129129
}
130130
if (ec) {

tests/PulsarFriend.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class PulsarFriend {
146146

147147
static std::vector<ProducerImplPtr> getProducers(const ClientConnection& cnx) {
148148
std::vector<ProducerImplPtr> producers;
149-
std::lock_guard<std::mutex> lock(cnx.mutex_);
149+
std::lock_guard lock(cnx.mutex_);
150150
for (const auto& kv : cnx.producers_) {
151151
producers.emplace_back(kv.second.lock());
152152
}
@@ -155,7 +155,7 @@ class PulsarFriend {
155155

156156
static std::vector<ConsumerImplPtr> getConsumers(const ClientConnection& cnx) {
157157
std::vector<ConsumerImplPtr> consumers;
158-
std::lock_guard<std::mutex> lock(cnx.mutex_);
158+
std::lock_guard lock(cnx.mutex_);
159159
for (const auto& kv : cnx.consumers_) {
160160
consumers.emplace_back(kv.second.lock());
161161
}

0 commit comments

Comments
 (0)