Skip to content

Commit 4596ed5

Browse files
committed
fix seek arg check
1 parent 9cdf616 commit 4596ed5

2 files changed

Lines changed: 17 additions & 10 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,12 +1139,16 @@ void ConsumerImpl::clearReceiveQueue() {
11391139
if (seekStatus_ != SeekStatus::NOT_STARTED) {
11401140
// Flush the pending ACKs in case newly arrived messages are filtered out by the previous pending ACKs
11411141
ackGroupingTrackerPtr_->flushAndClean();
1142-
if (std::holds_alternative<MessageId>(lastSeekArg_)) {
1143-
startMessageId_ = std::get<MessageId>(lastSeekArg_);
1142+
if (lastSeekArg_.has_value()) {
1143+
if (std::holds_alternative<MessageId>(lastSeekArg_.value())) {
1144+
startMessageId_ = std::get<MessageId>(lastSeekArg_.value());
1145+
} else {
1146+
// Invalidate startMessageId_ so that `isPrior` checks will be skipped, and
1147+
// `hasMessageAvailableAsync` won't use `startMessageId_` in compare.
1148+
startMessageId_ = std::nullopt;
1149+
}
11441150
} else {
1145-
// Invalidate startMessageId_ so that `isPrior` checks will be skipped, and
1146-
// `hasMessageAvailableAsync` won't use `startMessageId_` in compare.
1147-
startMessageId_ = std::nullopt;
1151+
LOG_ERROR(getName() << "SeekStatus is not NOT_STARTED but lastSeekArg_ is not set");
11481152
}
11491153
return;
11501154
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
@@ -1597,7 +1601,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
15971601
(startMessageId_.value_or(MessageId::earliest()) == MessageId::latest() ||
15981602
// If there is a previous seek operation by timestamp, the start message id will be incorrect, so
15991603
// we cannot compare the start position with the last position.
1600-
std::holds_alternative<SeekTimestampType>(lastSeekArg_));
1604+
(lastSeekArg_.has_value() && std::holds_alternative<SeekTimestampType>(lastSeekArg_.value())));
16011605
}
16021606
if (compareMarkDeletePosition) {
16031607
auto self = get_shared_this_ptr();
@@ -1621,7 +1625,10 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
16211625
bool lastSeekIsByTimestamp = false;
16221626
{
16231627
LockGuard lock{self->mutex_};
1624-
lastSeekIsByTimestamp = std::holds_alternative<SeekTimestampType>(self->lastSeekArg_);
1628+
if (self->lastSeekArg_.has_value() &&
1629+
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
1630+
lastSeekIsByTimestamp = true;
1631+
}
16251632
}
16261633
if (self->config_.isStartMessageIdInclusive() && !lastSeekIsByTimestamp) {
16271634
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
@@ -1798,8 +1805,8 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17981805
LOG_INFO(getName() << "Seek successfully");
17991806
ackGroupingTrackerPtr_->flushAndClean();
18001807
incomingMessages_.clear();
1801-
if (std::holds_alternative<MessageId>(lastSeekArg_)) {
1802-
startMessageId_ = std::get<MessageId>(lastSeekArg_);
1808+
if (lastSeekArg_.has_value() && std::holds_alternative<MessageId>(lastSeekArg_.value())) {
1809+
startMessageId_ = std::get<MessageId>(lastSeekArg_.value());
18031810
}
18041811
if (!seekCallback_.has_value()) {
18051812
LOG_ERROR(getName() << "Seek callback is not set");

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class ConsumerImpl : public ConsumerImplBase {
270270

271271
SeekStatus seekStatus_{SeekStatus::NOT_STARTED};
272272
optional<ResultCallback> seekCallback_;
273-
SeekArg lastSeekArg_;
273+
optional<SeekArg> lastSeekArg_;
274274

275275
class ChunkedMessageCtx {
276276
public:

0 commit comments

Comments
 (0)