Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <pulsar/Reader.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/TableView.h>
#include <pulsar/defines.h>

#include <string>
Expand All @@ -37,6 +38,7 @@ namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
typedef std::function<void(Result, Consumer)> SubscribeCallback;
typedef std::function<void(Result, Reader)> ReaderCallback;
typedef std::function<void(Result, TableView)> TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;

Expand Down Expand Up @@ -301,6 +303,36 @@ class PULSAR_PUBLIC Client {
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, ReaderCallback callback);

/**
* Create a table view with given {@code TableViewConfiguration} for specified topic.
*
* The TableView provides a key-value map view of a compacted topic. Messages without keys will
* be ignored.
*
* @param topic the name of the topic.
* @param conf The {@code TableViewConfiguration} object
* @param tableView The {@code TableView} object
* @return Returned when the TableView is successfully linked to the topic and the map is built from a
* message that already exists
*/
Result createTableView(const std::string& topic, const TableViewConfiguration& conf,
TableView& tableView);

/**
* Async create a table view with given {@code TableViewConfiguration} for specified topic.
*
* The TableView provides a key-value map view of a compacted topic. Messages without keys will
* be ignored.
*
* @param topic the name of the topic.
* @param conf The {@code TableViewConfiguration} object
* @param callBack
* The callback that is triggered when the TableView is successfully linked to the topic and the map is
* built from a message that already exists
*/
void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
TableViewCallback callBack);

/**
* Get the list of partitions for a given topic.
*
Expand Down
9 changes: 9 additions & 0 deletions include/pulsar/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class PulsarFriend;
class ReaderImpl;

typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
typedef std::function<void(Result result, const Message& message)> ReadNextCallback;

/**
* A Reader can be used to scan through all the messages currently available in a topic.
Expand Down Expand Up @@ -68,6 +69,13 @@ class PULSAR_PUBLIC Reader {
*/
Result readNext(Message& msg, int timeoutMs);

/**
* Read asynchronously the next message in the topic.
*
* @param callback
*/
void readNextAsync(ReadNextCallback callback);

/**
* Close the reader and stop the broker to push more messages
*
Expand Down Expand Up @@ -156,6 +164,7 @@ class PULSAR_PUBLIC Reader {
friend class PulsarFriend;
friend class PulsarWrapper;
friend class ReaderImpl;
friend class TableViewImpl;
friend class ReaderTest;
};
} // namespace pulsar
Expand Down
1 change: 1 addition & 0 deletions include/pulsar/ReaderConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class PULSAR_PUBLIC ReaderConfiguration {
* Set the internal subscription name.
*
* @param internal subscriptionName
* Default value is reader-{random string}.
*/
void setInternalSubscriptionName(std::string internalSubscriptionName);

Expand Down
129 changes: 129 additions & 0 deletions include/pulsar/TableView.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef TABEL_VIEW_HPP_
#define TABEL_VIEW_HPP_

#include <pulsar/Result.h>
#include <pulsar/TableViewConfiguration.h>
#include <pulsar/defines.h>

#include <functional>
#include <unordered_map>

namespace pulsar {

class TableViewImpl;

typedef std::function<void(Result result)> ResultCallback;
typedef std::function<void(const std::string& key, const std::string& value)> TableViewAction;
/**
*
*/
class PULSAR_PUBLIC TableView {
public:
/**
* Construct an uninitialized tableView object
*/
TableView();

/**
* Move the latest value associated with the key.
*
* Example:
*
* ```c++
* TableView view;
* std::string value;
* while (true) {
* if (view.retrieveValue("key")) {
* std::cout << "value is updated to: " << value;
* } else {
* // sleep for a while or print the message that value is not updated
* }
* }
* ```
*
* @param key
* @param value the value associated with the key
* @return true if there is an associated value of the key, otherwise false
*
* NOTE: Once the value has been retrieved successfully, the associated value
* will be removed from the table view until next time the value is updated.
*/
bool retrieveValue(const std::string& key, std::string& value);

/**
* It's similar with retrievedValue except the value is copied into `value`.
*
* @param key
* @param value the value associated with the key
* @return Whether the key exists in the table view.
*/
bool getValue(const std::string& key, std::string& value) const;

/**
* Check if the key exists in the table view.
*
* @return true if the key exists in the table view
*/
bool containsKey(const std::string& key) const;

/**
* Move the table view data into the unordered map.
*/
std::unordered_map<std::string, std::string> snapshot();

/**
* Get the size of the elements.
*/
std::size_t size() const;

/**
* Performs the given action for each entry in this map until all entries have been processed or the
* action throws an exception.
*/
void forEach(TableViewAction action);

/**
* Performs the given action for each entry in this map until all entries have been processed and
* register the callback, which will be called each time a key-value pair is updated.
*/
void forEachAndListen(TableViewAction action);

/**
* Asynchronously close the tableview and stop the broker to push more messages
*/
void closeAsync(ResultCallback callback);

/**
* Close the consumer and stop the broker to push more messages
*/
Result close();

private:
typedef std::shared_ptr<TableViewImpl> TableViewImplPtr;
TableViewImplPtr impl_;
explicit TableView(TableViewImplPtr);

friend class PulsarFriend;
friend class ClientImpl;
};
} // namespace pulsar

