From e13662cf2f22b291a148105168b2b1b5c2308ace Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 18 Apr 2023 07:53:06 +0800 Subject: [PATCH 1/4] Complete batch receive other feture --- include/pulsar/c/consumer.h | 13 ++++++ include/pulsar/c/consumer_configuration.h | 25 ++++++++++ lib/c/c_Consumer.cc | 15 ++++++ lib/c/c_ConsumerConfiguration.cc | 17 +++++++ tests/c/c_ConsumerConfigurationTest.cc | 10 ++++ tests/c/c_ConsumerTest.cc | 56 +++++++++++++++++++++-- 6 files changed, 131 insertions(+), 5 deletions(-) diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 664608e5..d652529c 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -35,6 +35,8 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); typedef void (*pulsar_receive_callback)(pulsar_result result, pulsar_message_t *msg, void *ctx); +typedef void (*pulsar_batch_receive_callback)(pulsar_result result, pulsar_messages_t *msg, void *ctx); + /** * @return the topic this consumer is subscribed to */ @@ -119,6 +121,17 @@ PULSAR_PUBLIC void pulsar_consumer_receive_async(pulsar_consumer_t *consumer, PULSAR_PUBLIC pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer, pulsar_messages_t **msgs); +/** + * Async batch receiving messages. + * + * @param callback + * 1. When it's callback call result is `ResultOk`, `*msg` will point to the memory that + * is allocated internally. You have to call `pulsar_messages_free` to free it. + * 2. If callback call result is not `ResultOk`, `*msg` will is nullptr. + */ +PULSAR_PUBLIC void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, + pulsar_batch_receive_callback callback, void *ctx); + /** * Acknowledge the reception of a single message. * diff --git a/include/pulsar/c/consumer_configuration.h b/include/pulsar/c/consumer_configuration.h index 16124a8f..dea92f50 100644 --- a/include/pulsar/c/consumer_configuration.h +++ b/include/pulsar/c/consumer_configuration.h @@ -89,6 +89,15 @@ typedef enum pulsar_consumer_regex_sub_mode_AllTopics = 2 } pulsar_consumer_regex_subscription_mode; +typedef struct { + // Max num message, if less than 0, it means no limit. + int maxNumMessage; + // Max num bytes, if less than 0, it means no limit. + long maxNumBytes; + // If less than 0, it means no limit. + long timeoutMs; +} pulsar_consumer_batch_receive_policy_t; + /// Callback definition for MessageListener typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx); @@ -328,6 +337,22 @@ PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode pulsar_consumer_configuration_get_regex_subscription_mode( pulsar_consumer_configuration_t *consumer_configuration); +/** + * Set batch receive policy. + * + * The default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100} + * @param consumer_configuration the consumer conf object. + * @param maxNumMessage default: Max num message, if less than 0, it means no limit. + * @param maxNumBytes Max num bytes, if less than 0, it means no limit. + * @param timeoutMs If less than 0, it means no limit. + */ +PULSAR_PUBLIC void pulsar_consumer_configuration_set_batch_receive_policy( + pulsar_consumer_configuration_t *consumer_configuration, + const pulsar_consumer_batch_receive_policy_t *batch_receive_policy); + +PULSAR_PUBLIC pulsar_consumer_batch_receive_policy_t pulsar_consumer_configuration_get_batch_receive_policy( + pulsar_consumer_configuration_t *consumer_configuration); + // const CryptoKeyReaderPtr getCryptoKeyReader() // // const; diff --git a/lib/c/c_Consumer.cc b/lib/c/c_Consumer.cc index b574e129..86c697ff 100644 --- a/lib/c/c_Consumer.cc +++ b/lib/c/c_Consumer.cc @@ -73,6 +73,21 @@ pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer, pulsar_ return (pulsar_result)res; } +void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback, + void *ctx) { + consumer->consumer.batchReceiveAsync([callback, ctx](pulsar::Result result, pulsar::Messages messages) { + pulsar_messages_t *msgs = nullptr; + if (callback && result == pulsar::ResultOk) { + msgs = new pulsar_messages_t; + msgs->messages.resize(messages.size()); + for (size_t i = 0; i < messages.size(); i++) { + msgs->messages[i].message = messages[i]; + } + } + callback((pulsar_result)result, msgs, ctx); + }); +} + static void handle_receive_callback(pulsar::Result result, pulsar::Message message, pulsar_receive_callback callback, void *ctx) { if (callback) { diff --git a/lib/c/c_ConsumerConfiguration.cc b/lib/c/c_ConsumerConfiguration.cc index 9cc3f201..6b808b65 100644 --- a/lib/c/c_ConsumerConfiguration.cc +++ b/lib/c/c_ConsumerConfiguration.cc @@ -250,3 +250,20 @@ pulsar_consumer_regex_subscription_mode pulsar_consumer_configuration_get_regex_ return (pulsar_consumer_regex_subscription_mode) consumer_configuration->consumerConfiguration.getRegexSubscriptionMode(); } + +void pulsar_consumer_configuration_set_batch_receive_policy( + pulsar_consumer_configuration_t *consumer_configuration, + const pulsar_consumer_batch_receive_policy_t *batch_receive_policy_t) { + pulsar::BatchReceivePolicy batchReceivePolicy(batch_receive_policy_t->maxNumMessage, + batch_receive_policy_t->maxNumBytes, + batch_receive_policy_t->timeoutMs); + consumer_configuration->consumerConfiguration.setBatchReceivePolicy(batchReceivePolicy); +} + +pulsar_consumer_batch_receive_policy_t pulsar_consumer_configuration_get_batch_receive_policy( + pulsar_consumer_configuration_t *consumer_configuration) { + pulsar::BatchReceivePolicy batchReceivePolicy = + consumer_configuration->consumerConfiguration.getBatchReceivePolicy(); + return {batchReceivePolicy.getMaxNumMessages(), batchReceivePolicy.getMaxNumBytes(), + batchReceivePolicy.getTimeoutMs()}; +} diff --git a/tests/c/c_ConsumerConfigurationTest.cc b/tests/c/c_ConsumerConfigurationTest.cc index 3984e6d4..629c6f37 100644 --- a/tests/c/c_ConsumerConfigurationTest.cc +++ b/tests/c/c_ConsumerConfigurationTest.cc @@ -42,4 +42,14 @@ TEST(C_ConsumerConfigurationTest, testCApiConfig) { consumer_conf, pulsar_consumer_regex_sub_mode_NonPersistentOnly); ASSERT_EQ(pulsar_consumer_configuration_get_regex_subscription_mode(consumer_conf), pulsar_consumer_regex_sub_mode_NonPersistentOnly); + + pulsar_consumer_batch_receive_policy_t batch_receive_policy{10, 1000, 1000}; + pulsar_consumer_configuration_set_batch_receive_policy(consumer_conf, &batch_receive_policy); + pulsar_consumer_batch_receive_policy_t get_batch_receive_policy = + pulsar_consumer_configuration_get_batch_receive_policy(consumer_conf); + ASSERT_EQ(get_batch_receive_policy.maxNumMessage, 10); + ASSERT_EQ(get_batch_receive_policy.maxNumBytes, 1000); + ASSERT_EQ(get_batch_receive_policy.timeoutMs, 1000); + + pulsar_consumer_configuration_free(consumer_conf); } diff --git a/tests/c/c_ConsumerTest.cc b/tests/c/c_ConsumerTest.cc index db352576..f4e5ce7a 100644 --- a/tests/c/c_ConsumerTest.cc +++ b/tests/c/c_ConsumerTest.cc @@ -23,8 +23,41 @@ #include #include +#include + static const char *lookup_url = "pulsar://localhost:6650"; +struct batch_receive_ctx { + pulsar_result result; + pulsar_consumer_t *consumer; + std::promise *promise; + int expect_receive_num; +}; + +static void batch_receive_callback(pulsar_result async_result, pulsar_messages_t *msgs, void *ctx) { + printf("todo callback.... \n"); + struct batch_receive_ctx *receive_ctx = (struct batch_receive_ctx *)ctx; + receive_ctx->result = async_result; + if (async_result == pulsar_result_Ok) { + ASSERT_EQ(pulsar_messages_size(msgs), receive_ctx->expect_receive_num); + for (int i = 0; i < pulsar_messages_size(msgs); i++) { + pulsar_message_t *msg = pulsar_messages_get(msgs, i); + size_t length = pulsar_message_get_length(msg); + char *str = (char *)malloc(pulsar_message_get_length(msg)); + strncpy(str, (const char *)pulsar_message_get_data(msg), length); + + char expected_str[128]; + snprintf(expected_str, sizeof(expected_str), "msg-%d", 10 + i); + printf("%d received: %s (%zd), expected: %s (%zd)\n", i, str, strlen(str), expected_str, + strlen(expected_str)); + ASSERT_EQ(strcmp(str, expected_str), 0); + free(str); + } + pulsar_messages_free(msgs); + receive_ctx->promise->set_value(); + } +} + TEST(c_ConsumerTest, testBatchReceive) { pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); pulsar_client_t *client = pulsar_client_create(lookup_url, conf); @@ -39,11 +72,16 @@ TEST(c_ConsumerTest, testBatchReceive) { pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); pulsar_consumer_t *consumer; + + const int batch_receive_max_size = 10; + pulsar_consumer_batch_receive_policy_t batch_receive_policy{batch_receive_max_size, -1, -1}; + pulsar_consumer_configuration_set_batch_receive_policy(consumer_conf, &batch_receive_policy); + result = pulsar_client_subscribe(client, topic, "sub", consumer_conf, &consumer); ASSERT_EQ(pulsar_result_Ok, result); - const int num_messages = 10; - for (int i = 0; i < num_messages; i++) { + // Sending two more messages proves that the batch_receive_policy works. + for (int i = 0; i < batch_receive_max_size * 2; i++) { pulsar_message_t *msg = pulsar_message_create(); char buf[128]; snprintf(buf, sizeof(buf), "msg-%d", i); @@ -54,8 +92,8 @@ TEST(c_ConsumerTest, testBatchReceive) { pulsar_messages_t *msgs = NULL; ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_batch_receive(consumer, &msgs)); - ASSERT_EQ(pulsar_messages_size(msgs), num_messages); - for (int i = 0; i < num_messages; i++) { + ASSERT_EQ(pulsar_messages_size(msgs), batch_receive_max_size); + for (int i = 0; i < batch_receive_max_size; i++) { pulsar_message_t *msg = pulsar_messages_get(msgs, i); size_t length = pulsar_message_get_length(msg); char *str = (char *)malloc(pulsar_message_get_length(msg)); @@ -69,9 +107,17 @@ TEST(c_ConsumerTest, testBatchReceive) { free(str); } + pulsar_messages_free(msgs); + + std::promise receive_promise; + std::future receive_future = receive_promise.get_future(); + struct batch_receive_ctx batch_receive_ctx = {pulsar_result_UnknownError, consumer, &receive_promise, + batch_receive_max_size}; + pulsar_consumer_batch_receive_async(consumer, batch_receive_callback, &batch_receive_ctx); + receive_future.get(); + ASSERT_EQ(pulsar_result_Ok, batch_receive_ctx.result); pulsar_client_close(client); - pulsar_messages_free(msgs); pulsar_consumer_free(consumer); pulsar_consumer_configuration_free(consumer_conf); pulsar_producer_free(producer); From 51947414936bf0af861ac227a2414313a46d6b90 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 19 Apr 2023 10:12:43 +0800 Subject: [PATCH 2/4] Fix code reviews. --- include/pulsar/c/consumer.h | 4 ++-- lib/c/c_Consumer.cc | 16 +++++++++------- tests/c/c_ConsumerTest.cc | 18 +++++++----------- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index d652529c..69712448 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -125,9 +125,9 @@ PULSAR_PUBLIC pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *con * Async batch receiving messages. * * @param callback - * 1. When it's callback call result is `ResultOk`, `*msg` will point to the memory that + * 1. When the result in the callback is `ResultOk`, `*msg` in the callback will point to the memory that * is allocated internally. You have to call `pulsar_messages_free` to free it. - * 2. If callback call result is not `ResultOk`, `*msg` will is nullptr. + * 2. If the result in the callback is not `ResultOk`, `*msg` in the callback will is nullptr. */ PULSAR_PUBLIC void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback, void *ctx); diff --git a/lib/c/c_Consumer.cc b/lib/c/c_Consumer.cc index 86c697ff..0fc3c080 100644 --- a/lib/c/c_Consumer.cc +++ b/lib/c/c_Consumer.cc @@ -76,15 +76,17 @@ pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer, pulsar_ void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback, void *ctx) { consumer->consumer.batchReceiveAsync([callback, ctx](pulsar::Result result, pulsar::Messages messages) { - pulsar_messages_t *msgs = nullptr; - if (callback && result == pulsar::ResultOk) { - msgs = new pulsar_messages_t; - msgs->messages.resize(messages.size()); - for (size_t i = 0; i < messages.size(); i++) { - msgs->messages[i].message = messages[i]; + if (callback) { + pulsar_messages_t *msgs = nullptr; + if (result == pulsar::ResultOk) { + msgs = new pulsar_messages_t; + msgs->messages.resize(messages.size()); + for (size_t i = 0; i < messages.size(); i++) { + msgs->messages[i].message = messages[i]; + } } + callback((pulsar_result)result, msgs, ctx); } - callback((pulsar_result)result, msgs, ctx); }); } diff --git a/tests/c/c_ConsumerTest.cc b/tests/c/c_ConsumerTest.cc index f4e5ce7a..bebfd287 100644 --- a/tests/c/c_ConsumerTest.cc +++ b/tests/c/c_ConsumerTest.cc @@ -28,16 +28,14 @@ static const char *lookup_url = "pulsar://localhost:6650"; struct batch_receive_ctx { - pulsar_result result; pulsar_consumer_t *consumer; - std::promise *promise; + std::promise *promise; int expect_receive_num; }; static void batch_receive_callback(pulsar_result async_result, pulsar_messages_t *msgs, void *ctx) { - printf("todo callback.... \n"); struct batch_receive_ctx *receive_ctx = (struct batch_receive_ctx *)ctx; - receive_ctx->result = async_result; + receive_ctx->promise->set_value(async_result); if (async_result == pulsar_result_Ok) { ASSERT_EQ(pulsar_messages_size(msgs), receive_ctx->expect_receive_num); for (int i = 0; i < pulsar_messages_size(msgs); i++) { @@ -54,7 +52,6 @@ static void batch_receive_callback(pulsar_result async_result, pulsar_messages_t free(str); } pulsar_messages_free(msgs); - receive_ctx->promise->set_value(); } } @@ -109,13 +106,12 @@ TEST(c_ConsumerTest, testBatchReceive) { } pulsar_messages_free(msgs); - std::promise receive_promise; - std::future receive_future = receive_promise.get_future(); - struct batch_receive_ctx batch_receive_ctx = {pulsar_result_UnknownError, consumer, &receive_promise, - batch_receive_max_size}; + std::promise receive_promise; + std::future receive_future = receive_promise.get_future(); + struct batch_receive_ctx batch_receive_ctx = {consumer, &receive_promise, batch_receive_max_size}; pulsar_consumer_batch_receive_async(consumer, batch_receive_callback, &batch_receive_ctx); - receive_future.get(); - ASSERT_EQ(pulsar_result_Ok, batch_receive_ctx.result); + pulsar_client_close(client); + ASSERT_EQ(pulsar_result_Ok, receive_future.get()); pulsar_client_close(client); pulsar_consumer_free(consumer); From d0716afe5b86a149b26836079c5fc8751d95923f Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 19 Apr 2023 17:03:32 +0800 Subject: [PATCH 3/4] Change notes. --- include/pulsar/c/consumer.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 69712448..28b8c5a2 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -35,7 +35,7 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); typedef void (*pulsar_receive_callback)(pulsar_result result, pulsar_message_t *msg, void *ctx); -typedef void (*pulsar_batch_receive_callback)(pulsar_result result, pulsar_messages_t *msg, void *ctx); +typedef void (*pulsar_batch_receive_callback)(pulsar_result result, pulsar_messages_t *msgs, void *ctx); /** * @return the topic this consumer is subscribed to @@ -125,9 +125,9 @@ PULSAR_PUBLIC pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *con * Async batch receiving messages. * * @param callback - * 1. When the result in the callback is `ResultOk`, `*msg` in the callback will point to the memory that + * 1. When the result in the callback is `ResultOk`, `msgs` in the callback will point to the memory that * is allocated internally. You have to call `pulsar_messages_free` to free it. - * 2. If the result in the callback is not `ResultOk`, `*msg` in the callback will is nullptr. + * 2. If the result in the callback is not `ResultOk`, `msgs` in the callback will is nullptr. */ PULSAR_PUBLIC void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback, void *ctx); From 35a0e8229279774dd2c8d5a6988eea2b18df8322 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 19 Apr 2023 17:58:47 +0800 Subject: [PATCH 4/4] Fix typo --- include/pulsar/c/consumer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 28b8c5a2..b39c5c4a 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -127,7 +127,7 @@ PULSAR_PUBLIC pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *con * @param callback * 1. When the result in the callback is `ResultOk`, `msgs` in the callback will point to the memory that * is allocated internally. You have to call `pulsar_messages_free` to free it. - * 2. If the result in the callback is not `ResultOk`, `msgs` in the callback will is nullptr. + * 2. If the result in the callback is not `ResultOk`, `msgs` in the callback will be nullptr. */ PULSAR_PUBLIC void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback, void *ctx);