From 1a4ee3da36f03760413b6812a43946dcfb7ceac3 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 28 Dec 2022 19:52:39 +0800 Subject: [PATCH 1/4] [fix] Redeliver messages that can't be decrypted. --- lib/ConsumerImpl.cc | 4 ++ lib/ConsumerImpl.h | 1 + tests/ConsumerTest.cc | 86 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b1a620ac..90f3d132 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -687,6 +687,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const } else { LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to " "consume encrypted message"); + auto messageId = MessageIdBuilder::from(msg.message_id()).build(); + unAckedMessageTrackerPtr_->add(messageId); } return false; } @@ -707,6 +709,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError); } else { LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message"); + auto messageId = MessageIdBuilder::from(msg.message_id()).build(); + unAckedMessageTrackerPtr_->add(messageId); } return false; } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 29d5d0bf..c255bc01 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -315,6 +315,7 @@ class ConsumerImpl : public ConsumerImplBase { // these two declared friend to access setNegativeAcknowledgeEnabledForTesting friend class MultiTopicsConsumerImpl; + FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index f3c8abbe..06c6bc94 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -27,6 +27,7 @@ #include #include "HttpHelper.h" +#include "NoOpsCryptoKeyReader.h" #include "PulsarFriend.h" #include "lib/ClientConnection.h" #include "lib/Future.h" @@ -895,6 +896,91 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) { ASSERT_GE(elapsed.seconds(), operationTimeout); } +TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr)); + std::string subName = "sub-test"; + + // TODO remove ../ + std::string PUBLIC_CERT_FILE_PATH = "../../test-conf//public-key.client-rsa.pem"; + std::string PRIVATE_CERT_FILE_PATH = "../../test-conf//private-key.client-rsa.pem"; + std::shared_ptr keyReader = + std::make_shared(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH); + + ProducerConfiguration conf; + conf.setCompressionType(CompressionLZ4); + conf.addEncryptionKey("client-rsa.pem"); + conf.setCryptoKeyReader(keyReader); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer)); + + ConsumerConfiguration consConfig1; + consConfig1.setCryptoKeyReader(keyReader); + consConfig1.setConsumerType(pulsar::ConsumerShared); + consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + Consumer consumer1; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, consumer1)); + + ConsumerConfiguration consConfig2; + consConfig2.setCryptoKeyReader(std::make_shared()); + consConfig2.setConsumerType(pulsar::ConsumerShared); + consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + consConfig2.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer2; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, consumer2)); + auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2); + consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + 100, 100, PulsarFriend::getClientImplPtr(client), static_cast(*consumer2ImplPtr))); + + ConsumerConfiguration consConfig3; + consConfig3.setConsumerType(pulsar::ConsumerShared); + consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + consConfig3.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer3; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, consumer3)); + auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3); + consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + 100, 100, PulsarFriend::getClientImplPtr(client), static_cast(*consumer3ImplPtr))); + + int numberOfMessages = 20; + std::string msgContent = "msg-content"; + Message msg; + for (int i = 0; i < numberOfMessages; i++) { + std::stringstream stream; + stream << msgContent << i; + msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000)); + ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000)); + + // All messages would be received by consumer 1 + std::unordered_set receivedMsgs; + for (int i = 0; i < numberOfMessages; i++) { + ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000)); + ASSERT_EQ(ResultOk, consumer1.acknowledge(msg)); + receivedMsgs.insert(msg.getDataAsString()); + } + + ASSERT_EQ(receivedMsgs.size(), numberOfMessages); + for (int i = 0; i < numberOfMessages; i++) { + std::stringstream expected; + expected << msgContent << i; + ASSERT_TRUE(receivedMsgs.count(expected.str())); + } + + // Consuming from consumer 2 and 3 again just to be sure + // no message should be returned since they can't decrypt the message + ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000)); + ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000)); + + ASSERT_EQ(ResultOk, client.close()); +} + class ConsumerSeekTest : public ::testing::TestWithParam { public: void SetUp() override { producerConf_ = ProducerConfiguration().setBatchingEnabled(GetParam()); } From 75b830f7a790d8c5418afdf5bdca9c8739654196 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 29 Dec 2022 09:45:25 +0800 Subject: [PATCH 2/4] Fix unit test. --- tests/ConsumerTest.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 06c6bc94..666ef71e 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -902,9 +902,8 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr)); std::string subName = "sub-test"; - // TODO remove ../ - std::string PUBLIC_CERT_FILE_PATH = "../../test-conf//public-key.client-rsa.pem"; - std::string PRIVATE_CERT_FILE_PATH = "../../test-conf//private-key.client-rsa.pem"; + std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem"; + std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem"; std::shared_ptr keyReader = std::make_shared(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH); From b8214aaf35e787d7185210f0f8faba9cde48988f Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 16 Feb 2023 17:51:55 +0800 Subject: [PATCH 3/4] Fix code reviews. --- tests/ConsumerTest.cc | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 666ef71e..5a198c5d 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -902,8 +902,8 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + std::to_string(time(nullptr)); std::string subName = "sub-test"; - std::string PUBLIC_CERT_FILE_PATH = "../test-conf//public-key.client-rsa.pem"; - std::string PRIVATE_CERT_FILE_PATH = "../test-conf//private-key.client-rsa.pem"; + std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem"; + std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem"; std::shared_ptr keyReader = std::make_shared(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH); @@ -946,9 +946,7 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { std::string msgContent = "msg-content"; Message msg; for (int i = 0; i < numberOfMessages; i++) { - std::stringstream stream; - stream << msgContent << i; - msg = MessageBuilder().setContent(stream.str()).build(); + msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build(); ASSERT_EQ(ResultOk, producer.send(msg)); } @@ -958,19 +956,22 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000)); // All messages would be received by consumer 1 - std::unordered_set receivedMsgs; + std::set valuesSent; for (int i = 0; i < numberOfMessages; i++) { - ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000)); - ASSERT_EQ(ResultOk, consumer1.acknowledge(msg)); - receivedMsgs.insert(msg.getDataAsString()); + auto value = msgContent + std::to_string(i); + valuesSent.emplace(value); + msg = MessageBuilder().setContent(value).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); } - ASSERT_EQ(receivedMsgs.size(), numberOfMessages); + // All messages would be received by consumer 1 + std::set valuesReceived; for (int i = 0; i < numberOfMessages; i++) { - std::stringstream expected; - expected << msgContent << i; - ASSERT_TRUE(receivedMsgs.count(expected.str())); + ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000)); + ASSERT_EQ(ResultOk, consumer1.acknowledge(msg)); + valuesReceived.emplace(msg.getDataAsString()); } + ASSERT_EQ(valuesSent, valuesReceived); // Consuming from consumer 2 and 3 again just to be sure // no message should be returned since they can't decrypt the message From dbd519c74b3e1b108975631253e8853864d11100 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 22 Feb 2023 17:55:30 +0800 Subject: [PATCH 4/4] Just send once loop msg --- tests/ConsumerTest.cc | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 5a198c5d..ef70d5f7 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -944,9 +944,12 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { int numberOfMessages = 20; std::string msgContent = "msg-content"; + std::set valuesSent; Message msg; for (int i = 0; i < numberOfMessages; i++) { - msg = MessageBuilder().setContent(msgContent + std::to_string(i)).build(); + auto value = msgContent + std::to_string(i); + valuesSent.emplace(value); + msg = MessageBuilder().setContent(value).build(); ASSERT_EQ(ResultOk, producer.send(msg)); } @@ -955,15 +958,6 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000)); ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000)); - // All messages would be received by consumer 1 - std::set valuesSent; - for (int i = 0; i < numberOfMessages; i++) { - auto value = msgContent + std::to_string(i); - valuesSent.emplace(value); - msg = MessageBuilder().setContent(value).build(); - ASSERT_EQ(ResultOk, producer.send(msg)); - } - // All messages would be received by consumer 1 std::set valuesReceived; for (int i = 0; i < numberOfMessages; i++) {