Skip to content

Commit bd26df6

Browse files
authored
IGNITE-22141 C++ Client: Implement Partition Awareness (#7966)
1 parent 4d034f4 commit bd26df6

31 files changed

Lines changed: 1677 additions & 108 deletions

.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsDebLinux : BuildType({
2020
""".trimIndent()
2121

2222
params {
23-
param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
2423
param("PATH__CMAKE_BUILD_DIRECTORY", "%PATH__WORKING_DIR%/cmake-build-debug")
2524
param("PATH__ODBC_TEST_RESULTS", "%PATH__WORKING_DIR%/odbc_tests_results.xml")
2625
text("PATH__WORKING_DIR", "%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp", display = ParameterDisplay.HIDDEN, allowEmpty = true)

.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsRpmLinux : BuildType({
2020
""".trimIndent()
2121

2222
params {
23-
param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
2423
param("PATH__CMAKE_BUILD_DIRECTORY", "%PATH__WORKING_DIR%/cmake-build-debug")
2524
param("PATH__ODBC_TEST_RESULTS", "%PATH__WORKING_DIR%/odbc_tests_results.xml")
2625
text("PATH__WORKING_DIR", "%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp", display = ParameterDisplay.HIDDEN, allowEmpty = true)

.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsTgzLinux : BuildType({
2020
""".trimIndent()
2121

2222
params {
23-
param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
2423
param("PATH__CMAKE_BUILD_DIRECTORY", "%PATH__WORKING_DIR%/cmake-build-debug")
2524
param("PATH__ODBC_TEST_RESULTS", "%PATH__WORKING_DIR%/odbc_tests_results.xml")
2625
text("PATH__WORKING_DIR", "%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp", display = ParameterDisplay.HIDDEN, allowEmpty = true)

.teamcity/test/platform_tests/PlatformCppTestsLinux.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ object PlatformCppTestsLinux : BuildType({
2424
""".trimIndent()
2525

2626
params {
27-
param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
2827
param("PATH__CMAKE_BUILD_DIRECTORY", "%PATH__WORKING_DIR%/cmake-build-debug")
2928
param("PATH__CLIENT_TEST_RESULTS", "%PATH__WORKING_DIR%/cpp_client_tests_results.xml")
3029
param("PATH__UNIT_TESTS_RESULT", "%PATH__WORKING_DIR%/cpp_unit_test_results.xml")

.teamcity/test/platform_tests/PlatformCppTestsWindows.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ object PlatformCppTestsWindows : BuildType({
2727
""".trimIndent()
2828

2929
params {
30-
hiddenText("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
3130
hiddenText("PATH__CMAKE_BUILD_DIRECTORY", """%PATH__WORKING_DIR%\cmake-build-debug""")
3231
hiddenText("PATH__CLIENT_TEST_RESULTS", """%PATH__CMAKE_BUILD_DIRECTORY%\cpp_client_tests_results.xml""")
3332
hiddenText("PATH__ODBC_TEST_RESULTS", """%PATH__CMAKE_BUILD_DIRECTORY%\odbc_tests_results.xml""")

modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ void cluster_connection::stop() {
9292

9393
void cluster_connection::on_connection_success(const end_point &addr, uint64_t id) {
9494
m_logger->log_info("Established connection with remote host " + addr.to_string());
95-
m_logger->log_debug("Connection ID: " + std::to_string(id));
95+
96+
if (m_logger->is_debug_enabled())
97+
m_logger->log_debug("Connection ID: " + std::to_string(id));
9698

9799
auto connection = node_connection::make_new(
98100
id, m_pool, weak_from_this(), m_logger, m_configuration, m_timer_thread);
@@ -214,6 +216,16 @@ void cluster_connection::on_observable_timestamp_changed(std::int64_t timestamp)
214216
}
215217
}
216218

219+
void cluster_connection::on_partition_assignment_changed(std::int64_t timestamp) {
220+
auto expected = m_assignment_timestamp.load();
221+
while (expected < timestamp) {
222+
auto success = m_assignment_timestamp.compare_exchange_weak(expected, timestamp);
223+
if (success)
224+
return;
225+
expected = m_assignment_timestamp.load();
226+
}
227+
}
228+
217229
void cluster_connection::remove_client(uint64_t id) {
218230
[[maybe_unused]] std::unique_lock<std::recursive_mutex> lock(m_connections_mutex);
219231

@@ -262,9 +274,28 @@ std::shared_ptr<node_connection> cluster_connection::get_random_connected_channe
262274
return std::next(m_connections.begin(), idx)->second;
263275
}
264276

277+
std::shared_ptr<node_connection> cluster_connection::get_channel(
278+
const std::optional<std::string>& preferred_node_name) {
279+
280+
if (preferred_node_name) {
281+
std::unique_lock lock(m_connections_mutex);
282+
for (auto& [id, conn] : m_connections) {
283+
if (conn->get_node_name() == *preferred_node_name) {
284+
return conn;
285+
}
286+
}
287+
}
288+
289+
return get_random_connected_channel();
290+
}
291+
265292
std::pair<std::shared_ptr<node_connection>, std::int64_t> cluster_connection::perform_request_handler(
266-
const operation_function_type &op_func, transaction_impl *tx, const writer_function_type &wr,
267-
const std::shared_ptr<response_handler> &handler) {
293+
const operation_function_type &op_func,
294+
transaction_impl *tx,
295+
const writer_function_type &wr,
296+
const std::shared_ptr<response_handler> &handler,
297+
const std::optional<std::string>& preferred_node_name) {
298+
268299
if (tx) {
269300
auto channel = tx->get_connection();
270301
if (!channel)
@@ -279,7 +310,7 @@ std::pair<std::shared_ptr<node_connection>, std::int64_t> cluster_connection::pe
279310
}
280311

281312
while (true) {
282-
auto channel = get_random_connected_channel();
313+
auto channel = get_channel(preferred_node_name);
283314
if (!channel)
284315
throw ignite_error(error::code::CONNECTION, "No nodes connected");
285316

@@ -290,10 +321,15 @@ std::pair<std::shared_ptr<node_connection>, std::int64_t> cluster_connection::pe
290321
}
291322
}
292323

293-
void cluster_connection::perform_request_raw(protocol::client_operation op, transaction_impl *tx,
294-
const writer_function_type &wr, ignite_callback<bytes_view> callback) {
324+
void cluster_connection::perform_request_raw(
325+
protocol::client_operation op,
326+
transaction_impl *tx,
327+
const writer_function_type &wr,
328+
ignite_callback<bytes_view> callback,
329+
const std::optional<std::string>& preferred_node_name) {
295330
auto handler = std::make_shared<response_handler_raw>(std::move(callback));
296-
perform_request_handler(static_op(op), tx, wr, std::move(handler));
331+
332+
perform_request_handler(static_op(op), tx, wr, std::move(handler), preferred_node_name);
297333
}
298334

299335
} // namespace ignite::detail

modules/platforms/cpp/ignite/client/detail/cluster_connection.h

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,15 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
106106
* @param tx Transaction.
107107
* @param wr Request writer function.
108108
* @param handler Request handler.
109+
* @param preferred_node_name Name of preferred node.
109110
* @return A connection used to perform request and the request ID.
110111
*/
111-
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_handler(const operation_function_type &op_func,
112-
transaction_impl *tx, const writer_function_type &wr, const std::shared_ptr<response_handler> &handler);
112+
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_handler(
113+
const operation_function_type &op_func,
114+
transaction_impl *tx,
115+
const writer_function_type &wr,
116+
const std::shared_ptr<response_handler> &handler,
117+
const std::optional<std::string>& preferred_node_name = std::nullopt);
113118

114119
/**
115120
* Perform request raw.
@@ -119,9 +124,14 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
119124
* @param tx Transaction.
120125
* @param wr Request writer function.
121126
* @param callback Callback to call on a result.
127+
* @param preferred_node_name Name of preferred node.
122128
*/
123-
void perform_request_raw(protocol::client_operation op, transaction_impl *tx,
124-
const writer_function_type &wr, ignite_callback<bytes_view> callback);
129+
void perform_request_raw(
130+
protocol::client_operation op,
131+
transaction_impl *tx,
132+
const writer_function_type &wr,
133+
ignite_callback<bytes_view> callback,
134+
const std::optional<std::string>& preferred_node_name = std::nullopt);
125135

126136
/**
127137
* Perform request raw.
@@ -151,13 +161,15 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
151161
* @param wr Request writer function.
152162
* @param rd Response reader function.
153163
* @param callback Callback to call on a result.
164+
* @param preferred_node_name Name of preferred node.
154165
*/
155166
template<typename T>
156167
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request(protocol::client_operation op,
157168
transaction_impl *tx, const writer_function_type &wr,
158-
reader_function_type<T> rd, ignite_callback<T> callback) {
169+
reader_function_type<T> rd, ignite_callback<T> callback,
170+
const std::optional<std::string>& preferred_node_name = std::nullopt) {
159171
auto handler = std::make_shared<response_handler_reader<T>>(std::move(rd), std::move(callback));
160-
return perform_request_handler(static_op(op), tx, wr, std::move(handler));
172+
return perform_request_handler(static_op(op), tx, wr, std::move(handler), preferred_node_name);
161173
}
162174

163175
/**
@@ -189,7 +201,7 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
189201
void perform_request(protocol::client_operation op, const writer_function_type &wr,
190202
std::function<T(protocol::reader &, std::shared_ptr<node_connection>)> rd, ignite_callback<T> callback) {
191203
auto handler = std::make_shared<response_handler_reader_connection<T>>(std::move(rd), std::move(callback));
192-
perform_request_handler(static_op(op), nullptr, wr, std::move(handler));
204+
perform_request_handler(static_op(op), nullptr, wr, std::move(handler), std::nullopt);
193205
}
194206

195207
/**
@@ -261,9 +273,13 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
261273
* @return A connection used to perform request and the request ID.
262274
*/
263275
template<typename T>
264-
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_wr(protocol::client_operation op,
265-
transaction_impl *tx, const writer_function_type &wr, ignite_callback<T> callback) {
266-
return perform_request<T>(op, tx, wr, [](protocol::reader &) {}, std::move(callback));
276+
std::pair<std::shared_ptr<node_connection>, std::int64_t> perform_request_wr(
277+
protocol::client_operation op,
278+
transaction_impl *tx,
279+
const writer_function_type &wr,
280+
ignite_callback<T> callback,
281+
const std::optional<std::string>& preferred_node_name = std::nullopt) {
282+
return perform_request<T>(op, tx, wr, [](protocol::reader &) {}, std::move(callback), preferred_node_name);
267283
}
268284

269285
/**
@@ -273,6 +289,20 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
273289
*/
274290
std::int64_t get_observable_timestamp() const { return m_observable_timestamp.load(); }
275291

292+
/**
293+
* Get assignment timestamp.
294+
*
295+
* @return Assignment timestamp.
296+
*/
297+
std::int64_t get_assignment_timestamp() const { return m_assignment_timestamp.load(); }
298+
299+
/**
300+
* Get logger.
301+
*
302+
* @return Logger.
303+
*/
304+
[[nodiscard]] std::shared_ptr<ignite_logger> get_logger() const { return m_logger; }
305+
276306
/**
277307
* @param op Operation code to return.
278308
* @return A function that always returns the same operation.
@@ -289,6 +319,14 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
289319
*/
290320
std::shared_ptr<node_connection> get_random_connected_channel();
291321

322+
/**
323+
* Get connection according to provided preference otherwise returns random node connection.
324+
*
325+
* @param preferred_node_name Name of preferred node.
326+
* @return Node connection.
327+
*/
328+
std::shared_ptr<node_connection> get_channel(const std::optional<std::string> &preferred_node_name);
329+
292330
/**
293331
* Constructor.
294332
*
@@ -342,6 +380,13 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
342380
*/
343381
void on_observable_timestamp_changed(std::int64_t timestamp) override;
344382

383+
/**
384+
* Handle partition assignment change.
385+
*
386+
* @param timestamp Assignment timestamp.
387+
*/
388+
void on_partition_assignment_changed(std::int64_t timestamp) override;
389+
345390
/**
346391
* Remove client.
347392
*
@@ -404,6 +449,9 @@ class cluster_connection : public std::enable_shared_from_this<cluster_connectio
404449
/** Observable timestamp. */
405450
std::atomic_int64_t m_observable_timestamp{0};
406451

452+
/** Partition assignment timestamp. */
453+
std::atomic_int64_t m_assignment_timestamp{0};
454+
407455
/** Timer thread. */
408456
std::shared_ptr<thread_timer> m_timer_thread;
409457
};

modules/platforms/cpp/ignite/client/detail/connection_event_handler.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ class connection_event_handler {
4444
* @param timestamp Timestamp.
4545
*/
4646
virtual void on_observable_timestamp_changed(std::int64_t timestamp) = 0;
47+
48+
/**
49+
* Handle partition assignment change.
50+
*
51+
* @param timestamp Assignment timestamp.
52+
*/
53+
virtual void on_partition_assignment_changed(std::int64_t timestamp) = 0;
4754
};
4855

4956
} // namespace ignite::detail

modules/platforms/cpp/ignite/client/detail/node_connection.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,12 @@ void node_connection::process_message(bytes_view msg) {
6969

7070
auto req_id = reader.read_int64();
7171
auto flags = reader.read_int32();
72+
auto event_handler = m_event_handler.lock();
7273
if (test_flag(flags, protocol::response_flag::PARTITION_ASSIGNMENT_CHANGED)) {
7374
auto assignment_ts = reader.read_int64();
74-
UNUSED_VALUE assignment_ts;
75+
if (event_handler) {
76+
event_handler->on_partition_assignment_changed(assignment_ts);
77+
}
7578
}
7679

7780
auto observable_timestamp = reader.read_int64();
@@ -180,6 +183,8 @@ ignite_result<void> node_connection::process_handshake_rsp(bytes_view msg) {
180183
std::chrono::milliseconds(response.idle_timeout_ms));
181184

182185
m_protocol_context = response.context;
186+
m_node_id = response.node_id;
187+
m_node_name = response.node_name;
183188
m_handshake_complete = true;
184189

185190
if (m_heartbeat_interval.count()) {

modules/platforms/cpp/ignite/client/detail/node_connection.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,20 @@ class node_connection : public std::enable_shared_from_this<node_connection> {
232232
*/
233233
std::shared_ptr<ignite_logger> get_logger() const { return m_logger; }
234234

235+
/**
236+
* Cancels waiting for over-due responses.
237+
*/
235238
void handle_timeouts();
236239

240+
/**
241+
* Name of the node this connection is tethered to.
242+
*
243+
* @return Name of the node.
244+
*/
245+
const std::string& get_node_name() const {
246+
return m_node_name;
247+
}
248+
237249
private:
238250
/**
239251
* Constructor.
@@ -334,6 +346,12 @@ class node_connection : public std::enable_shared_from_this<node_connection> {
334346

335347
/** Timer thread. */
336348
std::weak_ptr<thread_timer> m_timer_thread;
349+
350+
/** Node id. */
351+
uuid m_node_id{};
352+
353+
/** Name of the node this connection is tethered to. */
354+
std::string m_node_name{};
337355
};
338356

339357
} // namespace ignite::detail

0 commit comments

Comments
 (0)