diff --git a/index.d.ts b/index.d.ts index 8e6fb85..61ade63 100644 --- a/index.d.ts +++ b/index.d.ts @@ -18,8 +18,12 @@ */ /// -export interface ClientConfig { - serviceUrl: string; +export type ClientConfig = ClientConfigBase & ( + { serviceUrl: string; serviceUrlProvider?: never } | + { serviceUrl?: never; serviceUrlProvider: AutoClusterFailoverConfig } +); + +export interface ClientConfigBase { authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic; operationTimeoutSeconds?: number; ioThreads?: number; @@ -37,6 +41,20 @@ export interface ClientConfig { connectionTimeoutMs?: number; } +export type ServiceInfo = string | { + serviceUrl: string; + authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic; + tlsTrustCertsFilePath?: string; +}; + +export interface AutoClusterFailoverConfig { + primary: ServiceInfo; + secondary: ServiceInfo[]; + checkIntervalMs?: number; + failoverThreshold?: number; + switchBackThreshold?: number; +} + export class Client { static setLogHandler(logHandler: (level: LogLevel, file: string, line: number, message: string) => void): void; constructor(config: ClientConfig); diff --git a/src/Client.cc b/src/Client.cc index 6d09cd7..581b62c 100644 --- a/src/Client.cc +++ b/src/Client.cc @@ -23,12 +23,28 @@ #include "Producer.h" #include "Reader.h" #include "ThreadSafeDeferred.h" +#include +#include +#include +#include #include #include #include #include "pulsar/ClientConfiguration.h" +#include +#include +#include +#include +#include +#include static const std::string CFG_SERVICE_URL = "serviceUrl"; +static const std::string CFG_SERVICE_URL_PROVIDER = "serviceUrlProvider"; +static const std::string CFG_PRIMARY = "primary"; +static const std::string CFG_SECONDARY = "secondary"; +static const std::string CFG_CHECK_INTERVAL_MS = "checkIntervalMs"; +static const std::string CFG_FAILOVER_THRESHOLD = "failoverThreshold"; +static const std::string CFG_SWITCH_BACK_THRESHOLD = "switchBackThreshold"; static const std::string CFG_AUTH = "authentication"; static const std::string CFG_AUTH_PROP = "binding"; static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds"; @@ -52,6 +68,194 @@ struct _pulsar_client_configuration { pulsar::ClientConfiguration conf; }; +struct _pulsar_client { + std::unique_ptr client; +}; + +struct _pulsar_authentication { + pulsar::AuthenticationPtr auth; +}; + +static bool IsPresent(const Napi::Value &value) { return !value.IsUndefined() && !value.IsNull(); } + +static std::optional BuildAuthenticationPtr( + const Napi::Object &authObject, std::vector &authRefs) { + Napi::Env env = authObject.Env(); + + if (!authObject.Has(CFG_AUTH_PROP) || !authObject.Get(CFG_AUTH_PROP).IsObject()) { + Napi::Error::New(env, "Authentication must be a Pulsar authentication object") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + + Napi::Object binding = authObject.Get(CFG_AUTH_PROP).As(); + authRefs.emplace_back(Napi::Persistent(binding)); + Authentication *auth = Authentication::Unwrap(authRefs.back().Value()); + + if (auth == nullptr || auth->GetCAuthentication() == nullptr) { + Napi::Error::New(env, "Authentication must be a Pulsar authentication object") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + + return auth->GetCAuthentication()->auth; +} + +static std::optional BuildServiceInfo(const Napi::Value &value, + const std::string &fieldName, + std::vector &authRefs, + const pulsar::AuthenticationPtr &defaultAuth, + const std::optional &defaultTls) { + Napi::Env env = value.Env(); + + if (value.IsString()) { + std::string serviceUrl = value.ToString().Utf8Value(); + if (serviceUrl.empty()) { + Napi::Error::New(env, fieldName + " service URL must be a non-empty string") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + return pulsar::ServiceInfo(serviceUrl, defaultAuth, defaultTls); + } + + if (!value.IsObject()) { + Napi::Error::New(env, fieldName + " must be a service URL string or service info object") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + + Napi::Object serviceInfo = value.As(); + if (!serviceInfo.Has(CFG_SERVICE_URL) || !serviceInfo.Get(CFG_SERVICE_URL).IsString() || + serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) { + Napi::Error::New(env, fieldName + ".serviceUrl is required and must be a non-empty string") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + + std::string serviceUrl = serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value(); + pulsar::AuthenticationPtr authentication = defaultAuth; + std::optional tlsTrustCertsFilePath = defaultTls; + + if (serviceInfo.Has(CFG_AUTH) && IsPresent(serviceInfo.Get(CFG_AUTH))) { + if (!serviceInfo.Get(CFG_AUTH).IsObject()) { + Napi::Error::New(env, fieldName + ".authentication must be a Pulsar authentication object") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + + auto auth = BuildAuthenticationPtr(serviceInfo.Get(CFG_AUTH).As(), authRefs); + if (!auth.has_value()) { + return std::nullopt; + } + authentication = auth.value(); + } + + if (serviceInfo.Has(CFG_TLS_TRUST_CERT) && IsPresent(serviceInfo.Get(CFG_TLS_TRUST_CERT))) { + if (!serviceInfo.Get(CFG_TLS_TRUST_CERT).IsString()) { + Napi::Error::New(env, fieldName + ".tlsTrustCertsFilePath must be a string") + .ThrowAsJavaScriptException(); + return std::nullopt; + } + tlsTrustCertsFilePath = serviceInfo.Get(CFG_TLS_TRUST_CERT).ToString().Utf8Value(); + } + + return pulsar::ServiceInfo(serviceUrl, authentication, tlsTrustCertsFilePath); +} + +static bool SetPositiveUint32(const Napi::Object &config, const std::string &fieldName, uint32_t &target) { + if (!config.Has(fieldName) || !IsPresent(config.Get(fieldName))) { + return true; + } + + Napi::Env env = config.Env(); + if (!config.Get(fieldName).IsNumber()) { + Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number") + .ThrowAsJavaScriptException(); + return false; + } + + int64_t value = config.Get(fieldName).ToNumber().Int64Value(); + if (value <= 0 || value > UINT32_MAX) { + Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number") + .ThrowAsJavaScriptException(); + return false; + } + + target = static_cast(value); + return true; +} + +static std::unique_ptr BuildServiceInfoProvider( + const Napi::Object &clientConfig, std::vector &authRefs, + const pulsar::AuthenticationPtr &defaultAuth, const std::optional &defaultTls) { + Napi::Value providerValue = clientConfig.Get(CFG_SERVICE_URL_PROVIDER); + Napi::Env env = clientConfig.Env(); + + if (!providerValue.IsObject()) { + Napi::Error::New(env, "serviceUrlProvider must be an object").ThrowAsJavaScriptException(); + return nullptr; + } + + Napi::Object providerConfig = providerValue.As(); + if (!providerConfig.Has(CFG_PRIMARY) || !IsPresent(providerConfig.Get(CFG_PRIMARY))) { + Napi::Error::New(env, "serviceUrlProvider.primary is required").ThrowAsJavaScriptException(); + return nullptr; + } + + auto primary = BuildServiceInfo(providerConfig.Get(CFG_PRIMARY), "serviceUrlProvider.primary", authRefs, + defaultAuth, defaultTls); + if (!primary.has_value()) { + return nullptr; + } + + if (!providerConfig.Has(CFG_SECONDARY) || !providerConfig.Get(CFG_SECONDARY).IsArray()) { + Napi::Error::New(env, "serviceUrlProvider.secondary is required and must be an array") + .ThrowAsJavaScriptException(); + return nullptr; + } + + Napi::Array secondaryConfig = providerConfig.Get(CFG_SECONDARY).As(); + if (secondaryConfig.Length() == 0) { + Napi::Error::New(env, "serviceUrlProvider.secondary must contain at least one service") + .ThrowAsJavaScriptException(); + return nullptr; + } + + std::vector secondary; + secondary.reserve(secondaryConfig.Length()); + for (uint32_t i = 0; i < secondaryConfig.Length(); i++) { + auto serviceInfo = + BuildServiceInfo(secondaryConfig.Get(i), "serviceUrlProvider.secondary[" + std::to_string(i) + "]", + authRefs, defaultAuth, defaultTls); + if (!serviceInfo.has_value()) { + return nullptr; + } + secondary.emplace_back(std::move(serviceInfo.value())); + } + + pulsar::AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary.value()), + std::move(secondary)); + + uint32_t checkIntervalMs = static_cast(autoClusterFailoverConfig.checkInterval.count()); + if (!SetPositiveUint32(providerConfig, CFG_CHECK_INTERVAL_MS, checkIntervalMs)) { + return nullptr; + } + autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs); + + if (!SetPositiveUint32(providerConfig, CFG_FAILOVER_THRESHOLD, + autoClusterFailoverConfig.failoverThreshold)) { + return nullptr; + } + + if (!SetPositiveUint32(providerConfig, CFG_SWITCH_BACK_THRESHOLD, + autoClusterFailoverConfig.switchBackThreshold)) { + return nullptr; + } + + return std::unique_ptr( + new pulsar::AutoClusterFailover(std::move(autoClusterFailoverConfig))); +} + void Client::SetLogHandler(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); @@ -103,16 +307,34 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info) Napi::HandleScope scope(env); Napi::Object clientConfig = info[0].As(); - if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString() || - clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) { + bool hasServiceUrlProvider = + clientConfig.Has(CFG_SERVICE_URL_PROVIDER) && IsPresent(clientConfig.Get(CFG_SERVICE_URL_PROVIDER)); + bool hasServiceUrl = clientConfig.Has(CFG_SERVICE_URL) && IsPresent(clientConfig.Get(CFG_SERVICE_URL)); + if (hasServiceUrlProvider && hasServiceUrl) { + Napi::Error::New(env, "Only one of serviceUrl or serviceUrlProvider can be configured") + .ThrowAsJavaScriptException(); + return; + } + + if (!hasServiceUrlProvider && !hasServiceUrl) { + Napi::Error::New(env, + "Service URL is required and must be specified as a string unless serviceUrlProvider " + "is configured") + .ThrowAsJavaScriptException(); + return; + } + + if (hasServiceUrl && (!clientConfig.Get(CFG_SERVICE_URL).IsString() || + clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty())) { Napi::Error::New(env, "Service URL is required and must be specified as a string") .ThrowAsJavaScriptException(); return; } - Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString(); this->cClientConfig = std::shared_ptr(pulsar_client_configuration_create(), pulsar_client_configuration_free); + pulsar::AuthenticationPtr defaultAuthentication = pulsar::AuthFactory::Disabled(); + std::optional defaultTlsTrustCertsFilePath = std::nullopt; // The logger can only be set once per process, so we will take control of it if (clientConfig.Has(CFG_LOG_LEVEL) && clientConfig.Get(CFG_LOG_LEVEL).IsNumber()) { @@ -145,9 +367,12 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info) if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) { Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject(); if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) { - this->authRef_ = Napi::Persistent(obj.Get(CFG_AUTH_PROP).As()); - Authentication *auth = Authentication::Unwrap(this->authRef_.Value()); - pulsar_client_configuration_set_auth(cClientConfig.get(), auth->GetCAuthentication()); + auto auth = BuildAuthenticationPtr(obj, this->authRefs_); + if (!auth.has_value()) { + return; + } + cClientConfig.get()->conf.setAuth(auth.value()); + defaultAuthentication = auth.value(); } } @@ -190,6 +415,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info) Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString(); pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig.get(), tlsTrustCertsFilePath.Utf8Value().c_str()); + defaultTlsTrustCertsFilePath = tlsTrustCertsFilePath.Utf8Value(); } if (clientConfig.Has(CFG_TLS_CERT_FILE) && clientConfig.Get(CFG_TLS_CERT_FILE).IsString()) { @@ -227,8 +453,23 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap(info) } try { - this->cClient = std::shared_ptr( - pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free); + if (hasServiceUrlProvider) { + std::unique_ptr serviceInfoProvider = BuildServiceInfoProvider( + clientConfig, this->authRefs_, defaultAuthentication, defaultTlsTrustCertsFilePath); + if (serviceInfoProvider == nullptr) { + return; + } + + std::unique_ptr rawClient(new pulsar_client_t); + rawClient->client.reset(new pulsar::Client( + pulsar::Client::create(std::move(serviceInfoProvider), cClientConfig.get()->conf))); + this->cClient = std::shared_ptr(rawClient.release(), + [](pulsar_client_t *client) { delete client; }); + } else { + Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString(); + this->cClient = std::shared_ptr( + pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free); + } } catch (const std::exception &e) { Napi::Error::New(env, e.what()).ThrowAsJavaScriptException(); } diff --git a/src/Client.h b/src/Client.h index b761d36..6ea9a15 100644 --- a/src/Client.h +++ b/src/Client.h @@ -22,6 +22,7 @@ #include #include +#include struct LogMessage { pulsar_logger_level_t level; @@ -54,7 +55,7 @@ class Client : public Napi::ObjectWrap { std::shared_ptr cClient; std::shared_ptr cClientConfig; pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO; - Napi::ObjectReference authRef_; + std::vector authRefs_; Napi::Value CreateProducer(const Napi::CallbackInfo &info); Napi::Value Subscribe(const Napi::CallbackInfo &info); diff --git a/tests/client.test.js b/tests/client.test.js index d97763e..f871b0d 100644 --- a/tests/client.test.js +++ b/tests/client.test.js @@ -26,9 +26,11 @@ const baseUrl = 'http://localhost:8080'; describe('Client', () => { describe('CreateFailedByUrlSetIncorrect', () => { test('No Set Url', async () => { + const expectedError = 'Service URL is required and must be specified as a string ' + + 'unless serviceUrlProvider is configured'; await expect(() => new Pulsar.Client({ operationTimeoutSeconds: 30, - })).toThrow('Service URL is required and must be specified as a string'); + })).toThrow(expectedError); }); test('Set empty url', async () => { @@ -51,6 +53,17 @@ const baseUrl = 'http://localhost:8080'; operationTimeoutSeconds: 30, })).toThrow('Service URL is required and must be specified as a string'); }); + + test('Set both service url and service url provider', async () => { + await expect(() => new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + serviceUrlProvider: { + primary: 'pulsar://localhost:6650', + secondary: ['pulsar://localhost:6651'], + }, + operationTimeoutSeconds: 30, + })).toThrow('Only one of serviceUrl or serviceUrlProvider can be configured'); + }); }); describe('test getPartitionsForTopic', () => { test('GetPartitions for empty topic', async () => { diff --git a/tests/failover_client_test.test.js b/tests/failover_client_test.test.js new file mode 100644 index 0000000..c834b5b --- /dev/null +++ b/tests/failover_client_test.test.js @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const childProcess = require('child_process'); +const fs = require('fs'); +const http = require('http'); +const os = require('os'); +const path = require('path'); +const { promisify } = require('util'); +const Pulsar = require('../index'); + +const execFile = promisify(childProcess.execFile); +const dockerImage = process.env.PULSAR_TEST_IMAGE || 'apachepulsar/pulsar:latest'; +const testRunId = `${Date.now()}-${process.pid}`; +const tempRoot = path.join(os.tmpdir(), `pulsar-node-failover-client-test-${testRunId}`); +const startedContainers = []; + +jest.setTimeout(180000); + +const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +const docker = async (args, options = {}) => { + const { stdout } = await execFile('docker', args, { + maxBuffer: 1024 * 1024, + ...options, + }); + return stdout.trim(); +}; + +const waitForHttpOk = async (url, timeoutMs = 60000) => { + const deadline = Date.now() + timeoutMs; + + while (Date.now() < deadline) { + try { + await new Promise((resolve, reject) => { + const request = http.get(url, (response) => { + response.resume(); + if (response.statusCode >= 200 && response.statusCode < 300) { + resolve(); + } else { + reject(new Error(`Unexpected status code ${response.statusCode}`)); + } + }); + request.on('error', reject); + request.setTimeout(2000, () => { + request.destroy(new Error('Timed out waiting for Pulsar HTTP endpoint')); + }); + }); + return; + } catch (e) { + await delay(1000); + } + } + + throw new Error(`Timed out waiting for ${url}`); +}; + +const writeStandaloneConfig = (clusterName, webPort, brokerPort) => { + const clusterDir = path.join(tempRoot, clusterName); + fs.mkdirSync(clusterDir, { recursive: true }); + fs.chmodSync(tempRoot, 0o755); + fs.chmodSync(clusterDir, 0o755); + + const sourceConfig = fs.readFileSync(path.join(__dirname, 'conf', 'standalone.conf'), 'utf8'); + const config = sourceConfig + .replace(/^brokerServicePort=.*$/m, `brokerServicePort=${brokerPort}`) + .replace(/^brokerServicePortTls=.*$/m, `brokerServicePortTls=${brokerPort + 100}`) + .replace(/^webServicePort=.*$/m, `webServicePort=${webPort}`) + .replace(/^webServicePortTls=.*$/m, `webServicePortTls=${webPort + 100}`) + .replace(/^advertisedListeners=.*$/m, 'advertisedListeners='); + + const configPath = path.join(clusterDir, 'standalone.conf'); + fs.writeFileSync(configPath, config); + fs.chmodSync(configPath, 0o644); + + ['server.crt', 'server.key'].forEach((fileName) => { + const targetPath = path.join(clusterDir, fileName); + fs.copyFileSync(path.join(__dirname, 'certificate', fileName), targetPath); + fs.chmodSync(targetPath, 0o644); + }); + + return clusterDir; +}; + +const getStandaloneLogs = async (containerId) => { + const { stdout, stderr } = await execFile('docker', [ + 'exec', + containerId, + 'bash', + '-lc', + 'cat logs/pulsar-standalone-*.out logs/pulsar-standalone-*.log 2>/dev/null || true', + ], { maxBuffer: 1024 * 1024 }).catch((e) => e); + + return `${stdout || ''}${stderr || ''}`.trim(); +}; + +const startStandaloneCluster = async ({ clusterName, webPort, brokerPort }) => { + const confDir = writeStandaloneConfig(clusterName, webPort, brokerPort); + const containerId = await docker([ + 'run', + '-i', + '-p', + `${webPort}:${webPort}`, + '-p', + `${brokerPort}:${brokerPort}`, + '--rm', + '--detach', + dockerImage, + 'sleep', + '3600', + ]); + startedContainers.push(containerId); + + await docker(['cp', confDir, `${containerId}:/pulsar/test-conf`]); + await docker([ + 'exec', + '-i', + containerId, + 'env', + 'PULSAR_STANDALONE_CONF=test-conf/standalone.conf', + 'PULSAR_STANDALONE_USE_ZOOKEEPER=1', + 'bin/pulsar-daemon', + 'start', + 'standalone', + '--no-functions-worker', + '--no-stream-storage', + '--bookkeeper-dir', + `data/bookkeeper-${clusterName}`, + ]); + try { + await waitForHttpOk(`http://localhost:${webPort}/admin/v2/clusters`); + } catch (e) { + const logs = await getStandaloneLogs(containerId); + throw new Error(`${e.message}\nStandalone logs for ${clusterName}:\n${logs}`); + } + + return { containerId, serviceUrl: `pulsar://localhost:${brokerPort}` }; +}; + +const stopContainer = async (containerId) => { + await docker(['kill', containerId]).catch(() => {}); +}; + +const sendAndReceive = async (client, topic, payload) => { + let consumer; + let producer; + + try { + producer = await client.createProducer({ topic }); + await producer.send({ data: Buffer.from(payload) }); + + consumer = await client.subscribe({ + topic, + subscription: `sub-${testRunId}-${Math.random().toString(36).slice(2)}`, + subscriptionInitialPosition: 'Earliest', + }); + + const message = await consumer.receive(10000); + expect(message.getData().toString()).toBe(payload); + await consumer.acknowledge(message); + } finally { + if (producer) { + await producer.close().catch(() => {}); + } + if (consumer) { + await consumer.close().catch(() => {}); + } + } +}; + +const retryUntil = async (operation, timeoutMs = 60000) => { + const deadline = Date.now() + timeoutMs; + let lastError; + + while (Date.now() < deadline) { + try { + return await operation(); + } catch (e) { + lastError = e; + await delay(1000); + } + } + + throw lastError; +}; + +describe('failoverClientTest', () => { + let primaryCluster; + let secondaryCluster; + let client; + + beforeAll(async () => { + primaryCluster = await startStandaloneCluster({ + clusterName: 'primary', + webPort: 18080, + brokerPort: 16650, + }); + secondaryCluster = await startStandaloneCluster({ + clusterName: 'secondary', + webPort: 18081, + brokerPort: 16651, + }); + + client = new Pulsar.Client({ + serviceUrlProvider: { + primary: primaryCluster.serviceUrl, + secondary: [secondaryCluster.serviceUrl], + checkIntervalMs: 1000, + failoverThreshold: 1, + switchBackThreshold: 1, + }, + operationTimeoutSeconds: 30, + connectionTimeoutMs: 1000, + }); + }); + + afterAll(async () => { + if (client) { + await client.close().catch(() => {}); + } + + await Promise.all(startedContainers.map(stopContainer)); + fs.rmSync(tempRoot, { force: true, recursive: true }); + }); + + test('continues producing and consuming after the primary cluster stops', async () => { + expect(client).toBeDefined(); + + await retryUntil(() => sendAndReceive( + client, + `persistent://public/default/failover-primary-${testRunId}`, + 'message-before-failover', + )); + + await stopContainer(primaryCluster.containerId); + + await retryUntil(() => sendAndReceive( + client, + `persistent://public/default/failover-secondary-${testRunId}`, + 'message-after-failover', + )); + }); +});