Skip to content

Commit 18dcdd5

Browse files
author
A
authored
Use boost::optional instead self-written Optional class (#138)
Fixes #134 ### Motivation Switch to use boost::optional instead self-written Optional class ### Modifications Remove class Optional in Utils.h and change all usage to boost::optional
1 parent 7a7b3aa commit 18dcdd5

14 files changed

Lines changed: 84 additions & 120 deletions

lib/ClientConnection.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <pulsar/MessageIdBuilder.h>
2222

23+
#include <boost/optional.hpp>
2324
#include <fstream>
2425

2526
#include "Commands.h"
@@ -1093,9 +1094,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
10931094
data.schemaVersion = producerSuccess.schema_version();
10941095
}
10951096
if (producerSuccess.has_topic_epoch()) {
1096-
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
1097+
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
10971098
} else {
1098-
data.topicEpoch = Optional<uint64_t>::empty();
1099+
data.topicEpoch = boost::none;
10991100
}
11001101
requestData.promise.setValue(data);
11011102
requestData.timer->cancel();

lib/ClientConnection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <boost/asio/ip/tcp.hpp>
3030
#include <boost/asio/ssl/stream.hpp>
3131
#include <boost/asio/strand.hpp>
32+
#include <boost/optional.hpp>
3233
#include <deque>
3334
#include <functional>
3435
#include <memory>
@@ -40,7 +41,6 @@
4041
#include "LookupDataResult.h"
4142
#include "SharedBuffer.h"
4243
#include "UtilAllocator.h"
43-
#include "Utils.h"
4444

4545
namespace pulsar {
4646

@@ -83,7 +83,7 @@ struct ResponseData {
8383
std::string producerName;
8484
int64_t lastSequenceId;
8585
std::string schemaVersion;
86-
Optional<uint64_t> topicEpoch;
86+
boost::optional<uint64_t> topicEpoch;
8787
};
8888

8989
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;

lib/Commands.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
296296
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
297297
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
298298
const std::string& consumerName, SubscriptionMode subscriptionMode,
299-
Optional<MessageId> startMessageId, bool readCompacted,
299+
boost::optional<MessageId> startMessageId, bool readCompacted,
300300
const std::map<std::string, std::string>& metadata,
301301
const std::map<std::string, std::string>& subscriptionProperties,
302302
const SchemaInfo& schemaInfo,
@@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
323323
subscribe->set_allocated_schema(getSchema(schemaInfo));
324324
}
325325

326-
if (startMessageId.is_present()) {
326+
if (startMessageId) {
327327
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
328328
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
329329
messageIdData.set_entryid(startMessageId.value().entryId());
@@ -383,7 +383,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
383383
const std::map<std::string, std::string>& metadata,
384384
const SchemaInfo& schemaInfo, uint64_t epoch,
385385
bool userProvidedProducerName, bool encrypted,
386-
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
386+
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch) {
387387
BaseCommand cmd;
388388
cmd.set_type(BaseCommand::PRODUCER);
389389
CommandProducer* producer = cmd.mutable_producer();
@@ -394,7 +394,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
394394
producer->set_user_provided_producer_name(userProvidedProducerName);
395395
producer->set_encrypted(encrypted);
396396
producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
397-
if (topicEpoch.is_present()) {
397+
if (topicEpoch) {
398398
producer->set_topic_epoch(topicEpoch.value());
399399
}
400400

lib/Commands.h

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
#include <pulsar/Schema.h>
2626
#include <pulsar/defines.h>
2727

28+
#include <boost/optional.hpp>
2829
#include <set>
2930

3031
#include "ProtoApiEnums.h"
3132
#include "SharedBuffer.h"
32-
#include "Utils.h"
3333

3434
using namespace pulsar;
3535

@@ -89,16 +89,14 @@ class Commands {
8989
uint64_t sequenceId, ChecksumType checksumType,
9090
const proto::MessageMetadata& metadata, const SharedBuffer& payload);
9191

92-
static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
93-
uint64_t consumerId, uint64_t requestId,
94-
CommandSubscribe_SubType subType, const std::string& consumerName,
95-
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
96-
bool readCompacted, const std::map<std::string, std::string>& metadata,
97-
const std::map<std::string, std::string>& subscriptionProperties,
98-
const SchemaInfo& schemaInfo,
99-
CommandSubscribe_InitialPosition subscriptionInitialPosition,
100-
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
101-
int priorityLevel = 0);
92+
static SharedBuffer newSubscribe(
93+
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
94+
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
95+
boost::optional<MessageId> startMessageId, bool readCompacted,
96+
const std::map<std::string, std::string>& metadata,
97+
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
98+
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
99+
KeySharedPolicy keySharedPolicy, int priorityLevel = 0);
102100

103101
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
104102

@@ -107,7 +105,7 @@ class Commands {
107105
const std::map<std::string, std::string>& metadata,
108106
const SchemaInfo& schemaInfo, uint64_t epoch,
109107
bool userProvidedProducerName, bool encrypted,
110-
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
108+
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);
111109

112110
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
113111
CommandAck_AckType ackType, CommandAck_ValidationError validationError);

lib/ConsumerImpl.cc

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
5454
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
5555
bool hasParent /* = false by default */,
5656
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
57-
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
57+
Commands::SubscriptionMode subscriptionMode,
58+
boost::optional<MessageId> startMessageId)
5859
: ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
5960
listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
6061
waitingForZeroQueueSizeMessage(false),
@@ -191,9 +192,8 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
191192
Lock lockForMessageId(mutexForMessageId_);
192193
// Update startMessageId so that we can discard messages after delivery restarts
193194
const auto startMessageId = clearReceiveQueue();
194-
const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
195-
? startMessageId
196-
: Optional<MessageId>::empty();
195+
const auto subscribeMessageId =
196+
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
197197
startMessageId_ = startMessageId;
198198
lockForMessageId.unlock();
199199

