diff --git a/include/pulsar/TableView.h b/include/pulsar/TableView.h index 2c2898b3..d0c278a7 100644 --- a/include/pulsar/TableView.h +++ b/include/pulsar/TableView.h @@ -51,7 +51,7 @@ class PULSAR_PUBLIC TableView { * TableView view; * std::string value; * while (true) { - * if (view.retrieveValue("key")) { + * if (view.retrieveValue("key", value)) { * std::cout << "value is updated to: " << value; * } else { * // sleep for a while or print the message that value is not updated diff --git a/include/pulsar/c/client.h b/include/pulsar/c/client.h index 3a53c017..5fdd2875 100644 --- a/include/pulsar/c/client.h +++ b/include/pulsar/c/client.h @@ -30,6 +30,8 @@ #include #include #include +#include +#include #include #ifdef __cplusplus @@ -47,6 +49,7 @@ typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_pro typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx); typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx); +typedef void (*pulsar_table_view_callback)(pulsar_result result, pulsar_table_view_t *tableView, void *ctx); typedef void (*pulsar_get_partitions_callback)(pulsar_result result, pulsar_string_list_t *partitions, void *ctx); @@ -172,6 +175,54 @@ PULSAR_PUBLIC void pulsar_client_create_reader_async(pulsar_client_t *client, co const pulsar_message_id_t *startMessageId, pulsar_reader_configuration_t *conf, pulsar_reader_callback callback, void *ctx); +/** + * Create a table view with given {@code table_view_configuration} for specified topic. + * + * The TableView provides a key-value map view of a compacted topic. Messages without keys will + * be ignored. + * + * NOTE: + * When the result in the callback is `ResultOk`, `*c_tableView` will point to the memory that + * is allocated internally. You have to call `pulsar_table_view_free` to free it. + * + * Example: + * ```c + * pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create(); + * pulsar_table_view_configuration_set_subscription_name(table_view_conf, sub_name); + * pulsar_table_view_t *table_view; + * pulsar_result result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view); + * + * // do something... + * + * pulsar_table_view_close(table_view); + * pulsar_table_view_free(table_view); + * pulsar_table_view_configuration_free(table_view_conf); + * + * ``` + * + * @param topic The name of the topic. + * @param conf The {@code table_view_configuration} pointer. + * @param c_tableView The pointer of the table_view pointer + * @return Returned when the table_view is successfully linked to the topic and the map is built from a + * message that already exists. + */ +PULSAR_PUBLIC pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_t **c_tableView); + +/** + * Async Create a table view with given {@code table_view_configuration} for specified topic. + * @param topic The name of the topic. + * @param conf The {@code table_view_configuration} pointer. + * @param callback + * 1. When the result in the callback is `ResultOk`, `tableView` in the callback will point to the memory that + * is allocated internally. You have to call `pulsar_table_view_free` to free it. + * 2. If the result in the callback is not `ResultOk`, `tableView` in the callback will be nullptr. + * @param ctx + */ +PULSAR_PUBLIC void pulsar_client_create_table_view_async(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_callback callback, void *ctx); PULSAR_PUBLIC pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic, pulsar_string_list_t **partitions); diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h new file mode 100644 index 00000000..87eed3e2 --- /dev/null +++ b/include/pulsar/c/table_view.h @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include + +typedef struct _pulsar_table_view pulsar_table_view_t; + +typedef void (*pulsar_table_view_action)(const char *key, const void *value, size_t value_size, void *ctx); +typedef void (*pulsar_result_callback)(pulsar_result, void *); + +/** + * Move the latest value associated with the key. + * + * NOTE: + * 1. Once the value has been retrieved successfully, + * the associated value will be removed from the table view until next time the value is updated. + * 2. Once the value has been retrieved successfully, `*value` will point to the memory that is allocated + * internally. You have to call `free(value)` to free it. + * + * Example: + * + * ```c + * pulsar_table_view_t *table_view; + * void* value; + * size_t value_size; + * while (true) { + * if (pulsar_table_view_retrieve_value(table_view, "key", &value, &value_size)) { + * for (size_t i = 0; i < value_size; i++) { + * printf("0x%02x%c", ((char*) value)[i], (i + 1 == value_size) ? '\n': ' '); + * } + * } else { + * // sleep for a while or print the message that value is not updated + * } + * } + * free(value); + * ``` + * + * @param table_view + * @param key + * @param value the value associated with the key + * @return true if there is an associated value of the key, otherwise false + */ +PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, + void **value, size_t *value_size); + +/** + * It's similar with `pulsar_table_view_retrieve_value` except the associated value not will be removed from + * the table view. + * + * NOTE: + * Once the value has been get successfully, `*value` will point to the memory that is allocated internally. + * You have to call `free(value)` to free it. + * + * @param table_view + * @param key + * @param value the value associated with the key + * @return true if there is an associated value of the key, otherwise false + */ +PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, void **value, + size_t *value_size); + +/** + * Check if the key exists in the table view. + * @param table_view + * @param key + * @return true if the key exists in the table view + */ +PULSAR_PUBLIC bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char *key); + +/** + * Get the size of the elements. + * @param table_view + * @return + */ +PULSAR_PUBLIC int pulsar_table_view_size(pulsar_table_view_t *table_view); + +/** + * Performs the given action for each entry in this map until all entries have been processed or the + * action throws an exception. + */ +PULSAR_PUBLIC void pulsar_table_view_for_each(pulsar_table_view_t *table_view, + pulsar_table_view_action action, void *ctx); + +/** + * Performs the given action for each entry in this map until all entries have been processed and + * register the callback, which will be called each time a key-value pair is updated. + */ +PULSAR_PUBLIC void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, + pulsar_table_view_action action, void *ctx); + +/** + * Free the table view. + * @param table_view + */ +PULSAR_PUBLIC void pulsar_table_view_free(pulsar_table_view_t *table_view); + +/** + * Close the table view and stop the broker to push more messages + * @param table_view + * @return + */ +PULSAR_PUBLIC pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view); + +/** + * Async close the table view and stop the broker to push more messages + * @param table_view + * @param callback + * @param ctx + */ +PULSAR_PUBLIC void pulsar_table_view_close_async(pulsar_table_view_t *table_view, + pulsar_result_callback callback, void *ctx); + +#ifdef __cplusplus +} +#endif diff --git a/include/pulsar/c/table_view_configuration.h b/include/pulsar/c/table_view_configuration.h new file mode 100644 index 00000000..26389b6a --- /dev/null +++ b/include/pulsar/c/table_view_configuration.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "producer_configuration.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _pulsar_table_view_configuration pulsar_table_view_configuration_t; + +PULSAR_PUBLIC pulsar_table_view_configuration_t *pulsar_table_view_configuration_create(); + +PULSAR_PUBLIC void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf); + +PULSAR_PUBLIC void pulsar_table_view_configuration_set_schema_info( + pulsar_table_view_configuration_t *table_view_configuration_t, pulsar_schema_type schemaType, + const char *name, const char *schema, pulsar_string_map_t *properties); + +PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t, const char *subscription_name); + +PULSAR_PUBLIC const char *pulsar_table_view_configuration_get_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t); + +#ifdef __cplusplus +} +#endif diff --git a/lib/c/c_Client.cc b/lib/c/c_Client.cc index dd1b71e4..9f275382 100644 --- a/lib/c/c_Client.cc +++ b/lib/c/c_Client.cc @@ -192,6 +192,34 @@ void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topi std::bind(&handle_reader_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx)); } +pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_t **c_tableView) { + pulsar::TableView tableView; + pulsar::Result res = client->client->createTableView(topic, conf->tableViewConfiguration, tableView); + if (res == pulsar::ResultOk) { + (*c_tableView) = new pulsar_table_view_t; + (*c_tableView)->tableView = std::move(tableView); + return pulsar_result_Ok; + } + return (pulsar_result)res; +} + +void pulsar_client_create_table_view_async(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_callback callback, void *ctx) { + client->client->createTableViewAsync(topic, conf->tableViewConfiguration, + [callback, ctx](pulsar::Result result, pulsar::TableView tableView) { + if (result == pulsar::ResultOk) { + auto *c_tableView = new pulsar_table_view_t; + c_tableView->tableView = std::move(tableView); + callback((pulsar_result)result, c_tableView, ctx); + } else { + callback((pulsar_result)result, NULL, ctx); + } + }); +} + pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic, pulsar_string_list_t **partitions) { std::vector partitionsList; diff --git a/lib/c/c_TableView.cc b/lib/c/c_TableView.cc new file mode 100644 index 00000000..520b0558 --- /dev/null +++ b/lib/c/c_TableView.cc @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "c_structs.h" + +static void *malloc_and_copy(const char *s, size_t slen) { + void *result = (void *)malloc(slen); + if (result == NULL) { + abort(); + } + memcpy(result, s, slen); + return result; +} + +bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, void **value, + size_t *value_size) { + std::string v; + bool result = table_view->tableView.retrieveValue(key, v); + if (result) { + *value = malloc_and_copy(v.c_str(), v.size()); + *value_size = v.size(); + } + return result; +} + +bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, void **value, + size_t *value_size) { + std::string v; + bool result = table_view->tableView.getValue(key, v); + if (result) { + *value = malloc_and_copy(v.c_str(), v.size()); + *value_size = v.size(); + } + return result; +} + +bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char *key) { + return table_view->tableView.containsKey(key); +} + +int pulsar_table_view_size(pulsar_table_view_t *table_view) { return table_view->tableView.size(); } + +void pulsar_table_view_for_each(pulsar_table_view_t *table_view, pulsar_table_view_action action, void *ctx) { + table_view->tableView.forEach([action, ctx](const std::string &key, const std::string &value) { + if (action) { + action(key.c_str(), value.c_str(), value.size(), ctx); + } + }); +} + +void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, pulsar_table_view_action action, + void *ctx) { + table_view->tableView.forEachAndListen([action, ctx](const std::string &key, const std::string &value) { + if (action) { + action(key.c_str(), value.c_str(), value.size(), ctx); + } + }); +} + +void pulsar_table_view_free(pulsar_table_view_t *table_view) { delete table_view; } + +pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view) { + return (pulsar_result)table_view->tableView.close(); +} + +void pulsar_table_view_close_async(pulsar_table_view_t *table_view, pulsar_result_callback callback, + void *ctx) { + table_view->tableView.closeAsync( + [callback, ctx](pulsar::Result result) { return handle_result_callback(result, callback, ctx); }); +} diff --git a/lib/c/c_TableViewConfiguration.cc b/lib/c/c_TableViewConfiguration.cc new file mode 100644 index 00000000..3f8132a3 --- /dev/null +++ b/lib/c/c_TableViewConfiguration.cc @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include "c_structs.h" + +pulsar_table_view_configuration_t *pulsar_table_view_configuration_create() { + auto *table_view_configuration_t = new pulsar_table_view_configuration_t; + table_view_configuration_t->tableViewConfiguration = pulsar::TableViewConfiguration(); + return table_view_configuration_t; +} + +void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf) { delete conf; } + +void pulsar_table_view_configuration_set_schema_info( + pulsar_table_view_configuration_t *table_view_configuration_t, pulsar_schema_type schemaType, + const char *name, const char *schema, pulsar_string_map_t *properties) { + auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map); + table_view_configuration_t->tableViewConfiguration.schemaInfo = schemaInfo; +} + +void pulsar_table_view_configuration_set_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t, const char *subscription_name) { + table_view_configuration_t->tableViewConfiguration.subscriptionName = subscription_name; +} + +const char *pulsar_table_view_configuration_get_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t) { + return table_view_configuration_t->tableViewConfiguration.subscriptionName.c_str(); +} diff --git a/lib/c/c_structs.h b/lib/c/c_structs.h index 1e9c376a..6f15998e 100644 --- a/lib/c/c_structs.h +++ b/lib/c/c_structs.h @@ -48,6 +48,14 @@ struct _pulsar_consumer_configuration { pulsar::ConsumerConfiguration consumerConfiguration; }; +struct _pulsar_table_view { + pulsar::TableView tableView; +}; + +struct _pulsar_table_view_configuration { + pulsar::TableViewConfiguration tableViewConfiguration; +}; + struct _pulsar_reader { pulsar::Reader reader; }; diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc new file mode 100644 index 00000000..4b7c5322 --- /dev/null +++ b/tests/c/c_TableViewTest.cc @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include + +#include + +static const char *lookup_url = "pulsar://localhost:6650"; + +struct tv_create_result { + pulsar_result result; + pulsar_table_view_t *tableView; +}; + +static void create_tv_callback(pulsar_result result, pulsar_table_view_t *tableView, void *ctx) { + std::promise *create_promise = (std::promise *)ctx; + create_promise->set_value({result, tableView}); +} + +TEST(c_TableViewTest, testCreateTableViewAsync) { + const char *topic_name = "persistent://public/default/test-create-tv-async"; + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + // Create table view. + pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create(); + pulsar_table_view_configuration_set_subscription_name(table_view_conf, "sub-name"); + std::promise create_promise; + std::future create_future = create_promise.get_future(); + pulsar_client_create_table_view_async(client, topic_name, table_view_conf, create_tv_callback, + &create_promise); + tv_create_result tvResult = create_future.get(); + ASSERT_EQ(pulsar_result_Ok, tvResult.result); + + pulsar_table_view_free(tvResult.tableView); + pulsar_client_close(client); +} + +struct tv_action_ctx { + char *expect_data; + int size; + int expect_size; + std::promise *listen_promise; +}; + +static void tv_action(const char *key, const void *value, size_t value_size, void *ctx) { + tv_action_ctx *context = (tv_action_ctx *)ctx; + context->size++; + ASSERT_EQ(memcmp(value, context->expect_data, value_size), 0); + if (context->size == context->expect_size) { + context->listen_promise->set_value(true); + } +} + +TEST(c_TableViewTest, testSimpleTableView) { + srand(time(NULL)); + char topic_name[64]; + snprintf(topic_name, 64, "persistent://public/default/test-table-view-%d", rand()); + const char *sub_name = "my-sub-name"; + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + // Send messages + const int num = 10; + const char *key1 = "key1"; + const char *key2 = "key2"; + size_t data_size = 4; + char *data = (char *)malloc(data_size); + data[0] = 0x01; + data[1] = 0x00; + data[2] = 0x02; + data[3] = 0x00; + for (int i = 0; i < num; i++) { + pulsar_message_t *message = pulsar_message_create(); + if (i % 2 == 0) { + pulsar_message_set_partition_key(message, key1); + } else { + pulsar_message_set_partition_key(message, key2); + } + pulsar_message_set_content(message, data, data_size); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + } + + // Create table view. + pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create(); + pulsar_table_view_configuration_set_subscription_name(table_view_conf, sub_name); + pulsar_table_view_t *table_view; + result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view); + ASSERT_EQ(pulsar_result_Ok, result); + + // test get value + void *v1; + size_t v1_size; + ASSERT_EQ(pulsar_table_view_size(table_view), 2); + ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1, &v1_size)); + ASSERT_EQ(v1_size, data_size); + ASSERT_EQ(memcmp(v1, data, data_size), 0); + free(v1); + + // test for each. + tv_action_ctx ctx; + ctx.expect_data = data; + ctx.size = 0; + pulsar_table_view_for_each(table_view, tv_action, &ctx); + ASSERT_EQ(ctx.size, 2); + + // test for each and listen + std::promise listen_promise; + std::future listen_future = listen_promise.get_future(); + tv_action_ctx ctx2; + ctx2.expect_data = data; + ctx2.size = 0; + ctx2.expect_size = 3; + ctx2.listen_promise = &listen_promise; + pulsar_table_view_for_each_add_listen(table_view, tv_action, &ctx2); + ASSERT_EQ(ctx.size, 2); + // send more message. + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_set_partition_key(message, "key3"); + pulsar_message_set_content(message, data, data_size); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + // wait for message. + ASSERT_TRUE(listen_future.get()); + ASSERT_EQ(ctx2.size, ctx2.expect_size); + + // test retrieve value + void *v2; + size_t v2_size; + ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2, &v2_size)); + ASSERT_EQ(v2_size, data_size); + ASSERT_EQ(memcmp(v2, data, data_size), 0); + free(v2); + + // test table view size + ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2")); + ASSERT_EQ(pulsar_table_view_size(table_view), 2); + + free(data); + pulsar_producer_close(producer); + pulsar_table_view_close(table_view); + pulsar_client_close(client); + pulsar_table_view_free(table_view); + pulsar_table_view_configuration_free(table_view_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +}