From b649150e39bcc1894bb457b6e65c0c151b795700 Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Sat, 5 Jul 2025 11:31:56 +0100 Subject: [PATCH 1/5] Add support for Confluenct Kafka 8.x --- docs/modules/kafka.md | 24 +- packages/modules/kafka/Dockerfile | 2 +- .../kafka/src/kafka-container-7.test.ts | 248 ++++++++++++++++++ .../kafka/src/kafka-container-latest.test.ts | 148 +++++++++++ packages/modules/kafka/src/kafka-container.ts | 9 +- packages/modules/kafka/src/test-helper.ts | 34 +++ packages/modules/kafka/tsconfig.build.json | 3 +- 7 files changed, 460 insertions(+), 8 deletions(-) create mode 100644 packages/modules/kafka/src/kafka-container-7.test.ts create mode 100644 packages/modules/kafka/src/kafka-container-latest.test.ts create mode 100644 packages/modules/kafka/src/test-helper.ts diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index c8cd48a57..8ee2088ff 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -8,20 +8,34 @@ npm install @testcontainers/kafka --save-dev ``` -## Examples +## Kafka 8.x + +### Examples + + +[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connect + + + +[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:ssl + + +## Kafka 7.x + +### Examples -[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectBuiltInZK +[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectBuiltInZK -[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectProvidedZK +[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectProvidedZK -[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:ssl +[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:ssl -[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectKraft +[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectKraft diff --git a/packages/modules/kafka/Dockerfile b/packages/modules/kafka/Dockerfile index 8afe540be..f80590ea7 100644 --- a/packages/modules/kafka/Dockerfile +++ b/packages/modules/kafka/Dockerfile @@ -1 +1 @@ -FROM confluentinc/cp-kafka:7.9.1 +FROM confluentinc/cp-kafka:8.0.0 diff --git a/packages/modules/kafka/src/kafka-container-7.test.ts b/packages/modules/kafka/src/kafka-container-7.test.ts new file mode 100644 index 000000000..92cd7698c --- /dev/null +++ b/packages/modules/kafka/src/kafka-container-7.test.ts @@ -0,0 +1,248 @@ +import fs from "fs"; +import path from "path"; +import { GenericContainer, Network } from "testcontainers"; +import { KafkaContainer } from "./kafka-container"; +import { testPubSub } from "./test-helper"; + +const IMAGE = "confluentinc/cp-kafka:7.9.1"; + +describe("KafkaContainer", { timeout: 240_000 }, () => { + // connectBuiltInZK { + it("should connect using in-built zoo-keeper", async () => { + const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + }); + // } + + it("should connect using in-built zoo-keeper and custom images", async () => { + const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + }); + + it("should connect using in-built zoo-keeper and custom network", async () => { + const network = await new Network().start(); + + const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + await network.stop(); + }); + + // connectProvidedZK { + it("should connect using provided zoo-keeper and network", async () => { + const network = await new Network().start(); + + const zooKeeperHost = "zookeeper"; + const zooKeeperPort = 2181; + const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4") + .withNetwork(network) + .withNetworkAliases(zooKeeperHost) + .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) + .withExposedPorts(zooKeeperPort) + .start(); + + const kafkaContainer = await new KafkaContainer(IMAGE) + .withNetwork(network) + .withZooKeeper(zooKeeperHost, zooKeeperPort) + .withExposedPorts(9093) + .start(); + + await testPubSub(kafkaContainer); + + await zookeeperContainer.stop(); + await kafkaContainer.stop(); + await network.stop(); + }); + // } + + it("should be reusable", async () => { + const originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); + const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); + + expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId()); + + await originalKafkaContainer.stop(); + }); + + describe.each([ + { + name: "and zookpeer enabled", + configure: () => ({}), + }, + { + name: "and kraft enabled", + configure: (kafkaContainer: KafkaContainer) => kafkaContainer.withKraft(), + }, + ])("when SASL SSL config listener provided $name", ({ configure }) => { + const certificatesDir = path.resolve(__dirname, "..", "test-certs"); + + // ssl { + it(`should connect locally`, async () => { + const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({ + port: 9096, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), + passphrase: "serverKeystorePassword", + }, + truststore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), + passphrase: "serverTruststorePassword", + }, + }); + configure(kafkaContainer); + const startedKafkaContainer = await kafkaContainer.start(); + + await testPubSub(startedKafkaContainer, { + brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`], + sasl: { + username: "app-user", + password: "userPassword", + mechanism: "scram-sha-512", + }, + ssl: { + ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))], + }, + }); + await startedKafkaContainer.stop(); + }); + // } + + it(`should connect within Docker network`, async () => { + const network = await new Network().start(); + + const kafkaContainer = await new KafkaContainer(IMAGE) + .withNetwork(network) + .withNetworkAliases("kafka") + .withSaslSslListener({ + port: 9094, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), + passphrase: "serverKeystorePassword", + }, + truststore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), + passphrase: "serverTruststorePassword", + }, + }) + .start(); + + const kafkaCliContainer = await new GenericContainer(IMAGE) + .withNetwork(network) + .withCommand(["bash", "-c", "sleep infinity"]) + .withCopyFilesToContainer([ + { + source: path.resolve(certificatesDir, "kafka.client.truststore.pem"), + target: "/truststore.pem", + }, + ]) + .withCopyContentToContainer([ + { + content: ` + security.protocol=SASL_SSL + ssl.truststore.location=/truststore.pem + ssl.truststore.type=PEM + ssl.endpoint.identification.algorithm= + sasl.mechanism=SCRAM-SHA-512 + sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\ + username="app-user" \\ + password="userPassword"; + `, + target: "/etc/kafka/consumer.properties", + }, + ]) + .start(); + + await kafkaCliContainer.exec( + "kafka-topics --create --topic test-topic --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties" + ); + const { output, exitCode } = await kafkaCliContainer.exec( + "kafka-topics --list --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties" + ); + + expect(exitCode).toBe(0); + expect(output).toContain("test-topic"); + + await kafkaCliContainer.stop(); + await kafkaContainer.stop(); + }); + }); + + // connectKraft { + it("should connect using kraft", async () => { + const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withExposedPorts(9093).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + }); + // } + + it("should throw an error when using kraft and and confluence platfom below 7.0.0", async () => { + expect(() => new KafkaContainer("confluentinc/cp-kafka:6.2.14").withKraft()).toThrow( + "Provided Confluent Platform's version 6.2.14 is not supported in Kraft mode (must be 7.0.0 or above)" + ); + }); + + it("should connect using kraft and custom network", async () => { + const network = await new Network().start(); + const kafkaContainer = await new KafkaContainer(IMAGE) + .withKraft() + .withNetwork(network) + .withExposedPorts(9093) + .start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + await network.stop(); + }); + + it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => { + const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0") + .withKraft() + .withExposedPorts(9093) + .withSaslSslListener({ + port: 9094, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: "fake", + passphrase: "serverKeystorePassword", + }, + truststore: { + content: "fake", + passphrase: "serverTruststorePassword", + }, + }); + await expect(() => kafkaContainer.start()).rejects.toThrow( + "Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)" + ); + }); +}); diff --git a/packages/modules/kafka/src/kafka-container-latest.test.ts b/packages/modules/kafka/src/kafka-container-latest.test.ts new file mode 100644 index 000000000..d88c6c394 --- /dev/null +++ b/packages/modules/kafka/src/kafka-container-latest.test.ts @@ -0,0 +1,148 @@ +import fs from "fs"; +import path from "path"; +import { GenericContainer, Network } from "testcontainers"; +import { getImage } from "../../../testcontainers/src/utils/test-helper"; +import { KafkaContainer, SaslSslListenerOptions } from "./kafka-container"; +import { testPubSub } from "./test-helper"; + +const IMAGE = getImage(__dirname); + +describe("KafkaContainer", { timeout: 240_000 }, () => { + const certificatesDir = path.resolve(__dirname, "..", "test-certs"); + + // connect { + it("should connect", async () => { + const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + }); + // } + + it("should connect with custom network", async () => { + const network = await new Network().start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + await network.stop(); + }); + + it("should be reusable", async () => { + const originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); + const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); + + expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId()); + + await originalKafkaContainer.stop(); + }); + + // ssl { + it(`should connect with SASL`, async () => { + const saslConfig: SaslSslListenerOptions = { + port: 9096, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), + passphrase: "serverKeystorePassword", + }, + truststore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), + passphrase: "serverTruststorePassword", + }, + }; + + const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener(saslConfig); + const startedKafkaContainer = await kafkaContainer.start(); + + await testPubSub(startedKafkaContainer, { + brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`], + sasl: { + username: "app-user", + password: "userPassword", + mechanism: "scram-sha-512", + }, + ssl: { + ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))], + }, + }); + await startedKafkaContainer.stop(); + }); + // } + + it(`should connect with SASL in custom network`, async () => { + const network = await new Network().start(); + + const saslConfig: SaslSslListenerOptions = { + port: 9096, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), + passphrase: "serverKeystorePassword", + }, + truststore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), + passphrase: "serverTruststorePassword", + }, + }; + + const kafkaContainer = await new KafkaContainer(IMAGE) + .withNetwork(network) + .withNetworkAliases("kafka") + .withSaslSslListener(saslConfig) + .start(); + + const kafkaCliContainer = await new GenericContainer(IMAGE) + .withNetwork(network) + .withCommand(["bash", "-c", "sleep infinity"]) + .withCopyFilesToContainer([ + { + source: path.resolve(certificatesDir, "kafka.client.truststore.pem"), + target: "/truststore.pem", + }, + ]) + .withCopyContentToContainer([ + { + content: ` + security.protocol=SASL_SSL + ssl.truststore.location=/truststore.pem + ssl.truststore.type=PEM + ssl.endpoint.identification.algorithm= + sasl.mechanism=SCRAM-SHA-512 + sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\ + username="app-user" \\ + password="userPassword"; + `, + target: "/etc/kafka/consumer.properties", + }, + ]) + .start(); + + await kafkaCliContainer.exec( + "kafka-topics --create --topic test-topic --bootstrap-server kafka:9096 --command-config /etc/kafka/consumer.properties" + ); + const { output, exitCode } = await kafkaCliContainer.exec( + "kafka-topics --list --bootstrap-server kafka:9096 --command-config /etc/kafka/consumer.properties" + ); + + expect(exitCode).toBe(0); + expect(output).toContain("test-topic"); + + await kafkaCliContainer.stop(); + await kafkaContainer.stop(); + }); +}); diff --git a/packages/modules/kafka/src/kafka-container.ts b/packages/modules/kafka/src/kafka-container.ts index 8b711e70e..573a00373 100644 --- a/packages/modules/kafka/src/kafka-container.ts +++ b/packages/modules/kafka/src/kafka-container.ts @@ -4,6 +4,7 @@ import { Content, GenericContainer, getContainerRuntimeClient, + ImageName, InspectResult, RandomUuid, StartedTestContainer, @@ -25,7 +26,7 @@ const WAIT_FOR_SCRIPT_MESSAGE = "Waiting for script..."; const MIN_KRAFT_VERSION = "7.0.0"; const MIN_KRAFT_SASL_VERSION = "7.5.0"; -interface SaslSslListenerOptions { +export interface SaslSslListenerOptions { sasl: SaslOptions; port: number; keystore: PKCS12CertificateStore; @@ -64,6 +65,12 @@ export class KafkaContainer extends GenericContainer { constructor(image: string) { super(image); + + const parsedImage = ImageName.fromString(image); + if (parsedImage.image === "confluentinc/cp-kafka" && parsedImage.tag.startsWith("8.")) { + this.withKraft(); + } + this.withExposedPorts(KAFKA_PORT).withStartupTimeout(180_000).withEnvironment({ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT", KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER", diff --git a/packages/modules/kafka/src/test-helper.ts b/packages/modules/kafka/src/test-helper.ts new file mode 100644 index 000000000..f3232d29e --- /dev/null +++ b/packages/modules/kafka/src/test-helper.ts @@ -0,0 +1,34 @@ +import { Kafka, KafkaConfig, logLevel } from "kafkajs"; +import { StartedTestContainer } from "testcontainers"; + +export async function testPubSub(kafkaContainer: StartedTestContainer, additionalConfig: Partial = {}) { + const kafka = new Kafka({ + logLevel: logLevel.NOTHING, + brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`], + ...additionalConfig, + }); + + const producer = kafka.producer(); + await producer.connect(); + + const consumer = kafka.consumer({ groupId: "test-group" }); + await consumer.connect(); + + await producer.send({ + topic: "test-topic", + messages: [{ value: "test message" }], + }); + + await consumer.subscribe({ topic: "test-topic", fromBeginning: true }); + + const consumedMessage = await new Promise((resolve) => { + consumer.run({ + eachMessage: async ({ message }) => resolve(message.value?.toString()), + }); + }); + + expect(consumedMessage).toBe("test message"); + + await consumer.disconnect(); + await producer.disconnect(); +} diff --git a/packages/modules/kafka/tsconfig.build.json b/packages/modules/kafka/tsconfig.build.json index ff7390b10..ee782d3d0 100644 --- a/packages/modules/kafka/tsconfig.build.json +++ b/packages/modules/kafka/tsconfig.build.json @@ -2,7 +2,8 @@ "extends": "./tsconfig.json", "exclude": [ "build", - "src/**/*.test.ts" + "src/**/*.test.ts", + "src/test-helper.ts" ], "references": [ { From 8a16e45e8511e1ac5a73428036ae091802c40034 Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Sat, 5 Jul 2025 11:40:23 +0100 Subject: [PATCH 2/5] Delete old test --- .../modules/kafka/src/kafka-container.test.ts | 281 ------------------ 1 file changed, 281 deletions(-) delete mode 100644 packages/modules/kafka/src/kafka-container.test.ts diff --git a/packages/modules/kafka/src/kafka-container.test.ts b/packages/modules/kafka/src/kafka-container.test.ts deleted file mode 100644 index 658b6c1d4..000000000 --- a/packages/modules/kafka/src/kafka-container.test.ts +++ /dev/null @@ -1,281 +0,0 @@ -import fs from "fs"; -import { Kafka, KafkaConfig, logLevel } from "kafkajs"; -import path from "path"; -import { GenericContainer, Network, StartedTestContainer } from "testcontainers"; -import { getImage } from "../../../testcontainers/src/utils/test-helper"; -import { KafkaContainer } from "./kafka-container"; - -const IMAGE = getImage(__dirname); - -describe("KafkaContainer", { timeout: 240_000 }, () => { - // connectBuiltInZK { - it("should connect using in-built zoo-keeper", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); - - await testPubSub(kafkaContainer); - - await kafkaContainer.stop(); - }); - // } - - it("should connect using in-built zoo-keeper and custom images", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); - - await testPubSub(kafkaContainer); - - await kafkaContainer.stop(); - }); - - it("should connect using in-built zoo-keeper and custom network", async () => { - const network = await new Network().start(); - - const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start(); - - await testPubSub(kafkaContainer); - - await kafkaContainer.stop(); - await network.stop(); - }); - - // connectProvidedZK { - it("should connect using provided zoo-keeper and network", async () => { - const network = await new Network().start(); - - const zooKeeperHost = "zookeeper"; - const zooKeeperPort = 2181; - const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4") - .withNetwork(network) - .withNetworkAliases(zooKeeperHost) - .withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() }) - .withExposedPorts(zooKeeperPort) - .start(); - - const kafkaContainer = await new KafkaContainer(IMAGE) - .withNetwork(network) - .withZooKeeper(zooKeeperHost, zooKeeperPort) - .withExposedPorts(9093) - .start(); - - await testPubSub(kafkaContainer); - - await zookeeperContainer.stop(); - await kafkaContainer.stop(); - await network.stop(); - }); - // } - - it("should be reusable", async () => { - const originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); - const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); - - expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId()); - - await originalKafkaContainer.stop(); - }); - - describe.each([ - { - name: "and zookpeer enabled", - configure: () => ({}), - }, - { - name: "and kraft enabled", - configure: (kafkaContainer: KafkaContainer) => kafkaContainer.withKraft(), - }, - ])("when SASL SSL config listener provided $name", ({ configure }) => { - const certificatesDir = path.resolve(__dirname, "..", "test-certs"); - - // ssl { - it(`should connect locally`, async () => { - const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({ - port: 9096, - sasl: { - mechanism: "SCRAM-SHA-512", - user: { - name: "app-user", - password: "userPassword", - }, - }, - keystore: { - content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), - passphrase: "serverKeystorePassword", - }, - truststore: { - content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), - passphrase: "serverTruststorePassword", - }, - }); - configure(kafkaContainer); - const startedKafkaContainer = await kafkaContainer.start(); - - await testPubSub(startedKafkaContainer, { - brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`], - sasl: { - username: "app-user", - password: "userPassword", - mechanism: "scram-sha-512", - }, - ssl: { - ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))], - }, - }); - await startedKafkaContainer.stop(); - }); - // } - - it(`should connect within Docker network`, async () => { - const network = await new Network().start(); - - const kafkaContainer = await new KafkaContainer(IMAGE) - .withNetwork(network) - .withNetworkAliases("kafka") - .withSaslSslListener({ - port: 9094, - sasl: { - mechanism: "SCRAM-SHA-512", - user: { - name: "app-user", - password: "userPassword", - }, - }, - keystore: { - content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), - passphrase: "serverKeystorePassword", - }, - truststore: { - content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), - passphrase: "serverTruststorePassword", - }, - }) - .start(); - - const kafkaCliContainer = await new GenericContainer(IMAGE) - .withNetwork(network) - .withCommand(["bash", "-c", "sleep infinity"]) - .withCopyFilesToContainer([ - { - source: path.resolve(certificatesDir, "kafka.client.truststore.pem"), - target: "/truststore.pem", - }, - ]) - .withCopyContentToContainer([ - { - content: ` - security.protocol=SASL_SSL - ssl.truststore.location=/truststore.pem - ssl.truststore.type=PEM - ssl.endpoint.identification.algorithm= - sasl.mechanism=SCRAM-SHA-512 - sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\ - username="app-user" \\ - password="userPassword"; - `, - target: "/etc/kafka/consumer.properties", - }, - ]) - .start(); - - await kafkaCliContainer.exec( - "kafka-topics --create --topic test-topic --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties" - ); - const { output, exitCode } = await kafkaCliContainer.exec( - "kafka-topics --list --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties" - ); - - expect(exitCode).toBe(0); - expect(output).toContain("test-topic"); - - await kafkaCliContainer.stop(); - await kafkaContainer.stop(); - }); - }); - - // connectKraft { - it("should connect using kraft", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withExposedPorts(9093).start(); - - await testPubSub(kafkaContainer); - - await kafkaContainer.stop(); - }); - // } - - it("should throw an error when using kraft and and confluence platfom below 7.0.0", async () => { - expect(() => new KafkaContainer("confluentinc/cp-kafka:6.2.14").withKraft()).toThrow( - "Provided Confluent Platform's version 6.2.14 is not supported in Kraft mode (must be 7.0.0 or above)" - ); - }); - - it("should connect using kraft and custom network", async () => { - const network = await new Network().start(); - const kafkaContainer = await new KafkaContainer(IMAGE) - .withKraft() - .withNetwork(network) - .withExposedPorts(9093) - .start(); - - await testPubSub(kafkaContainer); - - await kafkaContainer.stop(); - await network.stop(); - }); - - it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => { - const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0") - .withKraft() - .withExposedPorts(9093) - .withSaslSslListener({ - port: 9094, - sasl: { - mechanism: "SCRAM-SHA-512", - user: { - name: "app-user", - password: "userPassword", - }, - }, - keystore: { - content: "fake", - passphrase: "serverKeystorePassword", - }, - truststore: { - content: "fake", - passphrase: "serverTruststorePassword", - }, - }); - await expect(() => kafkaContainer.start()).rejects.toThrow( - "Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)" - ); - }); - - const testPubSub = async (kafkaContainer: StartedTestContainer, additionalConfig: Partial = {}) => { - const kafka = new Kafka({ - logLevel: logLevel.NOTHING, - brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`], - ...additionalConfig, - }); - - const producer = kafka.producer(); - await producer.connect(); - - const consumer = kafka.consumer({ groupId: "test-group" }); - await consumer.connect(); - - await producer.send({ - topic: "test-topic", - messages: [{ value: "test message" }], - }); - - await consumer.subscribe({ topic: "test-topic", fromBeginning: true }); - - const consumedMessage = await new Promise((resolve) => { - consumer.run({ - eachMessage: async ({ message }) => resolve(message.value?.toString()), - }); - }); - - expect(consumedMessage).toBe("test message"); - - await consumer.disconnect(); - await producer.disconnect(); - }; -}); From 9ae820f470346ce1ef9c44a0b0b15e9f5448fb29 Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Sat, 5 Jul 2025 16:36:57 +0100 Subject: [PATCH 3/5] Refactor --- package-lock.json | 94 +++++++++++-------- packages/modules/kafka/package.json | 1 + packages/modules/kafka/src/kafka-container.ts | 30 ++---- 3 files changed, 66 insertions(+), 59 deletions(-) diff --git a/package-lock.json b/package-lock.json index be58cebb2..d79d8b762 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5308,10 +5308,11 @@ } }, "node_modules/@npmcli/arborist/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -5419,10 +5420,11 @@ } }, "node_modules/@npmcli/map-workspaces/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -5528,10 +5530,11 @@ } }, "node_modules/@npmcli/package-json/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -7277,10 +7280,11 @@ } }, "node_modules/@tufjs/models/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -8003,9 +8007,9 @@ } }, "node_modules/@typescript-eslint/typescript-estree/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -8603,9 +8607,10 @@ } }, "node_modules/archiver-utils/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -9211,10 +9216,11 @@ "license": "MIT" }, "node_modules/brace-expansion": { - "version": "1.1.11", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", - "integrity": "sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==", + "version": "1.1.12", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.12.tgz", + "integrity": "sha512-9T9UjW3r0UW5c1Q7GTwllptXwhvYmEzFhzMfZ9H7FQWt+uZePjZPjBP/W1ZEyZ1twGWom5/56TF4lPcqjnDHcg==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -9389,9 +9395,9 @@ } }, "node_modules/cacache/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -10212,6 +10218,12 @@ "integrity": "sha512-L3sHRo1pXXEqX8VU28kfgUY+YGsk09hPqZiZmLacNib6XNTCM8ubYeT7ryXQw8asB1sKgcU5lkB7ONug08aB8w==", "dev": true }, + "node_modules/compare-versions": { + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/compare-versions/-/compare-versions-6.1.1.tgz", + "integrity": "sha512-4hm4VPpIecmlg59CHXnRDnqGplJFrbLG4aFEl5vl6cK1u76ws3LLvX7ikFnTDl5vo39sjWD6AaDPYodJp/NNHg==", + "license": "MIT" + }, "node_modules/component-emitter": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.1.tgz", @@ -12968,10 +12980,11 @@ } }, "node_modules/ignore-walk/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -14794,10 +14807,11 @@ } }, "node_modules/make-fetch-happen/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -17662,9 +17676,9 @@ } }, "node_modules/read-package-json/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -17730,9 +17744,10 @@ } }, "node_modules/readdir-glob/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -19640,9 +19655,9 @@ } }, "node_modules/test-exclude/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -21571,6 +21586,7 @@ "version": "11.1.0", "license": "MIT", "dependencies": { + "compare-versions": "^6.1.1", "testcontainers": "^11.1.0" }, "devDependencies": { diff --git a/packages/modules/kafka/package.json b/packages/modules/kafka/package.json index 9b2b39823..88647bbc4 100644 --- a/packages/modules/kafka/package.json +++ b/packages/modules/kafka/package.json @@ -32,6 +32,7 @@ "kafkajs": "^2.2.4" }, "dependencies": { + "compare-versions": "^6.1.1", "testcontainers": "^11.1.0" } } diff --git a/packages/modules/kafka/src/kafka-container.ts b/packages/modules/kafka/src/kafka-container.ts index 573a00373..0cbfa98b8 100644 --- a/packages/modules/kafka/src/kafka-container.ts +++ b/packages/modules/kafka/src/kafka-container.ts @@ -1,10 +1,10 @@ +import { satisfies } from "compare-versions"; import { AbstractStartedContainer, BoundPorts, Content, GenericContainer, getContainerRuntimeClient, - ImageName, InspectResult, RandomUuid, StartedTestContainer, @@ -66,8 +66,7 @@ export class KafkaContainer extends GenericContainer { constructor(image: string) { super(image); - const parsedImage = ImageName.fromString(image); - if (parsedImage.image === "confluentinc/cp-kafka" && parsedImage.tag.startsWith("8.")) { + if (satisfies(this.imageName.tag, ">=8.0.0")) { this.withKraft(); } @@ -149,7 +148,11 @@ export class KafkaContainer extends GenericContainer { } public override async start(): Promise { - if (this.mode === KafkaMode.KRAFT && this.saslSslConfig && this.isLessThanCP(7, 5)) { + if ( + this.mode === KafkaMode.KRAFT && + this.saslSslConfig && + satisfies(this.imageName.tag, `<${MIN_KRAFT_SASL_VERSION}`) + ) { throw new Error( `Provided Confluent Platform's version ${this.imageName.tag} is not supported in Kraft mode with sasl (must be ${MIN_KRAFT_SASL_VERSION} or above)` ); @@ -166,7 +169,7 @@ export class KafkaContainer extends GenericContainer { // exporting KAFKA_ADVERTISED_LISTENERS with the container hostname command += `export KAFKA_ADVERTISED_LISTENERS=${advertisedListeners}\n`; - if (this.mode !== KafkaMode.KRAFT || this.isLessThanCP(7, 4)) { + if (this.mode !== KafkaMode.KRAFT || satisfies(this.imageName.tag, "<7.4.0")) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; } @@ -174,7 +177,7 @@ export class KafkaContainer extends GenericContainer { if (this.saslSslConfig) { command += this.commandKraftCreateUser(this.saslSslConfig); } - if (this.isLessThanCP(7, 4)) { + if (satisfies(this.imageName.tag, "<7.4.0")) { command += this.commandKraft(); } } else if (this.mode === KafkaMode.EMBEDDED_ZOOKEEPER) { @@ -268,26 +271,13 @@ export class KafkaContainer extends GenericContainer { } private verifyMinKraftVersion() { - if (this.isLessThanCP(7)) { + if (satisfies(this.imageName.tag, `<${MIN_KRAFT_VERSION}`)) { throw new Error( `Provided Confluent Platform's version ${this.imageName.tag} is not supported in Kraft mode (must be ${MIN_KRAFT_VERSION} or above)` ); } } - private isLessThanCP(max: number, min = 0, patch = 0): boolean { - if (this.imageName.tag === "latest") { - return false; - } - const parts = this.imageName.tag.split("."); - return !( - parts.length > 2 && - (Number(parts[0]) > max || - (Number(parts[0]) === max && - (Number(parts[1]) > min || (Number(parts[1]) === min && Number(parts[2]) >= patch)))) - ); - } - private commandKraftCreateUser(saslOptions: SaslSslListenerOptions): string { return ( "echo 'kafka-storage format --ignore-formatted " + From 1005718f60bf4eb0135c7ed3ac1d8ddfa864d637 Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Sat, 5 Jul 2025 16:40:40 +0100 Subject: [PATCH 4/5] Update doc --- docs/modules/kafka.md | 2 +- packages/modules/kafka/src/kafka-container-latest.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index 8ee2088ff..bdadf1f1b 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -13,7 +13,7 @@ npm install @testcontainers/kafka --save-dev ### Examples -[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connect +[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connectKafkaLatest diff --git a/packages/modules/kafka/src/kafka-container-latest.test.ts b/packages/modules/kafka/src/kafka-container-latest.test.ts index d88c6c394..fefc3e186 100644 --- a/packages/modules/kafka/src/kafka-container-latest.test.ts +++ b/packages/modules/kafka/src/kafka-container-latest.test.ts @@ -10,7 +10,7 @@ const IMAGE = getImage(__dirname); describe("KafkaContainer", { timeout: 240_000 }, () => { const certificatesDir = path.resolve(__dirname, "..", "test-certs"); - // connect { + // connectKafkaLatest { it("should connect", async () => { const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); From ea0851c2b3479066f5f3e229af46ab4bbba56f6f Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Sat, 5 Jul 2025 16:45:40 +0100 Subject: [PATCH 5/5] Remove redundant `withExposedPorts` --- .../kafka/src/kafka-container-7.test.ts | 52 ++++++++----------- .../kafka/src/kafka-container-latest.test.ts | 4 +- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/packages/modules/kafka/src/kafka-container-7.test.ts b/packages/modules/kafka/src/kafka-container-7.test.ts index 92cd7698c..e4df2f51a 100644 --- a/packages/modules/kafka/src/kafka-container-7.test.ts +++ b/packages/modules/kafka/src/kafka-container-7.test.ts @@ -9,7 +9,7 @@ const IMAGE = "confluentinc/cp-kafka:7.9.1"; describe("KafkaContainer", { timeout: 240_000 }, () => { // connectBuiltInZK { it("should connect using in-built zoo-keeper", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).start(); await testPubSub(kafkaContainer); @@ -18,7 +18,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { // } it("should connect using in-built zoo-keeper and custom images", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).start(); await testPubSub(kafkaContainer); @@ -28,7 +28,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { it("should connect using in-built zoo-keeper and custom network", async () => { const network = await new Network().start(); - const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).start(); await testPubSub(kafkaContainer); @@ -52,7 +52,6 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { const kafkaContainer = await new KafkaContainer(IMAGE) .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) - .withExposedPorts(9093) .start(); await testPubSub(kafkaContainer); @@ -191,7 +190,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { // connectKraft { it("should connect using kraft", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().start(); await testPubSub(kafkaContainer); @@ -207,11 +206,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { it("should connect using kraft and custom network", async () => { const network = await new Network().start(); - const kafkaContainer = await new KafkaContainer(IMAGE) - .withKraft() - .withNetwork(network) - .withExposedPorts(9093) - .start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withNetwork(network).start(); await testPubSub(kafkaContainer); @@ -220,27 +215,24 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { }); it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => { - const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0") - .withKraft() - .withExposedPorts(9093) - .withSaslSslListener({ - port: 9094, - sasl: { - mechanism: "SCRAM-SHA-512", - user: { - name: "app-user", - password: "userPassword", - }, - }, - keystore: { - content: "fake", - passphrase: "serverKeystorePassword", - }, - truststore: { - content: "fake", - passphrase: "serverTruststorePassword", + const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0").withKraft().withSaslSslListener({ + port: 9094, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", }, - }); + }, + keystore: { + content: "fake", + passphrase: "serverKeystorePassword", + }, + truststore: { + content: "fake", + passphrase: "serverTruststorePassword", + }, + }); await expect(() => kafkaContainer.start()).rejects.toThrow( "Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)" ); diff --git a/packages/modules/kafka/src/kafka-container-latest.test.ts b/packages/modules/kafka/src/kafka-container-latest.test.ts index fefc3e186..96a7fb041 100644 --- a/packages/modules/kafka/src/kafka-container-latest.test.ts +++ b/packages/modules/kafka/src/kafka-container-latest.test.ts @@ -12,7 +12,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { // connectKafkaLatest { it("should connect", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).start(); await testPubSub(kafkaContainer); @@ -22,7 +22,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { it("should connect with custom network", async () => { const network = await new Network().start(); - const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).start(); await testPubSub(kafkaContainer);