From 33a7c9a4a0159895f5d54410a05519b703f68f60 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 27 Jun 2023 19:14:34 +0800 Subject: [PATCH 1/5] feat: Support table view for C client. --- include/pulsar/TableView.h | 2 +- include/pulsar/c/client.h | 51 +++++++ include/pulsar/c/table_view.h | 147 ++++++++++++++++++++ include/pulsar/c/table_view_configuration.h | 48 +++++++ lib/c/c_Client.cc | 28 ++++ lib/c/c_TableView.cc | 95 +++++++++++++ lib/c/c_TableViewConfiguration.cc | 46 ++++++ lib/c/c_structs.h | 8 ++ tests/c/c_TableViewTest.cc | 92 ++++++++++++ 9 files changed, 516 insertions(+), 1 deletion(-) create mode 100644 include/pulsar/c/table_view.h create mode 100644 include/pulsar/c/table_view_configuration.h create mode 100644 lib/c/c_TableView.cc create mode 100644 lib/c/c_TableViewConfiguration.cc create mode 100644 tests/c/c_TableViewTest.cc diff --git a/include/pulsar/TableView.h b/include/pulsar/TableView.h index 2c2898b3..d0c278a7 100644 --- a/include/pulsar/TableView.h +++ b/include/pulsar/TableView.h @@ -51,7 +51,7 @@ class PULSAR_PUBLIC TableView { * TableView view; * std::string value; * while (true) { - * if (view.retrieveValue("key")) { + * if (view.retrieveValue("key", value)) { * std::cout << "value is updated to: " << value; * } else { * // sleep for a while or print the message that value is not updated diff --git a/include/pulsar/c/client.h b/include/pulsar/c/client.h index 3a53c017..5fdd2875 100644 --- a/include/pulsar/c/client.h +++ b/include/pulsar/c/client.h @@ -30,6 +30,8 @@ #include #include #include +#include +#include #include #ifdef __cplusplus @@ -47,6 +49,7 @@ typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_pro typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx); typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx); +typedef void (*pulsar_table_view_callback)(pulsar_result result, pulsar_table_view_t *tableView, void *ctx); typedef void (*pulsar_get_partitions_callback)(pulsar_result result, pulsar_string_list_t *partitions, void *ctx); @@ -172,6 +175,54 @@ PULSAR_PUBLIC void pulsar_client_create_reader_async(pulsar_client_t *client, co const pulsar_message_id_t *startMessageId, pulsar_reader_configuration_t *conf, pulsar_reader_callback callback, void *ctx); +/** + * Create a table view with given {@code table_view_configuration} for specified topic. + * + * The TableView provides a key-value map view of a compacted topic. Messages without keys will + * be ignored. + * + * NOTE: + * When the result in the callback is `ResultOk`, `*c_tableView` will point to the memory that + * is allocated internally. You have to call `pulsar_table_view_free` to free it. + * + * Example: + * ```c + * pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create(); + * pulsar_table_view_configuration_set_subscription_name(table_view_conf, sub_name); + * pulsar_table_view_t *table_view; + * pulsar_result result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view); + * + * // do something... + * + * pulsar_table_view_close(table_view); + * pulsar_table_view_free(table_view); + * pulsar_table_view_configuration_free(table_view_conf); + * + * ``` + * + * @param topic The name of the topic. + * @param conf The {@code table_view_configuration} pointer. + * @param c_tableView The pointer of the table_view pointer + * @return Returned when the table_view is successfully linked to the topic and the map is built from a + * message that already exists. + */ +PULSAR_PUBLIC pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_t **c_tableView); + +/** + * Async Create a table view with given {@code table_view_configuration} for specified topic. + * @param topic The name of the topic. + * @param conf The {@code table_view_configuration} pointer. + * @param callback + * 1. When the result in the callback is `ResultOk`, `tableView` in the callback will point to the memory that + * is allocated internally. You have to call `pulsar_table_view_free` to free it. + * 2. If the result in the callback is not `ResultOk`, `tableView` in the callback will be nullptr. + * @param ctx + */ +PULSAR_PUBLIC void pulsar_client_create_table_view_async(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_callback callback, void *ctx); PULSAR_PUBLIC pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic, pulsar_string_list_t **partitions); diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h new file mode 100644 index 00000000..b64816c6 --- /dev/null +++ b/include/pulsar/c/table_view.h @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include + +typedef struct _pulsar_table_view pulsar_table_view_t; + +typedef void (*pulsar_table_view_action)(const char *key, const char *value, void *ctx); +typedef void (*pulsar_result_callback)(pulsar_result, void *); + +/** + * Move the latest value associated with the key. + * + * NOTE: + * 1. Once the value has been retrieved successfully, + * the associated value will be removed from the table view until next time the value is updated. + * 2. Once the value has been retrieved successfully, `*value` will point to the memory that is allocated + * internally. You have to call `delete value` to free it. + * + * Example: + * + * ```c + * pulsar_table_view_t *table_view; + * char *value; + * while (true) { + * if (pulsar_table_view_retrieve_value(table_view, "key", &value)) { + * printf("value is update to: %s", value); + * } else { + * // sleep for a while or print the message that value is not updated + * } + * } + * delete value; + * ``` + * + * @param table_view + * @param key + * @param value the value associated with the key + * @return true if there is an associated value of the key, otherwise false + */ +PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, + char **value); + +/** + * It's similar with `pulsar_table_view_retrieve_value` except the associated value not will be removed from + * the table view. + * + * NOTE: + * Once the value has been get successfully, `*value` will point to the memory that is allocated internally. + * You have to call `delete value` to free it. + * + * @param table_view + * @param key + * @param value the value associated with the key + * @return true if there is an associated value of the key, otherwise false + */ +PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, + char **value); + +/** + * Check if the key exists in the table view. + * @param table_view + * @param key + * @return true if the key exists in the table view + */ +PULSAR_PUBLIC bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char *key); + +/** + * Get the size of the elements. + * @param table_view + * @return + */ +PULSAR_PUBLIC int pulsar_table_view_size(pulsar_table_view_t *table_view); + +/** + * Performs the given action for each entry in this map until all entries have been processed or the + * action throws an exception. + */ +PULSAR_PUBLIC void pulsar_table_view_for_each(pulsar_table_view_t *table_view, + pulsar_table_view_action action, void *ctx); + +/** + * Performs the given action for each entry in this map until all entries have been processed and + * register the callback, which will be called each time a key-value pair is updated. + */ +PULSAR_PUBLIC void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, + pulsar_table_view_action action, void *ctx); + +/** + * Move the table view data into the pulsar_string_map_t. + * + * @param table_view + * @return *string_map `string_map` will point to the memory that is allocated internally. + * You have to call `pulsar_string_map_free` to free it. + */ +PULSAR_PUBLIC pulsar_string_map_t *pulsar_table_view_snapshot(pulsar_table_view_t *table_view); + +/** + * Free the table view. + * @param table_view + */ +PULSAR_PUBLIC void pulsar_table_view_free(pulsar_table_view_t *table_view); + +/** + * Close the table view and stop the broker to push more messages + * @param table_view + * @return + */ +PULSAR_PUBLIC pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view); + +/** + * Async close the table view and stop the broker to push more messages + * @param table_view + * @param callback + * @param ctx + */ +PULSAR_PUBLIC void pulsar_table_view_close_async(pulsar_table_view_t *table_view, + pulsar_result_callback callback, void *ctx); + +#ifdef __cplusplus +} +#endif diff --git a/include/pulsar/c/table_view_configuration.h b/include/pulsar/c/table_view_configuration.h new file mode 100644 index 00000000..26389b6a --- /dev/null +++ b/include/pulsar/c/table_view_configuration.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "producer_configuration.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _pulsar_table_view_configuration pulsar_table_view_configuration_t; + +PULSAR_PUBLIC pulsar_table_view_configuration_t *pulsar_table_view_configuration_create(); + +PULSAR_PUBLIC void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf); + +PULSAR_PUBLIC void pulsar_table_view_configuration_set_schema_info( + pulsar_table_view_configuration_t *table_view_configuration_t, pulsar_schema_type schemaType, + const char *name, const char *schema, pulsar_string_map_t *properties); + +PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t, const char *subscription_name); + +PULSAR_PUBLIC const char *pulsar_table_view_configuration_get_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t); + +#ifdef __cplusplus +} +#endif diff --git a/lib/c/c_Client.cc b/lib/c/c_Client.cc index dd1b71e4..9f275382 100644 --- a/lib/c/c_Client.cc +++ b/lib/c/c_Client.cc @@ -192,6 +192,34 @@ void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topi std::bind(&handle_reader_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx)); } +pulsar_result pulsar_client_create_table_view(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_t **c_tableView) { + pulsar::TableView tableView; + pulsar::Result res = client->client->createTableView(topic, conf->tableViewConfiguration, tableView); + if (res == pulsar::ResultOk) { + (*c_tableView) = new pulsar_table_view_t; + (*c_tableView)->tableView = std::move(tableView); + return pulsar_result_Ok; + } + return (pulsar_result)res; +} + +void pulsar_client_create_table_view_async(pulsar_client_t *client, const char *topic, + pulsar_table_view_configuration_t *conf, + pulsar_table_view_callback callback, void *ctx) { + client->client->createTableViewAsync(topic, conf->tableViewConfiguration, + [callback, ctx](pulsar::Result result, pulsar::TableView tableView) { + if (result == pulsar::ResultOk) { + auto *c_tableView = new pulsar_table_view_t; + c_tableView->tableView = std::move(tableView); + callback((pulsar_result)result, c_tableView, ctx); + } else { + callback((pulsar_result)result, NULL, ctx); + } + }); +} + pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic, pulsar_string_list_t **partitions) { std::vector partitionsList; diff --git a/lib/c/c_TableView.cc b/lib/c/c_TableView.cc new file mode 100644 index 00000000..eb075455 --- /dev/null +++ b/lib/c/c_TableView.cc @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "c_structs.h" +#include "cstring" + +char *str_malloc_and_copy(const char *s) { + size_t slen = strlen(s); + char *result = (char *)malloc(slen + 1); + if (result == NULL) { + return NULL; + } + memcpy(result, s, slen + 1); + return result; +} + +bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, char **value) { + std::string v; + bool result = table_view->tableView.retrieveValue(key, v); + if (result) { + *value = str_malloc_and_copy(v.c_str()); + } + return result; +} + +bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, char **value) { + std::string v; + bool result = table_view->tableView.getValue(key, v); + if (result) { + *value = str_malloc_and_copy(v.c_str()); + } + return result; +} + +bool pulsar_table_view_contain_key(pulsar_table_view_t *table_view, const char *key) { + return table_view->tableView.containsKey(key); +} + +int pulsar_table_view_size(pulsar_table_view_t *table_view) { return table_view->tableView.size(); } + +void pulsar_table_view_for_each(pulsar_table_view_t *table_view, pulsar_table_view_action action, void *ctx) { + table_view->tableView.forEach([action, ctx](const std::string &key, const std::string &value) { + if (action) { + action(key.c_str(), value.c_str(), ctx); + } + }); +} + +void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, pulsar_table_view_action action, + void *ctx) { + table_view->tableView.forEachAndListen([action, ctx](const std::string &key, const std::string &value) { + if (action) { + action(key.c_str(), value.c_str(), ctx); + } + }); +} + +pulsar_string_map_t *pulsar_table_view_snapshot(pulsar_table_view_t *table_view) { + auto map = pulsar_string_map_create(); + auto snapshot = table_view->tableView.snapshot(); + for (const auto &item : snapshot) { + map->map.emplace(item); + } + return map; +} + +void pulsar_table_view_free(pulsar_table_view_t *table_view) { delete table_view; } + +pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view) { + return (pulsar_result)table_view->tableView.close(); +} + +void pulsar_table_view_close_async(pulsar_table_view_t *table_view, pulsar_result_callback callback, + void *ctx) { + table_view->tableView.closeAsync( + [callback, ctx](pulsar::Result result) { return handle_result_callback(result, callback, ctx); }); +} diff --git a/lib/c/c_TableViewConfiguration.cc b/lib/c/c_TableViewConfiguration.cc new file mode 100644 index 00000000..3f8132a3 --- /dev/null +++ b/lib/c/c_TableViewConfiguration.cc @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include "c_structs.h" + +pulsar_table_view_configuration_t *pulsar_table_view_configuration_create() { + auto *table_view_configuration_t = new pulsar_table_view_configuration_t; + table_view_configuration_t->tableViewConfiguration = pulsar::TableViewConfiguration(); + return table_view_configuration_t; +} + +void pulsar_table_view_configuration_free(pulsar_table_view_configuration_t *conf) { delete conf; } + +void pulsar_table_view_configuration_set_schema_info( + pulsar_table_view_configuration_t *table_view_configuration_t, pulsar_schema_type schemaType, + const char *name, const char *schema, pulsar_string_map_t *properties) { + auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map); + table_view_configuration_t->tableViewConfiguration.schemaInfo = schemaInfo; +} + +void pulsar_table_view_configuration_set_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t, const char *subscription_name) { + table_view_configuration_t->tableViewConfiguration.subscriptionName = subscription_name; +} + +const char *pulsar_table_view_configuration_get_subscription_name( + pulsar_table_view_configuration_t *table_view_configuration_t) { + return table_view_configuration_t->tableViewConfiguration.subscriptionName.c_str(); +} diff --git a/lib/c/c_structs.h b/lib/c/c_structs.h index 1e9c376a..6f15998e 100644 --- a/lib/c/c_structs.h +++ b/lib/c/c_structs.h @@ -48,6 +48,14 @@ struct _pulsar_consumer_configuration { pulsar::ConsumerConfiguration consumerConfiguration; }; +struct _pulsar_table_view { + pulsar::TableView tableView; +}; + +struct _pulsar_table_view_configuration { + pulsar::TableViewConfiguration tableViewConfiguration; +}; + struct _pulsar_reader { pulsar::Reader reader; }; diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc new file mode 100644 index 00000000..f7376761 --- /dev/null +++ b/tests/c/c_TableViewTest.cc @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include + +#include "pulsar/c/table_view.h" + +static const char *lookup_url = "pulsar://localhost:6650"; + +TEST(c_TableViewTest, testSimpleTableView) { + const char *topic_name = "persistent://public/default/test-table_view20"; + const char *sub_name = "my-sub-name"; + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + // Send messages + const int num = 10; + const char *key1 = "key1"; + const char *key2 = "key2"; + const char *value = "content"; + for (int i = 0; i < num; i++) { + pulsar_message_t *message = pulsar_message_create(); + if (i % 2 == 0) { + pulsar_message_set_partition_key(message, key1); + } else { + pulsar_message_set_partition_key(message, key2); + } + pulsar_message_set_content(message, value, strlen(value)); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + } + + // Create table view. + pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create(); + pulsar_table_view_configuration_set_subscription_name(table_view_conf, sub_name); + pulsar_table_view_t *table_view; + result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view); + ASSERT_EQ(pulsar_result_Ok, result); + + char *v1; + ASSERT_EQ(pulsar_table_view_size(table_view), 2); + ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1)); + ASSERT_STREQ(v1, "content"); + delete v1; + + char *v2; + ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2)); + ASSERT_STREQ(v2, "content"); + delete v2; + + ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2")); + ASSERT_EQ(pulsar_table_view_size(table_view), 1); + + pulsar_string_map_t *pMap = pulsar_table_view_snapshot(table_view); + ASSERT_EQ(pulsar_table_view_size(table_view), 0); + ASSERT_EQ(pulsar_string_map_size(pMap), 1); + pulsar_string_map_free(pMap); + + pulsar_producer_close(producer); + pulsar_table_view_close(table_view); + pulsar_client_close(client); + pulsar_table_view_free(table_view); + pulsar_table_view_configuration_free(table_view_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +} From c2471a27137dffae46e51903b8a6c2ddda6559f6 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 5 Jul 2023 17:38:34 +0800 Subject: [PATCH 2/5] Change char* type to void* --- include/pulsar/c/table_view.h | 24 ++++++++---------------- lib/c/c_TableView.cc | 34 ++++++++++++++-------------------- tests/c/c_TableViewTest.cc | 35 ++++++++++++++++++++--------------- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h index b64816c6..e2bdecb9 100644 --- a/include/pulsar/c/table_view.h +++ b/include/pulsar/c/table_view.h @@ -31,7 +31,8 @@ extern "C" { typedef struct _pulsar_table_view pulsar_table_view_t; -typedef void (*pulsar_table_view_action)(const char *key, const char *value, void *ctx); +typedef void (*pulsar_table_view_action)(const char *key, const void *value, const size_t value_size, + void *ctx); typedef void (*pulsar_result_callback)(pulsar_result, void *); /** @@ -41,7 +42,7 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); * 1. Once the value has been retrieved successfully, * the associated value will be removed from the table view until next time the value is updated. * 2. Once the value has been retrieved successfully, `*value` will point to the memory that is allocated - * internally. You have to call `delete value` to free it. + * internally. You have to call `free(value)` to free it. * * Example: * @@ -55,7 +56,7 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); * // sleep for a while or print the message that value is not updated * } * } - * delete value; + * free(value); * ``` * * @param table_view @@ -64,7 +65,7 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); * @return true if there is an associated value of the key, otherwise false */ PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, - char **value); + void **value, size_t *value_size); /** * It's similar with `pulsar_table_view_retrieve_value` except the associated value not will be removed from @@ -72,15 +73,15 @@ PULSAR_PUBLIC bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_v * * NOTE: * Once the value has been get successfully, `*value` will point to the memory that is allocated internally. - * You have to call `delete value` to free it. + * You have to call `free(value)` to free it. * * @param table_view * @param key * @param value the value associated with the key * @return true if there is an associated value of the key, otherwise false */ -PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, - char **value); +PULSAR_PUBLIC bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, void **value, + size_t *value_size); /** * Check if the key exists in the table view. @@ -111,15 +112,6 @@ PULSAR_PUBLIC void pulsar_table_view_for_each(pulsar_table_view_t *table_view, PULSAR_PUBLIC void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, pulsar_table_view_action action, void *ctx); -/** - * Move the table view data into the pulsar_string_map_t. - * - * @param table_view - * @return *string_map `string_map` will point to the memory that is allocated internally. - * You have to call `pulsar_string_map_free` to free it. - */ -PULSAR_PUBLIC pulsar_string_map_t *pulsar_table_view_snapshot(pulsar_table_view_t *table_view); - /** * Free the table view. * @param table_view diff --git a/lib/c/c_TableView.cc b/lib/c/c_TableView.cc index eb075455..49f071b2 100644 --- a/lib/c/c_TableView.cc +++ b/lib/c/c_TableView.cc @@ -18,34 +18,37 @@ */ #include +#include #include "c_structs.h" -#include "cstring" -char *str_malloc_and_copy(const char *s) { - size_t slen = strlen(s); - char *result = (char *)malloc(slen + 1); +static void *malloc_and_copy(const char *s, size_t slen) { + void *result = (void *)malloc(slen); if (result == NULL) { return NULL; } - memcpy(result, s, slen + 1); + memcpy(result, s, slen); return result; } -bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, char **value) { +bool pulsar_table_view_retrieve_value(pulsar_table_view_t *table_view, const char *key, void **value, + size_t *value_size) { std::string v; bool result = table_view->tableView.retrieveValue(key, v); if (result) { - *value = str_malloc_and_copy(v.c_str()); + *value = malloc_and_copy(v.c_str(), v.size()); + *value_size = v.size(); } return result; } -bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, char **value) { +bool pulsar_table_view_get_value(pulsar_table_view_t *table_view, const char *key, void **value, + size_t *value_size) { std::string v; bool result = table_view->tableView.getValue(key, v); if (result) { - *value = str_malloc_and_copy(v.c_str()); + *value = malloc_and_copy(v.c_str(), v.size()); + *value_size = v.size(); } return result; } @@ -59,7 +62,7 @@ int pulsar_table_view_size(pulsar_table_view_t *table_view) { return table_view- void pulsar_table_view_for_each(pulsar_table_view_t *table_view, pulsar_table_view_action action, void *ctx) { table_view->tableView.forEach([action, ctx](const std::string &key, const std::string &value) { if (action) { - action(key.c_str(), value.c_str(), ctx); + action(key.c_str(), value.c_str(), value.size(), ctx); } }); } @@ -68,20 +71,11 @@ void pulsar_table_view_for_each_add_listen(pulsar_table_view_t *table_view, puls void *ctx) { table_view->tableView.forEachAndListen([action, ctx](const std::string &key, const std::string &value) { if (action) { - action(key.c_str(), value.c_str(), ctx); + action(key.c_str(), value.c_str(), value.size(), ctx); } }); } -pulsar_string_map_t *pulsar_table_view_snapshot(pulsar_table_view_t *table_view) { - auto map = pulsar_string_map_create(); - auto snapshot = table_view->tableView.snapshot(); - for (const auto &item : snapshot) { - map->map.emplace(item); - } - return map; -} - void pulsar_table_view_free(pulsar_table_view_t *table_view) { delete table_view; } pulsar_result pulsar_table_view_close(pulsar_table_view_t *table_view) { diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc index f7376761..03904807 100644 --- a/tests/c/c_TableViewTest.cc +++ b/tests/c/c_TableViewTest.cc @@ -40,7 +40,12 @@ TEST(c_TableViewTest, testSimpleTableView) { const int num = 10; const char *key1 = "key1"; const char *key2 = "key2"; - const char *value = "content"; + size_t data_size = 4; + int *data = (int *)malloc(data_size); + data[0] = 0x01; + data[1] = 0x00; + data[2] = 0x02; + data[3] = 0x00; for (int i = 0; i < num; i++) { pulsar_message_t *message = pulsar_message_create(); if (i % 2 == 0) { @@ -48,7 +53,7 @@ TEST(c_TableViewTest, testSimpleTableView) { } else { pulsar_message_set_partition_key(message, key2); } - pulsar_message_set_content(message, value, strlen(value)); + pulsar_message_set_content(message, data, data_size); pulsar_result res = pulsar_producer_send(producer, message); ASSERT_EQ(pulsar_result_Ok, res); pulsar_message_free(message); @@ -61,25 +66,25 @@ TEST(c_TableViewTest, testSimpleTableView) { result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view); ASSERT_EQ(pulsar_result_Ok, result); - char *v1; + void *v1; + size_t v1_size; ASSERT_EQ(pulsar_table_view_size(table_view), 2); - ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1)); - ASSERT_STREQ(v1, "content"); - delete v1; + ASSERT_TRUE(pulsar_table_view_get_value(table_view, "key1", &v1, &v1_size)); + ASSERT_EQ(v1_size, data_size); + ASSERT_EQ(memcmp(v1, data, data_size), 0); + free(v1); - char *v2; - ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2)); - ASSERT_STREQ(v2, "content"); - delete v2; + void *v2; + size_t v2_size; + ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2, &v2_size)); + ASSERT_EQ(v2_size, data_size); + ASSERT_EQ(memcmp(v1, data, data_size), 0); + free(v2); ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2")); ASSERT_EQ(pulsar_table_view_size(table_view), 1); - pulsar_string_map_t *pMap = pulsar_table_view_snapshot(table_view); - ASSERT_EQ(pulsar_table_view_size(table_view), 0); - ASSERT_EQ(pulsar_string_map_size(pMap), 1); - pulsar_string_map_free(pMap); - + free(data); pulsar_producer_close(producer); pulsar_table_view_close(table_view); pulsar_client_close(client); From 4b7fa6538abbc87582c6cc85e88a5039ccc1413d Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 6 Jul 2023 15:59:18 +0800 Subject: [PATCH 3/5] Fix code review & add more unit test. --- include/pulsar/c/table_view.h | 6 +-- lib/c/c_TableView.cc | 2 +- tests/c/c_TableViewTest.cc | 87 +++++++++++++++++++++++++++++++++-- 3 files changed, 86 insertions(+), 9 deletions(-) diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h index e2bdecb9..e5933bc8 100644 --- a/include/pulsar/c/table_view.h +++ b/include/pulsar/c/table_view.h @@ -31,8 +31,7 @@ extern "C" { typedef struct _pulsar_table_view pulsar_table_view_t; -typedef void (*pulsar_table_view_action)(const char *key, const void *value, const size_t value_size, - void *ctx); +typedef void (*pulsar_table_view_action)(const char *key, const void *value, size_t value_size, void *ctx); typedef void (*pulsar_result_callback)(pulsar_result, void *); /** @@ -49,8 +48,9 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); * ```c * pulsar_table_view_t *table_view; * char *value; + * size_t value_size; * while (true) { - * if (pulsar_table_view_retrieve_value(table_view, "key", &value)) { + * if (pulsar_table_view_retrieve_value(table_view, "key", &value, &value_size)) { * printf("value is update to: %s", value); * } else { * // sleep for a while or print the message that value is not updated diff --git a/lib/c/c_TableView.cc b/lib/c/c_TableView.cc index 49f071b2..520b0558 100644 --- a/lib/c/c_TableView.cc +++ b/lib/c/c_TableView.cc @@ -25,7 +25,7 @@ static void *malloc_and_copy(const char *s, size_t slen) { void *result = (void *)malloc(slen); if (result == NULL) { - return NULL; + abort(); } memcpy(result, s, slen); return result; diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc index 03904807..3a4fb808 100644 --- a/tests/c/c_TableViewTest.cc +++ b/tests/c/c_TableViewTest.cc @@ -18,14 +18,60 @@ */ #include #include +#include #include -#include "pulsar/c/table_view.h" +#include static const char *lookup_url = "pulsar://localhost:6650"; +struct tv_create_result { + pulsar_result result; + pulsar_table_view_t *tableView; +}; + +static void create_tv_callback(pulsar_result result, pulsar_table_view_t *tableView, void *ctx) { + std::promise *create_promise = (std::promise *)ctx; + create_promise->set_value({result, tableView}); +} + +TEST(c_TableViewTest, testCreateTableViewAsync) { + const char *topic_name = "persistent://public/default/test-create-tv-async"; + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + // Create table view. + pulsar_table_view_configuration_t *table_view_conf = pulsar_table_view_configuration_create(); + pulsar_table_view_configuration_set_subscription_name(table_view_conf, "sub-name"); + std::promise create_promise; + std::future create_future = create_promise.get_future(); + pulsar_client_create_table_view_async(client, topic_name, table_view_conf, create_tv_callback, + &create_promise); + tv_create_result tvResult = create_future.get(); + ASSERT_EQ(pulsar_result_Ok, tvResult.result); + + pulsar_table_view_free(tvResult.tableView); + pulsar_client_close(client); +} + +struct tv_action_ctx { + char *expect_data; + int size; + int expect_size; + std::promise *listen_promise; +}; + +static void tv_action(const char *key, const void *value, size_t value_size, void *ctx) { + tv_action_ctx *context = (tv_action_ctx *)ctx; + context->size++; + ASSERT_EQ(memcmp(value, context->expect_data, value_size), 0); + if (context->size == context->expect_size) { + context->listen_promise->set_value(true); + } +} + TEST(c_TableViewTest, testSimpleTableView) { - const char *topic_name = "persistent://public/default/test-table_view20"; + const char *topic_name = "persistent://public/default/1test-table_view"; const char *sub_name = "my-sub-name"; pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); @@ -41,7 +87,7 @@ TEST(c_TableViewTest, testSimpleTableView) { const char *key1 = "key1"; const char *key2 = "key2"; size_t data_size = 4; - int *data = (int *)malloc(data_size); + char *data = (char *)malloc(data_size); data[0] = 0x01; data[1] = 0x00; data[2] = 0x02; @@ -66,6 +112,7 @@ TEST(c_TableViewTest, testSimpleTableView) { result = pulsar_client_create_table_view(client, topic_name, table_view_conf, &table_view); ASSERT_EQ(pulsar_result_Ok, result); + // test get value void *v1; size_t v1_size; ASSERT_EQ(pulsar_table_view_size(table_view), 2); @@ -74,15 +121,45 @@ TEST(c_TableViewTest, testSimpleTableView) { ASSERT_EQ(memcmp(v1, data, data_size), 0); free(v1); + // test for each. + tv_action_ctx ctx; + ctx.expect_data = data; + ctx.size = 0; + pulsar_table_view_for_each(table_view, tv_action, &ctx); + ASSERT_EQ(ctx.size, 2); + + // test for each and listen + std::promise listen_promise; + std::future listen_future = listen_promise.get_future(); + tv_action_ctx ctx2; + ctx2.expect_data = data; + ctx2.size = 0; + ctx2.expect_size = 3; + ctx2.listen_promise = &listen_promise; + pulsar_table_view_for_each_add_listen(table_view, tv_action, &ctx2); + ASSERT_EQ(ctx.size, 2); + // send more message. + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_set_partition_key(message, "key3"); + pulsar_message_set_content(message, data, data_size); + pulsar_result res = pulsar_producer_send(producer, message); + ASSERT_EQ(pulsar_result_Ok, res); + pulsar_message_free(message); + // wait for message. + ASSERT_TRUE(listen_future.get()); + ASSERT_EQ(ctx2.size, ctx2.expect_size); + + // test retrieve value void *v2; size_t v2_size; ASSERT_TRUE(pulsar_table_view_retrieve_value(table_view, "key2", &v2, &v2_size)); ASSERT_EQ(v2_size, data_size); - ASSERT_EQ(memcmp(v1, data, data_size), 0); + ASSERT_EQ(memcmp(v2, data, data_size), 0); free(v2); + // test table view size ASSERT_FALSE(pulsar_table_view_contain_key(table_view, "key2")); - ASSERT_EQ(pulsar_table_view_size(table_view), 1); + ASSERT_EQ(pulsar_table_view_size(table_view), 2); free(data); pulsar_producer_close(producer); From f71dae9b165d19c063820c9d9cd0724a56e890ca Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 6 Jul 2023 16:28:14 +0800 Subject: [PATCH 4/5] Rand topic name. --- tests/c/c_TableViewTest.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/c/c_TableViewTest.cc b/tests/c/c_TableViewTest.cc index 3a4fb808..4b7c5322 100644 --- a/tests/c/c_TableViewTest.cc +++ b/tests/c/c_TableViewTest.cc @@ -71,7 +71,9 @@ static void tv_action(const char *key, const void *value, size_t value_size, voi } TEST(c_TableViewTest, testSimpleTableView) { - const char *topic_name = "persistent://public/default/1test-table_view"; + srand(time(NULL)); + char topic_name[64]; + snprintf(topic_name, 64, "persistent://public/default/test-table-view-%d", rand()); const char *sub_name = "my-sub-name"; pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); From 0fbf46f6d2d46e1faeecf4ac6c15c83f848762f4 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 6 Jul 2023 20:30:42 +0800 Subject: [PATCH 5/5] Change examples. --- include/pulsar/c/table_view.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/pulsar/c/table_view.h b/include/pulsar/c/table_view.h index e5933bc8..87eed3e2 100644 --- a/include/pulsar/c/table_view.h +++ b/include/pulsar/c/table_view.h @@ -47,11 +47,13 @@ typedef void (*pulsar_result_callback)(pulsar_result, void *); * * ```c * pulsar_table_view_t *table_view; - * char *value; + * void* value; * size_t value_size; * while (true) { * if (pulsar_table_view_retrieve_value(table_view, "key", &value, &value_size)) { - * printf("value is update to: %s", value); + * for (size_t i = 0; i < value_size; i++) { + * printf("0x%02x%c", ((char*) value)[i], (i + 1 == value_size) ? '\n': ' '); + * } * } else { * // sleep for a while or print the message that value is not updated * }