Skip to content

Commit 8cc19a0

Browse files
committed
Support Dead Letter Topic.
1 parent 2018a06 commit 8cc19a0

25 files changed

Lines changed: 995 additions & 16 deletions

include/pulsar/ConsumerConfiguration.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <memory>
3535

3636
#include "BatchReceivePolicy.h"
37+
#include "DeadLetterPolicy.h"
3738

3839
namespace pulsar {
3940

@@ -398,6 +399,20 @@ class PULSAR_PUBLIC ConsumerConfiguration {
398399
*/
399400
const BatchReceivePolicy& getBatchReceivePolicy() const;
400401

402+
/**
403+
* Set dead letter policy.
404+
*
405+
* @param deadLetterPolicy thd default is empty
406+
*/
407+
void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy);
408+
409+
/**
410+
* Get dead letter policy.
411+
*
412+
* @return dead letter policy
413+
*/
414+
const DeadLetterPolicy& getDeadLetterPolicy() const;
415+
401416
/**
402417
* Set whether the subscription status should be replicated.
403418
* The default value is `false`.

include/pulsar/DeadLetterPolicy.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef DEAD_LETTER_POLICY_HPP_
20+
#define DEAD_LETTER_POLICY_HPP_
21+
22+
#include <pulsar/defines.h>
23+
24+
#include <memory>
25+
#include <string>
26+
27+
namespace pulsar {
28+
29+
struct DeadLetterPolicyImpl;
30+
31+
/**
32+
* Configuration for the "dead letter queue" feature in consumer.
33+
*
34+
* see @DeadLetterPolicyBuilder
35+
*/
36+
class PULSAR_PUBLIC DeadLetterPolicy {
37+
public:
38+
DeadLetterPolicy();
39+
40+
/**
41+
* Get dead letter topic
42+
*
43+
* @return
44+
*/
45+
std::string getDeadLetterTopic() const;
46+
47+
/**
48+
* Get max redeliver count
49+
*
50+
* @return
51+
*/
52+
int getMaxRedeliverCount() const;
53+
54+
/**
55+
* Get initial subscription name
56+
*
57+
* @return
58+
*/
59+
std::string getInitialSubscriptionName() const;
60+
61+
private:
62+
friend class DeadLetterPolicyBuilder;
63+
64+
typedef std::shared_ptr<DeadLetterPolicyImpl> DeadLetterPolicyImplPtr;
65+
DeadLetterPolicyImplPtr impl_;
66+
67+
explicit DeadLetterPolicy(const DeadLetterPolicyImplPtr& impl);
68+
};
69+
} // namespace pulsar
70+
71+
#endif /* DEAD_LETTER_POLICY_HPP_ */
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
20+
#define DEAD_LETTER_POLICY_BUILD_HPP_
21+
22+
#include <pulsar/DeadLetterPolicy.h>
23+
#include <pulsar/defines.h>
24+
25+
#include <memory>
26+
27+
namespace pulsar {
28+
29+
struct DeadLetterPolicyImpl;
30+
31+
/**
32+
* The builder to build a DeadLetterPolicyBuilder
33+
*
34+
* Example of building DeadLetterPolicy:
35+
*
36+
* ```c++
37+
* DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
38+
* .deadLetterTopic("dlq-topic")
39+
* .maxRedeliverCount(10)
40+
* .initialSubscriptionName("init-sub-name")
41+
* .build();
42+
* ```
43+
*/
44+
class PULSAR_PUBLIC DeadLetterPolicyBuilder {
45+
public:
46+
DeadLetterPolicyBuilder();
47+
48+
/**
49+
* Set dead letter topic
50+
*
51+
* @return
52+
*/
53+
DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);
54+
55+
/**
56+
* Set max redeliver count
57+
*
58+
* @return
59+
*/
60+
DeadLetterPolicyBuilder& maxRedeliverCount(int maxRedeliverCount);
61+
62+
/**
63+
* Set initial subscription name
64+
*
65+
* @return
66+
*/
67+
DeadLetterPolicyBuilder& initialSubscriptionName(const std::string& initialSubscriptionName);
68+
69+
/**
70+
* Build DeadLetterPolicy.
71+
*
72+
* @return
73+
*/
74+
DeadLetterPolicy build();
75+
76+
private:
77+
std::shared_ptr<DeadLetterPolicyImpl> impl_;
78+
};
79+
} // namespace pulsar
80+
81+
#endif /* DEAD_LETTER_POLICY_BUILD_HPP_ */

include/pulsar/ProducerConfiguration.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,18 @@ class PULSAR_PUBLIC ProducerConfiguration {
532532
*/
533533
ProducerAccessMode getAccessMode() const;
534534

535+
/**
536+
* Use this configuration to automatically create an initial subscription when creating a topic.
537+
*
538+
* If this field is not set, the initial subscription is not created.
539+
*/
540+
ProducerConfiguration& setInitialSubscriptionName(const std::string& initialSubscriptionName);
541+
542+
/**
543+
* Get initial subscription name.
544+
*/
545+
const std::string& getInitialSubscriptionName() const;
546+
535547
friend class PulsarWrapper;
536548

537549
private:

lib/Commands.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,8 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
385385
const std::map<std::string, std::string>& metadata,
386386
const SchemaInfo& schemaInfo, uint64_t epoch,
387387
bool userProvidedProducerName, bool encrypted,
388-
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch) {
388+
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
389+
std::string initialSubscriptionName) {
389390
BaseCommand cmd;
390391
cmd.set_type(BaseCommand::PRODUCER);
391392
CommandProducer* producer = cmd.mutable_producer();
@@ -399,6 +400,9 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
399400
if (topicEpoch) {
400401
producer->set_topic_epoch(topicEpoch.value());
401402
}
403+
if (!initialSubscriptionName.empty()) {
404+
producer->set_initial_subscription_name(initialSubscriptionName);
405+
}
402406

403407
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
404408
it++) {

lib/Commands.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ class Commands {
110110
const std::map<std::string, std::string>& metadata,
111111
const SchemaInfo& schemaInfo, uint64_t epoch,
112112
bool userProvidedProducerName, bool encrypted,
113-
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);
113+
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
114+
std::string initialSubscriptionName);
114115

115116
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
116117
CommandAck_AckType ackType, CommandAck_ValidationError validationError);

lib/ConsumerConfiguration.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,10 @@ const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
287287
return impl_->batchReceivePolicy;
288288
}
289289

290+
void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy) {
291+
impl_->deadLetterPolicy = deadLetterPolicy;
292+
}
293+
294+
const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; }
295+
290296
} // namespace pulsar

lib/ConsumerConfigurationImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct ConsumerConfigurationImpl {
4646
bool readCompacted{false};
4747
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
4848
BatchReceivePolicy batchReceivePolicy{};
49+
DeadLetterPolicy deadLetterPolicy;
4950
int patternAutoDiscoveryPeriod{60};
5051
bool replicateSubscriptionStateEnabled{false};
5152
std::map<std::string, std::string> properties;

0 commit comments

Comments
 (0)