diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 666361b7..0ee6a74d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -318,8 +318,9 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(get_shared_this_ptr()); } else { // Consumer was not yet created, retry to connect to broker if it's possible - if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) { - LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result)); + result = convertToTimeoutIfNecessary(result, creationTimestamp_); + if (result == ResultRetryable) { + LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result)); scheduleReconnection(get_shared_this_ptr()); } else { LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 501f5fc7..28b53176 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -138,8 +138,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con } } -bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; } - void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { const auto state = handler->state_.load(); if (state == Pending || state == Ready) { @@ -164,4 +162,12 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBase } } +Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const { + if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) { + return ResultTimeout; + } else { + return result; + } +} + } // namespace pulsar diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 3b12f098..e6a2fc6d 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -69,11 +69,6 @@ class HandlerBase { */ static void scheduleReconnection(HandlerBasePtr handler); - /* - * Should we retry in error that are transient - */ - bool isRetriableError(Result result); - /** * Do some cleanup work before changing `connection_` to `cnx`. * @@ -127,6 +122,8 @@ class HandlerBase { Backoff backoff_; uint64_t epoch_; + Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const; + private: DeadlineTimerPtr timer_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 0b7f482d..597bfd37 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -281,7 +281,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(shared_from_this()); } else { // Producer was not yet created, retry to connect to broker if it's possible - if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) { + result = convertToTimeoutIfNecessary(result, creationTimestamp_); + if (result == ResultRetryable) { LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result)); scheduleReconnection(shared_from_this()); } else { diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 98c11cf3..2d195fd8 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1321,4 +1321,18 @@ TEST(ConsumerTest, testNotSetSubscriptionName) { client.close(); } +TEST(ConsumerTest, testRetrySubscribe) { + Client client{lookupUrl}; + for (int i = 0; i < 10; i++) { + // "Subscription is fenced" error might happen here because the previous seek operation might not be + // done in broker, the consumer should retry until timeout + Consumer consumer; + ASSERT_EQ(client.subscribe("test-close-before-seek-done", "sub", consumer), ResultOk); + consumer.seekAsync(MessageId::earliest(), [](Result) {}); + consumer.close(); + } + // TODO: Currently it's hard to test the timeout error without configuring the operation timeout in + // milliseconds +} + } // namespace pulsar