Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
/// <reference types="node" />

export interface ClientConfig {
serviceUrl: string;
export type ClientConfig = ClientConfigBase & (
{ serviceUrl: string; serviceUrlProvider?: never } |
{ serviceUrl?: never; serviceUrlProvider: AutoClusterFailoverConfig }
);

export interface ClientConfigBase {
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
operationTimeoutSeconds?: number;
ioThreads?: number;
Expand All @@ -37,6 +41,20 @@ export interface ClientConfig {
connectionTimeoutMs?: number;
}

export type ServiceInfo = string | {
serviceUrl: string;
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
tlsTrustCertsFilePath?: string;
};

export interface AutoClusterFailoverConfig {
primary: ServiceInfo;
secondary: ServiceInfo[];
checkIntervalMs?: number;
failoverThreshold?: number;
switchBackThreshold?: number;
}

export class Client {
static setLogHandler(logHandler: (level: LogLevel, file: string, line: number, message: string) => void): void;
constructor(config: ClientConfig);
Expand Down
257 changes: 249 additions & 8 deletions src/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,28 @@
#include "Producer.h"
#include "Reader.h"
#include "ThreadSafeDeferred.h"
#include <pulsar/AutoClusterFailover.h>
#include <pulsar/Client.h>
#include <pulsar/ServiceInfo.h>
#include <pulsar/c/authentication.h>
#include <pulsar/c/client.h>
#include <pulsar/c/client_configuration.h>
#include <pulsar/c/result.h>
#include "pulsar/ClientConfiguration.h"
#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>

static const std::string CFG_SERVICE_URL = "serviceUrl";
static const std::string CFG_SERVICE_URL_PROVIDER = "serviceUrlProvider";
static const std::string CFG_PRIMARY = "primary";
static const std::string CFG_SECONDARY = "secondary";
static const std::string CFG_CHECK_INTERVAL_MS = "checkIntervalMs";
static const std::string CFG_FAILOVER_THRESHOLD = "failoverThreshold";
static const std::string CFG_SWITCH_BACK_THRESHOLD = "switchBackThreshold";
static const std::string CFG_AUTH = "authentication";
static const std::string CFG_AUTH_PROP = "binding";
static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
Expand All @@ -52,6 +68,194 @@ struct _pulsar_client_configuration {
pulsar::ClientConfiguration conf;
};

struct _pulsar_client {
std::unique_ptr<pulsar::Client> client;
};

struct _pulsar_authentication {
pulsar::AuthenticationPtr auth;
};

static bool IsPresent(const Napi::Value &value) { return !value.IsUndefined() && !value.IsNull(); }

static std::optional<pulsar::AuthenticationPtr> BuildAuthenticationPtr(
const Napi::Object &authObject, std::vector<Napi::ObjectReference> &authRefs) {
Napi::Env env = authObject.Env();

if (!authObject.Has(CFG_AUTH_PROP) || !authObject.Get(CFG_AUTH_PROP).IsObject()) {
Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

Napi::Object binding = authObject.Get(CFG_AUTH_PROP).As<Napi::Object>();
authRefs.emplace_back(Napi::Persistent(binding));
Authentication *auth = Authentication::Unwrap(authRefs.back().Value());

if (auth == nullptr || auth->GetCAuthentication() == nullptr) {
Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

return auth->GetCAuthentication()->auth;
}

static std::optional<pulsar::ServiceInfo> BuildServiceInfo(const Napi::Value &value,
const std::string &fieldName,
std::vector<Napi::ObjectReference> &authRefs,
const pulsar::AuthenticationPtr &defaultAuth,
const std::optional<std::string> &defaultTls) {
Napi::Env env = value.Env();

if (value.IsString()) {
std::string serviceUrl = value.ToString().Utf8Value();
if (serviceUrl.empty()) {
Napi::Error::New(env, fieldName + " service URL must be a non-empty string")
.ThrowAsJavaScriptException();
return std::nullopt;
}
return pulsar::ServiceInfo(serviceUrl, defaultAuth, defaultTls);
}

if (!value.IsObject()) {
Napi::Error::New(env, fieldName + " must be a service URL string or service info object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

Napi::Object serviceInfo = value.As<Napi::Object>();
if (!serviceInfo.Has(CFG_SERVICE_URL) || !serviceInfo.Get(CFG_SERVICE_URL).IsString() ||
serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
Napi::Error::New(env, fieldName + ".serviceUrl is required and must be a non-empty string")
.ThrowAsJavaScriptException();
return std::nullopt;
}

std::string serviceUrl = serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value();
pulsar::AuthenticationPtr authentication = defaultAuth;
std::optional<std::string> tlsTrustCertsFilePath = defaultTls;

if (serviceInfo.Has(CFG_AUTH) && IsPresent(serviceInfo.Get(CFG_AUTH))) {
if (!serviceInfo.Get(CFG_AUTH).IsObject()) {
Napi::Error::New(env, fieldName + ".authentication must be a Pulsar authentication object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

auto auth = BuildAuthenticationPtr(serviceInfo.Get(CFG_AUTH).As<Napi::Object>(), authRefs);
if (!auth.has_value()) {
return std::nullopt;
}
authentication = auth.value();
}

if (serviceInfo.Has(CFG_TLS_TRUST_CERT) && IsPresent(serviceInfo.Get(CFG_TLS_TRUST_CERT))) {
if (!serviceInfo.Get(CFG_TLS_TRUST_CERT).IsString()) {
Napi::Error::New(env, fieldName + ".tlsTrustCertsFilePath must be a string")
.ThrowAsJavaScriptException();
return std::nullopt;
}
tlsTrustCertsFilePath = serviceInfo.Get(CFG_TLS_TRUST_CERT).ToString().Utf8Value();
}

return pulsar::ServiceInfo(serviceUrl, authentication, tlsTrustCertsFilePath);
}

static bool SetPositiveUint32(const Napi::Object &config, const std::string &fieldName, uint32_t &target) {
if (!config.Has(fieldName) || !IsPresent(config.Get(fieldName))) {
return true;
}

Napi::Env env = config.Env();
if (!config.Get(fieldName).IsNumber()) {
Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
.ThrowAsJavaScriptException();
return false;
}

int64_t value = config.Get(fieldName).ToNumber().Int64Value();
if (value <= 0 || value > UINT32_MAX) {
Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
.ThrowAsJavaScriptException();
return false;
}

target = static_cast<uint32_t>(value);
return true;
}

static std::unique_ptr<pulsar::ServiceInfoProvider> BuildServiceInfoProvider(
const Napi::Object &clientConfig, std::vector<Napi::ObjectReference> &authRefs,
const pulsar::AuthenticationPtr &defaultAuth, const std::optional<std::string> &defaultTls) {
Napi::Value providerValue = clientConfig.Get(CFG_SERVICE_URL_PROVIDER);
Napi::Env env = clientConfig.Env();

if (!providerValue.IsObject()) {
Napi::Error::New(env, "serviceUrlProvider must be an object").ThrowAsJavaScriptException();
return nullptr;
}

Napi::Object providerConfig = providerValue.As<Napi::Object>();
if (!providerConfig.Has(CFG_PRIMARY) || !IsPresent(providerConfig.Get(CFG_PRIMARY))) {
Napi::Error::New(env, "serviceUrlProvider.primary is required").ThrowAsJavaScriptException();
return nullptr;
}

auto primary = BuildServiceInfo(providerConfig.Get(CFG_PRIMARY), "serviceUrlProvider.primary", authRefs,
defaultAuth, defaultTls);
if (!primary.has_value()) {
return nullptr;
}

if (!providerConfig.Has(CFG_SECONDARY) || !providerConfig.Get(CFG_SECONDARY).IsArray()) {
Napi::Error::New(env, "serviceUrlProvider.secondary is required and must be an array")
.ThrowAsJavaScriptException();
return nullptr;
}

Napi::Array secondaryConfig = providerConfig.Get(CFG_SECONDARY).As<Napi::Array>();
if (secondaryConfig.Length() == 0) {
Napi::Error::New(env, "serviceUrlProvider.secondary must contain at least one service")
.ThrowAsJavaScriptException();
return nullptr;
}

std::vector<pulsar::ServiceInfo> secondary;
secondary.reserve(secondaryConfig.Length());
for (uint32_t i = 0; i < secondaryConfig.Length(); i++) {
auto serviceInfo =
BuildServiceInfo(secondaryConfig.Get(i), "serviceUrlProvider.secondary[" + std::to_string(i) + "]",
authRefs, defaultAuth, defaultTls);
if (!serviceInfo.has_value()) {
return nullptr;
}
secondary.emplace_back(std::move(serviceInfo.value()));
}

pulsar::AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary.value()),
std::move(secondary));

uint32_t checkIntervalMs = static_cast<uint32_t>(autoClusterFailoverConfig.checkInterval.count());
if (!SetPositiveUint32(providerConfig, CFG_CHECK_INTERVAL_MS, checkIntervalMs)) {
return nullptr;
}
autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);

if (!SetPositiveUint32(providerConfig, CFG_FAILOVER_THRESHOLD,
autoClusterFailoverConfig.failoverThreshold)) {
return nullptr;
}

if (!SetPositiveUint32(providerConfig, CFG_SWITCH_BACK_THRESHOLD,
autoClusterFailoverConfig.switchBackThreshold)) {
return nullptr;
}

return std::unique_ptr<pulsar::ServiceInfoProvider>(
new pulsar::AutoClusterFailover(std::move(autoClusterFailoverConfig)));
}

void Client::SetLogHandler(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
Expand Down Expand Up @@ -103,16 +307,34 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
Napi::HandleScope scope(env);
Napi::Object clientConfig = info[0].As<Napi::Object>();

if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString() ||
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
bool hasServiceUrlProvider =
clientConfig.Has(CFG_SERVICE_URL_PROVIDER) && IsPresent(clientConfig.Get(CFG_SERVICE_URL_PROVIDER));
bool hasServiceUrl = clientConfig.Has(CFG_SERVICE_URL) && IsPresent(clientConfig.Get(CFG_SERVICE_URL));
if (hasServiceUrlProvider && hasServiceUrl) {
Napi::Error::New(env, "Only one of serviceUrl or serviceUrlProvider can be configured")
.ThrowAsJavaScriptException();
return;
Comment thread
shibd marked this conversation as resolved.
}

if (!hasServiceUrlProvider && !hasServiceUrl) {
Napi::Error::New(env,
"Service URL is required and must be specified as a string unless serviceUrlProvider "
"is configured")
.ThrowAsJavaScriptException();
return;
}

if (hasServiceUrl && (!clientConfig.Get(CFG_SERVICE_URL).IsString() ||
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty())) {
Napi::Error::New(env, "Service URL is required and must be specified as a string")
.ThrowAsJavaScriptException();
return;
}
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();

this->cClientConfig = std::shared_ptr<pulsar_client_configuration_t>(pulsar_client_configuration_create(),
pulsar_client_configuration_free);
pulsar::AuthenticationPtr defaultAuthentication = pulsar::AuthFactory::Disabled();
std::optional<std::string> defaultTlsTrustCertsFilePath = std::nullopt;

// The logger can only be set once per process, so we will take control of it
if (clientConfig.Has(CFG_LOG_LEVEL) && clientConfig.Get(CFG_LOG_LEVEL).IsNumber()) {
Expand Down Expand Up @@ -145,9 +367,12 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
this->authRef_ = Napi::Persistent(obj.Get(CFG_AUTH_PROP).As<Napi::Object>());
Authentication *auth = Authentication::Unwrap(this->authRef_.Value());
pulsar_client_configuration_set_auth(cClientConfig.get(), auth->GetCAuthentication());
auto auth = BuildAuthenticationPtr(obj, this->authRefs_);
if (!auth.has_value()) {
return;
}
cClientConfig.get()->conf.setAuth(auth.value());
defaultAuthentication = auth.value();
}
}

Expand Down Expand Up @@ -190,6 +415,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig.get(),
tlsTrustCertsFilePath.Utf8Value().c_str());
defaultTlsTrustCertsFilePath = tlsTrustCertsFilePath.Utf8Value();
}

if (clientConfig.Has(CFG_TLS_CERT_FILE) && clientConfig.Get(CFG_TLS_CERT_FILE).IsString()) {
Expand Down Expand Up @@ -227,8 +453,23 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
}

try {
this->cClient = std::shared_ptr<pulsar_client_t>(
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
if (hasServiceUrlProvider) {
std::unique_ptr<pulsar::ServiceInfoProvider> serviceInfoProvider = BuildServiceInfoProvider(
clientConfig, this->authRefs_, defaultAuthentication, defaultTlsTrustCertsFilePath);
if (serviceInfoProvider == nullptr) {
return;
}

std::unique_ptr<pulsar_client_t> rawClient(new pulsar_client_t);
rawClient->client.reset(new pulsar::Client(
pulsar::Client::create(std::move(serviceInfoProvider), cClientConfig.get()->conf)));
this->cClient = std::shared_ptr<pulsar_client_t>(rawClient.release(),
[](pulsar_client_t *client) { delete client; });
} else {
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
this->cClient = std::shared_ptr<pulsar_client_t>(
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
}
} catch (const std::exception &e) {
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
}
Expand Down
3 changes: 2 additions & 1 deletion src/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <napi.h>
#include <pulsar/c/client.h>
#include <vector>

struct LogMessage {
pulsar_logger_level_t level;
Expand Down Expand Up @@ -54,7 +55,7 @@ class Client : public Napi::ObjectWrap<Client> {
std::shared_ptr<pulsar_client_t> cClient;
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO;
Napi::ObjectReference authRef_;
std::vector<Napi::ObjectReference> authRefs_;

Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);
Expand Down
15 changes: 14 additions & 1 deletion tests/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ const baseUrl = 'http://localhost:8080';
describe('Client', () => {
describe('CreateFailedByUrlSetIncorrect', () => {
test('No Set Url', async () => {
const expectedError = 'Service URL is required and must be specified as a string '
+ 'unless serviceUrlProvider is configured';
await expect(() => new Pulsar.Client({
operationTimeoutSeconds: 30,
})).toThrow('Service URL is required and must be specified as a string');
})).toThrow(expectedError);
});

test('Set empty url', async () => {
Expand All @@ -51,6 +53,17 @@ const baseUrl = 'http://localhost:8080';
operationTimeoutSeconds: 30,
})).toThrow('Service URL is required and must be specified as a string');
});

test('Set both service url and service url provider', async () => {
await expect(() => new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
serviceUrlProvider: {
primary: 'pulsar://localhost:6650',
secondary: ['pulsar://localhost:6651'],
},
operationTimeoutSeconds: 30,
})).toThrow('Only one of serviceUrl or serviceUrlProvider can be configured');
});
});
describe('test getPartitionsForTopic', () => {
test('GetPartitions for empty topic', async () => {
Expand Down
Loading
Loading