@@ -373,11 +373,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
373373
});
374374
}
375375

376-
Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
377-
const proto::MessageMetadata& metadata,
378-
const MessageId& messageId,
379-
const proto::MessageIdData& messageIdData,
380-
const ClientConnectionPtr& cnx) {
376+
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
377+
const proto::MessageMetadata& metadata,
378+
const MessageId& messageId,
379+
const proto::MessageIdData& messageIdData,
380+
const ClientConnectionPtr& cnx) {
381381
const auto chunkId = metadata.chunk_id();
382382
const auto uuid = metadata.uuid();
383383
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
@@ -422,14 +422,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
422422
lock.unlock();
423423
increaseAvailablePermits(cnx);
424424
trackMessage(messageId);
425-
return Optional<SharedBuffer>::empty();
425+
return boost::none;
426426
}
427427

428428
chunkedMsgCtx.appendChunk(messageId, payload);
429429
if (!chunkedMsgCtx.isCompleted()) {
430430
lock.unlock();
431431
increaseAvailablePermits(cnx);
432-
return Optional<SharedBuffer>::empty();
432+
return boost::none;
433433
}
434434

435435
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
@@ -438,9 +438,9 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
438438
auto wholePayload = chunkedMsgCtx.getBuffer();
439439
chunkedMessageCache_.remove(uuid);
440440
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
441-
return Optional<SharedBuffer>::of(wholePayload);
441+
return wholePayload;
442442
} else {
443-
return Optional<SharedBuffer>::empty();
443+
return boost::none;
444444
}
445445
}
446446

