diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b1a620ac..90f3d132 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -687,6 +687,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const } else { LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to " "consume encrypted message"); + auto messageId = MessageIdBuilder::from(msg.message_id()).build(); + unAckedMessageTrackerPtr_->add(messageId); } return false; } @@ -707,6 +709,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError); } else { LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message"); + auto messageId = MessageIdBuilder::from(msg.message_id()).build(); + unAckedMessageTrackerPtr_->add(messageId); } return false; } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 29d5d0bf..c255bc01 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -315,6 +315,7 @@ class ConsumerImpl : public ConsumerImplBase { // these two declared friend to access setNegativeAcknowledgeEnabledForTesting friend class MultiTopicsConsumerImpl; + FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index f3c8abbe..ef70d5f7 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -27,6 +27,7 @@ #include #include "HttpHelper.h" +#include "NoOpsCryptoKeyReader.h" #include "PulsarFriend.h" #include "lib/ClientConnection.h" #include "lib/Future.h" @@ -895,6 +896,85 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) { ASSERT_GE(elapsed.seconds(), operationTimeout); } +TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr)); + std::string subName = "sub-test"; + + std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem"; + std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem"; + std::shared_ptr keyReader = + std::make_shared(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH); + + ProducerConfiguration conf; + conf.setCompressionType(CompressionLZ4); + conf.addEncryptionKey("client-rsa.pem"); + conf.setCryptoKeyReader(keyReader); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer)); + + ConsumerConfiguration consConfig1; + consConfig1.setCryptoKeyReader(keyReader); + consConfig1.setConsumerType(pulsar::ConsumerShared); + consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + Consumer consumer1; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1)); + + ConsumerConfiguration consConfig2; + consConfig2.setCryptoKeyReader(std::make_shared()); + consConfig2.setConsumerType(pulsar::ConsumerShared); + consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + consConfig2.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer2; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2)); + auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2); + consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + 100, 100, PulsarFriend::getClientImplPtr(client), static_cast(*consumer2ImplPtr))); + + ConsumerConfiguration consConfig3; + consConfig3.setConsumerType(pulsar::ConsumerShared); + consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + consConfig3.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer3; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3)); + auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3); + consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + 100, 100, PulsarFriend::getClientImplPtr(client), static_cast(*consumer3ImplPtr))); + + int numberOfMessages = 20; + std::string msgContent = "msg-content"; + std::set valuesSent; + Message msg; + for (int i = 0; i < numberOfMessages; i++) { + auto value = msgContent + std::to_string(i); + valuesSent.emplace(value); + msg = MessageBuilder().setContent(value).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000)); + ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000)); + + // All messages would be received by consumer 1 + std::set valuesReceived; + for (int i = 0; i < numberOfMessages; i++) { + ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000)); + ASSERT_EQ(ResultOk, consumer1.acknowledge(msg)); + valuesReceived.emplace(msg.getDataAsString()); + } + ASSERT_EQ(valuesSent, valuesReceived); + + // Consuming from consumer 2 and 3 again just to be sure + // no message should be returned since they can't decrypt the message + ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000)); + ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000)); + + ASSERT_EQ(ResultOk, client.close()); +} + class ConsumerSeekTest : public ::testing::TestWithParam { public: void SetUp() override { producerConf_ = ProducerConfiguration().setBatchingEnabled(GetParam()); }