2323#include < pulsar/MessageIdBuilder.h>
2424
2525#include < algorithm>
26+ #include < utility>
2627
2728#include " AckGroupingTracker.h"
2829#include " AckGroupingTrackerDisabled.h"
@@ -235,19 +236,28 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
235236
236237 // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
237238 // sending the subscribe request.
238- cnx->registerConsumer (consumerId_, get_shared_this_ptr ());
239- LOG_DEBUG (cnx->cnxString () << " Registered consumer " << consumerId_);
239+ optional<MessageId> subscribeMessageId;
240+ bool duringSeek = false ;
241+ {
242+ std::lock_guard<std::mutex> lock (mutex_);
243+ setCnx (cnx);
244+ cnx->registerConsumer (consumerId_, get_shared_this_ptr ());
245+ LOG_DEBUG (cnx->cnxString () << " Registered consumer " << consumerId_);
240246
241- if (hasPendingSeek_.load (std::memory_order_acquire)) {
247+ {
248+ std::lock_guard<std::mutex> lock (mutexForMessageId_);
249+ clearReceiveQueue ();
250+ subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
251+ ? startMessageId_.get ()
252+ : std::nullopt ;
253+ }
254+
255+ duringSeek = seekCallback_.has_value ();
256+ }
257+ if (duringSeek) {
242258 ackGroupingTrackerPtr_->flushAndClean ();
243259 }
244260
245- Lock lockForMessageId (mutexForMessageId_);
246- clearReceiveQueue ();
247- const auto subscribeMessageId =
248- (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get () : std::nullopt ;
249- lockForMessageId.unlock ();
250-
251261 unAckedMessageTrackerPtr_->clear ();
252262
253263 ClientImplPtr client = client_.lock ();
@@ -269,7 +279,6 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
269279 } else {
270280 promise.setFailed (handleResult);
271281 }
272- completeSeekCallback (ResultOk);
273282 });
274283
275284 return promise.getFuture ();
@@ -1130,7 +1139,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11301139 * `startMessageId_` is updated so that we can discard messages after delivery restarts.
11311140 */
11321141void ConsumerImpl::clearReceiveQueue () {
1133- if (hasPendingSeek_.load (std::memory_order_acquire)) {
1142+ // NOTE: This method must be called with `mutex_` held for thread safety where
1143+ if (seekCallback_.has_value ()) {
1144+ executor_->postWork (
1145+ [callback{std::exchange (seekCallback_, std::nullopt ).value ()}] { callback (ResultOk); });
1146+
11341147 if (hasSoughtByTimestamp ()) {
11351148 // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
11361149 // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
@@ -1733,9 +1746,16 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17331746 callback (ResultNotConnected);
17341747 return ;
17351748 }
1736-
1737- auto expected = false ;
1738- if (!hasPendingSeek_.compare_exchange_strong (expected, true )) {
1749+ bool hasPendingSeek = false ;
1750+ {
1751+ std::lock_guard<std::mutex> lock (mutex_);
1752+ if (seekCallback_.has_value ()) {
1753+ hasPendingSeek = true ;
1754+ } else {
1755+ seekCallback_ = std::move (callback);
1756+ }
1757+ }
1758+ if (hasPendingSeek) {
17391759 LOG_ERROR (getName () << " attempted to seek " << seekArg << " when there is a pending seek" );
17401760 callback (ResultNotAllowedError);
17411761 return ;
@@ -1748,37 +1768,46 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17481768 seekMessageId_ = *boost::get<MessageId>(&seekArg);
17491769 hasSoughtByTimestamp_.store (false , std::memory_order_release);
17501770 }
1751- seekCallback_ = std::move (callback);
17521771 LOG_INFO (getName () << " Seeking subscription to " << seekArg);
17531772
17541773 auto weakSelf = weak_from_this ();
17551774
17561775 cnx->sendRequestWithId (seek, requestId, " SEEK" )
1757- .addListener ([this , weakSelf, callback, originalSeekMessageId](Result result,
1758- const ResponseData& responseData) {
1776+ .addListener ([this , weakSelf, originalSeekMessageId](Result result,
1777+ const ResponseData& responseData) {
17591778 auto self = weakSelf.lock ();
17601779 if (!self) {
1761- callback (result);
17621780 return ;
17631781 }
17641782 if (result == ResultOk) {
17651783 LOG_INFO (getName () << " Seek successfully" );
17661784 ackGroupingTrackerPtr_->flushAndClean ();
17671785 incomingMessages_.clear ();
1768- Lock lock (mutexForMessageId_);
1769- lastDequedMessageId_ = MessageId::earliest ();
1770- lock.unlock ();
1786+ {
1787+ std::lock_guard<std::mutex> lock (mutexForMessageId_);
1788+ lastDequedMessageId_ = MessageId::earliest ();
1789+ }
17711790
1791+ std::lock_guard<std::mutex> lock (mutex_);
17721792 if (!getCnx ().expired ()) {
17731793 if (!hasSoughtByTimestamp ()) {
17741794 startMessageId_ = seekMessageId_.get ();
17751795 }
1776- completeSeekCallback (result);
1796+ if (!seekCallback_.has_value ()) {
1797+ LOG_ERROR (getName () << " Seek callback is not set" );
1798+ return ;
1799+ }
1800+ executor_->postWork (
1801+ [self, callback{std::exchange (seekCallback_, std::nullopt ).value ()}]() {
1802+ callback (ResultOk);
1803+ });
17771804 } // else: complete the seek future after connection is established
17781805 } else {
17791806 LOG_ERROR (getName () << " Failed to seek: " << result);
17801807 seekMessageId_ = originalSeekMessageId;
1781- completeSeekCallback (result);
1808+ executor_->postWork ([self, callback{std::exchange (seekCallback_, std::nullopt ).value ()}]() {
1809+ callback (ResultOk);
1810+ });
17821811 }
17831812 });
17841813}
0 commit comments