diff --git a/include/pulsar/ReaderConfiguration.h b/include/pulsar/ReaderConfiguration.h index 4f6464f7..f72abf67 100644 --- a/include/pulsar/ReaderConfiguration.h +++ b/include/pulsar/ReaderConfiguration.h @@ -263,6 +263,21 @@ class PULSAR_PUBLIC ReaderConfiguration { */ ReaderConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action); + /** + * Set the reader to include the startMessageId or given position of any reset operation like + * Reader::seek. + * + * Default: false + * + * @param startMessageIdInclusive whether to include the reset position + */ + ReaderConfiguration& setStartMessageIdInclusive(bool startMessageIdInclusive); + + /** + * The associated getter of setStartMessageIdInclusive + */ + bool isStartMessageIdInclusive() const; + /** * Check whether the message has a specific property attached. * diff --git a/lib/ReaderConfiguration.cc b/lib/ReaderConfiguration.cc index 3ba7fedd..f1dba5eb 100644 --- a/lib/ReaderConfiguration.cc +++ b/lib/ReaderConfiguration.cc @@ -120,6 +120,13 @@ ReaderConfiguration& ReaderConfiguration::setCryptoFailureAction(ConsumerCryptoF return *this; } +ReaderConfiguration& ReaderConfiguration::setStartMessageIdInclusive(bool startMessageIdInclusive) { + impl_->startMessageIdInclusive = startMessageIdInclusive; + return *this; +} + +bool ReaderConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; } + bool ReaderConfiguration::hasProperty(const std::string& name) const { const auto& properties = impl_->properties; return properties.find(name) != properties.cend(); diff --git a/lib/ReaderConfigurationImpl.h b/lib/ReaderConfigurationImpl.h index 6f38c29b..c92397c4 100644 --- a/lib/ReaderConfigurationImpl.h +++ b/lib/ReaderConfigurationImpl.h @@ -38,6 +38,7 @@ struct ReaderConfigurationImpl { CryptoKeyReaderPtr cryptoKeyReader; ConsumerCryptoFailureAction cryptoFailureAction; std::map properties; + bool startMessageIdInclusive{false}; }; } // namespace pulsar #endif /* LIB_READERCONFIGURATIONIMPL_H_ */ diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index d6a8f1d5..f41106e4 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -59,6 +59,7 @@ void ReaderImpl::start(const MessageId& startMessageId, consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader()); consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction()); consumerConf.setProperties(readerConf_.getProperties()); + consumerConf.setStartMessageIdInclusive(readerConf_.isStartMessageIdInclusive()); if (readerConf_.getReaderName().length() > 0) { consumerConf.setConsumerName(readerConf_.getReaderName()); diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index afef384d..ac2fa234 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -700,4 +700,56 @@ TEST_P(ReaderTest, testReceiveAfterSeek) { client.close(); } -INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false)); \ No newline at end of file +TEST(ReaderSeekTest, testSeekForMessageId) { + Client client(serviceUrl); + + const std::string topic = "test-seek-for-message-id-" + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + Reader readerExclusive; + ASSERT_EQ(ResultOk, + client.createReader(topic, MessageId::latest(), ReaderConfiguration(), readerExclusive)); + + Reader readerInclusive; + ASSERT_EQ(ResultOk, + client.createReader(topic, MessageId::latest(), + ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive)); + + const auto numMessages = 100; + MessageId seekMessageId; + + int r = (rand() % (numMessages - 1)); + for (int i = 0; i < numMessages; i++) { + MessageId id; + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id)); + + if (i == r) { + seekMessageId = id; + } + } + + LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r); + + readerExclusive.seek(seekMessageId); + Message msg0; + ASSERT_EQ(ResultOk, readerExclusive.readNext(msg0, 3000)); + + readerInclusive.seek(seekMessageId); + Message msg1; + ASSERT_EQ(ResultOk, readerInclusive.readNext(msg1, 3000)); + + LOG_INFO("readerExclusive received " << msg0.getDataAsString() << " from " << msg0.getMessageId()); + LOG_INFO("readerInclusive received " << msg1.getDataAsString() << " from " << msg1.getMessageId()); + + ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1)); + ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r)); + + readerExclusive.close(); + readerInclusive.close(); + producer.close(); +} + +INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));