Skip to content

Commit a1cf401

Browse files
authored
Fix StartMessageIdInclusive not work when reader reads from latest msg id (#386)
Fixes #385 ### Motivation The reader with `StartMessageIdInclusive` enabled should be able to reads messages from the latest message ID. ### Modifications - If `StartMessageIdInclusive` is enabled, the reader will seek and read the latest message in the topic.
1 parent 9ffd2ef commit a1cf401

2 files changed

Lines changed: 55 additions & 7 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,18 +1508,34 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
15081508

15091509
if (messageId == MessageId::latest()) {
15101510
lock.unlock();
1511-
getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
1511+
auto self = get_shared_this_ptr();
1512+
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
15121513
if (result != ResultOk) {
15131514
callback(result, {});
15141515
return;
15151516
}
1516-
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
1517-
// We only care about comparing ledger ids and entry ids as mark delete position doesn't have
1518-
// other ids such as batch index
1519-
callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(),
1520-
response.getLastMessageId()) < 0);
1517+
auto handleResponse = [self, response, callback] {
1518+
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
1519+
// We only care about comparing ledger ids and entry ids as mark delete position doesn't
1520+
// have other ids such as batch index
1521+
auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
1522+
response.getLastMessageId());
1523+
callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
1524+
: compareResult < 0);
1525+
} else {
1526+
callback(ResultOk, false);
1527+
}
1528+
};
1529+
if (self->config_.isStartMessageIdInclusive()) {
1530+
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
1531+
if (result != ResultOk) {
1532+
callback(result, {});
1533+
return;
1534+
}
1535+
handleResponse();
1536+
});
15211537
} else {
1522-
callback(ResultOk, false);
1538+
handleResponse();
15231539
}
15241540
});
15251541
} else {

tests/ReaderTest.cc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,4 +752,36 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
752752
producer.close();
753753
}
754754

755+
TEST(ReaderSeekTest, testStartAtLatestMessageId) {
756+
Client client(serviceUrl);
757+
758+
const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr));
759+
760+
Producer producer;
761+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
762+
763+
MessageId id;
764+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id));
765+
766+
Reader readerExclusive;
767+
ASSERT_EQ(ResultOk,
768+
client.createReader(topic, MessageId::latest(), ReaderConfiguration(), readerExclusive));
769+
770+
Reader readerInclusive;
771+
ASSERT_EQ(ResultOk,
772+
client.createReader(topic, MessageId::latest(),
773+
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
774+
775+
Message msg;
776+
bool hasMsgAvaliable = false;
777+
readerInclusive.hasMessageAvailable(hasMsgAvaliable);
778+
ASSERT_TRUE(hasMsgAvaliable);
779+
ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
780+
ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
781+
782+
readerExclusive.close();
783+
readerInclusive.close();
784+
producer.close();
785+
}
786+
755787
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)