@@ -477,7 +477,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
477477
const auto& messageIdData = msg.message_id();
478478
auto messageId = MessageIdBuilder::from(messageIdData).build();
479479
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
480-
if (optionalPayload.is_present()) {
480+
if (optionalPayload) {
481481
payload = optionalPayload.value();
482482
} else {
483483
return;
@@ -512,7 +512,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
512512
m.impl_->convertPayloadToKeyValue(config_.getSchema());
513513

514514
const auto startMessageId = startMessageId_.get();
515-
if (isPersistent_ && startMessageId.is_present() &&
515+
if (isPersistent_ && startMessageId &&
516516
m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
517517
m.getMessageId().entryId() == startMessageId.value().entryId() &&
518518
isPriorEntryIndex(m.getMessageId().entryId())) {
@@ -637,7 +637,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
637637
msg.impl_->setTopicName(batchedMessage.getTopicName());
638638
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
639639

640-
if (startMessageId.is_present()) {
640+
if (startMessageId) {
641641
const MessageId& msgId = msg.getMessageId();
642642

643643
// If we are receiving a batch message, we need to discard messages that were prior
@@ -921,10 +921,10 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
921921
* was
922922
* not seen by the application
923923
*/
924-
Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
924+
boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
925925
bool expectedDuringSeek = true;
926926
if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
927-
return Optional<MessageId>::of(seekMessageId_.get());
927+
return seekMessageId_.get();
928928
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
929929
return startMessageId_.get();
930930
}
@@ -943,12 +943,12 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
943943
.ledgerId(nextMessageId.ledgerId())
944944
.entryId(nextMessageId.entryId() - 1)
945945
.build();
946-
return Optional<MessageId>::of(previousMessageId);
946+
return previousMessageId;
947947
} else if (lastDequedMessageId_ != MessageId::earliest()) {
948948
// If the queue was empty we need to restart from the message just after the last one that has been
949949
// dequeued
950950
// in the past
951-
return Optional<MessageId>::of(lastDequedMessageId_);
951+
return lastDequedMessageId_;
952952
} else {
953953
// No message was received or dequeued by this consumer. Next message would still be the
954954
// startMessageId

lib/ConsumerImpl.h

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <pulsar/Reader.h>
2323

24+
#include <boost/optional.hpp>
2425
#include <functional>
2526
#include <memory>
2627

@@ -71,7 +72,7 @@ class ConsumerImpl : public ConsumerImplBase {
7172
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
7273
const ConsumerTopicType consumerTopicType = NonPartitioned,
7374
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
74-
Optional<MessageId> startMessageId = Optional<MessageId>::empty());
75+
boost::optional<MessageId> startMessageId = boost::none);
7576
~ConsumerImpl();
7677
void setPartitionIndex(int partitionIndex);
7778
int getPartitionIndex();
@@ -193,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase {
193194
const DeadlineTimerPtr& timer,
194195
BrokerGetLastMessageIdCallback callback);
195196

196-
Optional<MessageId> clearReceiveQueue();
197+
boost::optional<MessageId> clearReceiveQueue();
197198
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
198199
ResultCallback callback);
199200

@@ -236,7 +237,7 @@ class ConsumerImpl : public ConsumerImplBase {
236237
MessageId lastMessageIdInBroker_{MessageId::earliest()};
237238

238239
std::atomic_bool duringSeek_{false};
239-
Synchronized<Optional<MessageId>> startMessageId_{Optional<MessageId>::empty()};
240+
Synchronized<boost::optional<MessageId>> startMessageId_;
240241
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
241242

242243
class ChunkedMessageCtx {
@@ -321,11 +322,11 @@ class ConsumerImpl : public ConsumerImplBase {
321322
* @return the concatenated payload if chunks are concatenated into a completed message payload
322323
* successfully, else Optional::empty()
323324
*/
324-
Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
325-
const proto::MessageMetadata& metadata,
326-
const MessageId& messageId,
327-
const proto::MessageIdData& messageIdData,
328-
const ClientConnectionPtr& cnx);
325+
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
326+
const proto::MessageMetadata& metadata,
327+
const MessageId& messageId,
328+
const proto::MessageIdData& messageIdData,
329+
const ClientConnectionPtr& cnx);
329330

330331
friend class PulsarFriend;
331332

lib/MultiTopicsConsumerImpl.cc

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
373373
for (int i = 0; i < numberPartitions; i++) {
374374
std::string topicPartitionName = topicName->getTopicPartitionName(i);
375375
auto optConsumer = consumers_.find(topicPartitionName);
376-
if (optConsumer.is_empty()) {
376+
if (!optConsumer) {
377377
LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
378378
callback(ResultUnknownError);
379379
continue;
@@ -400,7 +400,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
400400
LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);
401401

402402
auto optConsumer = consumers_.remove(topicPartitionName);
403-
if (optConsumer.is_present()) {
403+
if (optConsumer) {
404404
optConsumer.value()->pauseMessageListener();
405405
}
406406

@@ -638,7 +638,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
638638
const std::string& topicPartitionName = msgId.getTopicName();
639639
auto optConsumer = consumers_.find(topicPartitionName);
640640

641-
if (optConsumer.is_present()) {
641+
if (optConsumer) {
642642
unAckedMessageTrackerPtr_->remove(msgId);
643643
optConsumer.value()->acknowledgeAsync(msgId, callback);
644644
} else {
@@ -674,7 +674,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
674674
};
675675
for (const auto& kv : topicToMessageId) {
676676
auto optConsumer = consumers_.find(kv.first);
677-
if (optConsumer.is_present()) {
677+
if (optConsumer) {
678678
unAckedMessageTrackerPtr_->remove(kv.second);
679679
optConsumer.value()->acknowledgeAsync(kv.second, cb);
680680
} else {
@@ -691,7 +691,7 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
691691
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
692692
auto optConsumer = consumers_.find(msgId.getTopicName());
693693

694-
if (optConsumer.is_present()) {
694+
if (optConsumer) {
695695
unAckedMessageTrackerPtr_->remove(msgId);
696696
optConsumer.value()->negativeAcknowledge(msgId);
697697
}
@@ -868,9 +868,8 @@ bool MultiTopicsConsumerImpl::isConnected() const {
868868
return false;
869869
}
870870

871-
return consumers_
872-
.findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
873-
.is_empty();
871+
return !consumers_.findFirstValueIf(
872+
[](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); });
874873
}
875874

876875
uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {

lib/ProducerConfiguration.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,21 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
3636
}
3737

3838
ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
39-
impl_->producerName = Optional<std::string>::of(producerName);
39+
impl_->producerName = boost::make_optional(producerName);
4040
return *this;
4141
}
4242

4343
const std::string& ProducerConfiguration::getProducerName() const {
44-
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
44+
return !impl_->producerName ? emptyString : impl_->producerName.value();
4545
}
4646

4747
ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
48-
impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
48+
impl_->initialSequenceId = boost::make_optional(initialSequenceId);
4949
return *this;
5050
}
5151

5252
int64_t ProducerConfiguration::getInitialSequenceId() const {
53-
return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
53+
return !impl_->initialSequenceId ? -1ll : impl_->initialSequenceId.value();
5454
}
5555

5656
ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {

0 commit comments

Comments
 (0)