diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index c189a200..f6881b2c 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,7 @@ namespace pulsar { typedef std::function CreateProducerCallback; typedef std::function SubscribeCallback; typedef std::function ReaderCallback; +typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; @@ -301,6 +303,36 @@ class PULSAR_PUBLIC Client { void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, ReaderCallback callback); + /** + * Create a table view with given {@code TableViewConfiguration} for specified topic. + * + * The TableView provides a key-value map view of a compacted topic. Messages without keys will + * be ignored. + * + * @param topic the name of the topic. + * @param conf The {@code TableViewConfiguration} object + * @param tableView The {@code TableView} object + * @return Returned when the TableView is successfully linked to the topic and the map is built from a + * message that already exists + */ + Result createTableView(const std::string& topic, const TableViewConfiguration& conf, + TableView& tableView); + + /** + * Async create a table view with given {@code TableViewConfiguration} for specified topic. + * + * The TableView provides a key-value map view of a compacted topic. Messages without keys will + * be ignored. + * + * @param topic the name of the topic. + * @param conf The {@code TableViewConfiguration} object + * @param callBack + * The callback that is triggered when the TableView is successfully linked to the topic and the map is + * built from a message that already exists + */ + void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, + TableViewCallback callBack); + /** * Get the list of partitions for a given topic. * diff --git a/include/pulsar/Reader.h b/include/pulsar/Reader.h index 233da4ff..77690517 100644 --- a/include/pulsar/Reader.h +++ b/include/pulsar/Reader.h @@ -29,6 +29,7 @@ class PulsarFriend; class ReaderImpl; typedef std::function HasMessageAvailableCallback; +typedef std::function ReadNextCallback; /** * A Reader can be used to scan through all the messages currently available in a topic. @@ -68,6 +69,13 @@ class PULSAR_PUBLIC Reader { */ Result readNext(Message& msg, int timeoutMs); + /** + * Read asynchronously the next message in the topic. + * + * @param callback + */ + void readNextAsync(ReadNextCallback callback); + /** * Close the reader and stop the broker to push more messages * @@ -156,6 +164,7 @@ class PULSAR_PUBLIC Reader { friend class PulsarFriend; friend class PulsarWrapper; friend class ReaderImpl; + friend class TableViewImpl; friend class ReaderTest; }; } // namespace pulsar diff --git a/include/pulsar/ReaderConfiguration.h b/include/pulsar/ReaderConfiguration.h index 9ae8e1c4..4f6464f7 100644 --- a/include/pulsar/ReaderConfiguration.h +++ b/include/pulsar/ReaderConfiguration.h @@ -162,6 +162,7 @@ class PULSAR_PUBLIC ReaderConfiguration { * Set the internal subscription name. * * @param internal subscriptionName + * Default value is reader-{random string}. */ void setInternalSubscriptionName(std::string internalSubscriptionName); diff --git a/include/pulsar/TableView.h b/include/pulsar/TableView.h new file mode 100644 index 00000000..de5a7bcf --- /dev/null +++ b/include/pulsar/TableView.h @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef TABEL_VIEW_HPP_ +#define TABEL_VIEW_HPP_ + +#include +#include +#include + +#include +#include + +namespace pulsar { + +class TableViewImpl; + +typedef std::function ResultCallback; +typedef std::function TableViewAction; +/** + * + */ +class PULSAR_PUBLIC TableView { + public: + /** + * Construct an uninitialized tableView object + */ + TableView(); + + /** + * Move the latest value associated with the key. + * + * Example: + * + * ```c++ + * TableView view; + * std::string value; + * while (true) { + * if (view.retrieveValue("key")) { + * std::cout << "value is updated to: " << value; + * } else { + * // sleep for a while or print the message that value is not updated + * } + * } + * ``` + * + * @param key + * @param value the value associated with the key + * @return true if there is an associated value of the key, otherwise false + * + * NOTE: Once the value has been retrieved successfully, the associated value + * will be removed from the table view until next time the value is updated. + */ + bool retrieveValue(const std::string& key, std::string& value); + + /** + * It's similar with retrievedValue except the value is copied into `value`. + * + * @param key + * @param value the value associated with the key + * @return Whether the key exists in the table view. + */ + bool getValue(const std::string& key, std::string& value) const; + + /** + * Check if the key exists in the table view. + * + * @return true if the key exists in the table view + */ + bool containsKey(const std::string& key) const; + + /** + * Move the table view data into the unordered map. + */ + std::unordered_map snapshot(); + + /** + * Get the size of the elements. + */ + std::size_t size() const; + + /** + * Performs the given action for each entry in this map until all entries have been processed or the + * action throws an exception. + */ + void forEach(TableViewAction action); + + /** + * Performs the given action for each entry in this map until all entries have been processed and + * register the callback, which will be called each time a key-value pair is updated. + */ + void forEachAndListen(TableViewAction action); + + /** + * Asynchronously close the tableview and stop the broker to push more messages + */ + void closeAsync(ResultCallback callback); + + /** + * Close the consumer and stop the broker to push more messages + */ + Result close(); + + private: + typedef std::shared_ptr TableViewImplPtr; + TableViewImplPtr impl_; + explicit TableView(TableViewImplPtr); + + friend class PulsarFriend; + friend class ClientImpl; +}; +} // namespace pulsar + +#endif /* TABEL_VIEW_HPP_ */ diff --git a/include/pulsar/TableViewConfiguration.h b/include/pulsar/TableViewConfiguration.h new file mode 100644 index 00000000..55e242d7 --- /dev/null +++ b/include/pulsar/TableViewConfiguration.h @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_TABLEVIEW_CONFIGURATION_H_ +#define PULSAR_TABLEVIEW_CONFIGURATION_H_ + +#include +#include + +#include + +namespace pulsar { + +struct TableViewConfigurationImpl; + +/** + * Class specifying the configuration of a consumer. + */ +class PULSAR_PUBLIC TableViewConfiguration { + public: + TableViewConfiguration(); + ~TableViewConfiguration(); + TableViewConfiguration(const TableViewConfiguration&); + TableViewConfiguration& operator=(const TableViewConfiguration&); + + /** + * Create a new instance of TableViewConfiguration with the same + * initial settings as the current one. + */ + TableViewConfiguration clone() const; + + /** + * @return the schema information declared for this consumer + */ + const SchemaInfo& getSchemaInfo() const; + + /** + * Declare the schema of the data that this table view will be accepting. + * + * The schema will be checked against the schema of the topic, and the + * table view creation will fail if it's not compatible. + * + * @param schemaInfo the schema definition object + */ + TableViewConfiguration& setSchemaInfo(const SchemaInfo& schemaInfo); + + /** + * @return subscriptionName + */ + const std::string& getSubscriptionName() const; + + /** + * Set the internal consumer subscription name of the {@link TableView}. + * + * @param subscriptionName the name of the subscription to the topic. + * Default value is reader-{random string}. + */ + TableViewConfiguration& setSubscriptionName(const std::string subscriptionName); + + private: + std::shared_ptr impl_; +}; +} // namespace pulsar +#endif /* PULSAR_TABLEVIEW_CONFIGURATION_H_ */ diff --git a/lib/Client.cc b/lib/Client.cc index 03823fb6..e132189d 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -150,6 +150,20 @@ void Client::createReaderAsync(const std::string& topic, const MessageId& startM impl_->createReaderAsync(topic, startMessageId, conf, callback); } +Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf, + TableView& tableView) { + Promise promise; + createTableViewAsync(topic, conf, WaitForCallbackValue(promise)); + Future future = promise.getFuture(); + + return future.get(tableView); +} + +void Client::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, + TableViewCallback callBack) { + impl_->createTableViewAsync(topic, conf, callBack); +} + Result Client::getPartitionsForTopic(const std::string& topic, std::vector& partitions) { Promise > promise; getPartitionsForTopicAsync(topic, WaitForCallbackValue >(promise)); diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a9c16536..bedcef33 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -24,7 +24,6 @@ #include "BinaryProtoLookupService.h" #include "ClientConfigurationImpl.h" -#include "Commands.h" #include "ConsumerImpl.h" #include "ExecutorService.h" #include "HTTPLookupService.h" @@ -35,6 +34,7 @@ #include "ProducerImpl.h" #include "ReaderImpl.h" #include "RetryableLookupService.h" +#include "TableViewImpl.h" #include "TimeUtils.h" #include "TopicName.h" @@ -226,6 +226,33 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st std::placeholders::_2, topicName, msgId, conf, callback)); } +void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, + TableViewCallback callback) { + TopicNamePtr topicName; + { + Lock lock(mutex_); + if (state_ != Open) { + lock.unlock(); + callback(ResultAlreadyClosed, TableView()); + return; + } else if (!(topicName = TopicName::get(topic))) { + lock.unlock(); + callback(ResultInvalidTopicName, TableView()); + return; + } + } + + TableViewImplPtr tableViewPtr = + std::make_shared(shared_from_this(), topicName->toString(), conf); + tableViewPtr->start().addListener([callback](Result result, TableViewImplPtr tableViewImplPtr) { + if (result == ResultOk) { + callback(result, TableView{tableViewImplPtr}); + } else { + callback(result, {}); + } + }); +} + void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata, TopicNamePtr topicName, MessageId startMessageId, ReaderConfiguration conf, ReaderCallback callback) { diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 8a393960..93d41c52 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -42,6 +42,9 @@ class ReaderImpl; typedef std::shared_ptr ReaderImplPtr; typedef std::weak_ptr ReaderImplWeakPtr; +class TableViewImpl; +typedef std::shared_ptr TableViewImplPtr; + class ConsumerImplBase; typedef std::weak_ptr ConsumerImplBaseWeakPtr; @@ -83,6 +86,9 @@ class ClientImpl : public std::enable_shared_from_this { void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, ReaderCallback callback); + void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, + TableViewCallback callback); + void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback); Future getConnection(const std::string& topic); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 6f2c211a..c2778fe5 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -826,7 +826,7 @@ Result ConsumerImpl::receive(Message& msg) { return res; } -void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { +void ConsumerImpl::receiveAsync(ReceiveCallback callback) { Message msg; // fail the callback if consumer is closing or closed diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 29d5d0bf..62ab06af 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -93,7 +93,7 @@ class ConsumerImpl : public ConsumerImplBase { const std::string& getTopic() const override; Result receive(Message& msg) override; Result receive(Message& msg, int timeout) override; - void receiveAsync(ReceiveCallback& callback) override; + void receiveAsync(ReceiveCallback callback) override; void unsubscribeAsync(ResultCallback callback) override; void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override; void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override; diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 5bc7e1b8..9cf63a38 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -51,7 +51,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this virtual const std::string& getSubscriptionName() const = 0; virtual Result receive(Message& msg) = 0; virtual Result receive(Message& msg, int timeout) = 0; - virtual void receiveAsync(ReceiveCallback& callback) = 0; + virtual void receiveAsync(ReceiveCallback callback) = 0; void batchReceiveAsync(BatchReceiveCallback callback); virtual void unsubscribeAsync(ResultCallback callback) = 0; virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index d9d38732..26a9c471 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -582,7 +582,7 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { } } -void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { +void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback callback) { Message msg; // fail the callback if consumer is closing or closed diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index da42b748..50cdecf3 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -65,7 +65,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { const std::string& getTopic() const override; Result receive(Message& msg) override; Result receive(Message& msg, int timeout) override; - void receiveAsync(ReceiveCallback& callback) override; + void receiveAsync(ReceiveCallback callback) override; void unsubscribeAsync(ResultCallback callback) override; void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override; void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override; diff --git a/lib/Reader.cc b/lib/Reader.cc index 261c0fac..c02fb2e5 100644 --- a/lib/Reader.cc +++ b/lib/Reader.cc @@ -49,6 +49,14 @@ Result Reader::readNext(Message& msg, int timeoutMs) { return impl_->readNext(msg, timeoutMs); } +void Reader::readNextAsync(ReadNextCallback callback) { + if (!impl_) { + return callback(ResultConsumerNotInitialized, {}); + } + + impl_->readNextAsync(callback); +} + Result Reader::close() { Promise promise; closeAsync(WaitForCallback(promise)); diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index a80e2e50..da1d95ec 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -111,6 +111,14 @@ Result ReaderImpl::readNext(Message& msg, int timeoutMs) { return res; } +void ReaderImpl::readNextAsync(ReceiveCallback callback) { + auto self = shared_from_this(); + consumer_->receiveAsync([self, callback](Result result, const Message& message) { + self->acknowledgeIfNecessary(result, message); + callback(result, message); + }); +} + void ReaderImpl::messageListener(Consumer consumer, const Message& msg) { readerListener_(Reader(shared_from_this()), msg); acknowledgeIfNecessary(ResultOk, msg); diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h index ed16c7d3..e216241d 100644 --- a/lib/ReaderImpl.h +++ b/lib/ReaderImpl.h @@ -67,6 +67,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this Result readNext(Message& msg); Result readNext(Message& msg, int timeoutMs); + void readNextAsync(ReceiveCallback callback); void closeAsync(ResultCallback callback); diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h index a274bd87..082aeaf4 100644 --- a/lib/SynchronizedHashMap.h +++ b/lib/SynchronizedHashMap.h @@ -122,7 +122,6 @@ class SynchronizedHashMap { return pairs; } - // This method is only used for test size_t size() const noexcept { Lock lock(mutex_); return data_.size(); diff --git a/lib/TableView.cc b/lib/TableView.cc new file mode 100644 index 00000000..d33ac255 --- /dev/null +++ b/lib/TableView.cc @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include "TableViewImpl.h" +#include "Utils.h" + +namespace pulsar { + +TableView::TableView() {} + +TableView::TableView(TableViewImplPtr impl) : impl_(impl) {} + +bool TableView::retrieveValue(const std::string& key, std::string& value) { + if (impl_) { + return impl_->retrieveValue(key, value); + } + return false; +} + +bool TableView::getValue(const std::string& key, std::string& value) const { + if (impl_) { + return impl_->getValue(key, value); + } + return false; +} + +bool TableView::containsKey(const std::string& key) const { + if (impl_) { + return impl_->containsKey(key); + } + return false; +} + +std::unordered_map TableView::snapshot() { + if (impl_) { + return impl_->snapshot(); + } + return {}; +} + +std::size_t TableView::size() const { return impl_->size(); } + +void TableView::forEach(TableViewAction action) { + if (impl_) { + impl_->forEach(action); + } +} + +void TableView::forEachAndListen(TableViewAction action) { + if (impl_) { + impl_->forEachAndListen(action); + } +} + +void TableView::closeAsync(ResultCallback callback) { + if (!impl_) { + callback(ResultConsumerNotInitialized); + return; + } + + impl_->closeAsync(callback); +} + +Result TableView::close() { + if (!impl_) { + return ResultConsumerNotInitialized; + } + Promise promise; + impl_->closeAsync(WaitForCallback(promise)); + Result result; + promise.getFuture().get(result); + return result; +} + +} // namespace pulsar diff --git a/lib/TableViewConfiguration.cc b/lib/TableViewConfiguration.cc new file mode 100644 index 00000000..244cb363 --- /dev/null +++ b/lib/TableViewConfiguration.cc @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include "TableViewConfigurationImpl.h" + +namespace pulsar { + +TableViewConfiguration::TableViewConfiguration() : impl_(std::make_shared()) {} + +TableViewConfiguration::~TableViewConfiguration() {} + +TableViewConfiguration::TableViewConfiguration(const TableViewConfiguration& x) : impl_(x.impl_) {} + +TableViewConfiguration& TableViewConfiguration::operator=(const TableViewConfiguration& x) { + impl_ = x.impl_; + return *this; +} + +TableViewConfiguration TableViewConfiguration::clone() const { + TableViewConfiguration newConf; + newConf.impl_.reset(new TableViewConfigurationImpl(*this->impl_)); + return newConf; +} + +const SchemaInfo& TableViewConfiguration::getSchemaInfo() const { return impl_->schemaInfo_; } + +TableViewConfiguration& TableViewConfiguration::setSchemaInfo(const SchemaInfo& schemaInfo) { + impl_->schemaInfo_ = schemaInfo; + return *this; +} + +const std::string& TableViewConfiguration::getSubscriptionName() const { return impl_->subscriptionName_; } + +TableViewConfiguration& TableViewConfiguration::setSubscriptionName(const std::string subscriptionName) { + impl_->subscriptionName_ = subscriptionName; + return *this; +} + +} // namespace pulsar diff --git a/lib/TableViewConfigurationImpl.h b/lib/TableViewConfigurationImpl.h new file mode 100644 index 00000000..7e27cb10 --- /dev/null +++ b/lib/TableViewConfigurationImpl.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef LIB_TABLEVIEW_CONFIGURATIONIMPL_H_ +#define LIB_TABLEVIEW_CONFIGURATIONIMPL_H_ + +#include + +namespace pulsar { + +struct TableViewConfigurationImpl { + SchemaInfo schemaInfo_; + std::string subscriptionName_; +}; +} // namespace pulsar + +#endif /* LIB_TABLEVIEW_CONFIGURATIONIMPL_H_ */ diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc new file mode 100644 index 00000000..a07a1f4e --- /dev/null +++ b/lib/TableViewImpl.cc @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TableViewImpl.h" + +#include "LogUtils.h" +#include "ReaderImpl.h" +#include "TimeUtils.h" + +namespace pulsar { + +DECLARE_LOG_OBJECT() + +TableViewImpl::TableViewImpl(const ClientImplPtr client, const std::string& topic, + const TableViewConfiguration& conf) + : client_(client), topic_(topic), conf_(conf) {} + +Future TableViewImpl::start() { + Promise promise; + TableViewImplPtr self = shared_from_this(); + + ReaderConfiguration readerConfiguration; + readerConfiguration.setSchema(conf_.getSchemaInfo()); + readerConfiguration.setReadCompacted(true); + readerConfiguration.setInternalSubscriptionName(conf_.getSubscriptionName()); + + ReaderCallback readerCallback = [self, promise](Result res, Reader reader) { + if (res == ResultOk) { + self->reader_ = reader.impl_; + self->readAllExistingMessages(promise, TimeUtils::currentTimeMillis(), 0); + } else { + promise.setFailed(res); + } + }; + client_->createReaderAsync(topic_, MessageId::earliest(), readerConfiguration, readerCallback); + return promise.getFuture(); +} + +bool TableViewImpl::retrieveValue(const std::string& key, std::string& value) { + auto optValue = data_.remove(key); + if (optValue) { + value = optValue.value(); + return true; + } + return false; +} + +bool TableViewImpl::getValue(const std::string& key, std::string& value) const { + auto optValue = data_.find(key); + if (optValue) { + value = optValue.value(); + return true; + } + return false; +} + +bool TableViewImpl::containsKey(const std::string& key) const { return data_.find(key) != boost::none; } + +std::unordered_map TableViewImpl::snapshot() { return data_.move(); } + +std::size_t TableViewImpl::size() const { return data_.size(); } + +void TableViewImpl::forEach(TableViewAction action) { data_.forEach(action); } + +void TableViewImpl::forEachAndListen(TableViewAction action) { + Lock lock(listenersMutex_); + data_.forEach(action); + listeners_.emplace_back(action); +} + +void TableViewImpl::closeAsync(ResultCallback callback) { + if (reader_) { + reader_->closeAsync([callback, this](Result result) { + reader_.reset(); + data_.clear(); + callback(result); + }); + } else { + callback(ResultConsumerNotInitialized); + } +} + +void TableViewImpl::handleMessage(const Message& msg) { + if (msg.hasPartitionKey()) { + LOG_DEBUG("Applying message from " << topic_ << " key=" << msg.getPartitionKey() + << " value=" << msg.getDataAsString()) + + Lock lock(listenersMutex_); + if (msg.getDataAsString().empty()) { + data_.remove(msg.getPartitionKey()); + } else { + data_.emplace(msg.getPartitionKey(), msg.getDataAsString()); + } + + for (const auto& listener : listeners_) { + try { + listener(msg.getPartitionKey(), msg.getDataAsString()); + } catch (const std::exception& exc) { + LOG_ERROR("Table view listener raised an exception: " << exc.what()); + } + } + } +} + +void TableViewImpl::readAllExistingMessages(Promise promise, long startTime, + long messagesRead) { + auto self = shared_from_this(); + reader_->hasMessageAvailableAsync( + [self, promise, startTime, messagesRead](Result result, bool hasMessage) { + if (result != ResultOk) { + promise.setFailed(result); + } + if (hasMessage) { + Message msg; + self->reader_->readNextAsync( + [self, promise, startTime, messagesRead](Result res, const Message& msg) { + if (res != ResultOk) { + promise.setFailed(res); + } else { + self->handleMessage(msg); + auto tmpMessagesRead = messagesRead + 1; + self->readAllExistingMessages(promise, startTime, tmpMessagesRead); + } + }); + } else { + long endTime = TimeUtils::currentTimeMillis(); + long durationMillis = endTime - startTime; + LOG_INFO("Started table view for " << self->topic_ << "Replayed: " << messagesRead + << " message in " << durationMillis << " millis"); + promise.setValue(self); + self->readTailMessage(); + } + }); +} + +void TableViewImpl::readTailMessage() { + auto self = shared_from_this(); + reader_->readNextAsync([self](Result result, const Message& msg) { + if (result == ResultOk) { + self->handleMessage(msg); + self->readTailMessage(); + } else { + LOG_INFO("Reader " << self->topic_ << " was interrupted: " << result); + } + }); +} + +} // namespace pulsar \ No newline at end of file diff --git a/lib/TableViewImpl.h b/lib/TableViewImpl.h new file mode 100644 index 00000000..54491b1c --- /dev/null +++ b/lib/TableViewImpl.h @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PULSAR_CPP_TABLEVIEW_IMPL_H +#define PULSAR_CPP_TABLEVIEW_IMPL_H + +#include + +#include "ClientImpl.h" +#include "SynchronizedHashMap.h" + +namespace pulsar { + +class TableViewImpl : public std::enable_shared_from_this { + public: + TableViewImpl(const ClientImplPtr client, const std::string& topic, const TableViewConfiguration& conf); + + ~TableViewImpl(){}; + + Future start(); + + bool retrieveValue(const std::string& key, std::string& value); + + bool getValue(const std::string& key, std::string& value) const; + + bool containsKey(const std::string& key) const; + + std::unordered_map snapshot(); + + std::size_t size() const; + + void forEach(TableViewAction action); + + void forEachAndListen(TableViewAction action); + + void closeAsync(ResultCallback callback); + + private: + using MutexType = std::mutex; + using Lock = std::lock_guard; + + const ClientImplPtr client_; + const std::string topic_; + const TableViewConfiguration conf_; + ReaderImplPtr reader_; + + MutexType listenersMutex_; + std::vector listeners_; + SynchronizedHashMap data_; + + void handleMessage(const Message& msg); + void readAllExistingMessages(Promise promise, long startTime, + long messagesRead); + void readTailMessage(); +}; +} // namespace pulsar + +#endif // PULSAR_CPP_TABLEVIEW_IMPL_H \ No newline at end of file diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index b88da998..ae6cddec 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -25,6 +25,7 @@ #include "HttpHelper.h" #include "PulsarFriend.h" +#include "WaitUtils.h" #include "lib/ClientConnection.h" #include "lib/Latch.h" #include "lib/LogUtils.h" @@ -39,7 +40,7 @@ static const std::string adminUrl = "http://localhost:8080/"; TEST(ReaderTest, testSimpleReader) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/test-simple-reader"; + std::string topicName = "persistent://public/default/test-simple-reader" + std::to_string(time(nullptr)); ReaderConfiguration readerConf; Reader reader; @@ -54,14 +55,63 @@ TEST(ReaderTest, testSimpleReader) { ASSERT_EQ(ResultOk, producer.send(msg)); } - for (int i = 0; i < 10; i++) { + int i = 0; + while (true) { Message msg; - ASSERT_EQ(ResultOk, reader.readNext(msg)); + bool has = false; + reader.hasMessageAvailable(has); + if (has) { + ASSERT_EQ(ResultOk, reader.readNext(msg)); + } else { + break; + } std::string content = msg.getDataAsString(); - std::string expected = "my-message-" + std::to_string(i); + std::string expected = "my-message-" + std::to_string(i++); ASSERT_EQ(expected, content); } + ASSERT_EQ(i, 10); + + producer.close(); + reader.close(); + client.close(); +} + +TEST(ReaderTest, testAsyncRead) { + Client client(serviceUrl); + + std::string topicName = "persistent://public/default/test-simple-reader" + std::to_string(time(nullptr)); + + ReaderConfiguration readerConf; + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + for (int i = 0; i < 10; i++) { + std::string content = "my-message-" + std::to_string(i); + Message msg = MessageBuilder().setContent(content).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + for (int i = 0; i < 10; i++) { + reader.readNextAsync([i](Result result, const Message& msg) { + ASSERT_EQ(ResultOk, result); + std::string content = msg.getDataAsString(); + std::string expected = "my-message-" + std::to_string(i); + ASSERT_EQ(expected, content); + }); + } + + waitUntil( + std::chrono::seconds(5), + [&]() { + bool hasMsg; + reader.hasMessageAvailable(hasMsg); + return !hasMsg; + }, + 1000); producer.close(); reader.close(); diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc new file mode 100644 index 00000000..1f76bac5 --- /dev/null +++ b/tests/TableViewTest.cc @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include + +#include "HttpHelper.h" +#include "PulsarFriend.h" +#include "WaitUtils.h" + +using namespace pulsar; + +static std::string lookupUrl = "pulsar://localhost:6650"; +static std::string adminUrl = "http://localhost:8080/"; + +DECLARE_LOG_OBJECT() + +TEST(TableViewTest, testCreateTableView) { + const std::string topic = "testCreateTableView" + std::to_string(time(nullptr)); + Client client(lookupUrl); + + static const std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo schemaInfo(JSON, "test-json", jsonSchema); + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(schemaInfo); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + // Create table view failed, The schema is not compatible + TableViewConfiguration tableViewConfiguration; + tableViewConfiguration.setSchemaInfo(SchemaInfo(AVRO, "", "")); + TableView tableView; + ASSERT_EQ(ResultIncompatibleSchema, client.createTableView(topic, tableViewConfiguration, tableView)); + ASSERT_EQ(ResultConsumerNotInitialized, tableView.close()); + + // Create table view success. + tableViewConfiguration.setSchemaInfo(schemaInfo); + ASSERT_EQ(ResultOk, client.createTableView(topic, tableViewConfiguration, tableView)); + ASSERT_EQ(ResultOk, tableView.close()); + + // Test async create and close the client during the process. + Latch latch(1); + client.createTableViewAsync( + topic, tableViewConfiguration, [&latch](Result result, const TableView& tableView) { + latch.countdown(); + ASSERT_TRUE(result == ResultDisconnected || result == ResultAlreadyClosed); + }); + client.close(); + latch.wait(); +} + +TEST(TableViewTest, testSimpleTableView) { + const std::string topic = "testTableView" + std::to_string(time(nullptr)); + Client client(lookupUrl); + + ProducerConfiguration producerConfiguration; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + auto count = 20; + for (int i = 0; i < count; ++i) { + auto msg = MessageBuilder() + .setPartitionKey("key" + std::to_string(i)) + .setContent("value" + std::to_string(i)) + .build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // Create table view and assert size. + TableViewConfiguration tableViewConfiguration; + TableView tableView; + ASSERT_EQ(ResultOk, client.createTableView(topic, tableViewConfiguration, tableView)); + ASSERT_EQ(tableView.size(), count); + + // Send some more messages, The 0 ~ count message key/value is duplicated send. + for (int i = 0; i < count * 2; ++i) { + auto msg = MessageBuilder() + .setPartitionKey("key" + std::to_string(i)) + .setContent("value" + std::to_string(i)) + .build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + waitUntil( + std::chrono::seconds(2), [&] { return tableView.size() == count * 2; }, 1000); + + // assert interfaces. + std::string value; + ASSERT_TRUE(tableView.getValue("key1", value)); + ASSERT_EQ(value, "value1"); + ASSERT_TRUE(tableView.retrieveValue("key1", value)); + ASSERT_EQ(value, "value1"); + ASSERT_FALSE(tableView.containsKey("key1")); + ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1); + ASSERT_EQ(tableView.size(), 0); + + client.close(); +} + +TEST(TableViewTest, testPublishNullValue) { + const std::string topic = "testTableView" + std::to_string(time(nullptr)); + Client client(lookupUrl); + + ProducerConfiguration producerConfiguration; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + auto count = 20; + for (int i = 0; i < count; ++i) { + auto msg = MessageBuilder() + .setPartitionKey("key" + std::to_string(i)) + .setContent("value" + std::to_string(i)) + .build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // Create table view failed, The schema is not compatible + TableViewConfiguration tableViewConfiguration; + TableView tableView; + ASSERT_EQ(ResultOk, client.createTableView(topic, tableViewConfiguration, tableView)); + ASSERT_EQ(tableView.size(), count); + + // Set the v of k1 is empty + auto msg = MessageBuilder().setPartitionKey("key1").setContent("").build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + waitUntil( + std::chrono::seconds(2), [&] { return tableView.size() == count - 1; }, 1000); + + // assert interfaces. + std::string value; + ASSERT_TRUE(!tableView.containsKey("key1")); + ASSERT_TRUE(!tableView.getValue("key1", value)); + ASSERT_TRUE(value.empty()); + + client.close(); +} + +TEST(TableViewTest, testNotSupportNonPersistentTopic) { + const std::string topic = TopicDomain::NonPersistent + + "://public/default/testNotSupportNonPersistentTopic" + + std::to_string(time(nullptr)); + Client client(lookupUrl); + + // Create table view failed, The schema is not compatible + TableViewConfiguration tableViewConfiguration; + TableView tableView; + ASSERT_EQ(ResultNotAllowedError, client.createTableView(topic, tableViewConfiguration, tableView)); + client.close(); +} + +TEST(TableViewTest, testMultiTopicAndAutoUpdatePartitions) { + std::string uniqueTimeStr = std::to_string(time(nullptr)); + std::string topic = "persistent://public/default/testMultiTopicAndAutoUpdatePartitions" + uniqueTimeStr; + Client client(lookupUrl); + + std::string url = adminUrl + "admin/v2/persistent/public/default/testMultiTopicAndAutoUpdatePartitions" + + uniqueTimeStr + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + + ProducerConfiguration producerConfiguration; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + auto count = 20; + for (int i = 0; i < count; ++i) { + auto msg = MessageBuilder() + .setPartitionKey("key" + std::to_string(i)) + .setContent("value" + std::to_string(i)) + .build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + TableViewConfiguration tableViewConfiguration; + TableView tableView; + // TODO need support multiReader first. + ASSERT_EQ(ResultOperationNotSupported, client.createTableView(topic, tableViewConfiguration, tableView)); + + client.close(); +}