From db14774c061c8fe8e0335eaea9f2d6797976b0e2 Mon Sep 17 00:00:00 2001
From: Hai Nguyen <3423575+haiphucnguyen@users.noreply.github.com>
Date: Fri, 27 Jun 2025 18:29:42 -0700
Subject: [PATCH 1/2] Update
---
gradle.properties | 2 +-
gradle/libs.versions.toml | 1 +
modules/kafka/build.gradle | 1 +
.../kafka/EnableKafkaContainer.java | 32 ++++++++++
.../kafka/KafkaContainerExtension.java | 48 +++++++++++++++
.../kafka/KafkaContainerProvider.java | 60 ++++++++++++++++++-
.../ai/OllamaContainerProvider.java | 2 +-
.../testcontainers/ContainerType.java | 5 +-
8 files changed, 147 insertions(+), 4 deletions(-)
create mode 100644 modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java
create mode 100644 modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerExtension.java
diff --git a/gradle.properties b/gradle.properties
index 5e7100a..7fff23a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -2,5 +2,5 @@
# https://docs.gradle.org/current/userguide/build_environment.html#sec:gradle_configuration_properties
org.gradle.configuration-cache=true
-version=0.9.2
+version=0.9.3
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 487a71f..1e00689 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -29,6 +29,7 @@ testcontainers-jdbc = { group = "org.testcontainers", name = "jdbc", version.ref
testcontainers-mysql = { group = "org.testcontainers", name = "mysql", version.ref = "testcontainers" }
testcontainers-postgresql = { group = "org.testcontainers", name = "postgresql", version.ref = "testcontainers" }
testcontainers-ollama = { group = "org.testcontainers", name = "ollama", version.ref = "testcontainers" }
+testcontainers-kafka = { group="org.testcontainers", name ="kafka", version.ref = "testcontainers"}
slf4j-api = { group = "org.slf4j", name = "slf4j-api", version.ref = "slf4j" }
logback-classic = { group = "ch.qos.logback", name = "logback-classic", version.ref = "logback" }
spring-test = { group = "org.springframework", name = "spring-test" }
diff --git a/modules/kafka/build.gradle b/modules/kafka/build.gradle
index 9e5d0e4..da75082 100644
--- a/modules/kafka/build.gradle
+++ b/modules/kafka/build.gradle
@@ -9,6 +9,7 @@ repositories {
dependencies {
api(project(":spring-testcontainers"))
implementation(platform(libs.spring.bom))
+ implementation(libs.testcontainers.kafka)
testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testImplementation(libs.junit.platform.launcher)
diff --git a/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java
new file mode 100644
index 0000000..16d91e4
--- /dev/null
+++ b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java
@@ -0,0 +1,32 @@
+package io.flowinquiry.testcontainers.kafka;
+
+/**
+ * Annotation that enables and configures a Kafka container for integration testing. When applied to
+ * a test class, this annotation triggers the creation and management of a Kafka container instance
+ * that can be used during tests.
+ *
+ *
Example usage:
+ *
+ *
{@code
+ * @EnableKafkaContainer
+ * public class KafkaIntegrationTest {
+ * // Test methods that require Kafka
+ * }
+ * }
+ */
+public @interface EnableKafkaContainer {
+
+ /**
+ * Specifies the version of the Kafka container image to use.
+ *
+ * @return the Kafka version, defaults to "3.9.1"
+ */
+ String version() default "3.9.1";
+
+ /**
+ * Specifies the Docker image name for the Kafka container.
+ *
+ * @return the Docker image name, defaults to "apache/kafka"
+ */
+ String dockerImage() default "apache/kafka";
+}
diff --git a/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerExtension.java b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerExtension.java
new file mode 100644
index 0000000..9219811
--- /dev/null
+++ b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerExtension.java
@@ -0,0 +1,48 @@
+package io.flowinquiry.testcontainers.kafka;
+
+import static io.flowinquiry.testcontainers.ContainerType.KAFKA;
+import static io.flowinquiry.testcontainers.ServiceLoaderContainerFactory.getProvider;
+
+import io.flowinquiry.testcontainers.ContainerLifecycleExtension;
+import io.flowinquiry.testcontainers.SpringAwareContainerProvider;
+import org.testcontainers.containers.GenericContainer;
+
+/**
+ * JUnit Jupiter extension that manages the lifecycle of Kafka containers for integration tests.
+ * This extension works in conjunction with the {@link EnableKafkaContainer} annotation to
+ * automatically start and stop Kafka containers before and after test execution.
+ *
+ * The extension is automatically registered when a test class is annotated with {@link
+ * EnableKafkaContainer}. It handles container initialization, startup, and shutdown, making the
+ * Kafka instance available during test execution.
+ */
+public class KafkaContainerExtension extends ContainerLifecycleExtension {
+
+ /**
+ * Resolves the {@link EnableKafkaContainer} annotation from the test class.
+ *
+ * @param testClass the test class to examine for the annotation
+ * @return the resolved annotation instance, or null if not present
+ */
+ @Override
+ protected EnableKafkaContainer getResolvedAnnotation(Class> testClass) {
+ return testClass.getAnnotation(EnableKafkaContainer.class);
+ }
+
+ /**
+ * Initializes a Kafka container provider based on the configuration in the annotation. This
+ * method locates the appropriate provider for Kafka containers and initializes it with the
+ * settings from the annotation.
+ *
+ * @param annotation the annotation containing Kafka container configuration
+ * @return a configured Kafka container provider
+ */
+ @Override
+ protected SpringAwareContainerProvider>
+ initProvider(EnableKafkaContainer annotation) {
+ return getProvider(
+ annotation,
+ p -> p.getContainerType() == KAFKA,
+ (prov, ann) -> prov.initContainerInstance(ann));
+ }
+}
diff --git a/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerProvider.java b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerProvider.java
index d2e16ce..5c2e178 100644
--- a/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerProvider.java
+++ b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/KafkaContainerProvider.java
@@ -1,3 +1,61 @@
package io.flowinquiry.testcontainers.kafka;
-public class KafkaContainerProvider {}
+import static io.flowinquiry.testcontainers.ContainerType.KAFKA;
+
+import io.flowinquiry.testcontainers.ContainerType;
+import io.flowinquiry.testcontainers.SpringAwareContainerProvider;
+import java.util.Properties;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.PropertiesPropertySource;
+import org.testcontainers.kafka.KafkaContainer;
+
+/**
+ * Provider for Kafka containers that integrates with Spring test environments. This class manages
+ * the creation, configuration, and lifecycle of Kafka containers for integration testing with
+ * Spring applications.
+ *
+ * The provider is automatically discovered through Java's ServiceLoader mechanism and is used by
+ * the {@link KafkaContainerExtension} when a test class is annotated with {@link
+ * EnableKafkaContainer}.
+ */
+public class KafkaContainerProvider
+ extends SpringAwareContainerProvider {
+
+ /**
+ * Creates and configures a Kafka container instance. The container is configured with the Docker
+ * image and version specified in the {@link EnableKafkaContainer} annotation.
+ *
+ * @return a configured Kafka container instance
+ */
+ @Override
+ protected KafkaContainer createContainer() {
+ return new KafkaContainer(dockerImage + ":" + version);
+ }
+
+ /**
+ * Returns the type of container managed by this provider.
+ *
+ * @return the KAFKA container type
+ */
+ @Override
+ public ContainerType getContainerType() {
+ return KAFKA;
+ }
+
+ /**
+ * Applies Kafka-specific configuration to the Spring environment. This method sets the bootstrap
+ * servers property in the Spring environment, allowing Spring Kafka clients to automatically
+ * connect to the test container.
+ *
+ * @param environment the Spring environment to configure
+ */
+ @Override
+ public void applyTo(ConfigurableEnvironment environment) {
+ Properties props = new Properties();
+ props.put("spring.kafka.bootstrap-servers", container.getBootstrapServers());
+
+ environment
+ .getPropertySources()
+ .addFirst(new PropertiesPropertySource("testcontainers.kafka", props));
+ }
+}
diff --git a/modules/ollama/src/main/java/io/flowinquiry/testcontainers/ai/OllamaContainerProvider.java b/modules/ollama/src/main/java/io/flowinquiry/testcontainers/ai/OllamaContainerProvider.java
index 65b999c..f2b9f9c 100644
--- a/modules/ollama/src/main/java/io/flowinquiry/testcontainers/ai/OllamaContainerProvider.java
+++ b/modules/ollama/src/main/java/io/flowinquiry/testcontainers/ai/OllamaContainerProvider.java
@@ -110,6 +110,6 @@ public void applyTo(ConfigurableEnvironment environment) {
environment
.getPropertySources()
- .addFirst(new PropertiesPropertySource("testcontainers", props));
+ .addFirst(new PropertiesPropertySource("testcontainers.ollama", props));
}
}
diff --git a/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerType.java b/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerType.java
index 7ff56f9..e0cc872 100644
--- a/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerType.java
+++ b/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerType.java
@@ -18,5 +18,8 @@ public enum ContainerType {
MYSQL,
/** Ollama Container */
- OLLAMA;
+ OLLAMA,
+
+ /** Kafka Container */
+ KAFKA;
}
From 229c681c19e3a001b4bc7b7a3aae06c02372b19a Mon Sep 17 00:00:00 2001
From: Hai Nguyen <3423575+haiphucnguyen@users.noreply.github.com>
Date: Fri, 27 Jun 2025 20:49:25 -0700
Subject: [PATCH 2/2] Update
---
README.md | 27 +++++--
examples/springboot-kafka/build.gradle.kts | 34 ++++++++
.../examples/kafka/KafkaDemoApp.java | 18 +++++
.../examples/kafka/config/KafkaConfig.java | 23 ++++++
.../kafka/consumer/MessageConsumer.java | 60 +++++++++++++++
.../examples/kafka/model/Message.java | 39 ++++++++++
.../kafka/producer/MessageProducer.java | 47 +++++++++++
.../src/main/resources/application.yml | 10 +++
.../examples/kafka/KafkaDemoAppTest.java | 58 ++++++++++++++
gradle/libs.versions.toml | 2 +-
.../jdbc/JdbcContainerExtension.java | 77 +++++++++++++++++++
modules/kafka/build.gradle | 3 -
.../kafka/EnableKafkaContainer.java | 13 ++++
modules/mysql/build.gradle.kts | 3 -
modules/ollama/build.gradle.kts | 3 -
settings.gradle.kts | 1 +
.../ContainerContextCustomizerFactory.java | 12 ++-
17 files changed, 414 insertions(+), 16 deletions(-)
create mode 100644 examples/springboot-kafka/build.gradle.kts
create mode 100644 examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoApp.java
create mode 100644 examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/config/KafkaConfig.java
create mode 100644 examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/consumer/MessageConsumer.java
create mode 100644 examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/model/Message.java
create mode 100644 examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/producer/MessageProducer.java
create mode 100644 examples/springboot-kafka/src/main/resources/application.yml
create mode 100644 examples/springboot-kafka/src/test/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoAppTest.java
diff --git a/README.md b/README.md
index 3bf22a5..8bd95e5 100644
--- a/README.md
+++ b/README.md
@@ -24,11 +24,12 @@ Spring-TestContainers provides out-of-the-box support for the following containe
Spring-TestContainers provides out-of-the-box support for the following containers. You can enable each one via a dedicated annotation in your test classes:
-| Container | Annotation | Example Usage | Notes |
-|----------------|-----------------------------------|-----------------------------------------------------|--------------------------------|
-| **PostgreSQL** | `@EnablePostgresContainer` | `@EnablePostgresContainer(version = "15")` | Uses `PostgreSQLContainer` |
-| **MySQL** | `@EnableMySQLContainer` | `@EnableMySQLContainer(version = "8")` | Uses `MySQLContainer` |
-| **Ollama (AI)**| `@EnableOllamaContainer` | `@EnableOllamaContainer(model = "llama2")` | Starts Ollama with auto-pull |
+| Container | Annotation | Example Usage | Notes |
+|------------|----------------------------|--------------------------------------------|------------------------------|
+| **PostgreSQL** | `@EnablePostgresContainer` | `@EnablePostgresContainer(version = "15")` | Uses `PostgreSQLContainer` |
+| **MySQL** | `@EnableMySQLContainer` | `@EnableMySQLContainer(version = "8")` | Uses `MySQLContainer` |
+| **Kafka** | `@EnableKafkaContainer` | `@EnableKafkaContainer(version = "3.9.1")` | Use `KafkaContainer` |
+| **Ollama (AI)** | `@EnableOllamaContainer` | `@EnableOllamaContainer(model = "llama2")` | Starts Ollama with auto-pull |
## Comparison: TestContainers with Spring vs Spring-TestContainers
@@ -152,6 +153,7 @@ Add the core library along with the database module(s) you plan to use. Each dat
// Add one or more of the following database modules
testImplementation("io.flowinquiry.testcontainers:postgresql:") // PostgreSQL support
testImplementation("io.flowinquiry.testcontainers:mysql:") // MySQL support
+testImplementation("io.flowinquiry.testcontainers:kafka:")
testImplementation("io.flowinquiry.testcontainers:ollama:") // Ollama support
```
@@ -178,6 +180,14 @@ testImplementation("io.flowinquiry.testcontainers:ollama:
+
+ io.flowinquiry.testcontainers
+ kafka
+
+ test
+
+
io.flowinquiry.testcontainers
@@ -228,6 +238,7 @@ Currently, the following containers are supported:
- PostgreSQL
- MySQL
+- Kafka
- Ollama
## Documentation
@@ -253,6 +264,12 @@ The project includes several example modules demonstrating how to use Spring-Tes
These examples provide a good starting point for integrating Spring-TestContainers into your own projects.
+### [springboot-kafka](examples/springboot-kafka)
+
+* Spring Boot applications having Kafka producer and consumer
+
+* Show how to integrate kafka container with minimal configuration
+
### [springboot-ollama](examples/springboot-ollama)
* Spring Boot applications using Spring AI and Ollama
diff --git a/examples/springboot-kafka/build.gradle.kts b/examples/springboot-kafka/build.gradle.kts
new file mode 100644
index 0000000..5ef8755
--- /dev/null
+++ b/examples/springboot-kafka/build.gradle.kts
@@ -0,0 +1,34 @@
+plugins {
+ id("buildlogic.java-application-conventions")
+ alias(libs.plugins.spring.dependency.management)
+}
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ implementation(project(":spring-testcontainers"))
+ implementation(project(":modules:kafka"))
+ implementation(platform(libs.spring.bom))
+ implementation(platform(libs.spring.boot.bom))
+ implementation(libs.slf4j.api)
+ implementation(libs.logback.classic)
+ implementation(libs.spring.boot.starter)
+ implementation("org.springframework.kafka:spring-kafka")
+ implementation(libs.spring.boot.autoconfigure)
+ implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
+ testImplementation(platform(libs.junit.bom))
+ testImplementation(libs.junit.jupiter)
+ testImplementation(libs.junit.platform.launcher)
+ testImplementation(libs.spring.boot.starter.test)
+ testImplementation("org.springframework.kafka:spring-kafka-test")
+}
+
+tasks.test {
+ useJUnitPlatform()
+}
+
+application {
+ mainClass.set("io.flowinquiry.testcontainers.examples.kafka.KafkaDemoApp")
+}
diff --git a/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoApp.java b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoApp.java
new file mode 100644
index 0000000..85e8895
--- /dev/null
+++ b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoApp.java
@@ -0,0 +1,18 @@
+package io.flowinquiry.testcontainers.examples.kafka;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.kafka.annotation.EnableKafka;
+
+/**
+ * Spring Boot application for the Kafka demo. This application demonstrates how to use Kafka with
+ * Spring Boot and Testcontainers.
+ */
+@SpringBootApplication
+@EnableKafka
+public class KafkaDemoApp {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaDemoApp.class, args);
+ }
+}
diff --git a/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/config/KafkaConfig.java b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/config/KafkaConfig.java
new file mode 100644
index 0000000..f1a027a
--- /dev/null
+++ b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/config/KafkaConfig.java
@@ -0,0 +1,23 @@
+package io.flowinquiry.testcontainers.examples.kafka.config;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.TopicBuilder;
+
+/** Configuration class for Kafka. Defines the Kafka topic and other Kafka-related beans. */
+@Configuration
+public class KafkaConfig {
+
+ public static final String TOPIC_NAME = "test-topic";
+
+ /**
+ * Creates a Kafka topic.
+ *
+ * @return the Kafka topic
+ */
+ @Bean
+ public NewTopic testTopic() {
+ return TopicBuilder.name(TOPIC_NAME).partitions(1).replicas(1).build();
+ }
+}
diff --git a/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/consumer/MessageConsumer.java b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/consumer/MessageConsumer.java
new file mode 100644
index 0000000..d291d79
--- /dev/null
+++ b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/consumer/MessageConsumer.java
@@ -0,0 +1,60 @@
+package io.flowinquiry.testcontainers.examples.kafka.consumer;
+
+import io.flowinquiry.testcontainers.examples.kafka.config.KafkaConfig;
+import io.flowinquiry.testcontainers.examples.kafka.model.Message;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+/** Service for consuming messages from Kafka. */
+@Service
+public class MessageConsumer {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
+
+ private final List receivedMessages = new ArrayList<>();
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ /**
+ * Receives messages from the Kafka topic.
+ *
+ * @param message the received message
+ */
+ @KafkaListener(topics = KafkaConfig.TOPIC_NAME, groupId = "test-group")
+ public void receive(Message message) {
+ log.info("Received message: {}", message);
+ receivedMessages.add(message);
+ latch.countDown();
+ }
+
+ /**
+ * Gets the list of received messages.
+ *
+ * @return the list of received messages
+ */
+ public List getReceivedMessages() {
+ return new ArrayList<>(receivedMessages);
+ }
+
+ /**
+ * Gets the latch that counts down when a message is received.
+ *
+ * @return the latch
+ */
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ /**
+ * Resets the latch to the specified count.
+ *
+ * @param count the count to reset the latch to
+ */
+ public void resetLatch(int count) {
+ latch = new CountDownLatch(count);
+ }
+}
diff --git a/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/model/Message.java b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/model/Message.java
new file mode 100644
index 0000000..2ba73bc
--- /dev/null
+++ b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/model/Message.java
@@ -0,0 +1,39 @@
+package io.flowinquiry.testcontainers.examples.kafka.model;
+
+import java.time.LocalDateTime;
+
+/** A simple message model class for Kafka messages. */
+public class Message {
+
+ private String content;
+ private LocalDateTime timestamp;
+
+ // Default constructor required for JSON deserialization
+ public Message() {}
+
+ public Message(String content) {
+ this.content = content;
+ this.timestamp = LocalDateTime.now();
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ }
+
+ public LocalDateTime getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(LocalDateTime timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "Message{" + "content='" + content + '\'' + ", timestamp=" + timestamp + '}';
+ }
+}
diff --git a/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/producer/MessageProducer.java b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/producer/MessageProducer.java
new file mode 100644
index 0000000..bf0f578
--- /dev/null
+++ b/examples/springboot-kafka/src/main/java/io/flowinquiry/testcontainers/examples/kafka/producer/MessageProducer.java
@@ -0,0 +1,47 @@
+package io.flowinquiry.testcontainers.examples.kafka.producer;
+
+import io.flowinquiry.testcontainers.examples.kafka.config.KafkaConfig;
+import io.flowinquiry.testcontainers.examples.kafka.model.Message;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+
+/** Service for producing messages to Kafka. */
+@Service
+public class MessageProducer {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);
+
+ private final KafkaTemplate kafkaTemplate;
+
+ public MessageProducer(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ /**
+ * Sends a message to the Kafka topic.
+ *
+ * @param message the message to send
+ * @return a CompletableFuture that will be completed when the send operation completes
+ */
+ public CompletableFuture> sendMessage(Message message) {
+ log.info("Sending message: {}", message);
+ CompletableFuture> future =
+ kafkaTemplate.send(KafkaConfig.TOPIC_NAME, message);
+
+ future.whenComplete(
+ (result, ex) -> {
+ if (ex == null) {
+ log.info("Message sent successfully: {}", message);
+ log.info("Offset: {}", result.getRecordMetadata().offset());
+ } else {
+ log.error("Failed to send message: {}", message, ex);
+ }
+ });
+
+ return future;
+ }
+}
diff --git a/examples/springboot-kafka/src/main/resources/application.yml b/examples/springboot-kafka/src/main/resources/application.yml
new file mode 100644
index 0000000..f82447b
--- /dev/null
+++ b/examples/springboot-kafka/src/main/resources/application.yml
@@ -0,0 +1,10 @@
+spring:
+ kafka:
+ producer:
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ consumer:
+ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ properties:
+ spring.json.trusted.packages: io.flowinquiry.testcontainers.examples.kafka.model
\ No newline at end of file
diff --git a/examples/springboot-kafka/src/test/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoAppTest.java b/examples/springboot-kafka/src/test/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoAppTest.java
new file mode 100644
index 0000000..69651dc
--- /dev/null
+++ b/examples/springboot-kafka/src/test/java/io/flowinquiry/testcontainers/examples/kafka/KafkaDemoAppTest.java
@@ -0,0 +1,58 @@
+package io.flowinquiry.testcontainers.examples.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.flowinquiry.testcontainers.examples.kafka.consumer.MessageConsumer;
+import io.flowinquiry.testcontainers.examples.kafka.model.Message;
+import io.flowinquiry.testcontainers.examples.kafka.producer.MessageProducer;
+import io.flowinquiry.testcontainers.kafka.EnableKafkaContainer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+@EnableKafkaContainer
+public class KafkaDemoAppTest {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaDemoAppTest.class);
+
+ @Autowired private MessageProducer producer;
+ @Autowired private MessageConsumer consumer;
+
+ @Test
+ public void testKafkaMessaging() throws Exception {
+ // Create a test message
+ Message message = new Message("Test message content");
+
+ // Send the message
+ log.info("Sending test message");
+ producer.sendMessage(message);
+
+ // Wait for the consumer to receive the message
+ log.info("Waiting for consumer to receive the message");
+ boolean messageReceived = consumer.getLatch().await(10, TimeUnit.SECONDS);
+
+ // Verify that the message was received
+ assertTrue(messageReceived, "Message should be received within timeout");
+
+ // Get the received messages
+ List receivedMessages = consumer.getReceivedMessages();
+
+ // Verify that exactly one message was received
+ assertEquals(1, receivedMessages.size(), "Should receive exactly one message");
+
+ // Verify the content of the received message
+ Message receivedMessage = receivedMessages.get(0);
+ assertNotNull(receivedMessage, "Received message should not be null");
+ assertEquals(
+ "Test message content", receivedMessage.getContent(), "Message content should match");
+
+ log.info("Test completed successfully");
+ }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 1e00689..dc620aa 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -8,7 +8,7 @@ spotless = "7.0.3"
logback = "1.5.18"
slf4j = "2.0.17"
spring = "6.2.7"
-spring-boot = "3.5.0"
+spring-boot = "3.5.3"
spring-dependency-management="1.1.7"
postgresql = "42.7.2"
mysql="8.0.33"
diff --git a/modules/jdbc/src/main/java/io/flowinquiry/testcontainers/jdbc/JdbcContainerExtension.java b/modules/jdbc/src/main/java/io/flowinquiry/testcontainers/jdbc/JdbcContainerExtension.java
index 87b9be6..8e16bfe 100644
--- a/modules/jdbc/src/main/java/io/flowinquiry/testcontainers/jdbc/JdbcContainerExtension.java
+++ b/modules/jdbc/src/main/java/io/flowinquiry/testcontainers/jdbc/JdbcContainerExtension.java
@@ -10,8 +10,46 @@
import java.util.Set;
import org.testcontainers.containers.GenericContainer;
+/**
+ * JUnit Jupiter extension that manages the lifecycle of JDBC database containers.
+ *
+ * This extension is responsible for detecting and processing database-specific annotations (like
+ * {@code @EnablePostgreSQL}, {@code @EnableMySQL}, etc.) that are meta-annotated with {@link
+ * EnableJdbcContainer}. It handles the creation, initialization, and cleanup of database containers
+ * for integration tests.
+ *
+ *
The extension works by:
+ *
+ *
+ * - Detecting database-specific annotations on test classes
+ *
- Resolving the actual {@link EnableJdbcContainer} configuration from these annotations
+ *
- Initializing the appropriate container provider based on the database type
+ *
- Managing the container lifecycle through the JUnit Jupiter extension mechanism
+ *
+ *
+ * This extension prevents direct use of {@link EnableJdbcContainer} and enforces the use of
+ * database-specific annotations instead.
+ *
+ * @see EnableJdbcContainer
+ * @see ContainerLifecycleExtension
+ * @see SpringAwareContainerProvider
+ */
public class JdbcContainerExtension extends ContainerLifecycleExtension {
+ /**
+ * Resolves the {@link EnableJdbcContainer} annotation from the test class.
+ *
+ * This method prevents direct use of {@link EnableJdbcContainer} and instead looks for
+ * database-specific annotations (like {@code @EnablePostgreSQL}, {@code @EnableMySQL}, etc.) that
+ * are meta-annotated with {@link EnableJdbcContainer}.
+ *
+ *
If a direct {@link EnableJdbcContainer} annotation is found, an exception is thrown.
+ *
+ * @param testClass the test class to examine for database-specific annotations
+ * @return the resolved {@link EnableJdbcContainer} configuration, or null if no relevant
+ * annotation is found
+ * @throws IllegalStateException if {@link EnableJdbcContainer} is used directly on the test class
+ */
@Override
protected EnableJdbcContainer getResolvedAnnotation(Class> testClass) {
if (testClass.isAnnotationPresent(EnableJdbcContainer.class)) {
@@ -35,6 +73,16 @@ protected EnableJdbcContainer getResolvedAnnotation(Class> testClass) {
return null;
}
+ /**
+ * Initializes a container provider based on the configuration in the annotation.
+ *
+ *
This method uses the {@link ServiceLoaderContainerFactory} to locate and initialize the
+ * appropriate container provider for the specified database type. The provider is selected based
+ * on the {@link ContainerType} specified in the {@link EnableJdbcContainer} annotation.
+ *
+ * @param enableJdbcContainer the annotation containing the database container configuration
+ * @return a configured container provider for the specified database type
+ */
@Override
protected SpringAwareContainerProvider>
initProvider(EnableJdbcContainer enableJdbcContainer) {
@@ -44,6 +92,18 @@ protected EnableJdbcContainer getResolvedAnnotation(Class> testClass) {
(prov, ann) -> prov.initContainerInstance(ann));
}
+ /**
+ * Recursively searches for an annotation that is meta-annotated with the target annotation.
+ *
+ * This method implements a depth-first search through the annotation hierarchy to find the
+ * nearest annotation that is meta-annotated with the target annotation. It uses a visited set to
+ * prevent infinite recursion in case of circular annotation references.
+ *
+ * @param candidate the annotation to check
+ * @param target the target meta-annotation to look for
+ * @param visited a set of already visited annotation types to prevent infinite recursion
+ * @return the found annotation that is meta-annotated with the target, or null if none is found
+ */
private Annotation findNearestAnnotationWith(
Annotation candidate, Class extends Annotation> target, Set> visited) {
Class extends Annotation> candidateType = candidate.annotationType();
@@ -65,6 +125,23 @@ private Annotation findNearestAnnotationWith(
return null;
}
+ /**
+ * Builds a resolved {@link EnableJdbcContainer} configuration from a source annotation.
+ *
+ * This method extracts configuration values from a database-specific annotation (like
+ * {@code @EnablePostgreSQL} or {@code @EnableMySQL}) and combines them with the meta-annotation
+ * information to create a complete {@link EnableJdbcContainer} configuration.
+ *
+ *
The method uses reflection to extract the 'version' and 'dockerImage' attributes from the
+ * source annotation and combines them with the database type from the meta-annotation.
+ *
+ * @param sourceAnnotation the database-specific annotation from the test class
+ * @param meta the {@link EnableJdbcContainer} meta-annotation from the database-specific
+ * annotation
+ * @return a resolved {@link EnableJdbcContainer} configuration
+ * @throws IllegalStateException if the required attributes cannot be extracted from the source
+ * annotation
+ */
private EnableJdbcContainer buildResolvedJdbcConfig(
Annotation sourceAnnotation, EnableJdbcContainer meta) {
try {
diff --git a/modules/kafka/build.gradle b/modules/kafka/build.gradle
index da75082..2a09f83 100644
--- a/modules/kafka/build.gradle
+++ b/modules/kafka/build.gradle
@@ -10,9 +10,6 @@ dependencies {
api(project(":spring-testcontainers"))
implementation(platform(libs.spring.bom))
implementation(libs.testcontainers.kafka)
- testImplementation(platform(libs.junit.bom))
- testImplementation(libs.junit.jupiter)
- testImplementation(libs.junit.platform.launcher)
}
test {
diff --git a/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java
index 16d91e4..a356a01 100644
--- a/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java
+++ b/modules/kafka/src/main/java/io/flowinquiry/testcontainers/kafka/EnableKafkaContainer.java
@@ -1,5 +1,14 @@
package io.flowinquiry.testcontainers.kafka;
+import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import org.junit.jupiter.api.extension.ExtendWith;
+
/**
* Annotation that enables and configures a Kafka container for integration testing. When applied to
* a test class, this annotation triggers the creation and management of a Kafka container instance
@@ -14,6 +23,10 @@
* }
* }
*/
+@Target({ANNOTATION_TYPE, TYPE})
+@Retention(RUNTIME)
+@Documented
+@ExtendWith(KafkaContainerExtension.class)
public @interface EnableKafkaContainer {
/**
diff --git a/modules/mysql/build.gradle.kts b/modules/mysql/build.gradle.kts
index 2fc47ce..7e3e9fb 100644
--- a/modules/mysql/build.gradle.kts
+++ b/modules/mysql/build.gradle.kts
@@ -11,9 +11,6 @@ dependencies {
implementation(platform(libs.spring.bom))
implementation(libs.testcontainers.jdbc)
implementation(libs.testcontainers.mysql)
- testImplementation(platform(libs.junit.bom))
- testImplementation(libs.junit.jupiter)
- testImplementation(libs.junit.platform.launcher)
}
tasks.test {
diff --git a/modules/ollama/build.gradle.kts b/modules/ollama/build.gradle.kts
index 4d7c05d..5cb6d08 100644
--- a/modules/ollama/build.gradle.kts
+++ b/modules/ollama/build.gradle.kts
@@ -11,9 +11,6 @@ dependencies {
implementation(platform(libs.spring.bom))
implementation(libs.testcontainers.ollama)
implementation(libs.spring.context)
- testImplementation(platform(libs.junit.bom))
- testImplementation(libs.junit.jupiter)
- testImplementation(libs.junit.platform.launcher)
}
tasks.test {
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 9c111b8..36581c5 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -34,5 +34,6 @@ include("examples:springboot-postgresql")
include("examples:spring-postgresql")
include("examples:springboot-mysql")
include("examples:springboot-ollama")
+include("examples:springboot-kafka")
include("modules:kafka")
include("modules:jdbc")
\ No newline at end of file
diff --git a/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerContextCustomizerFactory.java b/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerContextCustomizerFactory.java
index 177da9a..f1ae908 100644
--- a/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerContextCustomizerFactory.java
+++ b/spring-testcontainers/src/main/java/io/flowinquiry/testcontainers/ContainerContextCustomizerFactory.java
@@ -1,6 +1,8 @@
package io.flowinquiry.testcontainers;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfigurationAttributes;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.ContextCustomizerFactory;
@@ -19,6 +21,9 @@
*/
public class ContainerContextCustomizerFactory implements ContextCustomizerFactory {
+ private static final Logger log =
+ LoggerFactory.getLogger(ContainerContextCustomizerFactory.class);
+
/**
* Creates a context customizer for the specified test class.
*
@@ -36,7 +41,12 @@ public ContextCustomizer createContextCustomizer(
Class> testClass, List configAttributes) {
return (context, mergedConfig) -> {
SpringAwareContainerProvider, ?> provider = ContainerRegistry.get(testClass);
- provider.applyTo(context.getEnvironment());
+ if (provider != null) {
+ provider.applyTo(context.getEnvironment());
+ } else {
+ throw new IllegalStateException(
+ "Can not file the associate provider for test class " + testClass.getName());
+ }
};
}
}