diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index f32ed81f..6d18780f 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -31,6 +31,7 @@ #include "OpSendMsg.h" #include "ProducerImpl.h" #include "PulsarApi.pb.h" +#include "ResultUtils.h" #include "Url.h" #include "auth/InitialAuthData.h" #include "checksum/ChecksumProvider.h" @@ -205,7 +206,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: ctx.load_verify_file(trustCertFilePath); } else { LOG_ERROR(trustCertFilePath << ": No such trustCertFile"); - close(); + close(ResultAuthenticationError, false); return; } } else { @@ -215,7 +216,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: if (!authentication_) { LOG_ERROR("Invalid authentication plugin"); - close(); + close(ResultAuthenticationError, false); return; } @@ -229,12 +230,12 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: tlsPrivateKey = authData->getTlsPrivateKey(); if (!file_exists(tlsCertificates)) { LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); - close(); + close(ResultAuthenticationError, false); return; } if (!file_exists(tlsCertificates)) { LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); - close(); + close(ResultAuthenticationError, false); return; } ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem); @@ -660,7 +661,7 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b } else { LOG_ERROR(cnxString_ << "Read operation failed: " << err.message()); } - close(); + close(ResultDisconnected); } else if (bytesTransferred < minReadSize) { // Read the remaining part, use a slice of buffer to write on the next // region @@ -718,7 +719,7 @@ void ClientConnection::processIncomingBuffer() { proto::BaseCommand incomingCmd; if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) { LOG_ERROR(cnxString_ << "Error parsing protocol buffer command"); - close(); + close(ResultDisconnected); return; } @@ -742,7 +743,7 @@ void ClientConnection::processIncomingBuffer() { << incomingCmd.message().message_id().ledgerid() << ", entry id " << incomingCmd.message().message_id().entryid() << "] Error parsing broker entry metadata"); - close(); + close(ResultDisconnected); return; } incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize); @@ -760,7 +761,7 @@ void ClientConnection::processIncomingBuffer() { << incomingCmd.message().message_id().ledgerid() // << ", entry id " << incomingCmd.message().message_id().entryid() << "] Error parsing message metadata"); - close(); + close(ResultDisconnected); return; } @@ -991,7 +992,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { default: LOG_WARN(cnxString_ << "Received invalid message from server"); - close(); + close(ResultDisconnected); break; } } @@ -1133,7 +1134,7 @@ void ClientConnection::sendMessage(const std::shared_ptr& args) { void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) { if (err) { LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message()); - close(); + close(ResultDisconnected); } else { sendPendingCommands(); } @@ -1142,7 +1143,7 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh void ClientConnection::handleSendPair(const boost::system::error_code& err) { if (err) { LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message()); - close(); + close(ResultDisconnected); } else { sendPendingCommands(); } @@ -1247,7 +1248,7 @@ void ClientConnection::handleKeepAliveTimeout() { if (havePendingPingRequest_) { LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout"); - close(); + close(ResultDisconnected); } else { // Send keep alive probe to peer LOG_DEBUG(cnxString_ << "Sending ping message"); @@ -1287,7 +1288,14 @@ void ClientConnection::close(Result result, bool detach) { } state_ = Disconnected; - closeSocket(); + if (socket_) { + boost::system::error_code err; + socket_->shutdown(boost::asio::socket_base::shutdown_both, err); + socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } + } if (tlsSocket_) { boost::system::error_code err; tlsSocket_->lowest_layer().close(err); @@ -1326,7 +1334,7 @@ void ClientConnection::close(Result result, bool detach) { } lock.unlock(); - if (result != ResultDisconnected && result != ResultRetryable) { + if (!isResultRetryable(result)) { LOG_ERROR(cnxString_ << "Connection closed with " << result); } else { LOG_INFO(cnxString_ << "Connection disconnected"); @@ -1473,26 +1481,15 @@ Future ClientConnection::newGetSchema(const std::string& top return promise.getFuture(); } -void ClientConnection::closeSocket() { - boost::system::error_code err; - if (socket_) { - socket_->shutdown(boost::asio::socket_base::shutdown_both, err); - socket_->close(err); - if (err) { - LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); - } - } -} - void ClientConnection::checkServerError(ServerError error) { switch (error) { case proto::ServerError::ServiceNotReady: - closeSocket(); + close(ResultDisconnected); break; case proto::ServerError::TooManyRequests: // TODO: Implement maxNumberOfRejectedRequestPerConnection like // https://github.com/apache/pulsar/pull/274 - closeSocket(); + close(ResultDisconnected); break; default: break; @@ -1518,7 +1515,7 @@ void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendRe if (!producer->ackReceived(sequenceId, messageId)) { // If the producer fails to process the ack, we need to close the connection // to give it a chance to recover from there - close(); + close(ResultDisconnected); } } } else { @@ -1542,12 +1539,12 @@ void ClientConnection::handleSendError(const proto::CommandSendError& error) { if (!producer->removeCorruptMessage(sequenceId)) { // If the producer fails to remove corrupt msg, we need to close the // connection to give it a chance to recover from there - close(); + close(ResultDisconnected); } } } } else { - close(); + close(ResultDisconnected); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index e74a3ae4..30ea8d82 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -142,6 +142,16 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= operationTimeut_)) { + if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >= operationTimeut_)) { return ResultTimeout; } else { return result; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index bdcc757d..4fd3c2c7 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -36,6 +36,7 @@ #include "OpSendMsg.h" #include "ProducerConfigurationImpl.h" #include "PulsarApi.pb.h" +#include "ResultUtils.h" #include "Semaphore.h" #include "TimeUtils.h" #include "TopicName.h" @@ -272,7 +273,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r } else { // Producer was not yet created, retry to connect to broker if it's possible result = convertToTimeoutIfNecessary(result, creationTimestamp_); - if (result == ResultRetryable) { + if (isResultRetryable(result)) { LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result)); scheduleReconnection(); } else { diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h new file mode 100644 index 00000000..b5ec6cdb --- /dev/null +++ b/lib/ResultUtils.h @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +namespace pulsar { + +inline bool isResultRetryable(Result result) { + return result == ResultRetryable || result == ResultDisconnected; +} + +} // namespace pulsar diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index 8bebb5b0..d026e424 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -29,6 +29,7 @@ #include "ExecutorService.h" #include "Future.h" #include "LogUtils.h" +#include "ResultUtils.h" namespace pulsar { @@ -95,7 +96,7 @@ class RetryableOperation : public std::enable_shared_from_this