From d2cf7414e0d4ff587cd77de5bdbec23f4e4c1f0e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Jun 2023 18:51:13 +0800 Subject: [PATCH 1/4] Fix retriable errors not handled well when subscribing Fixes https://github.com/apache/pulsar-client-cpp/issues/292 ### Motivation When a consumer failed to subscribe due to a retriable error, the time point comparation is wrong: https://github.com/apache/pulsar-client-cpp/blob/633f4bbe8c182128da09803172676b9d6af05057/lib/ConsumerImpl.cc#L321 `creationTimestamp_ + operationTimeut_` is the deadline, `TimeUtils::now()` is the current time, we should use `>` instead of `<` here to compare them. Otherwise, if the consumer encountered a retriable error and the deadline is not exceeded, the consumer won't reconnect and fail with `ResultRetryable`. ### Modifications Reverse the comparation between the deadline and the current time. When it times out, completing the future with `ResultTimeout` instead of the `result` itself, which is always `ResultRetryable`. Add `ConsumerTest.testRetrySubscribe` to verify this change. ### TODO Support configuring the operation timeout in milliseconds. --- lib/ConsumerImpl.cc | 5 ++++- tests/ConsumerTest.cc | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 666361b7..44ae3ab1 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -318,10 +318,13 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(get_shared_this_ptr()); } else { // Consumer was not yet created, retry to connect to broker if it's possible - if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) { + if (isRetriableError(result) && (TimeUtils::now() - creationTimestamp_ < operationTimeut_)) { LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result)); scheduleReconnection(get_shared_this_ptr()); } else { + if (isRetriableError(result)) { + result = ResultTimeout; + } LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); consumerCreatedPromise_.setFailed(result); state_ = Failed; diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 98c11cf3..2d195fd8 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1321,4 +1321,18 @@ TEST(ConsumerTest, testNotSetSubscriptionName) { client.close(); } +TEST(ConsumerTest, testRetrySubscribe) { + Client client{lookupUrl}; + for (int i = 0; i < 10; i++) { + // "Subscription is fenced" error might happen here because the previous seek operation might not be + // done in broker, the consumer should retry until timeout + Consumer consumer; + ASSERT_EQ(client.subscribe("test-close-before-seek-done", "sub", consumer), ResultOk); + consumer.seekAsync(MessageId::earliest(), [](Result) {}); + consumer.close(); + } + // TODO: Currently it's hard to test the timeout error without configuring the operation timeout in + // milliseconds +} + } // namespace pulsar From 39cb33b054b4583d1a91047bbd896b674c8815a8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 28 Jun 2023 20:08:40 +0800 Subject: [PATCH 2/4] Fix the same error for producer as well --- lib/ProducerImpl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 0b7f482d..f7af05b1 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -281,7 +281,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(shared_from_this()); } else { // Producer was not yet created, retry to connect to broker if it's possible - if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) { + if (isRetriableError(result) && (TimeUtils::now() - creationTimestamp_ < operationTimeut_)) { LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result)); scheduleReconnection(shared_from_this()); } else { From cc9caf336f45fa2aeb2ee9a4fabee994ef0b5796 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 28 Jun 2023 20:14:45 +0800 Subject: [PATCH 3/4] Add result conversion --- lib/ProducerImpl.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index f7af05b1..c1f3d5d5 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -285,6 +285,9 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result)); scheduleReconnection(shared_from_this()); } else { + if (isRetriableError(result)) { + result = ResultTimeout; + } LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); failPendingMessages(result, false); state_ = Failed; From 9b80487964aab18419f969ebd4b04acde6675c73 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 28 Jun 2023 20:36:55 +0800 Subject: [PATCH 4/4] Reduce duplicated code --- lib/ConsumerImpl.cc | 8 +++----- lib/HandlerBase.cc | 10 ++++++++-- lib/HandlerBase.h | 7 ++----- lib/ProducerImpl.cc | 6 ++---- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 44ae3ab1..0ee6a74d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -318,13 +318,11 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(get_shared_this_ptr()); } else { // Consumer was not yet created, retry to connect to broker if it's possible - if (isRetriableError(result) && (TimeUtils::now() - creationTimestamp_ < operationTimeut_)) { - LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result)); + result = convertToTimeoutIfNecessary(result, creationTimestamp_); + if (result == ResultRetryable) { + LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result)); scheduleReconnection(get_shared_this_ptr()); } else { - if (isRetriableError(result)) { - result = ResultTimeout; - } LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); consumerCreatedPromise_.setFailed(result); state_ = Failed; diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 501f5fc7..28b53176 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -138,8 +138,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con } } -bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; } - void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { const auto state = handler->state_.load(); if (state == Pending || state == Ready) { @@ -164,4 +162,12 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBase } } +Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const { + if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) { + return ResultTimeout; + } else { + return result; + } +} + } // namespace pulsar diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 3b12f098..e6a2fc6d 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -69,11 +69,6 @@ class HandlerBase { */ static void scheduleReconnection(HandlerBasePtr handler); - /* - * Should we retry in error that are transient - */ - bool isRetriableError(Result result); - /** * Do some cleanup work before changing `connection_` to `cnx`. * @@ -127,6 +122,8 @@ class HandlerBase { Backoff backoff_; uint64_t epoch_; + Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const; + private: DeadlineTimerPtr timer_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index c1f3d5d5..597bfd37 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -281,13 +281,11 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(shared_from_this()); } else { // Producer was not yet created, retry to connect to broker if it's possible - if (isRetriableError(result) && (TimeUtils::now() - creationTimestamp_ < operationTimeut_)) { + result = convertToTimeoutIfNecessary(result, creationTimestamp_); + if (result == ResultRetryable) { LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result)); scheduleReconnection(shared_from_this()); } else { - if (isRetriableError(result)) { - result = ResultTimeout; - } LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); failPendingMessages(result, false); state_ = Failed;