Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ typedef enum
pulsar_ProducerSend
} pulsar_producer_crypto_failure_action;

typedef enum
{
// By default multiple producers can publish on a topic.
pulsar_ProducerAccessModeShared = 0,
// Require exclusive access for producer.
// Fail immediately if there's already a producer connected.
pulsar_ProducerAccessModeExclusive = 1,
// Producer creation is pending until it can acquire exclusive access.
pulsar_ProducerAccessModeWaitForExclusive = 2,
// Acquire exclusive access for the producer.
// Any existing producer will be removed and invalidated immediately.
pulsar_ProducerAccessModeExclusiveWithFencing = 3

} pulsar_producer_access_mode;

typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;

typedef struct _pulsar_crypto_key_reader pulsar_crypto_key_reader;
Expand Down Expand Up @@ -210,6 +225,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_chunking_enabled(pulsar_pro

PULSAR_PUBLIC int pulsar_producer_configuration_is_chunking_enabled(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC pulsar_producer_access_mode
pulsar_producer_configuration_get_access_mode(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_access_mode(pulsar_producer_configuration_t *conf,
pulsar_producer_access_mode accessMode);

// const CryptoKeyReaderPtr getCryptoKeyReader() const;
// ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
//
Expand Down
10 changes: 10 additions & 0 deletions lib/c/c_ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,13 @@ void pulsar_producer_configuration_set_chunking_enabled(pulsar_producer_configur
int pulsar_producer_configuration_is_chunking_enabled(pulsar_producer_configuration_t *conf) {
return conf->conf.isChunkingEnabled();
}

pulsar_producer_access_mode pulsar_producer_configuration_get_access_mode(
pulsar_producer_configuration_t *conf) {
return (pulsar_producer_access_mode)conf->conf.getAccessMode();
}

void pulsar_producer_configuration_set_access_mode(pulsar_producer_configuration_t *conf,
pulsar_producer_access_mode accessMode) {
conf->conf.setAccessMode((pulsar::ProducerConfiguration::ProducerAccessMode)accessMode);
}
4 changes: 4 additions & 0 deletions tests/c/c_ProducerConfigurationTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ TEST(C_ProducerConfigurationTest, testCApiConfig) {
ASSERT_EQ(pulsar_producer_configuration_is_chunking_enabled(producer_conf), 0);
pulsar_producer_configuration_set_chunking_enabled(producer_conf, 1);
ASSERT_EQ(pulsar_producer_configuration_is_chunking_enabled(producer_conf), 1);

pulsar_producer_configuration_set_access_mode(producer_conf, pulsar_ProducerAccessModeExclusive);
ASSERT_EQ(pulsar_producer_configuration_get_access_mode(producer_conf),
pulsar_ProducerAccessModeExclusive);
}