Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/pulsar/c/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *);

typedef void (*pulsar_receive_callback)(pulsar_result result, pulsar_message_t *msg, void *ctx);

typedef void (*pulsar_batch_receive_callback)(pulsar_result result, pulsar_messages_t *msgs, void *ctx);

/**
* @return the topic this consumer is subscribed to
*/
Expand Down Expand Up @@ -119,6 +121,17 @@ PULSAR_PUBLIC void pulsar_consumer_receive_async(pulsar_consumer_t *consumer,
PULSAR_PUBLIC pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer,
pulsar_messages_t **msgs);

/**
* Async batch receiving messages.
*
* @param callback
* 1. When the result in the callback is `ResultOk`, `msgs` in the callback will point to the memory that
* is allocated internally. You have to call `pulsar_messages_free` to free it.
* 2. If the result in the callback is not `ResultOk`, `msgs` in the callback will be nullptr.
*/
PULSAR_PUBLIC void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer,
pulsar_batch_receive_callback callback, void *ctx);

/**
* Acknowledge the reception of a single message.
*
Expand Down
25 changes: 25 additions & 0 deletions include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ typedef enum
pulsar_consumer_regex_sub_mode_AllTopics = 2
} pulsar_consumer_regex_subscription_mode;

typedef struct {
// Max num message, if less than 0, it means no limit.
int maxNumMessage;
// Max num bytes, if less than 0, it means no limit.
long maxNumBytes;
// If less than 0, it means no limit.
long timeoutMs;
} pulsar_consumer_batch_receive_policy_t;

/// Callback definition for MessageListener
typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);

Expand Down Expand Up @@ -328,6 +337,22 @@ PULSAR_PUBLIC pulsar_consumer_regex_subscription_mode
pulsar_consumer_configuration_get_regex_subscription_mode(
pulsar_consumer_configuration_t *consumer_configuration);

/**
* Set batch receive policy.
*
* The default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
* @param consumer_configuration the consumer conf object.
* @param maxNumMessage default: Max num message, if less than 0, it means no limit.
* @param maxNumBytes Max num bytes, if less than 0, it means no limit.
* @param timeoutMs If less than 0, it means no limit.
*/
PULSAR_PUBLIC void pulsar_consumer_configuration_set_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration,
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy);

PULSAR_PUBLIC pulsar_consumer_batch_receive_policy_t pulsar_consumer_configuration_get_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration);

// const CryptoKeyReaderPtr getCryptoKeyReader()
//
// const;
Expand Down
17 changes: 17 additions & 0 deletions lib/c/c_Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,23 @@ pulsar_result pulsar_consumer_batch_receive(pulsar_consumer_t *consumer, pulsar_
return (pulsar_result)res;
}

void pulsar_consumer_batch_receive_async(pulsar_consumer_t *consumer, pulsar_batch_receive_callback callback,
void *ctx) {
consumer->consumer.batchReceiveAsync([callback, ctx](pulsar::Result result, pulsar::Messages messages) {
if (callback) {
pulsar_messages_t *msgs = nullptr;
if (result == pulsar::ResultOk) {
msgs = new pulsar_messages_t;
msgs->messages.resize(messages.size());
for (size_t i = 0; i < messages.size(); i++) {
msgs->messages[i].message = messages[i];
}
}
callback((pulsar_result)result, msgs, ctx);
}
});
}

static void handle_receive_callback(pulsar::Result result, pulsar::Message message,
pulsar_receive_callback callback, void *ctx) {
if (callback) {
Expand Down
17 changes: 17 additions & 0 deletions lib/c/c_ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,20 @@ pulsar_consumer_regex_subscription_mode pulsar_consumer_configuration_get_regex_
return (pulsar_consumer_regex_subscription_mode)
consumer_configuration->consumerConfiguration.getRegexSubscriptionMode();
}

void pulsar_consumer_configuration_set_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration,
const pulsar_consumer_batch_receive_policy_t *batch_receive_policy_t) {
pulsar::BatchReceivePolicy batchReceivePolicy(batch_receive_policy_t->maxNumMessage,
batch_receive_policy_t->maxNumBytes,
batch_receive_policy_t->timeoutMs);
consumer_configuration->consumerConfiguration.setBatchReceivePolicy(batchReceivePolicy);
}

pulsar_consumer_batch_receive_policy_t pulsar_consumer_configuration_get_batch_receive_policy(
pulsar_consumer_configuration_t *consumer_configuration) {
pulsar::BatchReceivePolicy batchReceivePolicy =
consumer_configuration->consumerConfiguration.getBatchReceivePolicy();
return {batchReceivePolicy.getMaxNumMessages(), batchReceivePolicy.getMaxNumBytes(),
batchReceivePolicy.getTimeoutMs()};
}
10 changes: 10 additions & 0 deletions tests/c/c_ConsumerConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ TEST(C_ConsumerConfigurationTest, testCApiConfig) {
consumer_conf, pulsar_consumer_regex_sub_mode_NonPersistentOnly);
ASSERT_EQ(pulsar_consumer_configuration_get_regex_subscription_mode(consumer_conf),
pulsar_consumer_regex_sub_mode_NonPersistentOnly);

