From c79075d3dc1600be6e16b1ae477b4a7507f019d9 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 19 Jan 2023 11:14:00 +0800 Subject: [PATCH] [feat] Support Dead Letter Topic. --- include/pulsar/ConsumerConfiguration.h | 39 +++ include/pulsar/DeadLetterPolicy.h | 71 ++++ include/pulsar/DeadLetterPolicyBuilder.h | 94 ++++++ include/pulsar/ProducerConfiguration.h | 7 +- lib/Commands.cc | 6 +- lib/Commands.h | 3 +- lib/ConsumerConfiguration.cc | 6 + lib/ConsumerConfigurationImpl.h | 1 + lib/ConsumerImpl.cc | 174 +++++++++- lib/ConsumerImpl.h | 14 + lib/DeadLetterPolicyBuilder.cc | 55 ++++ lib/DeadLetterPolicyImpl.cc | 40 +++ lib/DeadLetterPolicyImpl.h | 32 ++ lib/MessageIdImpl.h | 25 ++ lib/MultiTopicsConsumerImpl.cc | 18 +- lib/NegativeAcksTracker.cc | 2 +- lib/ProducerConfigurationImpl.h | 1 + lib/ProducerImpl.cc | 10 +- tests/ConsumerConfigurationTest.cc | 16 + tests/DeadLetterPolicyTest.cc | 45 +++ tests/DeadLetterQueueTest.cc | 392 +++++++++++++++++++++++ tests/PulsarFriend.h | 6 + 22 files changed, 1043 insertions(+), 14 deletions(-) create mode 100644 include/pulsar/DeadLetterPolicy.h create mode 100644 include/pulsar/DeadLetterPolicyBuilder.h create mode 100644 lib/DeadLetterPolicyBuilder.cc create mode 100644 lib/DeadLetterPolicyImpl.cc create mode 100644 lib/DeadLetterPolicyImpl.h create mode 100644 tests/DeadLetterPolicyTest.cc create mode 100644 tests/DeadLetterQueueTest.cc diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 8759108f..1977415d 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -35,11 +35,13 @@ #include #include "BatchReceivePolicy.h" +#include "DeadLetterPolicy.h" namespace pulsar { class Consumer; class PulsarWrapper; +class PulsarFriend; /// Callback definition for non-data operation typedef std::vector Messages; @@ -408,6 +410,42 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ const BatchReceivePolicy& getBatchReceivePolicy() const; + /** + * Set dead letter policy for consumer + * + * By default, some messages are redelivered many times, even to the extent that they can never be + * stopped. By using the dead letter mechanism, messages have the max redelivery count, when they + * exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged + * automatically. + * + * You can enable the dead letter mechanism by setting the dead letter policy. + * Example: + * + *
+     * * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+     *                       .maxRedeliverCount(10)
+     *                       .build();
+     * 
+ * Default dead letter topic name is {TopicName}-{Subscription}-DLQ. + * To set a custom dead letter topic name + *
+     * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
+     *                       .deadLetterTopic("dlq-topic")
+     *                       .maxRedeliverCount(10)
+     *                       .initialSubscriptionName("init-sub-name")
+     *                       .build();
+     * 
+ * @param deadLetterPolicy Default value is empty + */ + void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy); + + /** + * Get dead letter policy. + * + * @return dead letter policy + */ + const DeadLetterPolicy& getDeadLetterPolicy() const; + /** * Set whether the subscription status should be replicated. * The default value is `false`. @@ -581,6 +619,7 @@ class PULSAR_PUBLIC ConsumerConfiguration { bool isBatchIndexAckEnabled() const; friend class PulsarWrapper; + friend class PulsarFriend; private: std::shared_ptr impl_; diff --git a/include/pulsar/DeadLetterPolicy.h b/include/pulsar/DeadLetterPolicy.h new file mode 100644 index 00000000..f0d29fb3 --- /dev/null +++ b/include/pulsar/DeadLetterPolicy.h @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef DEAD_LETTER_POLICY_HPP_ +#define DEAD_LETTER_POLICY_HPP_ + +#include + +#include +#include + +namespace pulsar { + +struct DeadLetterPolicyImpl; + +/** + * Configuration for the "dead letter queue" feature in consumer. + * + * see @DeadLetterPolicyBuilder + */ +class PULSAR_PUBLIC DeadLetterPolicy { + public: + DeadLetterPolicy(); + + /** + * Get dead letter topic + * + * @return + */ + const std::string& getDeadLetterTopic() const; + + /** + * Get max redeliver count + * + * @return + */ + int getMaxRedeliverCount() const; + + /** + * Get initial subscription name + * + * @return + */ + const std::string& getInitialSubscriptionName() const; + + private: + friend class DeadLetterPolicyBuilder; + + typedef std::shared_ptr DeadLetterPolicyImplPtr; + DeadLetterPolicyImplPtr impl_; + + explicit DeadLetterPolicy(const DeadLetterPolicyImplPtr& impl); +}; +} // namespace pulsar + +#endif /* DEAD_LETTER_POLICY_HPP_ */ diff --git a/include/pulsar/DeadLetterPolicyBuilder.h b/include/pulsar/DeadLetterPolicyBuilder.h new file mode 100644 index 00000000..ba7d55e5 --- /dev/null +++ b/include/pulsar/DeadLetterPolicyBuilder.h @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef DEAD_LETTER_POLICY_BUILD_HPP_ +#define DEAD_LETTER_POLICY_BUILD_HPP_ + +#include +#include + +#include + +namespace pulsar { + +struct DeadLetterPolicyImpl; + +/** + * The builder to build a DeadLetterPolicyBuilder + * + * Example of building DeadLetterPolicy: + * + * ```c++ + * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder() + * .deadLetterTopic("dlq-topic") + * .maxRedeliverCount(10) + * .initialSubscriptionName("init-sub-name") + * .build(); + * ``` + */ +class PULSAR_PUBLIC DeadLetterPolicyBuilder { + public: + DeadLetterPolicyBuilder(); + + /** + * Set dead letter topic. + * + * @param deadLetterTopic Name of the dead topic where the failing messages are sent. + * The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ" + * + * @return + */ + DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic); + + /** + * Set max redeliver count + * + * @param maxRedeliverCount Maximum number of times that a message is redelivered before being sent + * to the dead letter queue. + * - The maxRedeliverCount must be greater than 0. + * - The default value is {INT_MAX} (DLQ is not enabled) + * + * @return + */ + DeadLetterPolicyBuilder& maxRedeliverCount(int maxRedeliverCount); + + /** + * Set initial subscription name + * + * @param initialSubscriptionName Name of the initial subscription name of the dead letter topic. + * If this field is not set, the initial subscription for the dead letter topic is not created. + * If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer + * fails to be created. + * + * @return + */ + DeadLetterPolicyBuilder& initialSubscriptionName(const std::string& initialSubscriptionName); + + /** + * Build DeadLetterPolicy. + * + * @return + */ + DeadLetterPolicy build(); + + private: + std::shared_ptr impl_; +}; +} // namespace pulsar + +#endif /* DEAD_LETTER_POLICY_BUILD_HPP_ */ diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 873e1383..67550cfa 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -532,11 +532,12 @@ class PULSAR_PUBLIC ProducerConfiguration { */ ProducerAccessMode getAccessMode() const; - friend class PulsarWrapper; - private: - struct Impl; std::shared_ptr impl_; + + friend class PulsarWrapper; + friend class ConsumerImpl; + friend class ProducerImpl; }; } // namespace pulsar #endif /* PULSAR_PRODUCERCONFIGURATION_H_ */ diff --git a/lib/Commands.cc b/lib/Commands.cc index 8d8b5f3f..db993bd1 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -387,7 +387,8 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, boost::optional topicEpoch) { + ProducerAccessMode accessMode, boost::optional topicEpoch, + const std::string& initialSubscriptionName) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); @@ -401,6 +402,9 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId if (topicEpoch) { producer->set_topic_epoch(topicEpoch.value()); } + if (!initialSubscriptionName.empty()) { + producer->set_initial_subscription_name(initialSubscriptionName); + } for (std::map::const_iterator it = metadata.begin(); it != metadata.end(); it++) { diff --git a/lib/Commands.h b/lib/Commands.h index 6b552c67..e7746d26 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -111,7 +111,8 @@ class Commands { const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, boost::optional topicEpoch); + ProducerAccessMode accessMode, boost::optional topicEpoch, + const std::string& initialSubscriptionName); static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet, CommandAck_AckType ackType, CommandAck_ValidationError validationError); diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index 1893ce38..800be575 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -294,4 +294,10 @@ ConsumerConfiguration& ConsumerConfiguration::setBatchIndexAckEnabled(bool enabl bool ConsumerConfiguration::isBatchIndexAckEnabled() const { return impl_->batchIndexAckEnabled; } +void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy) { + impl_->deadLetterPolicy = deadLetterPolicy; +} + +const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; } + } // namespace pulsar diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index 1ee5613f..8b64b0d7 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -46,6 +46,7 @@ struct ConsumerConfigurationImpl { bool readCompacted{false}; InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest}; BatchReceivePolicy batchReceivePolicy{}; + DeadLetterPolicy deadLetterPolicy; int patternAutoDiscoveryPeriod{60}; bool replicateSubscriptionStateEnabled{false}; std::map properties; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index ccd082b6..f577d38a 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -18,6 +18,7 @@ */ #include "ConsumerImpl.h" +#include #include #include @@ -39,6 +40,7 @@ #include "MessageIdUtil.h" #include "MessageImpl.h" #include "MessagesImpl.h" +#include "ProducerConfigurationImpl.h" #include "PulsarApi.pb.h" #include "TimeUtils.h" #include "TopicName.h" @@ -117,6 +119,21 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, msgCrypto_ = std::make_shared(consumerStr_, false); } + // Config dlq + auto deadLetterPolicy = conf.getDeadLetterPolicy(); + if (deadLetterPolicy.getMaxRedeliverCount() > 0) { + auto deadLetterPolicyBuilder = + DeadLetterPolicyBuilder() + .maxRedeliverCount(deadLetterPolicy.getMaxRedeliverCount()) + .initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName()); + if (deadLetterPolicy.getDeadLetterTopic().empty()) { + deadLetterPolicyBuilder.deadLetterTopic(topic + "-" + subscriptionName + DLQ_GROUP_TOPIC_SUFFIX); + } else { + deadLetterPolicyBuilder.deadLetterTopic(deadLetterPolicy.getDeadLetterTopic()); + } + deadLetterPolicy_ = deadLetterPolicyBuilder.build(); + } + checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); } @@ -242,6 +259,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r Lock lock(mutex_); setCnx(cnx); incomingMessages_.clear(); + possibleSendToDeadLetterTopicMessages_.clear(); state_ = Ready; backoff_.reset(); // Complicated logic since we don't have a isLocked() function for mutex @@ -467,6 +485,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: return; } + auto redeliveryCount = msg.redelivery_count(); const bool isMessageUndecryptable = metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() && config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME; @@ -533,6 +552,14 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: << startMessageId.value()); return; } + if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector{m}); + if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { + redeliverUnacknowledgedMessages({m.getMessageId()}); + increaseAvailablePermits(cnx); + return; + } + } executeNotifyCallback(m); } @@ -644,6 +671,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection int skippedMessages = 0; auto acker = BatchMessageAckerImpl::create(batchSize); + std::vector possibleToDeadLetter; for (int i = 0; i < batchSize; i++) { // This is a cheap copy since message contains only one shared pointer (impl_) Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker); @@ -651,6 +679,14 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection msg.impl_->setTopicName(batchedMessage.getTopicName()); msg.impl_->convertPayloadToKeyValue(config_.getSchema()); + if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { + possibleToDeadLetter.emplace_back(msg); + if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { + skippedMessages++; + continue; + } + } + if (startMessageId) { const MessageId& msgId = msg.getMessageId(); @@ -676,6 +712,13 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection executeNotifyCallback(msg); } + if (!possibleToDeadLetter.empty()) { + possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter); + if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) { + redeliverUnacknowledgedMessages({batchedMessage.getMessageId()}); + } + } + if (skippedMessages > 0) { increaseAvailablePermits(cnx, skippedMessages); } @@ -1080,6 +1123,7 @@ std::pair ConsumerImpl::prepareIndividualAck(const MessageId& m consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Individual, (batchSize > 0) ? batchSize : 1); unAckedMessageTrackerPtr_->remove(messageId); + possibleSendToDeadLetterTopicMessages_.remove(messageId); return std::make_pair(discardBatch(messageId), true); } else if (config_.isBatchIndexAckEnabled()) { return std::make_pair(messageId, true); @@ -1169,6 +1213,7 @@ const std::string& ConsumerImpl::getName() const { return consumerStr_; } void ConsumerImpl::shutdown() { incomingMessages_.clear(); + possibleSendToDeadLetterTopicMessages_.clear(); resetCnx(); auto client = client_.lock(); if (client) { @@ -1228,7 +1273,29 @@ void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set& me redeliverUnacknowledgedMessages(); return; } - redeliverMessages(messageIds); + + ClientConnectionPtr cnx = getCnx().lock(); + if (cnx) { + if (cnx->getServerProtocolVersion() >= proto::v2) { + auto needRedeliverMsgs = std::make_shared>(); + auto needCallBack = std::make_shared>(messageIds.size()); + auto self = get_shared_this_ptr(); + // TODO Support MAX_REDELIVER_UNACKNOWLEDGED Avoid redelivering too many messages + for (const auto& msgId : messageIds) { + processPossibleToDLQ(msgId, + [self, needRedeliverMsgs, &msgId, needCallBack](bool processSuccess) { + if (!processSuccess) { + needRedeliverMsgs->emplace(msgId); + } + if (--(*needCallBack) == 0 && !needRedeliverMsgs->empty()) { + self->redeliverMessages(*needRedeliverMsgs); + } + }); + } + } + } else { + LOG_WARN("Connection not ready for Consumer - " << getConsumerId()); + } } void ConsumerImpl::redeliverMessages(const std::set& messageIds) { @@ -1545,4 +1612,109 @@ void ConsumerImpl::cancelTimers() noexcept { checkExpiredChunkedTimer_->cancel(ec); } +void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) { + auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId); + if (!messages) { + cb(false); + return; + } + + // Initialize deadLetterProducer_ + if (!deadLetterProducer_) { + std::lock_guard createLock(createProducerLock_); + if (!deadLetterProducer_) { + deadLetterProducer_ = std::make_shared>(); + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(config_.getSchema()); + producerConfiguration.setBlockIfQueueFull(false); + producerConfiguration.impl_->initialSubscriptionName = + deadLetterPolicy_.getInitialSubscriptionName(); + ClientImplPtr client = client_.lock(); + if (client) { + auto self = get_shared_this_ptr(); + client->createProducerAsync( + deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration, + [self](Result res, Producer producer) { + if (res == ResultOk) { + self->deadLetterProducer_->setValue(producer); + } else { + LOG_ERROR("Dead letter producer create exception with topic: " + << self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res); + self->deadLetterProducer_.reset(); + } + }); + } else { + LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer."); + return; + } + } + } + + for (const auto& message : messages.value()) { + std::weak_ptr weakSelf{get_shared_this_ptr()}; + deadLetterProducer_->getFuture().addListener([weakSelf, message, messageId, cb](Result res, + Producer producer) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + auto originMessageId = message.getMessageId(); + std::stringstream originMessageIdStr; + originMessageIdStr << originMessageId; + MessageBuilder msgBuilder; + msgBuilder.setAllocatedContent(const_cast(message.getData()), message.getLength()) + .setProperties(message.getProperties()) + .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str()) + .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName()); + if (message.hasPartitionKey()) { + msgBuilder.setPartitionKey(message.getPartitionKey()); + } + if (message.hasOrderingKey()) { + msgBuilder.setOrderingKey(message.getOrderingKey()); + } + producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, messageId, cb]( + Result res, const MessageId& messageIdInDLQ) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (res == ResultOk) { + if (self->state_ != Ready) { + LOG_WARN( + "Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : " + << self->state_); + cb(false); + return; + } + self->possibleSendToDeadLetterTopicMessages_.remove(messageId); + self->acknowledgeAsync(originMessageId, [weakSelf, originMessageId, cb](Result result) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (result != ResultOk) { + LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" + << self->consumerName_ << "} Failed to acknowledge the message {" + << originMessageId + << "} of the original topic but send to the DLQ successfully : " + << result); + cb(false); + } else { + LOG_DEBUG("Send msg:" << originMessageId + << "to DLQ success and acknowledge success."); + cb(true); + } + }); + } else { + LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" + << self->consumerName_ << "} Failed to send DLQ message to {" + << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " + << "{" << originMessageId << "} : " << res); + cb(false); + } + }); + }); + } +} + } /* namespace pulsar */ diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 4832f4e6..0c5ae444 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -31,11 +31,13 @@ #include "CompressionCodec.h" #include "ConsumerImplBase.h" #include "MapCache.h" +#include "MessageIdImpl.h" #include "NegativeAcksTracker.h" #include "Synchronized.h" #include "TestUtil.h" #include "TimeUtils.h" #include "UnboundedBlockingQueue.h" +#include "lib/SynchronizedHashMap.h" namespace pulsar { class UnAckedMessageTrackerInterface; @@ -45,6 +47,7 @@ class MessageCrypto; class GetLastMessageIdResponse; typedef std::shared_ptr MessageCryptoPtr; typedef std::shared_ptr BackoffPtr; +typedef std::function ProcessDLQCallBack; class AckGroupingTracker; using AckGroupingTrackerPtr = std::shared_ptr; @@ -65,6 +68,10 @@ enum ConsumerTopicType Partitioned }; +const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; +const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; +const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; + class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, @@ -182,9 +189,11 @@ class ConsumerImpl : public ConsumerImplBase { boost::optional clearReceiveQueue(); void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp, ResultCallback callback); + void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb); std::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; + DeadLetterPolicy deadLetterPolicy_; const std::string subscription_; std::string originalSubscriptionName_; const bool isPersistent_; @@ -215,6 +224,10 @@ class ConsumerImpl : public ConsumerImplBase { MessageCryptoPtr msgCrypto_; const bool readCompacted_; + SynchronizedHashMap> possibleSendToDeadLetterTopicMessages_; + std::shared_ptr> deadLetterProducer_; + std::mutex createProducerLock_; + // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe mutable std::mutex mutexForMessageId_; MessageId lastDequedMessageId_{MessageId::earliest()}; @@ -319,6 +332,7 @@ class ConsumerImpl : public ConsumerImplBase { FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker); + FRIEND_TEST(DeadLetterQueueTest, testAutoSetDLQTopicName); }; } /* namespace pulsar */ diff --git a/lib/DeadLetterPolicyBuilder.cc b/lib/DeadLetterPolicyBuilder.cc new file mode 100644 index 00000000..58284726 --- /dev/null +++ b/lib/DeadLetterPolicyBuilder.cc @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "DeadLetterPolicyImpl.h" +#include "stdexcept" + +using namespace pulsar; + +namespace pulsar { + +DeadLetterPolicyBuilder::DeadLetterPolicyBuilder() : impl_(std::make_shared()) {} + +DeadLetterPolicyBuilder& DeadLetterPolicyBuilder::deadLetterTopic(const std::string& deadLetterTopic) { + impl_->deadLetterTopic = deadLetterTopic; + return *this; +} + +DeadLetterPolicyBuilder& DeadLetterPolicyBuilder::maxRedeliverCount(int maxRedeliverCount) { + impl_->maxRedeliverCount = maxRedeliverCount; + return *this; +} + +DeadLetterPolicyBuilder& DeadLetterPolicyBuilder::initialSubscriptionName( + const std::string& initialSubscriptionName) { + impl_->initialSubscriptionName = initialSubscriptionName; + return *this; +} + +DeadLetterPolicy DeadLetterPolicyBuilder::build() { + if (impl_->maxRedeliverCount <= 0) { + throw std::invalid_argument("maxRedeliverCount must be > 0."); + } + return DeadLetterPolicy(impl_); +} + +} // namespace pulsar diff --git a/lib/DeadLetterPolicyImpl.cc b/lib/DeadLetterPolicyImpl.cc new file mode 100644 index 00000000..ae6bdf2f --- /dev/null +++ b/lib/DeadLetterPolicyImpl.cc @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "DeadLetterPolicyImpl.h" + +#include + +using namespace pulsar; + +namespace pulsar { + +DeadLetterPolicy::DeadLetterPolicy() : impl_(std::make_shared()) {} + +const std::string& DeadLetterPolicy::getDeadLetterTopic() const { return impl_->deadLetterTopic; } + +int DeadLetterPolicy::getMaxRedeliverCount() const { return impl_->maxRedeliverCount; } + +const std::string& DeadLetterPolicy::getInitialSubscriptionName() const { + return impl_->initialSubscriptionName; +} + +DeadLetterPolicy::DeadLetterPolicy(const DeadLetterPolicy::DeadLetterPolicyImplPtr& impl) : impl_(impl) {} + +} // namespace pulsar diff --git a/lib/DeadLetterPolicyImpl.h b/lib/DeadLetterPolicyImpl.h new file mode 100644 index 00000000..38e9bc30 --- /dev/null +++ b/lib/DeadLetterPolicyImpl.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +namespace pulsar { + +struct DeadLetterPolicyImpl { + std::string deadLetterTopic; + int maxRedeliverCount{INT_MAX}; + std::string initialSubscriptionName; +}; + +} // namespace pulsar diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h index 0aa96648..70892fa0 100644 --- a/lib/MessageIdImpl.h +++ b/lib/MessageIdImpl.h @@ -19,11 +19,36 @@ #pragma once +#include #include #include #include "BitSet.h" +namespace std { + +template <> +struct hash { + std::size_t operator()(const pulsar::MessageId& msgId) const { + using boost::hash_combine; + using boost::hash_value; + + // Start with a hash value of 0 . + std::size_t seed = 0; + + // Modify 'seed' by XORing and bit-shifting in + // one member of 'Key' after the other: + hash_combine(seed, hash_value(msgId.ledgerId())); + hash_combine(seed, hash_value(msgId.entryId())); + hash_combine(seed, hash_value(msgId.batchIndex())); + hash_combine(seed, hash_value(msgId.partition())); + + // Return the result. + return seed; + } +}; +} // namespace std + namespace pulsar { class MessageIdImpl { diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index a0135667..a5676d3b 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -771,10 +771,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::setredeliverUnacknowledgedMessages(messageIds); - }); + std::unordered_map> topicToMessageId; + for (const MessageId& messageId : messageIds) { + auto topicName = messageId.getTopicName(); + topicToMessageId[topicName].emplace(messageId); + } + + for (const auto& kv : topicToMessageId) { + auto optConsumer = consumers_.find(kv.first); + if (optConsumer) { + optConsumer.value()->redeliverUnacknowledgedMessages(kv.second); + } else { + LOG_ERROR("Message of topic: " << kv.first << " not in consumers"); + } + } } int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 6ff322df..9dcca20f 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -80,7 +80,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { } if (!messagesToRedeliver.empty()) { - consumer_.redeliverMessages(messagesToRedeliver); + consumer_.redeliverUnacknowledgedMessages(messagesToRedeliver); } scheduleTimer(); } diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index 83723f61..4c8fcdb1 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -50,6 +50,7 @@ struct ProducerConfigurationImpl { std::map properties; bool chunkingEnabled{false}; ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared}; + std::string initialSubscriptionName; }; } // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index cdcf14ef..a319946c 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -34,6 +34,7 @@ #include "MessageCrypto.h" #include "MessageImpl.h" #include "OpSendMsg.h" +#include "ProducerConfigurationImpl.h" #include "PulsarApi.pb.h" #include "TimeUtils.h" #include "TopicName.h" @@ -148,10 +149,11 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); - SharedBuffer cmd = Commands::newProducer( - topic_, producerId_, producerName_, requestId, conf_.getProperties(), conf_.getSchema(), epoch_, - userProvidedProducerName_, conf_.isEncryptionEnabled(), - static_cast(conf_.getAccessMode()), topicEpoch); + SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId, + conf_.getProperties(), conf_.getSchema(), epoch_, + userProvidedProducerName_, conf_.isEncryptionEnabled(), + static_cast(conf_.getAccessMode()), + topicEpoch, conf_.impl_->initialSubscriptionName); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, std::placeholders::_1, std::placeholders::_2)); diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc index 0d826446..9b91722e 100644 --- a/tests/ConsumerConfigurationTest.cc +++ b/tests/ConsumerConfigurationTest.cc @@ -20,10 +20,14 @@ #include #include +#include + #include "NoOpsCryptoKeyReader.h" DECLARE_LOG_OBJECT() +#include + #include "../lib/Future.h" #include "../lib/Utils.h" @@ -321,3 +325,15 @@ TEST(ConsumerConfigurationTest, testResetAckTimeOut) { config.setUnAckedMessagesTimeoutMs(0); ASSERT_EQ(0, config.getUnAckedMessagesTimeoutMs()); } + +TEST(ConsumerConfigurationTest, testDeadLetterPolicy) { + ConsumerConfiguration config; + auto dlqPolicy = config.getDeadLetterPolicy(); + ASSERT_TRUE(dlqPolicy.getDeadLetterTopic().empty()); + ASSERT_EQ(dlqPolicy.getMaxRedeliverCount(), INT_MAX); + ASSERT_TRUE(dlqPolicy.getInitialSubscriptionName().empty()); + + config.setDeadLetterPolicy(DeadLetterPolicyBuilder().maxRedeliverCount(10).build()); + auto dlqPolicy2 = config.getDeadLetterPolicy(); + ASSERT_EQ(dlqPolicy2.getMaxRedeliverCount(), 10); +} diff --git a/tests/DeadLetterPolicyTest.cc b/tests/DeadLetterPolicyTest.cc new file mode 100644 index 00000000..4c412830 --- /dev/null +++ b/tests/DeadLetterPolicyTest.cc @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +using namespace pulsar; + +TEST(DeadLetterPolicy, testDeadLetterPolicy) { + // test default value. + DeadLetterPolicy deadLetterPolicy; + ASSERT_EQ(deadLetterPolicy.getMaxRedeliverCount(), INT_MAX); + ASSERT_TRUE(deadLetterPolicy.getDeadLetterTopic().empty()); + ASSERT_TRUE(deadLetterPolicy.getInitialSubscriptionName().empty()); + + // test don't allowed max redeliver count less than 0. + ASSERT_THROW(DeadLetterPolicyBuilder().maxRedeliverCount(-1).build(), std::invalid_argument); + + // test create DeadLetterPolicy by builder. + deadLetterPolicy = DeadLetterPolicyBuilder() + .maxRedeliverCount(10) + .deadLetterTopic("topic-subscription-DLQ") + .initialSubscriptionName("init-DLQ-subscription") + .build(); + ASSERT_EQ(deadLetterPolicy.getMaxRedeliverCount(), 10); + ASSERT_EQ(deadLetterPolicy.getDeadLetterTopic(), "topic-subscription-DLQ"); + ASSERT_EQ(deadLetterPolicy.getInitialSubscriptionName(), "init-DLQ-subscription"); +} diff --git a/tests/DeadLetterQueueTest.cc b/tests/DeadLetterQueueTest.cc new file mode 100644 index 00000000..1a747cc3 --- /dev/null +++ b/tests/DeadLetterQueueTest.cc @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include + +#include "HttpHelper.h" +#include "PulsarFriend.h" +#include "lib/ConsumerConfigurationImpl.h" +#include "lib/LogUtils.h" +#include "lib/MessageIdUtil.h" +#include "lib/UnAckedMessageTrackerEnabled.h" +#include "lib/Utils.h" + +static const std::string lookupUrl = "pulsar://localhost:6650"; +static const std::string adminUrl = "http://localhost:8080/"; + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +TEST(DeadLetterQueueTest, testDLQWithSchema) { + Client client(lookupUrl); + const std::string topic = "testDLQWithSchema-" + std::to_string(time(nullptr)); + const std::string subName = "test-sub"; + + static const std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo schemaInfo(JSON, "test-json", jsonSchema); + + auto dlqPolicy = DeadLetterPolicyBuilder() + .maxRedeliverCount(3) + .deadLetterTopic(topic + subName + "-DLQ") + .initialSubscriptionName("init-sub") + .build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(ConsumerType::ConsumerShared); + consumerConfig.setSchema(schemaInfo); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists. + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared); + dlqConsumerConfig.setSchema(schemaInfo); + Consumer deadLetterConsumer; + ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig, + deadLetterConsumer)); + + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setSchema(schemaInfo); + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer)); + std::string data = "{\"re\":2.1,\"im\":1.23}"; + const int num = 10; + for (int i = 0; i < num; ++i) { + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build())); + } + + // nack all msg. + Message msg; + for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + consumer.negativeAcknowledge(msg); + } + + // assert dlq msg. + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000)); + ASSERT_FALSE(msg.getDataAsString().empty()); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic)); + ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200)); + + client.close(); +} + +// If the user never receives this message, the message should not be delivered to the DLQ. +TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) { + Client client(lookupUrl); + const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr)); + const std::string subName = "dlq-sub"; + auto dlqPolicy = + DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + // set ack timeout is 10 ms. + PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfig, 10); + consumerConfig.setConsumerType(ConsumerType::ConsumerShared); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + producer.send(MessageBuilder().setContent("msg").build()); + + // Wait a while, message should not be send to DLQ + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg)); + client.close(); +} + +TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) { + Client client(lookupUrl); + const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr)); + const std::string subName = "dlq-sub"; + const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ"; + auto dlqPolicy = + DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(ConsumerType::ConsumerShared); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); + ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic); + + client.close(); +} + +class DeadLetterQueueTest : public ::testing::TestWithParam> { + public: + void SetUp() override { + bool isProducerBatch = std::get<0>(GetParam()); + bool isMultiConsumer = std::get<1>(GetParam()); + ConsumerType consumerType = std::get<2>(GetParam()); + + std::string testSuiteName = testing::UnitTest::GetInstance()->current_test_info()->name(); + std::replace(testSuiteName.begin(), testSuiteName.end(), '/', '_'); + topic_ = testSuiteName + std::to_string(time(nullptr)); + subName_ = "test-sub"; + dlqTopic_ = topic_ + "-" + subName_ + "-DLQ"; + + if (isMultiConsumer) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic_ + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + producerConf_.setBatchingEnabled(isProducerBatch); + consumerConf_.setConsumerType(consumerType); + consumerConf_.setDeadLetterPolicy( + DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic_).build()); + } + + void TearDown() override { client_.close(); } + + protected: + Client client_{lookupUrl}; + ProducerConfiguration producerConf_; + ConsumerConfiguration consumerConf_; + std::string topic_; + std::string subName_; + std::string dlqTopic_; +}; + +TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) { + Client client(lookupUrl); + + Consumer consumer; + PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConf_, 200); + consumerConf_.setNegativeAckRedeliveryDelayMs(100); + ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer)); + const int num = 100; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder() + .setContent(std::to_string(i)) + .setPartitionKey("p-key") + .setOrderingKey("o-key") + .setProperty("pk-1", "pv-1") + .build(); + producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); }); + } + + // receive messages and don't ack. + for (int i = 0; i < consumerConf_.getDeadLetterPolicy().getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + // Randomly specify some messages manually negativeAcknowledge. + if (rand() % 2 == 0) { + consumer.negativeAcknowledge(msg); + } + } + + // assert dlq msg. + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, subName_, dlqConsumerConfig, deadLetterQueueConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_FALSE(msg.getDataAsString().empty()); + ASSERT_EQ(msg.getPartitionKey(), "p-key"); + ASSERT_EQ(msg.getOrderingKey(), "o-key"); + ASSERT_EQ(msg.getProperty("pk-1"), "pv-1"); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_)); + ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + + ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200)); +} + +TEST_P(DeadLetterQueueTest, testSendDLQTriggerByRedeliverUnacknowledgedMessages) { + Client client(lookupUrl); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer)); + + const int num = 10; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder() + .setContent(std::to_string(i)) + .setPartitionKey("p-key") + .setOrderingKey("o-key") + .setProperty("pk-1", "pv-1") + .build(); + producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); }); + } + + // nack all msg. + for (int i = 1; i <= consumerConf_.getDeadLetterPolicy().getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + if (i % num == 0) { + consumer.redeliverUnacknowledgedMessages(); + } + } + + // assert dlq msg. + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, subName_, dlqConsumerConfig, deadLetterQueueConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_FALSE(msg.getDataAsString().empty()); + ASSERT_EQ(msg.getPartitionKey(), "p-key"); + ASSERT_EQ(msg.getOrderingKey(), "o-key"); + ASSERT_EQ(msg.getProperty("pk-1"), "pv-1"); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_)); + ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200)); +} + +TEST_P(DeadLetterQueueTest, testSendDLQTriggerByNegativeAcknowledge) { + Client client(lookupUrl); + + Consumer consumer; + consumerConf_.setNegativeAckRedeliveryDelayMs(100); + ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer)); + + const int num = 10; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder() + .setContent(std::to_string(i)) + .setPartitionKey("p-key") + .setOrderingKey("o-key") + .setProperty("pk-1", "pv-1") + .build(); + producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); }); + } + + // nack all msg. + for (int i = 0; i < consumerConf_.getDeadLetterPolicy().getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + consumer.negativeAcknowledge(msg); + } + + // assert dlq msg. + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_FALSE(msg.getDataAsString().empty()); + ASSERT_EQ(msg.getPartitionKey(), "p-key"); + ASSERT_EQ(msg.getOrderingKey(), "o-key"); + ASSERT_EQ(msg.getProperty("pk-1"), "pv-1"); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_)); + ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200)); +} + +TEST_P(DeadLetterQueueTest, testInitSubscription) { + Client client(lookupUrl); + + const std::string dlqInitSub = "dlq-init-sub"; + auto dlqPolicy = DeadLetterPolicyBuilder() + .maxRedeliverCount(3) + .initialSubscriptionName(dlqInitSub) + .deadLetterTopic(dlqTopic_) + .build(); + consumerConf_.setDeadLetterPolicy(dlqPolicy); + consumerConf_.setNegativeAckRedeliveryDelayMs(100); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic_, subName_, consumerConf_, consumer)); + + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, subName_, dlqConsumerConfig, deadLetterQueueConsumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic_, producerConf_, producer)); + + const int num = 10; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder().setContent(std::to_string(i)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // nack all msg. + for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + consumer.negativeAcknowledge(msg); + } + + // Use this subscription to ensure that messages are sent to the DLQ. + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_FALSE(msg.getDataAsString().empty()); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_)); + ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + + // If there is no initial subscription, then the subscription will not receive the DLQ messages sent + // before the subscription. + Consumer initDLQConsumer; + ConsumerConfiguration initDLQConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionLatest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic_, dlqInitSub, initDLQConsumerConfig, initDLQConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, initDLQConsumer.receive(msg, 1000)); + ASSERT_FALSE(msg.getDataAsString().empty()); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic_)); + ASSERT_FALSE(msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, initDLQConsumer.receive(msg, 200)); +} + +INSTANTIATE_TEST_SUITE_P(Pulsar, DeadLetterQueueTest, + testing::Combine(testing::Values(true, false), testing::Values(true, false), + testing::Values(ConsumerType::ConsumerShared, + ConsumerType::ConsumerKeyShared)), + [](const testing::TestParamInfo &info) { + return "isBatch_" + std::to_string(std::get<0>(info.param)) + "_isMultiTopics_" + + std::to_string(std::get<1>(info.param)) + "_subType_" + + std::to_string(std::get<2>(info.param)); + }); + +} // namespace pulsar diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index fb6bb591..aa7737d0 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -23,6 +23,7 @@ #include "lib/ClientConnection.h" #include "lib/ClientImpl.h" +#include "lib/ConsumerConfigurationImpl.h" #include "lib/ConsumerImpl.h" #include "lib/MessageImpl.h" #include "lib/MultiTopicsConsumerImpl.h" @@ -181,6 +182,11 @@ class PulsarFriend { static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } static std::shared_ptr getMessageIdImpl(MessageId& msgId) { return msgId.impl_; } + + static void setConsumerUnAckMessagesTimeoutMs(const ConsumerConfiguration& consumerConfiguration, + long unAckedMessagesTimeoutMs) { + consumerConfiguration.impl_->unAckedMessagesTimeoutMs = unAckedMessagesTimeoutMs; + } }; } // namespace pulsar