From 7fdd6882c5fcc929df6538a5e805a57b07c6707d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 15 Oct 2023 20:53:59 +0800 Subject: [PATCH] Fix topic not shown correctly in the consumer string ### Motivation `ConsumerImpl::getName()` returns a string that is used in logs to represent the consumer. However, the topic part does not show correctly: ``` ConsumerImpl:283 | [0x6000001c88b8, consumer-1, 0] Created consumer on broker [127.0.0.1:60399 -> 127.0.0.1:6650] ``` It's because after https://github.com/apache/pulsar-client-cpp/pull/218, the `ConsumerImpl::topic_` field becomes `std::shared_ptr` rather than a `std::string` but it is still used to construct the `consumerStr_`. ### Modifications Construct the `consumerStr_` using the `topic` argument in the constructor and make `consumerStr_` const because it is never changed. Now the logs will be like: ``` ConsumerImpl:280 | [persistent://public/default/my-topic, consumer-1, 0] Created consumer on broker [127.0.0.1:60647 -> 127.0.0.1:6650] ``` --- lib/ConsumerImpl.cc | 5 +---- lib/ConsumerImpl.h | 2 +- lib/MessageCrypto.cc | 2 +- lib/MessageCrypto.h | 2 +- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 52f04401..7a363bc1 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -84,6 +84,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2), consumerId_(client->newConsumerId()), consumerName_(config_.getConsumerName()), + consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "), messageListenerRunning_(true), negativeAcksTracker_(client, *this, conf), readCompacted_(conf.isReadCompacted()), @@ -92,10 +93,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), interceptors_(interceptors) { - std::stringstream consumerStrStream; - consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; - consumerStr_ = consumerStrStream.str(); - // Initialize un-ACKed messages OT tracker. if (conf.getUnAckedMessagesTimeoutMs() != 0) { if (conf.getTickDurationInMs() > 0) { diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 690d8fc4..7ca39799 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -215,7 +215,7 @@ class ConsumerImpl : public ConsumerImplBase { const int receiverQueueRefillThreshold_; uint64_t consumerId_; std::string consumerName_; - std::string consumerStr_; + const std::string consumerStr_; int32_t partitionIndex_ = -1; Promise consumerCreatedPromise_; std::atomic_bool messageListenerRunning_; diff --git a/lib/MessageCrypto.cc b/lib/MessageCrypto.cc index bab96d1e..920e3fde 100644 --- a/lib/MessageCrypto.cc +++ b/lib/MessageCrypto.cc @@ -28,7 +28,7 @@ namespace pulsar { DECLARE_LOG_OBJECT() -MessageCrypto::MessageCrypto(std::string& logCtx, bool keyGenNeeded) +MessageCrypto::MessageCrypto(const std::string& logCtx, bool keyGenNeeded) : dataKeyLen_(32), dataKey_(new unsigned char[dataKeyLen_]), tagLen_(16), diff --git a/lib/MessageCrypto.h b/lib/MessageCrypto.h index fd139c1e..90b891e2 100644 --- a/lib/MessageCrypto.h +++ b/lib/MessageCrypto.h @@ -49,7 +49,7 @@ class MessageCrypto { typedef std::map StringMap; typedef std::map> DataKeyCacheMap; - MessageCrypto(std::string& logCtx, bool keyGenNeeded); + MessageCrypto(const std::string& logCtx, bool keyGenNeeded); ~MessageCrypto(); /*