From 53fbcc40b5e4d2964eacf5a0323a3178a0837d77 Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 6 Feb 2026 01:36:59 +0800 Subject: [PATCH 1/2] feat: add support for OAUTHBEARER token refresh callback --- docs/oauth_bearer_callback.md | 211 +++++++++++++++++++++++++++++++ examples/oauth_example.cpp | 79 ++++++++++++ include/cppkafka/configuration.h | 17 +++ src/configuration.cpp | 19 +++ 4 files changed, 326 insertions(+) create mode 100644 docs/oauth_bearer_callback.md create mode 100644 examples/oauth_example.cpp diff --git a/docs/oauth_bearer_callback.md b/docs/oauth_bearer_callback.md new file mode 100644 index 00000000..c791ea64 --- /dev/null +++ b/docs/oauth_bearer_callback.md @@ -0,0 +1,211 @@ +# OAuth Bearer Token Refresh Callback + +## Overview + +The OAuth bearer token refresh callback allows you to implement custom OAuth bearer token generation and refresh logic in cppkafka. This is particularly useful for authentication mechanisms like AWS MSK IAM, Azure Event Hubs, or any custom OAuth implementation. + +## API + +### Setting the Callback + +```cpp +using OAuthBearerTokenRefreshCallback = std::function; + +Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback); +``` + +### Getting the Callback + +```cpp +const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const; +``` + +## Usage + +### Basic Setup + +1. **Create a callback function** that generates or fetches OAuth tokens: + +```cpp +void my_oauth_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { + // Parse config if needed + // Generate token + std::string token = generate_my_token(); + int64_t expiry_ms = get_token_expiry(); + std::string principal = get_principal(); + + // Set the token + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + expiry_ms, + principal.c_str(), + nullptr, 0, + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } +} +``` + +2. **Configure Kafka with OAUTHBEARER**: + +```cpp +Configuration config = { + {"metadata.broker.list", "broker:9092"}, + {"group.id", "my-consumer"}, + {"sasl.mechanism", "OAUTHBEARER"}, + {"security.protocol", "SASL_SSL"} +}; +``` + +3. **Set the callback**: + +```cpp +config.set_oauthbearer_token_refresh_callback(my_oauth_callback); +``` + +4. **Create consumer or producer**: + +```cpp +Consumer consumer(config); +// or +Producer producer(config); +``` + +## Callback Parameters + +### KafkaHandleBase& handle +The Kafka handle (consumer or producer) requesting token refresh. Use `handle.get_handle()` to get the underlying `rd_kafka_t*` pointer for calling librdkafka functions. + +### const std::string& oauthbearer_config +The value of the `sasl.oauthbearer.config` configuration property. You can use this to pass custom parameters to your callback. + +## Callback Responsibilities + +Your callback must either: + +1. **Successfully set a token** using `rd_kafka_oauthbearer_set_token()`, or +2. **Report failure** using `rd_kafka_oauthbearer_set_token_failure()` + +Failure to do either will result in authentication hanging. + +## Complete Example: AWS MSK IAM + +```cpp +#include +#include + +void aws_msk_token_callback(KafkaHandleBase& handle, const std::string& config) { + try { + // Get AWS credentials + auto provider = Aws::Auth::DefaultAWSCredentialsProviderChain(); + auto credentials = provider.GetAWSCredentials(); + + // Generate MSK IAM token (simplified) + std::string token = generate_msk_iam_token(credentials, "us-east-1"); + int64_t expiry_ms = current_time_ms() + 300000; // 5 minutes + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + expiry_ms, + credentials.GetAWSAccessKeyId().c_str(), + nullptr, 0, + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } + } catch (const std::exception& e) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what()); + } +} + +int main() { + Configuration config = { + {"metadata.broker.list", "b-1.mycluster.kafka.us-east-1.amazonaws.com:9098"}, + {"security.protocol", "SASL_SSL"}, + {"sasl.mechanism", "OAUTHBEARER"}, + {"sasl.oauthbearer.config", "region=us-east-1"} + }; + + config.set_oauthbearer_token_refresh_callback(aws_msk_token_callback); + + Consumer consumer(config); + consumer.subscribe({"my-topic"}); + + // Process messages... +} +``` + +## When is the Callback Invoked? + +The callback is invoked: + +1. **On initial connection** - Before the first authentication attempt +2. **Before token expiry** - Automatically when the current token is about to expire +3. **On authentication failure** - If the broker rejects the current token + +## Thread Safety + +The callback may be invoked from librdkafka's internal threads. Ensure your callback is thread-safe if it accesses shared resources. + +## Error Handling + +Always handle errors in your callback: + +```cpp +void safe_oauth_callback(KafkaHandleBase& handle, const std::string& config) { + try { + // Token generation logic + std::string token = generate_token(); + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + expiry_ms, + principal.c_str(), + nullptr, 0, + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } + } catch (const std::exception& e) { + // Always report failures + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), e.what()); + } catch (...) { + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), + "Unknown error generating token"); + } +} +``` + +## Background Token Refresh + +For background token refresh (useful for long-lived consumers with low traffic): + +```cpp +// Enable SASL queue for background callbacks +rd_kafka_conf_enable_sasl_queue(config.get_handle(), 1); + +// Enable background SASL callbacks (if supported) +rd_kafka_sasl_background_callbacks_enable(consumer.get_handle()); +``` + +This ensures tokens are refreshed even when the consumer is idle. + +## See Also + +- [librdkafka OAuth documentation](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#authentication) +- [AWS MSK IAM authentication](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) +- [OAuth 2.0 Bearer Token Usage](https://tools.ietf.org/html/rfc6750) \ No newline at end of file diff --git a/examples/oauth_example.cpp b/examples/oauth_example.cpp new file mode 100644 index 00000000..844a756b --- /dev/null +++ b/examples/oauth_example.cpp @@ -0,0 +1,79 @@ +/* + * OAuth Bearer Token Refresh Callback Example + * + * This example demonstrates how to use the OAuth bearer token refresh callback + * in cppkafka. This is useful for authentication mechanisms like AWS MSK IAM. + */ + +#include +#include + +using namespace cppkafka; + +// Example token refresh callback +void oauth_token_refresh_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { + std::cout << "OAuth token refresh requested" << std::endl; + std::cout << "Config: " << oauthbearer_config << std::endl; + + // In a real implementation, you would: + // 1. Parse the oauthbearer_config to get any necessary parameters + // 2. Generate or fetch a new OAuth token + // 3. Call rd_kafka_oauthbearer_set_token() with the new token + // or rd_kafka_oauthbearer_set_token_failure() if token generation fails + + // Example (simplified): + std::string token = "your-generated-token"; + int64_t token_expiry_ms = 3600000; // 1 hour from now + std::string principal = "your-principal"; + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_oauthbearer_set_token( + handle.get_handle(), + token.c_str(), + token_expiry_ms, + principal.c_str(), + nullptr, 0, // no extensions + errstr, sizeof(errstr) + ); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + std::cerr << "Failed to set OAuth token: " << errstr << std::endl; + rd_kafka_oauthbearer_set_token_failure(handle.get_handle(), errstr); + } else { + std::cout << "OAuth token set successfully" << std::endl; + } +} + +int main() { + // Create configuration + Configuration config = { + {"metadata.broker.list", "localhost:9092"}, + {"group.id", "example-consumer"}, + {"sasl.mechanism", "OAUTHBEARER"}, + {"security.protocol", "SASL_SSL"} + }; + + // Set the OAuth bearer token refresh callback + config.set_oauthbearer_token_refresh_callback(oauth_token_refresh_callback); + + // Create consumer + Consumer consumer(config); + + // Subscribe to topics + consumer.subscribe({"test-topic"}); + + std::cout << "Consumer created with OAuth callback" << std::endl; + std::cout << "The callback will be invoked when token refresh is needed" << std::endl; + + // Poll for messages (the callback will be triggered as needed) + while (true) { + Message msg = consumer.poll(); + if (msg) { + if (!msg.get_error()) { + std::cout << "Received message: " << msg.get_payload() << std::endl; + } + } + } + + return 0; +} \ No newline at end of file diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index c97f5a83..6cdf4dbf 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -80,6 +80,8 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { using StatsCallback = std::function; using SocketCallback = std::function; using BackgroundEventCallback = std::function; + using OAuthBearerTokenRefreshCallback = std::function; using ConfigurationBase::set; using ConfigurationBase::get; @@ -144,6 +146,15 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { */ Configuration& set_socket_callback(SocketCallback callback); + /** + * Sets the OAuth bearer token refresh callback (invokes rd_kafka_conf_set_oauthbearer_token_refresh_cb) + * + * This callback is triggered when the SASL/OAUTHBEARER token needs to be refreshed. + * The callback should generate a new token and call rd_kafka_oauthbearer_set_token() + * or rd_kafka_oauthbearer_set_token_failure() on the rd_kafka_t handle. + */ + Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback); + #if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION /** * Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb) @@ -223,6 +234,11 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { */ const BackgroundEventCallback& get_background_event_callback() const; + /** + * Gets the OAuth bearer token refresh callback + */ + const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() const; + /** * Gets the default topic configuration */ @@ -249,6 +265,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { StatsCallback stats_callback_; SocketCallback socket_callback_; BackgroundEventCallback background_event_callback_; + OAuthBearerTokenRefreshCallback oauthbearer_token_refresh_callback_; }; } // cppkafka diff --git a/src/configuration.cpp b/src/configuration.cpp index 5a59c517..1565d2f6 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -109,6 +109,14 @@ void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, v (*handle, Event{event_ptr}); } +void oauthbearer_token_refresh_callback_proxy(rd_kafka_t*, const char* oauthbearer_config, void* opaque) { + KafkaHandleBase* handle = static_cast(opaque); + string config = oauthbearer_config ? oauthbearer_config : ""; + CallbackInvoker + ("oauthbearer_token_refresh", handle->get_configuration().get_oauthbearer_token_refresh_callback(), handle) + (*handle, config); +} + // Configuration Configuration::Configuration() @@ -184,6 +192,12 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) { return *this; } +Configuration& Configuration::set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback) { + oauthbearer_token_refresh_callback_ = move(callback); + rd_kafka_conf_set_oauthbearer_token_refresh_cb(handle_.get(), &oauthbearer_token_refresh_callback_proxy); + return *this; +} + #if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) { background_event_callback_ = move(callback); @@ -264,6 +278,11 @@ Configuration::get_background_event_callback() const { return background_event_callback_; } +const Configuration::OAuthBearerTokenRefreshCallback& +Configuration::get_oauthbearer_token_refresh_callback() const { + return oauthbearer_token_refresh_callback_; +} + const optional& Configuration::get_default_topic_configuration() const { return default_topic_config_; } From 8d1ef67c956870b70c0da6b57da1892d5747b005 Mon Sep 17 00:00:00 2001 From: Arbin Date: Thu, 26 Feb 2026 16:31:39 +0800 Subject: [PATCH 2/2] Change OAuthBearerTokenRefreshCallback parameter to pointer Change the oauthbearer_config parameter from 'const std::string&' to 'const std::string*' to maintain semantic consistency with librdkafka. This allows the callback to distinguish between: - Config not set (nullptr) - Config set to empty string ('') Updated files: - include/cppkafka/configuration.h: Changed callback signature - src/configuration.cpp: Modified proxy to pass nullptr instead of empty string - examples/oauth_example.cpp: Updated example to check for nullptr - docs/oauth_bearer_callback.md: Updated all documentation and examples Addresses PR review comment from antaljanosbenjamin. --- docs/oauth_bearer_callback.md | 36 +++++++++++++++++++++++++------- examples/oauth_example.cpp | 8 +++++-- include/cppkafka/configuration.h | 2 +- src/configuration.cpp | 9 ++++++-- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/docs/oauth_bearer_callback.md b/docs/oauth_bearer_callback.md index c791ea64..17c64554 100644 --- a/docs/oauth_bearer_callback.md +++ b/docs/oauth_bearer_callback.md @@ -10,7 +10,7 @@ The OAuth bearer token refresh callback allows you to implement custom OAuth bea ```cpp using OAuthBearerTokenRefreshCallback = std::function; + const std::string* oauthbearer_config)>; Configuration& set_oauthbearer_token_refresh_callback(OAuthBearerTokenRefreshCallback callback); ``` @@ -28,8 +28,13 @@ const OAuthBearerTokenRefreshCallback& get_oauthbearer_token_refresh_callback() 1. **Create a callback function** that generates or fetches OAuth tokens: ```cpp -void my_oauth_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { - // Parse config if needed +void my_oauth_callback(KafkaHandleBase& handle, const std::string* oauthbearer_config) { + // Check if config is provided + if (oauthbearer_config) { + // Parse config if needed + // Use *oauthbearer_config to access the value + } + // Generate token std::string token = generate_my_token(); int64_t expiry_ms = get_token_expiry(); @@ -82,8 +87,8 @@ Producer producer(config); ### KafkaHandleBase& handle The Kafka handle (consumer or producer) requesting token refresh. Use `handle.get_handle()` to get the underlying `rd_kafka_t*` pointer for calling librdkafka functions. -### const std::string& oauthbearer_config -The value of the `sasl.oauthbearer.config` configuration property. You can use this to pass custom parameters to your callback. +### const std::string* oauthbearer_config +A pointer to the value of the `sasl.oauthbearer.config` configuration property. If the configuration property is not set, this will be `nullptr`. You can use this to pass custom parameters to your callback. Always check for `nullptr` before dereferencing. ## Callback Responsibilities @@ -100,14 +105,24 @@ Failure to do either will result in authentication hanging. #include #include -void aws_msk_token_callback(KafkaHandleBase& handle, const std::string& config) { +void aws_msk_token_callback(KafkaHandleBase& handle, const std::string* config) { try { + // Parse region from config if provided, otherwise use default + std::string region = "us-east-1"; + if (config && !config->empty()) { + // Parse config (e.g., "region=us-east-1") + // Simplified parsing shown here + if (config->find("region=") == 0) { + region = config->substr(7); + } + } + // Get AWS credentials auto provider = Aws::Auth::DefaultAWSCredentialsProviderChain(); auto credentials = provider.GetAWSCredentials(); // Generate MSK IAM token (simplified) - std::string token = generate_msk_iam_token(credentials, "us-east-1"); + std::string token = generate_msk_iam_token(credentials, region); int64_t expiry_ms = current_time_ms() + 300000; // 5 minutes char errstr[512]; @@ -162,8 +177,13 @@ The callback may be invoked from librdkafka's internal threads. Ensure your call Always handle errors in your callback: ```cpp -void safe_oauth_callback(KafkaHandleBase& handle, const std::string& config) { +void safe_oauth_callback(KafkaHandleBase& handle, const std::string* config) { try { + // Check if config is provided + if (config) { + // Use *config to access the configuration string + } + // Token generation logic std::string token = generate_token(); diff --git a/examples/oauth_example.cpp b/examples/oauth_example.cpp index 844a756b..23f1278a 100644 --- a/examples/oauth_example.cpp +++ b/examples/oauth_example.cpp @@ -11,9 +11,13 @@ using namespace cppkafka; // Example token refresh callback -void oauth_token_refresh_callback(KafkaHandleBase& handle, const std::string& oauthbearer_config) { +void oauth_token_refresh_callback(KafkaHandleBase& handle, const std::string* oauthbearer_config) { std::cout << "OAuth token refresh requested" << std::endl; - std::cout << "Config: " << oauthbearer_config << std::endl; + if (oauthbearer_config) { + std::cout << "Config: " << *oauthbearer_config << std::endl; + } else { + std::cout << "Config: not set" << std::endl; + } // In a real implementation, you would: // 1. Parse the oauthbearer_config to get any necessary parameters diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 6cdf4dbf..32be4967 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -81,7 +81,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase { using SocketCallback = std::function; using BackgroundEventCallback = std::function; using OAuthBearerTokenRefreshCallback = std::function; + const std::string* oauthbearer_config)>; using ConfigurationBase::set; using ConfigurationBase::get; diff --git a/src/configuration.cpp b/src/configuration.cpp index 1565d2f6..d87f8761 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -111,10 +111,15 @@ void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, v void oauthbearer_token_refresh_callback_proxy(rd_kafka_t*, const char* oauthbearer_config, void* opaque) { KafkaHandleBase* handle = static_cast(opaque); - string config = oauthbearer_config ? oauthbearer_config : ""; + const string* config_ptr = nullptr; + string config_value; + if (oauthbearer_config) { + config_value = oauthbearer_config; + config_ptr = &config_value; + } CallbackInvoker ("oauthbearer_token_refresh", handle->get_configuration().get_oauthbearer_token_refresh_callback(), handle) - (*handle, config); + (*handle, config_ptr); } // Configuration