From 1c9e8f041583c36e8d3f34e35daacb5681fdcebb Mon Sep 17 00:00:00 2001 From: Robert Barbey Date: Tue, 2 May 2023 10:06:25 +0200 Subject: [PATCH 1/5] Support e2e encryption for Reader --- include/pulsar/c/reader_configuration.h | 3 +++ lib/c/c_ReaderConfiguration.cc | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/include/pulsar/c/reader_configuration.h b/include/pulsar/c/reader_configuration.h index 66ce8ef9..d4e5601e 100644 --- a/include/pulsar/c/reader_configuration.h +++ b/include/pulsar/c/reader_configuration.h @@ -89,6 +89,9 @@ PULSAR_PUBLIC void pulsar_reader_configuration_set_read_compacted( PULSAR_PUBLIC int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t *configuration); +PULSAR_PUBLIC void pulsar_reader_configuration_set_default_crypto_key_reader(pulsar_reader_configuration_t *configuration, + const char *public_key_path, const char *private_key_path); + #ifdef __cplusplus } #endif diff --git a/lib/c/c_ReaderConfiguration.cc b/lib/c/c_ReaderConfiguration.cc index fe05cf22..695d500e 100644 --- a/lib/c/c_ReaderConfiguration.cc +++ b/lib/c/c_ReaderConfiguration.cc @@ -86,3 +86,10 @@ void pulsar_reader_configuration_set_read_compacted(pulsar_reader_configuration_ int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t *configuration) { return configuration->conf.isReadCompacted(); } + +void pulsar_reader_configuration_set_default_crypto_key_reader( + pulsar_reader_configuration_t *configuration, + const char *public_key_path, const char *private_key_path) { + std::shared_ptr keyReader = std::make_shared(public_key_path, private_key_path); + configuration->conf.setCryptoKeyReader(keyReader); +} From 6e4b91eae5f340e923415810f9e95ad4721159bc Mon Sep 17 00:00:00 2001 From: Robert Barbey Date: Tue, 2 May 2023 10:21:55 +0200 Subject: [PATCH 2/5] Support handling of crypto failure action in Reader --- include/pulsar/c/reader_configuration.h | 8 ++++++++ lib/c/c_ReaderConfiguration.cc | 13 +++++++++++++ 2 files changed, 21 insertions(+) diff --git a/include/pulsar/c/reader_configuration.h b/include/pulsar/c/reader_configuration.h index d4e5601e..12fc1f21 100644 --- a/include/pulsar/c/reader_configuration.h +++ b/include/pulsar/c/reader_configuration.h @@ -23,6 +23,8 @@ #include #include +#include "consumer_configuration.h" + #ifdef __cplusplus extern "C" { #endif @@ -92,6 +94,12 @@ PULSAR_PUBLIC int pulsar_reader_configuration_is_read_compacted(pulsar_reader_co PULSAR_PUBLIC void pulsar_reader_configuration_set_default_crypto_key_reader(pulsar_reader_configuration_t *configuration, const char *public_key_path, const char *private_key_path); +PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_reader_configuration_get_crypto_failure_action(pulsar_reader_configuration_t *configuration); + +PULSAR_PUBLIC void pulsar_reader_configuration_set_crypto_failure_action( + pulsar_reader_configuration_t *configuration, + pulsar_consumer_crypto_failure_action crypto_failure_action); + #ifdef __cplusplus } #endif diff --git a/lib/c/c_ReaderConfiguration.cc b/lib/c/c_ReaderConfiguration.cc index 695d500e..37bada83 100644 --- a/lib/c/c_ReaderConfiguration.cc +++ b/lib/c/c_ReaderConfiguration.cc @@ -93,3 +93,16 @@ void pulsar_reader_configuration_set_default_crypto_key_reader( std::shared_ptr keyReader = std::make_shared(public_key_path, private_key_path); configuration->conf.setCryptoKeyReader(keyReader); } + +pulsar_consumer_crypto_failure_action pulsar_reader_configuration_get_crypto_failure_action( + pulsar_reader_configuration_t *configuration) { + return (pulsar_consumer_crypto_failure_action) + configuration->conf.getCryptoFailureAction(); +} + +void pulsar_reader_configuration_set_crypto_failure_action( + pulsar_reader_configuration_t *configuration, + pulsar_consumer_crypto_failure_action crypto_failure_action) { + configuration->conf.setCryptoFailureAction( + (pulsar::ConsumerCryptoFailureAction)crypto_failure_action); +} From 196fbef6941b1529a7138a6c19b87aeb613388d6 Mon Sep 17 00:00:00 2001 From: Robert Barbey Date: Thu, 4 May 2023 08:59:00 +0200 Subject: [PATCH 3/5] Fix formatting --- include/pulsar/c/reader_configuration.h | 7 ++++--- lib/c/c_ReaderConfiguration.cc | 15 +++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/pulsar/c/reader_configuration.h b/include/pulsar/c/reader_configuration.h index 12fc1f21..a409226d 100644 --- a/include/pulsar/c/reader_configuration.h +++ b/include/pulsar/c/reader_configuration.h @@ -91,10 +91,11 @@ PULSAR_PUBLIC void pulsar_reader_configuration_set_read_compacted( PULSAR_PUBLIC int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t *configuration); -PULSAR_PUBLIC void pulsar_reader_configuration_set_default_crypto_key_reader(pulsar_reader_configuration_t *configuration, - const char *public_key_path, const char *private_key_path); +PULSAR_PUBLIC void pulsar_reader_configuration_set_default_crypto_key_reader( + pulsar_reader_configuration_t *configuration, const char *public_key_path, const char *private_key_path); -PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_reader_configuration_get_crypto_failure_action(pulsar_reader_configuration_t *configuration); +PULSAR_PUBLIC pulsar_consumer_crypto_failure_action +pulsar_reader_configuration_get_crypto_failure_action(pulsar_reader_configuration_t *configuration); PULSAR_PUBLIC void pulsar_reader_configuration_set_crypto_failure_action( pulsar_reader_configuration_t *configuration, diff --git a/lib/c/c_ReaderConfiguration.cc b/lib/c/c_ReaderConfiguration.cc index 37bada83..b7aa9a92 100644 --- a/lib/c/c_ReaderConfiguration.cc +++ b/lib/c/c_ReaderConfiguration.cc @@ -87,22 +87,21 @@ int pulsar_reader_configuration_is_read_compacted(pulsar_reader_configuration_t return configuration->conf.isReadCompacted(); } -void pulsar_reader_configuration_set_default_crypto_key_reader( - pulsar_reader_configuration_t *configuration, - const char *public_key_path, const char *private_key_path) { - std::shared_ptr keyReader = std::make_shared(public_key_path, private_key_path); +void pulsar_reader_configuration_set_default_crypto_key_reader(pulsar_reader_configuration_t *configuration, + const char *public_key_path, + const char *private_key_path) { + std::shared_ptr keyReader = + std::make_shared(public_key_path, private_key_path); configuration->conf.setCryptoKeyReader(keyReader); } pulsar_consumer_crypto_failure_action pulsar_reader_configuration_get_crypto_failure_action( pulsar_reader_configuration_t *configuration) { - return (pulsar_consumer_crypto_failure_action) - configuration->conf.getCryptoFailureAction(); + return (pulsar_consumer_crypto_failure_action)configuration->conf.getCryptoFailureAction(); } void pulsar_reader_configuration_set_crypto_failure_action( pulsar_reader_configuration_t *configuration, pulsar_consumer_crypto_failure_action crypto_failure_action) { - configuration->conf.setCryptoFailureAction( - (pulsar::ConsumerCryptoFailureAction)crypto_failure_action); + configuration->conf.setCryptoFailureAction((pulsar::ConsumerCryptoFailureAction)crypto_failure_action); } From 0f212caa72cf211322cc24920403ec4382eb82b0 Mon Sep 17 00:00:00 2001 From: Robert Barbey Date: Thu, 4 May 2023 10:25:17 +0200 Subject: [PATCH 4/5] Add tests for reader configuration C API --- tests/c/c_ReaderConfigurationTest.cc | 66 ++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/c/c_ReaderConfigurationTest.cc diff --git a/tests/c/c_ReaderConfigurationTest.cc b/tests/c/c_ReaderConfigurationTest.cc new file mode 100644 index 00000000..746e679d --- /dev/null +++ b/tests/c/c_ReaderConfigurationTest.cc @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +/* + ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive); + ASSERT_EQ(consumerConf.getReceiverQueueSize(), 1000); + ASSERT_EQ(consumerConf.isReadCompacted(), false); + ASSERT_EQ(consumerConf.getSchema().getName(), "BYTES"); + ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 0); + ASSERT_EQ(consumerConf.getTickDurationInMs(), 1000); + ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 100); + ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 1000); + ASSERT_EQ(consumerConf.getCryptoKeyReader().get(), nullptr); + ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL); + ASSERT_TRUE(consumerConf.getProperties().empty()); + ASSERT_TRUE(consumerConf.getConsumerName().empty()); + ASSERT_FALSE(consumerConf.hasMessageListener()); + */ + +TEST(C_ReaderConfigurationTest, testCApiConfig) { + pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create(); + + ASSERT_FALSE(pulsar_reader_configuration_has_reader_listener(reader_conf)); + + ASSERT_EQ(pulsar_reader_configuration_get_receiver_queue_size(reader_conf), 1000); + pulsar_reader_configuration_set_receiver_queue_size(reader_conf, 1729); + ASSERT_EQ(pulsar_reader_configuration_get_receiver_queue_size(reader_conf), 1729); + + ASSERT_STREQ(pulsar_reader_configuration_get_subscription_role_prefix(reader_conf), ""); + pulsar_reader_configuration_set_subscription_role_prefix(reader_conf, "prefix"); + ASSERT_STREQ(pulsar_reader_configuration_get_subscription_role_prefix(reader_conf), "prefix"); + + ASSERT_STREQ(pulsar_reader_configuration_get_reader_name(reader_conf), ""); + pulsar_reader_configuration_set_reader_name(reader_conf, "reader"); + ASSERT_STREQ(pulsar_reader_configuration_get_reader_name(reader_conf), "reader"); + + ASSERT_FALSE(pulsar_reader_configuration_is_read_compacted(reader_conf)); + pulsar_reader_configuration_set_read_compacted(reader_conf, true); + ASSERT_TRUE(pulsar_reader_configuration_is_read_compacted(reader_conf)); + + ASSERT_EQ(pulsar_reader_configuration_get_crypto_failure_action(reader_conf), pulsar_ConsumerFail); + pulsar_reader_configuration_set_crypto_failure_action(reader_conf, pulsar_ConsumerDiscard); + ASSERT_EQ(pulsar_reader_configuration_get_crypto_failure_action(reader_conf), pulsar_ConsumerDiscard); + + pulsar_reader_configuration_free(reader_conf); +} From f33bad7aea15eb1f6c6681fcfc6757490c8b9c20 Mon Sep 17 00:00:00 2001 From: Robert Barbey Date: Thu, 4 May 2023 11:02:47 +0200 Subject: [PATCH 5/5] Remove superfluous comment --- tests/c/c_ReaderConfigurationTest.cc | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tests/c/c_ReaderConfigurationTest.cc b/tests/c/c_ReaderConfigurationTest.cc index 746e679d..fd21646b 100644 --- a/tests/c/c_ReaderConfigurationTest.cc +++ b/tests/c/c_ReaderConfigurationTest.cc @@ -21,22 +21,6 @@ #include -/* - ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive); - ASSERT_EQ(consumerConf.getReceiverQueueSize(), 1000); - ASSERT_EQ(consumerConf.isReadCompacted(), false); - ASSERT_EQ(consumerConf.getSchema().getName(), "BYTES"); - ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 0); - ASSERT_EQ(consumerConf.getTickDurationInMs(), 1000); - ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 100); - ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 1000); - ASSERT_EQ(consumerConf.getCryptoKeyReader().get(), nullptr); - ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL); - ASSERT_TRUE(consumerConf.getProperties().empty()); - ASSERT_TRUE(consumerConf.getConsumerName().empty()); - ASSERT_FALSE(consumerConf.hasMessageListener()); - */ - TEST(C_ReaderConfigurationTest, testCApiConfig) { pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create();