pulsar_consumer_batch_receive_policy_t batch_receive_policy{10, 1000, 1000};
pulsar_consumer_configuration_set_batch_receive_policy(consumer_conf, &batch_receive_policy);
pulsar_consumer_batch_receive_policy_t get_batch_receive_policy =
pulsar_consumer_configuration_get_batch_receive_policy(consumer_conf);
ASSERT_EQ(get_batch_receive_policy.maxNumMessage, 10);
ASSERT_EQ(get_batch_receive_policy.maxNumBytes, 1000);
ASSERT_EQ(get_batch_receive_policy.timeoutMs, 1000);

pulsar_consumer_configuration_free(consumer_conf);
}
52 changes: 47 additions & 5 deletions tests/c/c_ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,38 @@
#include <string.h>
#include <time.h>

#include <future>

static const char *lookup_url = "pulsar://localhost:6650";

struct batch_receive_ctx {
pulsar_consumer_t *consumer;
std::promise<pulsar_result> *promise;
int expect_receive_num;
};

static void batch_receive_callback(pulsar_result async_result, pulsar_messages_t *msgs, void *ctx) {
struct batch_receive_ctx *receive_ctx = (struct batch_receive_ctx *)ctx;
receive_ctx->promise->set_value(async_result);
if (async_result == pulsar_result_Ok) {
ASSERT_EQ(pulsar_messages_size(msgs), receive_ctx->expect_receive_num);
for (int i = 0; i < pulsar_messages_size(msgs); i++) {
pulsar_message_t *msg = pulsar_messages_get(msgs, i);
size_t length = pulsar_message_get_length(msg);
char *str = (char *)malloc(pulsar_message_get_length(msg));
strncpy(str, (const char *)pulsar_message_get_data(msg), length);

char expected_str[128];
snprintf(expected_str, sizeof(expected_str), "msg-%d", 10 + i);
printf("%d received: %s (%zd), expected: %s (%zd)\n", i, str, strlen(str), expected_str,
strlen(expected_str));
ASSERT_EQ(strcmp(str, expected_str), 0);
free(str);
}
pulsar_messages_free(msgs);
}
}

TEST(c_ConsumerTest, testBatchReceive) {
pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
Expand All @@ -39,11 +69,16 @@ TEST(c_ConsumerTest, testBatchReceive) {

pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
pulsar_consumer_t *consumer;

const int batch_receive_max_size = 10;
pulsar_consumer_batch_receive_policy_t batch_receive_policy{batch_receive_max_size, -1, -1};
pulsar_consumer_configuration_set_batch_receive_policy(consumer_conf, &batch_receive_policy);

result = pulsar_client_subscribe(client, topic, "sub", consumer_conf, &consumer);
ASSERT_EQ(pulsar_result_Ok, result);

const int num_messages = 10;
for (int i = 0; i < num_messages; i++) {
// Sending two more messages proves that the batch_receive_policy works.
for (int i = 0; i < batch_receive_max_size * 2; i++) {
pulsar_message_t *msg = pulsar_message_create();
char buf[128];
snprintf(buf, sizeof(buf), "msg-%d", i);
Expand All @@ -54,8 +89,8 @@ TEST(c_ConsumerTest, testBatchReceive) {

pulsar_messages_t *msgs = NULL;
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_batch_receive(consumer, &msgs));
ASSERT_EQ(pulsar_messages_size(msgs), num_messages);
for (int i = 0; i < num_messages; i++) {
ASSERT_EQ(pulsar_messages_size(msgs), batch_receive_max_size);
for (int i = 0; i < batch_receive_max_size; i++) {
pulsar_message_t *msg = pulsar_messages_get(msgs, i);
size_t length = pulsar_message_get_length(msg);
char *str = (char *)malloc(pulsar_message_get_length(msg));
Expand All @@ -69,9 +104,16 @@ TEST(c_ConsumerTest, testBatchReceive) {

free(str);
}
pulsar_messages_free(msgs);

std::promise<pulsar_result> receive_promise;
std::future<pulsar_result> receive_future = receive_promise.get_future();
struct batch_receive_ctx batch_receive_ctx = {consumer, &receive_promise, batch_receive_max_size};
pulsar_consumer_batch_receive_async(consumer, batch_receive_callback, &batch_receive_ctx);
pulsar_client_close(client);
ASSERT_EQ(pulsar_result_Ok, receive_future.get());

pulsar_client_close(client);
pulsar_messages_free(msgs);
pulsar_consumer_free(consumer);
pulsar_consumer_configuration_free(consumer_conf);
pulsar_producer_free(producer);
Expand Down