Skip to content

Commit 01ea0b1

Browse files
committed
Use exception
1 parent 2e690cd commit 01ea0b1

6 files changed

Lines changed: 43 additions & 33 deletions

File tree

lib/ClientConnection.cc

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
189189
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
190190
if (!authentication_) {
191191
LOG_ERROR("Invalid authentication plugin");
192-
close(ResultAuthenticationError, false);
193-
return;
192+
throw ResultException{ResultAuthenticationError};
194193
}
195194
if (clientConfiguration.isUseTls()) {
196195
#if BOOST_VERSION >= 105400
@@ -212,8 +211,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
212211
ctx.load_verify_file(trustCertFilePath);
213212
} else {
214213
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
215-
close(ResultAuthenticationError, false);
216-
return;
214+
throw ResultException{ResultAuthenticationError};
217215
}
218216
} else {
219217
ctx.set_default_verify_paths();
@@ -230,13 +228,11 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
230228
tlsPrivateKey = authData->getTlsPrivateKey();
231229
if (!file_exists(tlsCertificates)) {
232230
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
233-
close(ResultAuthenticationError, false);
234-
return;
231+
throw ResultException{ResultAuthenticationError};
235232
}
236233
if (!file_exists(tlsCertificates)) {
237234
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
238-
close(ResultAuthenticationError, false);
239-
return;
235+
throw ResultException{ResultAuthenticationError};
240236
}
241237
ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem);
242238
ctx.use_certificate_file(tlsCertificates, boost::asio::ssl::context::pem);
@@ -1259,13 +1255,7 @@ void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_cod
12591255
startConsumerStatsTimer(consumerStatsRequests);
12601256
}
12611257

1262-
void ClientConnection::close(Result result, bool constructCompleted) {
1263-
if (!constructCompleted) {
1264-
state_ = Disconnected;
1265-
connectPromise_.setFailed(result);
1266-
return;
1267-
}
1268-
1258+
void ClientConnection::close(Result result) {
12691259
Lock lock(mutex_);
12701260
if (isClosed()) {
12711261
return;

lib/ClientConnection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <boost/asio/strand.hpp>
3333
#include <boost/optional.hpp>
3434
#include <deque>
35+
#include <exception>
3536
#include <functional>
3637
#include <memory>
3738
#include <string>
@@ -146,9 +147,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
146147
* Close the connection.
147148
*
148149
* @param result all pending futures will complete with this result
149-
* @param constructCompleted whether the construction is completed
150150
*/
151-
void close(Result result = ResultConnectError, bool constructCompleted = true);
151+
void close(Result result = ResultConnectError);
152152

153153
bool isClosed() const;
154154

lib/ConnectionPool.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "ClientConnection.h"
2525
#include "ExecutorService.h"
2626
#include "LogUtils.h"
27+
#include "ResultUtils.h"
2728

2829
using boost::asio::ip::tcp;
2930
namespace ssl = boost::asio::ssl;
@@ -54,20 +55,17 @@ bool ConnectionPool::close() {
5455
auto& cnx = cnxIt->second;
5556
if (cnx) {
5657
// The 2nd argument is false because removing a value during the iteration will cause segfault
57-
cnx->close(ResultDisconnected, false);
58+
cnx->close(ResultDisconnected);
5859
}
5960
}
6061
pool_.clear();
6162
return true;
6263
}
6364

64-
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
65-
const std::string& physicalAddress,
66-
size_t keySuffix) {
65+
auto ConnectionPool::getConnectionAsync(const std::string& logicalAddress, const std::string& physicalAddress,
66+
size_t keySuffix) -> ConnectionFuture {
6767
if (closed_) {
68-
Promise<Result, ClientConnectionWeakPtr> promise;
69-
promise.setFailed(ResultAlreadyClosed);
70-
return promise.getFuture();
68+
return ConnectionFuture::failure(ResultAlreadyClosed);
7169
}
7270

7371
std::unique_lock<std::recursive_mutex> lock(mutex_);
@@ -99,12 +97,12 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
9997
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
10098
clientConfiguration_, authentication_, clientVersion_, *this,
10199
keySuffix));
100+
} catch (const ResultException& e) {
101+
return ConnectionFuture::failure(e.result());
102102
} catch (const std::runtime_error& e) {
103103
lock.unlock();
104104
LOG_ERROR("Failed to create connection: " << e.what())
105-
Promise<Result, ClientConnectionWeakPtr> promise;
106-
promise.setFailed(ResultConnectError);
107-
return promise.getFuture();
105+
return ConnectionFuture::failure(ResultConnectError);
108106
}
109107

110108
LOG_INFO("Created connection for " << key);

lib/ConnectionPool.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class PULSAR_PUBLIC ConnectionPool {
5353

5454
void remove(const std::string& key, ClientConnection* value);
5555

56+
using ConnectionFuture = Future<Result, ClientConnectionWeakPtr>;
57+
5658
/**
5759
* Get a connection from the pool.
5860
* <p>
@@ -73,16 +75,15 @@ class PULSAR_PUBLIC ConnectionPool {
7375
* @param keySuffix the key suffix to choose which connection on the same broker
7476
* @return a future that will produce the ClientCnx object
7577
*/
76-
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
77-
const std::string& physicalAddress,
78-
size_t keySuffix);
78+
ConnectionFuture getConnectionAsync(const std::string& logicalAddress, const std::string& physicalAddress,
79+
size_t keySuffix);
7980

80-
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
81-
const std::string& physicalAddress) {
81+
ConnectionFuture getConnectionAsync(const std::string& logicalAddress,
82+
const std::string& physicalAddress) {
8283
return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex());
8384
}
8485

85-
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
86+
ConnectionFuture getConnectionAsync(const std::string& address) {
8687
return getConnectionAsync(address, address);
8788
}
8889

lib/Future.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class Future {
116116

117117
Result get(Type &result) { return state_->get(result); }
118118

119+
static Future<Result, Type> failure(Result result);
120+
119121
private:
120122
InternalStatePtr<Result, Type> state_;
121123

@@ -144,6 +146,13 @@ class Promise {
144146
const InternalStatePtr<Result, Type> state_;
145147
};
146148

149+
template <typename Result, typename Type>
150+
inline Future<Result, Type> Future<Result, Type>::failure(Result result) {
151+
Promise<Result, Type> promise;
152+
promise.setFailed(result);
153+
return promise.getFuture();
154+
}
155+
147156
} // namespace pulsar
148157

149158
#endif

lib/ResultUtils.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,22 @@
2020

2121
#include <pulsar/Result.h>
2222

23+
#include <exception>
24+
2325
namespace pulsar {
2426

2527
inline bool isResultRetryable(Result result) {
2628
return result == ResultRetryable || result == ResultDisconnected;
2729
}
2830

31+
class ResultException : public std::exception {
32+
public:
33+
ResultException(Result result) : result_(result) {}
34+
Result result() const noexcept { return result_; }
35+
const char* what() const noexcept { return strResult(result_); }
36+
37+
private:
38+
const Result result_;
39+
};
40+
2941
} // namespace pulsar

0 commit comments

Comments
 (0)