diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 19d5055c..77a44e26 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1397,6 +1397,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me if (result == ResultOk) { LOG_INFO(getName() << "Seek successfully"); ackGroupingTrackerPtr_->flushAndClean(); + incomingMessages_.clear(); Lock lock(mutexForMessageId_); lastDequedMessageId_ = MessageId::earliest(); lock.unlock(); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index e35a1f02..a77636ee 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -893,6 +893,14 @@ TEST_P(ConsumerSeekTest, testSeekForMessageId) { Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); + Consumer consumerExclusive; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive)); + + Consumer consumerInclusive; + ASSERT_EQ(ResultOk, + client.subscribe(topic, "sub-1", ConsumerConfiguration().setStartMessageIdInclusive(true), + consumerInclusive)); + const auto numMessages = 100; MessageId seekMessageId; @@ -909,16 +917,10 @@ TEST_P(ConsumerSeekTest, testSeekForMessageId) { LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r); - Consumer consumerExclusive; - ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive)); consumerExclusive.seek(seekMessageId); Message msg0; ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000)); - Consumer consumerInclusive; - ASSERT_EQ(ResultOk, - client.subscribe(topic, "sub-1", ConsumerConfiguration().setStartMessageIdInclusive(true), - consumerInclusive)); consumerInclusive.seek(seekMessageId); Message msg1; ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000));