#endif /* TABEL_VIEW_HPP_ */
79 changes: 79 additions & 0 deletions include/pulsar/TableViewConfiguration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_TABLEVIEW_CONFIGURATION_H_
#define PULSAR_TABLEVIEW_CONFIGURATION_H_

#include <pulsar/Schema.h>
#include <pulsar/defines.h>

#include <memory>

namespace pulsar {

struct TableViewConfigurationImpl;

/**
* Class specifying the configuration of a consumer.
*/
class PULSAR_PUBLIC TableViewConfiguration {
public:
TableViewConfiguration();
~TableViewConfiguration();
TableViewConfiguration(const TableViewConfiguration&);
TableViewConfiguration& operator=(const TableViewConfiguration&);

/**
* Create a new instance of TableViewConfiguration with the same
* initial settings as the current one.
*/
TableViewConfiguration clone() const;

/**
* @return the schema information declared for this consumer
*/
const SchemaInfo& getSchemaInfo() const;

/**
* Declare the schema of the data that this table view will be accepting.
*
* The schema will be checked against the schema of the topic, and the
* table view creation will fail if it's not compatible.
*
* @param schemaInfo the schema definition object
*/
TableViewConfiguration& setSchemaInfo(const SchemaInfo& schemaInfo);

/**
* @return subscriptionName
*/
const std::string& getSubscriptionName() const;

/**
* Set the internal consumer subscription name of the {@link TableView}.
*
* @param subscriptionName the name of the subscription to the topic.
* Default value is reader-{random string}.
*/
TableViewConfiguration& setSubscriptionName(const std::string subscriptionName);

private:
std::shared_ptr<TableViewConfigurationImpl> impl_;
};
} // namespace pulsar
#endif /* PULSAR_TABLEVIEW_CONFIGURATION_H_ */
14 changes: 14 additions & 0 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ void Client::createReaderAsync(const std::string& topic, const MessageId& startM
impl_->createReaderAsync(topic, startMessageId, conf, callback);
}

Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf,
TableView& tableView) {
Promise<Result, TableView> promise;
createTableViewAsync(topic, conf, WaitForCallbackValue<TableView>(promise));
Future<Result, TableView> future = promise.getFuture();

return future.get(tableView);
}

void Client::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
TableViewCallback callBack) {
impl_->createTableViewAsync(topic, conf, callBack);
}

Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::string>& partitions) {
Promise<Result, std::vector<std::string> > promise;
getPartitionsForTopicAsync(topic, WaitForCallbackValue<std::vector<std::string> >(promise));
Expand Down
29 changes: 28 additions & 1 deletion lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include "BinaryProtoLookupService.h"
#include "ClientConfigurationImpl.h"
#include "Commands.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "HTTPLookupService.h"
Expand All @@ -35,6 +34,7 @@
#include "ProducerImpl.h"
#include "ReaderImpl.h"
#include "RetryableLookupService.h"
#include "TableViewImpl.h"
#include "TimeUtils.h"
#include "TopicName.h"

Expand Down Expand Up @@ -226,6 +226,33 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
std::placeholders::_2, topicName, msgId, conf, callback));
}

void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
TableViewCallback callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, TableView());
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
callback(ResultInvalidTopicName, TableView());
return;
}
}

TableViewImplPtr tableViewPtr =
std::make_shared<TableViewImpl>(shared_from_this(), topicName->toString(), conf);
tableViewPtr->start().addListener([callback](Result result, TableViewImplPtr tableViewImplPtr) {
if (result == ResultOk) {
callback(result, TableView{tableViewImplPtr});
} else {
callback(result, {});
}
});
}

void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, MessageId startMessageId,
ReaderConfiguration conf, ReaderCallback callback) {
Expand Down
6 changes: 6 additions & 0 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class ReaderImpl;
typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;

class TableViewImpl;
typedef std::shared_ptr<TableViewImpl> TableViewImplPtr;

class ConsumerImplBase;
typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;

Expand Down Expand Up @@ -83,6 +86,9 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, ReaderCallback callback);

void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
TableViewCallback callback);

void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

Future<Result, ClientConnectionWeakPtr> getConnection(const std::string& topic);
Expand Down
Loading