Skip to content

Commit f9621ef

Browse files
committed
Hash thread id by index
1 parent f22f757 commit f9621ef

5 files changed

Lines changed: 7 additions & 10 deletions

File tree

lib/ClientImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
516516
}
517517
}
518518

519-
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, int key) {
519+
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
520520
Promise<Result, ClientConnectionPtr> promise;
521521

522522
const auto topicNamePtr = TopicName::get(topic);

lib/ClientImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
9595

9696
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
9797

98-
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, int key);
98+
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
9999

100100
void closeAsync(CloseCallback callback);
101101
void shutdown();

lib/ConnectionPool.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ bool ConnectionPool::close() {
6363

6464
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
6565
const std::string& physicalAddress,
66-
int keySuffix) {
66+
size_t keySuffix) {
6767
if (closed_) {
6868
Promise<Result, ClientConnectionWeakPtr> promise;
6969
promise.setFailed(ResultAlreadyClosed);
@@ -96,8 +96,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
9696
// No valid or pending connection found in the pool, creating a new one
9797
ClientConnectionPtr cnx;
9898
try {
99-
size_t index = executorIndex_++;
100-
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(index),
99+
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
101100
clientConfiguration_, authentication_, clientVersion_, *this));
102101
} catch (const std::runtime_error& e) {
103102
lock.unlock();

lib/ConnectionPool.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class PULSAR_PUBLIC ConnectionPool {
7575
*/
7676
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
7777
const std::string& physicalAddress,
78-
int keySuffix);
78+
size_t keySuffix);
7979

8080
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
8181
const std::string& physicalAddress) {
@@ -86,7 +86,7 @@ class PULSAR_PUBLIC ConnectionPool {
8686
return getConnectionAsync(address, address);
8787
}
8888

89-
int generateRandomIndex() { return randomDistribution_(randomEngine_); }
89+
size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }
9090

9191
private:
9292
ClientConfiguration clientConfiguration_;
@@ -97,8 +97,6 @@ class PULSAR_PUBLIC ConnectionPool {
9797
const std::string clientVersion_;
9898
mutable std::recursive_mutex mutex_;
9999
std::atomic_bool closed_{false};
100-
// Use a separated index so that connections will be distributed uniformly across the executors
101-
std::atomic_size_t executorIndex_{0};
102100

103101
std::uniform_int_distribution<> randomDistribution_;
104102
std::mt19937 randomEngine_;

lib/HandlerBase.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
9999

100100
protected:
101101
ClientImplWeakPtr client_;
102-
const int connectionKeySuffix_;
102+
const size_t connectionKeySuffix_;
103103
ExecutorServicePtr executor_;
104104
mutable std::mutex mutex_;
105105
std::mutex pendingReceiveMutex_;

0 commit comments

Comments
 (0)