From 6fc8b2129d266baa69eb508bb4a359cd8aca0eec Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 6 Apr 2023 16:46:04 +0800 Subject: [PATCH] [feat] Support set producer access mode for C client. --- include/pulsar/c/producer_configuration.h | 21 +++++++++++++++++++++ lib/c/c_ProducerConfiguration.cc | 10 ++++++++++ tests/c/c_ProducerConfigurationTest.cc | 4 ++++ 3 files changed, 35 insertions(+) diff --git a/include/pulsar/c/producer_configuration.h b/include/pulsar/c/producer_configuration.h index f8f74c25..f5566945 100644 --- a/include/pulsar/c/producer_configuration.h +++ b/include/pulsar/c/producer_configuration.h @@ -78,6 +78,21 @@ typedef enum pulsar_ProducerSend } pulsar_producer_crypto_failure_action; +typedef enum +{ + // By default multiple producers can publish on a topic. + pulsar_ProducerAccessModeShared = 0, + // Require exclusive access for producer. + // Fail immediately if there's already a producer connected. + pulsar_ProducerAccessModeExclusive = 1, + // Producer creation is pending until it can acquire exclusive access. + pulsar_ProducerAccessModeWaitForExclusive = 2, + // Acquire exclusive access for the producer. + // Any existing producer will be removed and invalidated immediately. + pulsar_ProducerAccessModeExclusiveWithFencing = 3 + +} pulsar_producer_access_mode; + typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t; typedef struct _pulsar_crypto_key_reader pulsar_crypto_key_reader; @@ -210,6 +225,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_chunking_enabled(pulsar_pro PULSAR_PUBLIC int pulsar_producer_configuration_is_chunking_enabled(pulsar_producer_configuration_t *conf); +PULSAR_PUBLIC pulsar_producer_access_mode +pulsar_producer_configuration_get_access_mode(pulsar_producer_configuration_t *conf); + +PULSAR_PUBLIC void pulsar_producer_configuration_set_access_mode(pulsar_producer_configuration_t *conf, + pulsar_producer_access_mode accessMode); + // const CryptoKeyReaderPtr getCryptoKeyReader() const; // ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader); // diff --git a/lib/c/c_ProducerConfiguration.cc b/lib/c/c_ProducerConfiguration.cc index 868eddf3..76e2e2d2 100644 --- a/lib/c/c_ProducerConfiguration.cc +++ b/lib/c/c_ProducerConfiguration.cc @@ -232,3 +232,13 @@ void pulsar_producer_configuration_set_chunking_enabled(pulsar_producer_configur int pulsar_producer_configuration_is_chunking_enabled(pulsar_producer_configuration_t *conf) { return conf->conf.isChunkingEnabled(); } + +pulsar_producer_access_mode pulsar_producer_configuration_get_access_mode( + pulsar_producer_configuration_t *conf) { + return (pulsar_producer_access_mode)conf->conf.getAccessMode(); +} + +void pulsar_producer_configuration_set_access_mode(pulsar_producer_configuration_t *conf, + pulsar_producer_access_mode accessMode) { + conf->conf.setAccessMode((pulsar::ProducerConfiguration::ProducerAccessMode)accessMode); +} diff --git a/tests/c/c_ProducerConfigurationTest.cc b/tests/c/c_ProducerConfigurationTest.cc index 16142272..91043264 100644 --- a/tests/c/c_ProducerConfigurationTest.cc +++ b/tests/c/c_ProducerConfigurationTest.cc @@ -25,4 +25,8 @@ TEST(C_ProducerConfigurationTest, testCApiConfig) { ASSERT_EQ(pulsar_producer_configuration_is_chunking_enabled(producer_conf), 0); pulsar_producer_configuration_set_chunking_enabled(producer_conf, 1); ASSERT_EQ(pulsar_producer_configuration_is_chunking_enabled(producer_conf), 1); + + pulsar_producer_configuration_set_access_mode(producer_conf, pulsar_ProducerAccessModeExclusive); + ASSERT_EQ(pulsar_producer_configuration_get_access_mode(producer_conf), + pulsar_ProducerAccessModeExclusive); }