diff --git a/index.d.ts b/index.d.ts
index 8e6fb85..61ade63 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -18,8 +18,12 @@
*/
///
-export interface ClientConfig {
- serviceUrl: string;
+export type ClientConfig = ClientConfigBase & (
+ { serviceUrl: string; serviceUrlProvider?: never } |
+ { serviceUrl?: never; serviceUrlProvider: AutoClusterFailoverConfig }
+);
+
+export interface ClientConfigBase {
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
operationTimeoutSeconds?: number;
ioThreads?: number;
@@ -37,6 +41,20 @@ export interface ClientConfig {
connectionTimeoutMs?: number;
}
+export type ServiceInfo = string | {
+ serviceUrl: string;
+ authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
+ tlsTrustCertsFilePath?: string;
+};
+
+export interface AutoClusterFailoverConfig {
+ primary: ServiceInfo;
+ secondary: ServiceInfo[];
+ checkIntervalMs?: number;
+ failoverThreshold?: number;
+ switchBackThreshold?: number;
+}
+
export class Client {
static setLogHandler(logHandler: (level: LogLevel, file: string, line: number, message: string) => void): void;
constructor(config: ClientConfig);
diff --git a/src/Client.cc b/src/Client.cc
index 6d09cd7..581b62c 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -23,12 +23,28 @@
#include "Producer.h"
#include "Reader.h"
#include "ThreadSafeDeferred.h"
+#include
+#include
+#include
+#include
#include
#include
#include
#include "pulsar/ClientConfiguration.h"
+#include
+#include
+#include
+#include
+#include
+#include
static const std::string CFG_SERVICE_URL = "serviceUrl";
+static const std::string CFG_SERVICE_URL_PROVIDER = "serviceUrlProvider";
+static const std::string CFG_PRIMARY = "primary";
+static const std::string CFG_SECONDARY = "secondary";
+static const std::string CFG_CHECK_INTERVAL_MS = "checkIntervalMs";
+static const std::string CFG_FAILOVER_THRESHOLD = "failoverThreshold";
+static const std::string CFG_SWITCH_BACK_THRESHOLD = "switchBackThreshold";
static const std::string CFG_AUTH = "authentication";
static const std::string CFG_AUTH_PROP = "binding";
static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
@@ -52,6 +68,194 @@ struct _pulsar_client_configuration {
pulsar::ClientConfiguration conf;
};
+struct _pulsar_client {
+ std::unique_ptr client;
+};
+
+struct _pulsar_authentication {
+ pulsar::AuthenticationPtr auth;
+};
+
+static bool IsPresent(const Napi::Value &value) { return !value.IsUndefined() && !value.IsNull(); }
+
+static std::optional BuildAuthenticationPtr(
+ const Napi::Object &authObject, std::vector &authRefs) {
+ Napi::Env env = authObject.Env();
+
+ if (!authObject.Has(CFG_AUTH_PROP) || !authObject.Get(CFG_AUTH_PROP).IsObject()) {
+ Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+
+ Napi::Object binding = authObject.Get(CFG_AUTH_PROP).As();
+ authRefs.emplace_back(Napi::Persistent(binding));
+ Authentication *auth = Authentication::Unwrap(authRefs.back().Value());
+
+ if (auth == nullptr || auth->GetCAuthentication() == nullptr) {
+ Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+
+ return auth->GetCAuthentication()->auth;
+}
+
+static std::optional BuildServiceInfo(const Napi::Value &value,
+ const std::string &fieldName,
+ std::vector &authRefs,
+ const pulsar::AuthenticationPtr &defaultAuth,
+ const std::optional &defaultTls) {
+ Napi::Env env = value.Env();
+
+ if (value.IsString()) {
+ std::string serviceUrl = value.ToString().Utf8Value();
+ if (serviceUrl.empty()) {
+ Napi::Error::New(env, fieldName + " service URL must be a non-empty string")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+ return pulsar::ServiceInfo(serviceUrl, defaultAuth, defaultTls);
+ }
+
+ if (!value.IsObject()) {
+ Napi::Error::New(env, fieldName + " must be a service URL string or service info object")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+
+ Napi::Object serviceInfo = value.As();
+ if (!serviceInfo.Has(CFG_SERVICE_URL) || !serviceInfo.Get(CFG_SERVICE_URL).IsString() ||
+ serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
+ Napi::Error::New(env, fieldName + ".serviceUrl is required and must be a non-empty string")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+
+ std::string serviceUrl = serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value();
+ pulsar::AuthenticationPtr authentication = defaultAuth;
+ std::optional tlsTrustCertsFilePath = defaultTls;
+
+ if (serviceInfo.Has(CFG_AUTH) && IsPresent(serviceInfo.Get(CFG_AUTH))) {
+ if (!serviceInfo.Get(CFG_AUTH).IsObject()) {
+ Napi::Error::New(env, fieldName + ".authentication must be a Pulsar authentication object")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+
+ auto auth = BuildAuthenticationPtr(serviceInfo.Get(CFG_AUTH).As(), authRefs);
+ if (!auth.has_value()) {
+ return std::nullopt;
+ }
+ authentication = auth.value();
+ }
+
+ if (serviceInfo.Has(CFG_TLS_TRUST_CERT) && IsPresent(serviceInfo.Get(CFG_TLS_TRUST_CERT))) {
+ if (!serviceInfo.Get(CFG_TLS_TRUST_CERT).IsString()) {
+ Napi::Error::New(env, fieldName + ".tlsTrustCertsFilePath must be a string")
+ .ThrowAsJavaScriptException();
+ return std::nullopt;
+ }
+ tlsTrustCertsFilePath = serviceInfo.Get(CFG_TLS_TRUST_CERT).ToString().Utf8Value();
+ }
+
+ return pulsar::ServiceInfo(serviceUrl, authentication, tlsTrustCertsFilePath);
+}
+
+static bool SetPositiveUint32(const Napi::Object &config, const std::string &fieldName, uint32_t &target) {
+ if (!config.Has(fieldName) || !IsPresent(config.Get(fieldName))) {
+ return true;
+ }
+
+ Napi::Env env = config.Env();
+ if (!config.Get(fieldName).IsNumber()) {
+ Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
+ .ThrowAsJavaScriptException();
+ return false;
+ }
+
+ int64_t value = config.Get(fieldName).ToNumber().Int64Value();
+ if (value <= 0 || value > UINT32_MAX) {
+ Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
+ .ThrowAsJavaScriptException();
+ return false;
+ }
+
+ target = static_cast(value);
+ return true;
+}
+
+static std::unique_ptr BuildServiceInfoProvider(
+ const Napi::Object &clientConfig, std::vector &authRefs,
+ const pulsar::AuthenticationPtr &defaultAuth, const std::optional &defaultTls) {
+ Napi::Value providerValue = clientConfig.Get(CFG_SERVICE_URL_PROVIDER);
+ Napi::Env env = clientConfig.Env();
+
+ if (!providerValue.IsObject()) {
+ Napi::Error::New(env, "serviceUrlProvider must be an object").ThrowAsJavaScriptException();
+ return nullptr;
+ }
+
+ Napi::Object providerConfig = providerValue.As();
+ if (!providerConfig.Has(CFG_PRIMARY) || !IsPresent(providerConfig.Get(CFG_PRIMARY))) {
+ Napi::Error::New(env, "serviceUrlProvider.primary is required").ThrowAsJavaScriptException();
+ return nullptr;
+ }
+
+ auto primary = BuildServiceInfo(providerConfig.Get(CFG_PRIMARY), "serviceUrlProvider.primary", authRefs,
+ defaultAuth, defaultTls);
+ if (!primary.has_value()) {
+ return nullptr;
+ }
+
+ if (!providerConfig.Has(CFG_SECONDARY) || !providerConfig.Get(CFG_SECONDARY).IsArray()) {
+ Napi::Error::New(env, "serviceUrlProvider.secondary is required and must be an array")
+ .ThrowAsJavaScriptException();
+ return nullptr;
+ }
+
+ Napi::Array secondaryConfig = providerConfig.Get(CFG_SECONDARY).As();
+ if (secondaryConfig.Length() == 0) {
+ Napi::Error::New(env, "serviceUrlProvider.secondary must contain at least one service")
+ .ThrowAsJavaScriptException();
+ return nullptr;
+ }
+
+ std::vector secondary;
+ secondary.reserve(secondaryConfig.Length());
+ for (uint32_t i = 0; i < secondaryConfig.Length(); i++) {
+ auto serviceInfo =
+ BuildServiceInfo(secondaryConfig.Get(i), "serviceUrlProvider.secondary[" + std::to_string(i) + "]",
+ authRefs, defaultAuth, defaultTls);
+ if (!serviceInfo.has_value()) {
+ return nullptr;
+ }
+ secondary.emplace_back(std::move(serviceInfo.value()));
+ }
+
+ pulsar::AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary.value()),
+ std::move(secondary));
+
+ uint32_t checkIntervalMs = static_cast(autoClusterFailoverConfig.checkInterval.count());
+ if (!SetPositiveUint32(providerConfig, CFG_CHECK_INTERVAL_MS, checkIntervalMs)) {
+ return nullptr;
+ }
+ autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);
+
+ if (!SetPositiveUint32(providerConfig, CFG_FAILOVER_THRESHOLD,
+ autoClusterFailoverConfig.failoverThreshold)) {
+ return nullptr;
+ }
+
+ if (!SetPositiveUint32(providerConfig, CFG_SWITCH_BACK_THRESHOLD,
+ autoClusterFailoverConfig.switchBackThreshold)) {
+ return nullptr;
+ }
+
+ return std::unique_ptr(
+ new pulsar::AutoClusterFailover(std::move(autoClusterFailoverConfig)));
+}
+
void Client::SetLogHandler(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
@@ -103,16 +307,34 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info)
Napi::HandleScope scope(env);
Napi::Object clientConfig = info[0].As();
- if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString() ||
- clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
+ bool hasServiceUrlProvider =
+ clientConfig.Has(CFG_SERVICE_URL_PROVIDER) && IsPresent(clientConfig.Get(CFG_SERVICE_URL_PROVIDER));
+ bool hasServiceUrl = clientConfig.Has(CFG_SERVICE_URL) && IsPresent(clientConfig.Get(CFG_SERVICE_URL));
+ if (hasServiceUrlProvider && hasServiceUrl) {
+ Napi::Error::New(env, "Only one of serviceUrl or serviceUrlProvider can be configured")
+ .ThrowAsJavaScriptException();
+ return;
+ }
+
+ if (!hasServiceUrlProvider && !hasServiceUrl) {
+ Napi::Error::New(env,
+ "Service URL is required and must be specified as a string unless serviceUrlProvider "
+ "is configured")
+ .ThrowAsJavaScriptException();
+ return;
+ }
+
+ if (hasServiceUrl && (!clientConfig.Get(CFG_SERVICE_URL).IsString() ||
+ clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty())) {
Napi::Error::New(env, "Service URL is required and must be specified as a string")
.ThrowAsJavaScriptException();
return;
}
- Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
this->cClientConfig = std::shared_ptr(pulsar_client_configuration_create(),
pulsar_client_configuration_free);
+ pulsar::AuthenticationPtr defaultAuthentication = pulsar::AuthFactory::Disabled();
+ std::optional defaultTlsTrustCertsFilePath = std::nullopt;
// The logger can only be set once per process, so we will take control of it
if (clientConfig.Has(CFG_LOG_LEVEL) && clientConfig.Get(CFG_LOG_LEVEL).IsNumber()) {
@@ -145,9 +367,12 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info)
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
- this->authRef_ = Napi::Persistent(obj.Get(CFG_AUTH_PROP).As());
- Authentication *auth = Authentication::Unwrap(this->authRef_.Value());
- pulsar_client_configuration_set_auth(cClientConfig.get(), auth->GetCAuthentication());
+ auto auth = BuildAuthenticationPtr(obj, this->authRefs_);
+ if (!auth.has_value()) {
+ return;
+ }
+ cClientConfig.get()->conf.setAuth(auth.value());
+ defaultAuthentication = auth.value();
}
}
@@ -190,6 +415,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info)
Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig.get(),
tlsTrustCertsFilePath.Utf8Value().c_str());
+ defaultTlsTrustCertsFilePath = tlsTrustCertsFilePath.Utf8Value();
}
if (clientConfig.Has(CFG_TLS_CERT_FILE) && clientConfig.Get(CFG_TLS_CERT_FILE).IsString()) {
@@ -227,8 +453,23 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info)
}
try {
- this->cClient = std::shared_ptr(
- pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
+ if (hasServiceUrlProvider) {
+ std::unique_ptr serviceInfoProvider = BuildServiceInfoProvider(
+ clientConfig, this->authRefs_, defaultAuthentication, defaultTlsTrustCertsFilePath);
+ if (serviceInfoProvider == nullptr) {
+ return;
+ }
+
+ std::unique_ptr rawClient(new pulsar_client_t);
+ rawClient->client.reset(new pulsar::Client(
+ pulsar::Client::create(std::move(serviceInfoProvider), cClientConfig.get()->conf)));
+ this->cClient = std::shared_ptr(rawClient.release(),
+ [](pulsar_client_t *client) { delete client; });
+ } else {
+ Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
+ this->cClient = std::shared_ptr(
+ pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
+ }
} catch (const std::exception &e) {
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
}
diff --git a/src/Client.h b/src/Client.h
index b761d36..6ea9a15 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -22,6 +22,7 @@
#include
#include
+#include
struct LogMessage {
pulsar_logger_level_t level;
@@ -54,7 +55,7 @@ class Client : public Napi::ObjectWrap {
std::shared_ptr cClient;
std::shared_ptr cClientConfig;
pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO;
- Napi::ObjectReference authRef_;
+ std::vector authRefs_;
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);
diff --git a/tests/client.test.js b/tests/client.test.js
index d97763e..f871b0d 100644
--- a/tests/client.test.js
+++ b/tests/client.test.js
@@ -26,9 +26,11 @@ const baseUrl = 'http://localhost:8080';
describe('Client', () => {
describe('CreateFailedByUrlSetIncorrect', () => {
test('No Set Url', async () => {
+ const expectedError = 'Service URL is required and must be specified as a string '
+ + 'unless serviceUrlProvider is configured';
await expect(() => new Pulsar.Client({
operationTimeoutSeconds: 30,
- })).toThrow('Service URL is required and must be specified as a string');
+ })).toThrow(expectedError);
});
test('Set empty url', async () => {
@@ -51,6 +53,17 @@ const baseUrl = 'http://localhost:8080';
operationTimeoutSeconds: 30,
})).toThrow('Service URL is required and must be specified as a string');
});
+
+ test('Set both service url and service url provider', async () => {
+ await expect(() => new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ serviceUrlProvider: {
+ primary: 'pulsar://localhost:6650',
+ secondary: ['pulsar://localhost:6651'],
+ },
+ operationTimeoutSeconds: 30,
+ })).toThrow('Only one of serviceUrl or serviceUrlProvider can be configured');
+ });
});
describe('test getPartitionsForTopic', () => {
test('GetPartitions for empty topic', async () => {
diff --git a/tests/failover_client_test.test.js b/tests/failover_client_test.test.js
new file mode 100644
index 0000000..c834b5b
--- /dev/null
+++ b/tests/failover_client_test.test.js
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+const childProcess = require('child_process');
+const fs = require('fs');
+const http = require('http');
+const os = require('os');
+const path = require('path');
+const { promisify } = require('util');
+const Pulsar = require('../index');
+
+const execFile = promisify(childProcess.execFile);
+const dockerImage = process.env.PULSAR_TEST_IMAGE || 'apachepulsar/pulsar:latest';
+const testRunId = `${Date.now()}-${process.pid}`;
+const tempRoot = path.join(os.tmpdir(), `pulsar-node-failover-client-test-${testRunId}`);
+const startedContainers = [];
+
+jest.setTimeout(180000);
+
+const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+
+const docker = async (args, options = {}) => {
+ const { stdout } = await execFile('docker', args, {
+ maxBuffer: 1024 * 1024,
+ ...options,
+ });
+ return stdout.trim();
+};
+
+const waitForHttpOk = async (url, timeoutMs = 60000) => {
+ const deadline = Date.now() + timeoutMs;
+
+ while (Date.now() < deadline) {
+ try {
+ await new Promise((resolve, reject) => {
+ const request = http.get(url, (response) => {
+ response.resume();
+ if (response.statusCode >= 200 && response.statusCode < 300) {
+ resolve();
+ } else {
+ reject(new Error(`Unexpected status code ${response.statusCode}`));
+ }
+ });
+ request.on('error', reject);
+ request.setTimeout(2000, () => {
+ request.destroy(new Error('Timed out waiting for Pulsar HTTP endpoint'));
+ });
+ });
+ return;
+ } catch (e) {
+ await delay(1000);
+ }
+ }
+
+ throw new Error(`Timed out waiting for ${url}`);
+};
+
+const writeStandaloneConfig = (clusterName, webPort, brokerPort) => {
+ const clusterDir = path.join(tempRoot, clusterName);
+ fs.mkdirSync(clusterDir, { recursive: true });
+ fs.chmodSync(tempRoot, 0o755);
+ fs.chmodSync(clusterDir, 0o755);
+
+ const sourceConfig = fs.readFileSync(path.join(__dirname, 'conf', 'standalone.conf'), 'utf8');
+ const config = sourceConfig
+ .replace(/^brokerServicePort=.*$/m, `brokerServicePort=${brokerPort}`)
+ .replace(/^brokerServicePortTls=.*$/m, `brokerServicePortTls=${brokerPort + 100}`)
+ .replace(/^webServicePort=.*$/m, `webServicePort=${webPort}`)
+ .replace(/^webServicePortTls=.*$/m, `webServicePortTls=${webPort + 100}`)
+ .replace(/^advertisedListeners=.*$/m, 'advertisedListeners=');
+
+ const configPath = path.join(clusterDir, 'standalone.conf');
+ fs.writeFileSync(configPath, config);
+ fs.chmodSync(configPath, 0o644);
+
+ ['server.crt', 'server.key'].forEach((fileName) => {
+ const targetPath = path.join(clusterDir, fileName);
+ fs.copyFileSync(path.join(__dirname, 'certificate', fileName), targetPath);
+ fs.chmodSync(targetPath, 0o644);
+ });
+
+ return clusterDir;
+};
+
+const getStandaloneLogs = async (containerId) => {
+ const { stdout, stderr } = await execFile('docker', [
+ 'exec',
+ containerId,
+ 'bash',
+ '-lc',
+ 'cat logs/pulsar-standalone-*.out logs/pulsar-standalone-*.log 2>/dev/null || true',
+ ], { maxBuffer: 1024 * 1024 }).catch((e) => e);
+
+ return `${stdout || ''}${stderr || ''}`.trim();
+};
+
+const startStandaloneCluster = async ({ clusterName, webPort, brokerPort }) => {
+ const confDir = writeStandaloneConfig(clusterName, webPort, brokerPort);
+ const containerId = await docker([
+ 'run',
+ '-i',
+ '-p',
+ `${webPort}:${webPort}`,
+ '-p',
+ `${brokerPort}:${brokerPort}`,
+ '--rm',
+ '--detach',
+ dockerImage,
+ 'sleep',
+ '3600',
+ ]);
+ startedContainers.push(containerId);
+
+ await docker(['cp', confDir, `${containerId}:/pulsar/test-conf`]);
+ await docker([
+ 'exec',
+ '-i',
+ containerId,
+ 'env',
+ 'PULSAR_STANDALONE_CONF=test-conf/standalone.conf',
+ 'PULSAR_STANDALONE_USE_ZOOKEEPER=1',
+ 'bin/pulsar-daemon',
+ 'start',
+ 'standalone',
+ '--no-functions-worker',
+ '--no-stream-storage',
+ '--bookkeeper-dir',
+ `data/bookkeeper-${clusterName}`,
+ ]);
+ try {
+ await waitForHttpOk(`http://localhost:${webPort}/admin/v2/clusters`);
+ } catch (e) {
+ const logs = await getStandaloneLogs(containerId);
+ throw new Error(`${e.message}\nStandalone logs for ${clusterName}:\n${logs}`);
+ }
+
+ return { containerId, serviceUrl: `pulsar://localhost:${brokerPort}` };
+};
+
+const stopContainer = async (containerId) => {
+ await docker(['kill', containerId]).catch(() => {});
+};
+
+const sendAndReceive = async (client, topic, payload) => {
+ let consumer;
+ let producer;
+
+ try {
+ producer = await client.createProducer({ topic });
+ await producer.send({ data: Buffer.from(payload) });
+
+ consumer = await client.subscribe({
+ topic,
+ subscription: `sub-${testRunId}-${Math.random().toString(36).slice(2)}`,
+ subscriptionInitialPosition: 'Earliest',
+ });
+
+ const message = await consumer.receive(10000);
+ expect(message.getData().toString()).toBe(payload);
+ await consumer.acknowledge(message);
+ } finally {
+ if (producer) {
+ await producer.close().catch(() => {});
+ }
+ if (consumer) {
+ await consumer.close().catch(() => {});
+ }
+ }
+};
+
+const retryUntil = async (operation, timeoutMs = 60000) => {
+ const deadline = Date.now() + timeoutMs;
+ let lastError;
+
+ while (Date.now() < deadline) {
+ try {
+ return await operation();
+ } catch (e) {
+ lastError = e;
+ await delay(1000);
+ }
+ }
+
+ throw lastError;
+};
+
+describe('failoverClientTest', () => {
+ let primaryCluster;
+ let secondaryCluster;
+ let client;
+
+ beforeAll(async () => {
+ primaryCluster = await startStandaloneCluster({
+ clusterName: 'primary',
+ webPort: 18080,
+ brokerPort: 16650,
+ });
+ secondaryCluster = await startStandaloneCluster({
+ clusterName: 'secondary',
+ webPort: 18081,
+ brokerPort: 16651,
+ });
+
+ client = new Pulsar.Client({
+ serviceUrlProvider: {
+ primary: primaryCluster.serviceUrl,
+ secondary: [secondaryCluster.serviceUrl],
+ checkIntervalMs: 1000,
+ failoverThreshold: 1,
+ switchBackThreshold: 1,
+ },
+ operationTimeoutSeconds: 30,
+ connectionTimeoutMs: 1000,
+ });
+ });
+
+ afterAll(async () => {
+ if (client) {
+ await client.close().catch(() => {});
+ }
+
+ await Promise.all(startedContainers.map(stopContainer));
+ fs.rmSync(tempRoot, { force: true, recursive: true });
+ });
+
+ test('continues producing and consuming after the primary cluster stops', async () => {
+ expect(client).toBeDefined();
+
+ await retryUntil(() => sendAndReceive(
+ client,
+ `persistent://public/default/failover-primary-${testRunId}`,
+ 'message-before-failover',
+ ));
+
+ await stopContainer(primaryCluster.containerId);
+
+ await retryUntil(() => sendAndReceive(
+ client,
+ `persistent://public/default/failover-secondary-${testRunId}`,
+ 'message-after-failover',
+ ));
+ });
+});