diff --git a/.gitignore b/.gitignore
index 6a39895e..22414fa0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,10 @@ dump.rdb
.apt_generated
artifacts
.vscode
+
+# Gradle
+.gradle/
+build/
+!gradle/wrapper/gradle-wrapper.jar
+!**/src/main/**/build/
+!**/src/test/**/build/
diff --git a/batch-producer-consumer/build.gradle.kts b/batch-producer-consumer/build.gradle.kts
new file mode 100644
index 00000000..1d9b041e
--- /dev/null
+++ b/batch-producer-consumer/build.gradle.kts
@@ -0,0 +1,12 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.kafka)
+ testImplementation(libs.spring.cloud.stream.test.binder)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/batch-producer-consumer/pom.xml b/batch-producer-consumer/pom.xml
index 06517775..c945c9c2 100644
--- a/batch-producer-consumer/pom.xml
+++ b/batch-producer-consumer/pom.xml
@@ -16,7 +16,6 @@
Sample app using stream functions with batch producer and consumer using kafka
17
- 2023.0.0-SNAPSHOT
@@ -48,18 +47,6 @@
test
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
@@ -79,69 +66,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
-
diff --git a/build.gradle.kts b/build.gradle.kts
new file mode 100644
index 00000000..32b58a70
--- /dev/null
+++ b/build.gradle.kts
@@ -0,0 +1,66 @@
+plugins {
+ alias(libs.plugins.spring.boot) apply false
+ alias(libs.plugins.spring.dependency.management) apply false
+ alias(libs.plugins.kotlin.jvm) apply false
+ alias(libs.plugins.kotlin.spring) apply false
+}
+
+val aggregatorModules = setOf(
+ "confluent-schema-registry-integration",
+ "function-based-stream-app-samples",
+ "image-thumbnail-samples",
+ "image-thumbnail-stream-sample",
+ "couchbase-stream-applications",
+ "kafka-e2e-kotlin-sample",
+ "kafka-security-samples",
+ "kafka-streams-samples",
+ "kinesis-samples",
+ "multi-functions-samples",
+ "multi-binder-samples",
+ "partitioning-samples",
+ "kafka-partitioning",
+ "rabbit-partitioning",
+ "routing-samples",
+ "spring-cloud-stream-schema-registry-integration",
+ "testing-samples",
+ "transaction-kafka-samples",
+)
+
+allprojects {
+ group = "io.spring.cloud.stream.sample"
+ version = "0.0.1-SNAPSHOT"
+}
+
+subprojects {
+ if (name in aggregatorModules) return@subprojects
+
+ apply(plugin = "java")
+ apply(plugin = "io.spring.dependency-management")
+
+ val catalog = rootProject.the().named("libs")
+
+ the().apply {
+ imports {
+ mavenBom("org.springframework.boot:spring-boot-dependencies:${catalog.findVersion("spring-boot").get()}")
+ mavenBom(catalog.findLibrary("spring-cloud-dependencies").get().get().toString())
+ }
+ }
+
+ configure {
+ toolchain {
+ languageVersion = JavaLanguageVersion.of(17)
+ }
+ }
+
+ dependencies {
+ "implementation"(catalog.findLibrary("spring-boot-starter").get())
+ "implementation"(catalog.findLibrary("spring-boot-starter-actuator").get())
+ "implementation"(catalog.findLibrary("spring-boot-starter-webmvc").get())
+ "implementation"(catalog.findLibrary("spring-boot-jackson2").get())
+ "testImplementation"(catalog.findLibrary("spring-boot-starter-test").get())
+ }
+
+ tasks.withType {
+ useJUnitPlatform()
+ }
+}
diff --git a/confluent-schema-registry-integration/build.gradle.kts b/confluent-schema-registry-integration/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/confluent-schema-registry-integration/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/confluent-schema-registry-integration/confluent-schema-registry-integration-consumer/build.gradle.kts b/confluent-schema-registry-integration/confluent-schema-registry-integration-consumer/build.gradle.kts
new file mode 100644
index 00000000..12c31319
--- /dev/null
+++ b/confluent-schema-registry-integration/confluent-schema-registry-integration-consumer/build.gradle.kts
@@ -0,0 +1,22 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.avro)
+ implementation(libs.confluent.kafka.avro.serializer) {
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.schema.registry.client)
+}
diff --git a/confluent-schema-registry-integration/confluent-schema-registry-integration-producer1/build.gradle.kts b/confluent-schema-registry-integration/confluent-schema-registry-integration-producer1/build.gradle.kts
new file mode 100644
index 00000000..12c31319
--- /dev/null
+++ b/confluent-schema-registry-integration/confluent-schema-registry-integration-producer1/build.gradle.kts
@@ -0,0 +1,22 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.avro)
+ implementation(libs.confluent.kafka.avro.serializer) {
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.schema.registry.client)
+}
diff --git a/confluent-schema-registry-integration/confluent-schema-registry-integration-producer2/build.gradle.kts b/confluent-schema-registry-integration/confluent-schema-registry-integration-producer2/build.gradle.kts
new file mode 100644
index 00000000..12c31319
--- /dev/null
+++ b/confluent-schema-registry-integration/confluent-schema-registry-integration-producer2/build.gradle.kts
@@ -0,0 +1,22 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.avro)
+ implementation(libs.confluent.kafka.avro.serializer) {
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.schema.registry.client)
+}
diff --git a/confluent-schema-registry-integration/pom.xml b/confluent-schema-registry-integration/pom.xml
index 02148987..dcbc120f 100644
--- a/confluent-schema-registry-integration/pom.xml
+++ b/confluent-schema-registry-integration/pom.xml
@@ -55,7 +55,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
org.springframework.cloud
@@ -88,68 +88,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
diff --git a/function-based-stream-app-samples/build.gradle.kts b/function-based-stream-app-samples/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/function-based-stream-app-samples/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/build.gradle.kts b/function-based-stream-app-samples/couchbase-stream-applications/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/function-based-stream-app-samples/couchbase-stream-applications/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/build.gradle.kts b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/build.gradle.kts
new file mode 100644
index 00000000..5e1d55d4
--- /dev/null
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/build.gradle.kts
@@ -0,0 +1,26 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+tasks.named("bootJar") {
+ enabled = false
+}
+
+tasks.named("jar") {
+ enabled = true
+}
+
+dependencies {
+ implementation(libs.couchbase.java.client)
+ compileOnly(libs.spring.boot.configuration.processor)
+ annotationProcessor(libs.spring.boot.configuration.processor)
+ implementation(libs.spring.cloud.fn.config.common)
+ testImplementation(libs.reactor.test)
+ testImplementation(libs.testcontainers.couchbase)
+ testImplementation(libs.testcontainers.junit.jupiter)
+}
+
+// Tests require running Couchbase container; Maven also skips tests for this module.
+tasks.named("compileTestJava") { enabled = false }
+tasks.named("processTestResources") { enabled = false }
+tasks.withType { enabled = false }
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
index 4a60fd53..e8287bd9 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
@@ -42,14 +42,27 @@
org.testcontainers
- couchbase
+ testcontainers-couchbase
test
org.testcontainers
- junit-jupiter
+ testcontainers-junit-jupiter
test
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java
index 8bae6e9b..b14b4d8e 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java
@@ -16,7 +16,7 @@
package io.spring.example.couchbase.consumer;
-import javax.validation.constraints.NotNull;
+import jakarta.validation.constraints.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.expression.Expression;
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
index 18fb618b..ac3790c9 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
@@ -21,7 +21,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import javax.annotation.PreDestroy;
+import jakarta.annotation.PreDestroy;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc
index 8ab130fe..5c3cb03c 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc
@@ -4,19 +4,4 @@ The **$$couchbase$$** $$sink$$ has the following options:
//tag::configuration-properties[]
-Properties grouped by prefix:
-
-
-=== couchbase.consumer
-
-$$bucket-expression$$:: $$A SpEL expression to specify the bucket.$$ *($$Expression$$, default: `$$$$`)*
-$$collection-expression$$:: $$A SpEL expression to specify the collection.$$ *($$Expression$$, default: `$$$$`)*
-$$key-expression$$:: $$A SpEL expression to specify the key.$$ *($$Expression$$, default: `$$$$`)*
-$$value-expression$$:: $$A SpEL expression to specify the value (default is payload).$$ *($$Expression$$, default: `$$$$`)*
-
-=== spring.couchbase
-
-$$connection-string$$:: $$Connection string used to locate the Couchbase cluster.$$ *($$String$$, default: `$$$$`)*
-$$password$$:: $$Cluster password.$$ *($$String$$, default: `$$$$`)*
-$$username$$:: $$Cluster username.$$ *($$String$$, default: `$$$$`)*
//end::configuration-properties[]
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/build.gradle.kts b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/build.gradle.kts
new file mode 100644
index 00000000..4c9a12e6
--- /dev/null
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/build.gradle.kts
@@ -0,0 +1,19 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(project(":function-based-stream-app-samples:couchbase-stream-applications:couchbase-consumer"))
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ testImplementation(libs.spring.cloud.stream.test.binder)
+ testImplementation(libs.testcontainers.couchbase)
+ testImplementation(libs.testcontainers.kafka)
+ testImplementation(libs.testcontainers.junit.jupiter)
+ testImplementation(libs.spring.cloud.stream.apps.test.support)
+}
+
+// Tests require running Couchbase container; Maven also skips tests for this module.
+tasks.named("compileTestJava") { enabled = false }
+tasks.named("processTestResources") { enabled = false }
+tasks.withType { enabled = false }
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
index b8f09b4d..b3629aec 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
@@ -39,24 +39,22 @@
org.springframework.cloud
- spring-cloud-stream
- test-jar
- test-binder
+ spring-cloud-stream-test-binder
test
org.testcontainers
- couchbase
+ testcontainers-couchbase
test
org.testcontainers
- kafka
+ testcontainers-kafka
test
org.testcontainers
- junit-jupiter
+ testcontainers-junit-jupiter
test
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/pom.xml
index 618ed324..299fce7a 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ function-based-stream-app-samples
+ 0.0.1-SNAPSHOT
+ ..
io.spring.example
couchbase-stream-applications
@@ -16,11 +16,7 @@
Demo Stream Applications project for Couchhbase
- 8
- 2020.0.2
- 1.86
- 3.0.10
- 1.15.1
+ 3.7.5
@@ -36,48 +32,9 @@
java-client
${couchbase-java-client.version}
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
- org.testcontainers
- testcontainers-bom
- ${test-containers.version}
- pom
- import
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
-
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
-
-
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/build.gradle.kts b/function-based-stream-app-samples/image-thumbnail-samples/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/function-based-stream-app-samples/image-thumbnail-samples/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/build.gradle.kts b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/build.gradle.kts
new file mode 100644
index 00000000..a1d9f75d
--- /dev/null
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/build.gradle.kts
@@ -0,0 +1,21 @@
+plugins {
+ `java-library`
+}
+
+java {
+ toolchain {
+ languageVersion = JavaLanguageVersion.of(17)
+ }
+}
+
+dependencies {
+ implementation(libs.assertj.core)
+ testImplementation(libs.junit.jupiter)
+ testImplementation(libs.spring.core)
+ // Align junit-platform-launcher with BOM-managed junit-platform-engine version
+ testRuntimeOnly("org.junit.platform:junit-platform-launcher")
+}
+
+tasks.withType {
+ useJUnitPlatform()
+}
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml
index 25d70301..e4ae7d62 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ image-thumbnail-samples
+ 0.0.1-SNAPSHOT
+ ..
io.spring.cloud.stream.sample
image-thumbnail-processor
@@ -14,10 +14,6 @@
image-thumbnail-processor
Thumbnail processor
-
- 8
-
-
org.junit.jupiter
@@ -38,4 +34,17 @@
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/build.gradle.kts b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/build.gradle.kts
new file mode 100644
index 00000000..0181a228
--- /dev/null
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/build.gradle.kts
@@ -0,0 +1,16 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.jib)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream)
+ implementation(project(":function-based-stream-app-samples:image-thumbnail-samples:image-thumbnail-processor"))
+ implementation(libs.spring.cloud.fn.file.consumer)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.cloud.stream.apps.composite.function.support)
+ testImplementation(libs.spring.cloud.stream.test.binder)
+ testImplementation(libs.awaitility)
+ testImplementation(libs.reactor.test)
+}
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml
index 5519d430..6d18670e 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ image-thumbnail-samples
+ 0.0.1-SNAPSHOT
+ ../..
io.spring.cloud.stream.sample
image-thumbnail-sink
@@ -14,8 +14,6 @@
image-thumbnail-sink
Thumbnail Sink Application
- 8
- 2020.0.2
demo
@@ -46,12 +44,6 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
org.awaitility
@@ -60,9 +52,7 @@
org.springframework.cloud
- spring-cloud-stream
- test-jar
- test-binder
+ spring-cloud-stream-test-binder
test
@@ -70,17 +60,6 @@
reactor-test
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
@@ -109,46 +88,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestonl
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
- false
-
-
-
+
\ No newline at end of file
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/src/test/java/io/spring/example/image/thumbnail/sink/ThumbnailSinkIntegrationTests.java b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/src/test/java/io/spring/example/image/thumbnail/sink/ThumbnailSinkIntegrationTests.java
index d6fe30a3..4138cddf 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/src/test/java/io/spring/example/image/thumbnail/sink/ThumbnailSinkIntegrationTests.java
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/src/test/java/io/spring/example/image/thumbnail/sink/ThumbnailSinkIntegrationTests.java
@@ -51,9 +51,9 @@ void test() throws IOException {
.run(context -> {
InputDestination inputDestination = context.getBean(InputDestination.class);
inputDestination.send(MessageBuilder.withPayload(data).build());
+ await().timeout(Duration.ofSeconds(10))
+ .until(() -> Files.exists(Paths.get(tempDir.getAbsolutePath(), "thumbnail-1.jpg")));
});
- await().timeout(Duration.ofSeconds(10))
- .until(() -> Files.exists(Paths.get(tempDir.getAbsolutePath(), "thumbnail-1.jpg")));
}
private byte[] readAllBytes(Resource resource) throws IOException {
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/build.gradle.kts b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/build.gradle.kts
new file mode 100644
index 00000000..de561d39
--- /dev/null
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/build.gradle.kts
@@ -0,0 +1,10 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(libs.spring.cloud.fn.http.request)
+ implementation(libs.spring.boot.starter.thymeleaf)
+ developmentOnly(libs.spring.boot.devtools)
+ implementation(project(":function-based-stream-app-samples:image-thumbnail-samples:image-thumbnail-processor"))
+}
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml
index a94d7e6b..9f54b59f 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ image-thumbnail-samples
+ 0.0.1-SNAPSHOT
+ ..
io.spring.cloud.stream.sample
image-thumbnail-web
@@ -14,10 +14,6 @@
image-thumbnail-web
Demo project for Spring Boot
-
- 8
-
-
org.springframework.cloud.fn
@@ -44,12 +40,6 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
@@ -62,47 +52,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/pom.xml
index a08cd26c..f6d58a03 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/pom.xml
@@ -2,6 +2,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ function-based-stream-app-samples
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
image-thumbnail-samples
0.0.1-SNAPSHOT
@@ -9,10 +15,6 @@
Samples using HTTP Request Function
pom
-
- 8
-
-
image-thumbnail-web
image-thumbnail-stream-sample/image-thumbnail-sink
diff --git a/function-based-stream-app-samples/pom.xml b/function-based-stream-app-samples/pom.xml
index e04fe1da..6829b9d3 100644
--- a/function-based-stream-app-samples/pom.xml
+++ b/function-based-stream-app-samples/pom.xml
@@ -2,6 +2,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
function-based-stream-app-samples
0.0.1-SNAPSHOT
@@ -9,10 +15,6 @@
Samples using Functions and Function-based stream applications
pom
-
- 8
-
-
image-thumbnail-samples
couchbase-stream-applications
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 00000000..01715a11
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,4 @@
+org.gradle.daemon=true
+org.gradle.parallel=true
+org.gradle.caching=true
+org.gradle.jvmargs=-Xmx4g -XX:+UseG1GC
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
new file mode 100644
index 00000000..e4adbd64
--- /dev/null
+++ b/gradle/libs.versions.toml
@@ -0,0 +1,105 @@
+[versions]
+spring-boot = "4.0.0"
+spring-dependency-management = "1.1.7"
+kotlin = "2.2.0"
+spring-cloud = "2025.1.0"
+avro = "1.12.0"
+confluent = "7.9.0"
+jib = "2.0.0"
+spring-cloud-fn = "1.0.0-SNAPSHOT"
+spring-cloud-fn-config = "1.0.0-RC1"
+spring-cloud-stream-apps-support = "3.0.0"
+spring-cloud-stream-apps-composite = "3.0.0-SNAPSHOT"
+kinesis-binder = "2.2.0"
+
+[libraries]
+# Spring Boot starters (version managed by plugin)
+spring-boot-starter = { group = "org.springframework.boot", name = "spring-boot-starter" }
+spring-boot-starter-actuator = { group = "org.springframework.boot", name = "spring-boot-starter-actuator" }
+spring-boot-starter-webmvc = { group = "org.springframework.boot", name = "spring-boot-starter-webmvc" }
+spring-boot-starter-webflux = { group = "org.springframework.boot", name = "spring-boot-starter-webflux" }
+spring-boot-starter-security = { group = "org.springframework.boot", name = "spring-boot-starter-security" }
+spring-boot-starter-data-jpa = { group = "org.springframework.boot", name = "spring-boot-starter-data-jpa" }
+spring-boot-starter-data-rest = { group = "org.springframework.boot", name = "spring-boot-starter-data-rest" }
+spring-boot-starter-thymeleaf = { group = "org.springframework.boot", name = "spring-boot-starter-thymeleaf" }
+spring-boot-starter-test = { group = "org.springframework.boot", name = "spring-boot-starter-test" }
+spring-boot-devtools = { group = "org.springframework.boot", name = "spring-boot-devtools" }
+spring-boot-configuration-processor = { group = "org.springframework.boot", name = "spring-boot-configuration-processor" }
+spring-boot-jackson2 = { group = "org.springframework.boot", name = "spring-boot-jackson2" }
+
+# Spring Cloud BOM
+spring-cloud-dependencies = { group = "org.springframework.cloud", name = "spring-cloud-dependencies", version.ref = "spring-cloud" }
+
+# Spring Cloud Stream
+spring-cloud-stream = { group = "org.springframework.cloud", name = "spring-cloud-stream" }
+spring-cloud-stream-binder-kafka = { group = "org.springframework.cloud", name = "spring-cloud-stream-binder-kafka" }
+spring-cloud-stream-binder-kafka-streams = { group = "org.springframework.cloud", name = "spring-cloud-stream-binder-kafka-streams" }
+spring-cloud-stream-binder-rabbit = { group = "org.springframework.cloud", name = "spring-cloud-stream-binder-rabbit" }
+spring-cloud-stream-binder-kinesis = { group = "org.springframework.cloud", name = "spring-cloud-stream-binder-kinesis", version.ref = "kinesis-binder" }
+spring-cloud-stream-test-support = { group = "org.springframework.cloud", name = "spring-cloud-stream-test-support" }
+spring-cloud-stream-test-binder = { group = "org.springframework.cloud", name = "spring-cloud-stream-test-binder" }
+spring-cloud-stream-schema-registry-client = { group = "org.springframework.cloud", name = "spring-cloud-stream-schema-registry-client" }
+spring-cloud-starter-stream-kafka = { group = "org.springframework.cloud", name = "spring-cloud-starter-stream-kafka" }
+spring-cloud-function-context = { group = "org.springframework.cloud", name = "spring-cloud-function-context" }
+
+# Spring Cloud Functions
+spring-cloud-fn-http-request = { group = "org.springframework.cloud.fn", name = "http-request-function", version.ref = "spring-cloud-fn" }
+spring-cloud-fn-file-consumer = { group = "org.springframework.cloud.fn", name = "file-consumer", version.ref = "spring-cloud-fn" }
+spring-cloud-fn-config-common = { group = "org.springframework.cloud.fn", name = "config-common", version.ref = "spring-cloud-fn-config" }
+spring-cloud-stream-apps-composite-function-support = { group = "org.springframework.cloud.stream.app", name = "stream-applications-composite-function-support", version.ref = "spring-cloud-stream-apps-composite" }
+spring-cloud-stream-apps-test-support = { group = "org.springframework.cloud.stream.app", name = "stream-applications-test-support", version.ref = "spring-cloud-stream-apps-support" }
+
+# Spring Integration
+spring-integration-jdbc = { group = "org.springframework.integration", name = "spring-integration-jdbc" }
+spring-integration-test = { group = "org.springframework.integration", name = "spring-integration-test" }
+
+# Spring Kafka
+spring-kafka = { group = "org.springframework.kafka", name = "spring-kafka" }
+spring-kafka-test = { group = "org.springframework.kafka", name = "spring-kafka-test" }
+
+# Kafka
+kafka-streams = { group = "org.apache.kafka", name = "kafka-streams" }
+kafka-streams-test-utils = { group = "org.apache.kafka", name = "kafka-streams-test-utils" }
+
+# Avro / Confluent
+avro = { group = "org.apache.avro", name = "avro", version.ref = "avro" }
+confluent-kafka-avro-serializer = { group = "io.confluent", name = "kafka-avro-serializer", version.ref = "confluent" }
+confluent-kafka-schema-registry-client = { group = "io.confluent", name = "kafka-schema-registry-client", version.ref = "confluent" }
+confluent-kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde", version.ref = "confluent" }
+
+# Kotlin
+kotlin-reflect = { group = "org.jetbrains.kotlin", name = "kotlin-reflect" }
+kotlin-stdlib = { group = "org.jetbrains.kotlin", name = "kotlin-stdlib" }
+jackson-module-kotlin = { group = "com.fasterxml.jackson.module", name = "jackson-module-kotlin" }
+
+# Micrometer / Observability
+micrometer-registry-prometheus = { group = "io.micrometer", name = "micrometer-registry-prometheus" }
+
+# Databases
+h2 = { group = "com.h2database", name = "h2" }
+hsqldb = { group = "org.hsqldb", name = "hsqldb" }
+mariadb-java-client = { group = "org.mariadb.jdbc", name = "mariadb-java-client" }
+couchbase-java-client = { group = "com.couchbase.client", name = "java-client" }
+
+# Testcontainers 2.x (Boot-managed)
+testcontainers-couchbase = { group = "org.testcontainers", name = "testcontainers-couchbase" }
+testcontainers-kafka = { group = "org.testcontainers", name = "testcontainers-kafka" }
+testcontainers-junit-jupiter = { group = "org.testcontainers", name = "testcontainers-junit-jupiter" }
+
+# Test
+assertj-core = { group = "org.assertj", name = "assertj-core", version = "3.27.3" }
+junit-jupiter = { group = "org.junit.jupiter", name = "junit-jupiter", version = "5.11.4" }
+awaitility = { group = "org.awaitility", name = "awaitility" }
+reactor-test = { group = "io.projectreactor", name = "reactor-test" }
+lombok = { group = "org.projectlombok", name = "lombok" }
+
+# Spring Framework
+spring-core = { group = "org.springframework", name = "spring-core" }
+
+[plugins]
+spring-boot = { id = "org.springframework.boot", version.ref = "spring-boot" }
+spring-dependency-management = { id = "io.spring.dependency-management", version.ref = "spring-dependency-management" }
+kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
+kotlin-spring = { id = "org.jetbrains.kotlin.plugin.spring", version.ref = "kotlin" }
+avro-gradle = { id = "com.github.davidmc24.gradle.plugin.avro", version = "1.9.1" }
+jib = { id = "com.google.cloud.tools.jib", version.ref = "jib" }
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 00000000..1b33c55b
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 00000000..ca025c83
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,7 @@
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
+networkTimeout=10000
+validateDistributionUrl=true
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
new file mode 100755
index 00000000..23d15a93
--- /dev/null
+++ b/gradlew
@@ -0,0 +1,251 @@
+#!/bin/sh
+
+#
+# Copyright © 2015-2021 the original authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+
+##############################################################################
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
+##############################################################################
+
+# Attempt to set APP_HOME
+
+# Resolve links: $0 may be a link
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
+done
+
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
+APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD=maximum
+
+warn () {
+ echo "$*"
+} >&2
+
+die () {
+ echo
+ echo "$*"
+ echo
+ exit 1
+} >&2
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+nonstop=false
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
+esac
+
+CLASSPATH="\\\"\\\""
+
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD=$JAVA_HOME/jre/sh/java
+ else
+ JAVACMD=$JAVA_HOME/bin/java
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD=java
+ if ! command -v java >/dev/null 2>&1
+ then
+ die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+fi
+
+# Increase the maximum file descriptors if we can.
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
+fi
+
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
+
+# For Cygwin or MSYS, switch paths to Windows format before running java
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
+ fi
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
+ done
+fi
+
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
+
+# Collect all arguments for the java command:
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
+# and any embedded shellness will be escaped.
+# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
+# treated as '${Hostname}' itself on the command line.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
+ "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
+
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
+
+exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
new file mode 100644
index 00000000..5eed7ee8
--- /dev/null
+++ b/gradlew.bat
@@ -0,0 +1,94 @@
+@rem
+@rem Copyright 2015 the original author or authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem https://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+@rem
+@rem SPDX-License-Identifier: Apache-2.0
+@rem
+
+@if "%DEBUG%"=="" @echo off
+@rem ##########################################################################
+@rem
+@rem Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+set DIRNAME=%~dp0
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if %ERRORLEVEL% equ 0 goto execute
+
+echo. 1>&2
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto execute
+
+echo. 1>&2
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
+
+goto fail
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=
+
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
+
+:end
+@rem End local scope for the variables with windows NT shell
+if %ERRORLEVEL% equ 0 goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
diff --git a/kafka-batch-sample/build.gradle.kts b/kafka-batch-sample/build.gradle.kts
new file mode 100644
index 00000000..89c63351
--- /dev/null
+++ b/kafka-batch-sample/build.gradle.kts
@@ -0,0 +1,12 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.kafka)
+ testImplementation(libs.spring.cloud.stream.test.support)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/kafka-batch-sample/pom.xml b/kafka-batch-sample/pom.xml
index 3e193f0e..6d3de590 100644
--- a/kafka-batch-sample/pom.xml
+++ b/kafka-batch-sample/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.3.3.RELEASE
-
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
com.example
kafka-batch-sample
@@ -14,10 +14,6 @@
kafka-batch-sample
Demo project for Spring Boot
-
- 1.8
- Hoxton.SR8
-
@@ -37,16 +33,10 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -56,18 +46,6 @@
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
diff --git a/kafka-batch-sample/src/main/resources/application.yml b/kafka-batch-sample/src/main/resources/application.yml
index 8696e7bc..3d6f4c26 100644
--- a/kafka-batch-sample/src/main/resources/application.yml
+++ b/kafka-batch-sample/src/main/resources/application.yml
@@ -17,7 +17,9 @@ spring:
---
spring:
- profiles: transactional
+ config:
+ activate:
+ on-profile: transactional
kafka:
consumer:
properties:
@@ -32,7 +34,9 @@ spring:
---
spring:
- profiles: batch-produce
+ config:
+ activate:
+ on-profile: batch-produce
cloud:
stream:
bindings:
diff --git a/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java b/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java
index 38e0aed1..44f31bfe 100644
--- a/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java
+++ b/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java
@@ -2,8 +2,10 @@
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
+@EmbeddedKafka(topics = {"batch-in", "batch-out"}, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class KafkaBatchSampleApplicationTests {
@Test
diff --git a/kafka-binder-native-app/build.gradle.kts b/kafka-binder-native-app/build.gradle.kts
new file mode 100644
index 00000000..990bb214
--- /dev/null
+++ b/kafka-binder-native-app/build.gradle.kts
@@ -0,0 +1,11 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.kafka)
+ testImplementation(libs.spring.cloud.stream.test.binder)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/kafka-binder-native-app/pom.xml b/kafka-binder-native-app/pom.xml
index 4d1ab0ed..c47f23a1 100644
--- a/kafka-binder-native-app/pom.xml
+++ b/kafka-binder-native-app/pom.xml
@@ -71,69 +71,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
-
-
diff --git a/kafka-e2e-kotlin-sample/build.gradle.kts b/kafka-e2e-kotlin-sample/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/kafka-e2e-kotlin-sample/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/kafka-e2e-kotlin-sample/customer-service/build.gradle.kts b/kafka-e2e-kotlin-sample/customer-service/build.gradle.kts
new file mode 100644
index 00000000..1df9aa80
--- /dev/null
+++ b/kafka-e2e-kotlin-sample/customer-service/build.gradle.kts
@@ -0,0 +1,39 @@
+plugins {
+ alias(libs.plugins.kotlin.jvm)
+ alias(libs.plugins.kotlin.spring)
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+kotlin {
+ compilerOptions {
+ freeCompilerArgs.addAll("-Xjsr305=strict")
+ }
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.kotlin.reflect)
+ implementation(libs.kotlin.stdlib)
+ implementation(libs.jackson.module.kotlin)
+ implementation(libs.confluent.kafka.streams.avro.serde) {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.avro.serializer)
+ implementation(libs.confluent.kafka.schema.registry.client)
+ implementation(libs.avro)
+ implementation(libs.spring.cloud.starter.stream.kafka)
+
+ implementation(libs.spring.boot.starter.webflux)
+ implementation(libs.spring.cloud.stream)
+ testImplementation(libs.reactor.test)
+ testImplementation(libs.spring.cloud.stream.test.support)
+}
diff --git a/kafka-e2e-kotlin-sample/customer-service/pom.xml b/kafka-e2e-kotlin-sample/customer-service/pom.xml
index 642b83cc..ba24a731 100644
--- a/kafka-e2e-kotlin-sample/customer-service/pom.xml
+++ b/kafka-e2e-kotlin-sample/customer-service/pom.xml
@@ -9,32 +9,17 @@
Customer Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-e2e-kotlin-sample
+ 0.0.1-SNAPSHOT
+ ..
- 1.8
- 1.3.31
1.11.3
5.2.1
- 2020.0.2
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
io.confluent
@@ -64,19 +49,18 @@
org.springframework.cloud
- spring-cloud-starter-stream-kafka
+ spring-cloud-stream-binder-kafka
org.springframework.cloud
- spring-cloud-stream-schema
- 3.0.0.BUILD-SNAPSHOT
+ spring-cloud-stream-schema-registry-client
org.springframework.boot
spring-boot-starter-webflux
- com.fasterxml.jackson.module
+ tools.jackson.module
jackson-module-kotlin
@@ -85,7 +69,7 @@
org.jetbrains.kotlin
- kotlin-stdlib-jdk8
+ kotlin-stdlib
org.springframework.cloud
@@ -104,7 +88,7 @@
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -117,7 +101,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -169,30 +153,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
diff --git a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt
index 02096656..f0bfdd03 100644
--- a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt
+++ b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt
@@ -18,13 +18,10 @@ package kafka.e2e.customer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
-import org.springframework.cloud.stream.annotation.EnableBinding
-import org.springframework.cloud.stream.messaging.Source
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
-@EnableBinding(Source::class)
@SpringBootApplication
class CustomerServiceApplication
diff --git a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt
index 3f244f95..5f86471f 100644
--- a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt
+++ b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt
@@ -17,19 +17,19 @@
package kafka.e2e.customer.service
import kafka.e2e.customer.Customer
-import org.springframework.cloud.stream.messaging.Source
+import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Service
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
@Service
-class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
+class CustomerServiceImpl(private val streamBridge: StreamBridge) : CustomerService {
override fun save(customer: Customer) {
- val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.getId()).build()
- customerKafkaProducer.output().send(message)
+ val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.KEY, customer.getId()).build()
+ streamBridge.send("output", message)
}
}
diff --git a/kafka-e2e-kotlin-sample/order-service/build.gradle.kts b/kafka-e2e-kotlin-sample/order-service/build.gradle.kts
new file mode 100644
index 00000000..1df9aa80
--- /dev/null
+++ b/kafka-e2e-kotlin-sample/order-service/build.gradle.kts
@@ -0,0 +1,39 @@
+plugins {
+ alias(libs.plugins.kotlin.jvm)
+ alias(libs.plugins.kotlin.spring)
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+kotlin {
+ compilerOptions {
+ freeCompilerArgs.addAll("-Xjsr305=strict")
+ }
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.kotlin.reflect)
+ implementation(libs.kotlin.stdlib)
+ implementation(libs.jackson.module.kotlin)
+ implementation(libs.confluent.kafka.streams.avro.serde) {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.avro.serializer)
+ implementation(libs.confluent.kafka.schema.registry.client)
+ implementation(libs.avro)
+ implementation(libs.spring.cloud.starter.stream.kafka)
+
+ implementation(libs.spring.boot.starter.webflux)
+ implementation(libs.spring.cloud.stream)
+ testImplementation(libs.reactor.test)
+ testImplementation(libs.spring.cloud.stream.test.support)
+}
diff --git a/kafka-e2e-kotlin-sample/order-service/pom.xml b/kafka-e2e-kotlin-sample/order-service/pom.xml
index 36e0858c..9b56f12e 100644
--- a/kafka-e2e-kotlin-sample/order-service/pom.xml
+++ b/kafka-e2e-kotlin-sample/order-service/pom.xml
@@ -9,32 +9,17 @@
Order Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-e2e-kotlin-sample
+ 0.0.1-SNAPSHOT
+ ..
- 1.8
- 1.3.31
1.11.3
5.2.1
- 2020.0.2
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
io.confluent
@@ -64,19 +49,18 @@
org.springframework.cloud
- spring-cloud-starter-stream-kafka
+ spring-cloud-stream-binder-kafka
org.springframework.cloud
- spring-cloud-stream-schema
- 3.0.0.BUILD-SNAPSHOT
+ spring-cloud-stream-schema-registry-client
org.springframework.boot
spring-boot-starter-webflux
- com.fasterxml.jackson.module
+ tools.jackson.module
jackson-module-kotlin
@@ -85,7 +69,7 @@
org.jetbrains.kotlin
- kotlin-stdlib-jdk8
+ kotlin-stdlib
org.springframework.cloud
@@ -104,7 +88,7 @@
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -157,30 +141,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
diff --git a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt
index ec304559..2764ec0d 100644
--- a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt
+++ b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt
@@ -18,15 +18,10 @@ package kafka.e2e.order
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
-import org.springframework.cloud.stream.annotation.EnableBinding
-import org.springframework.cloud.stream.messaging.Source
-import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
-@EnableSchemaRegistryClient
-@EnableBinding(Source::class)
@SpringBootApplication
class OrderServiceApplication
diff --git a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt
index d61515ed..ddfbcfef 100644
--- a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt
+++ b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt
@@ -18,20 +18,21 @@ package kafka.e2e.order.producer
import kafka.e2e.order.OrderCreatedEvent
import kafka.e2e.order.dto.Order
-import org.springframework.cloud.stream.messaging.Source
+import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
@Component
-class OrderProducer(private val source: Source) {
+class OrderProducer(private val streamBridge: StreamBridge) {
fun publishOrderCreatedEvent(order: Order) {
- source.output().send(MessageBuilder.withPayload(OrderCreatedEvent(order.id, order.productId, order.customerId))
- .setHeader(KafkaHeaders.MESSAGE_KEY, order.id).build())
+ val message = MessageBuilder.withPayload(OrderCreatedEvent(order.id, order.productId, order.customerId))
+ .setHeader(KafkaHeaders.KEY, order.id).build()
+ streamBridge.send("output", message)
}
}
diff --git a/kafka-e2e-kotlin-sample/pom.xml b/kafka-e2e-kotlin-sample/pom.xml
index 0036d4c5..763c84b9 100644
--- a/kafka-e2e-kotlin-sample/pom.xml
+++ b/kafka-e2e-kotlin-sample/pom.xml
@@ -1,6 +1,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
kafka-e2e-kotlin-sample
0.0.1-SNAPSHOT
diff --git a/kafka-e2e-kotlin-sample/shipping-service/build.gradle.kts b/kafka-e2e-kotlin-sample/shipping-service/build.gradle.kts
new file mode 100644
index 00000000..3e3664cd
--- /dev/null
+++ b/kafka-e2e-kotlin-sample/shipping-service/build.gradle.kts
@@ -0,0 +1,39 @@
+plugins {
+ alias(libs.plugins.kotlin.jvm)
+ alias(libs.plugins.kotlin.spring)
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+kotlin {
+ compilerOptions {
+ freeCompilerArgs.addAll("-Xjsr305=strict")
+ }
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.kotlin.reflect)
+ implementation(libs.kotlin.stdlib)
+ implementation(libs.jackson.module.kotlin)
+ implementation(libs.confluent.kafka.streams.avro.serde) {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.avro.serializer)
+ implementation(libs.confluent.kafka.schema.registry.client)
+ implementation(libs.avro)
+ implementation(libs.spring.cloud.stream.binder.kafka.streams)
+
+ implementation(libs.spring.boot.starter.webflux)
+ implementation(libs.spring.cloud.stream)
+ testImplementation(libs.reactor.test)
+ testImplementation(libs.spring.cloud.stream.test.support)
+}
diff --git a/kafka-e2e-kotlin-sample/shipping-service/pom.xml b/kafka-e2e-kotlin-sample/shipping-service/pom.xml
index 8beacb2f..719ac8d4 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/pom.xml
+++ b/kafka-e2e-kotlin-sample/shipping-service/pom.xml
@@ -9,32 +9,17 @@
Shipping Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-e2e-kotlin-sample
+ 0.0.1-SNAPSHOT
+ ..
- 1.8
- 1.3.31
1.11.3
5.2.1
- 2020.0.2
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
io.confluent
@@ -68,15 +53,14 @@
org.springframework.cloud
- spring-cloud-stream-schema
- 3.0.0.BUILD-SNAPSHOT
+ spring-cloud-stream-schema-registry-client
org.springframework.boot
spring-boot-starter-webflux
- com.fasterxml.jackson.module
+ tools.jackson.module
jackson-module-kotlin
@@ -85,7 +69,7 @@
org.jetbrains.kotlin
- kotlin-stdlib-jdk8
+ kotlin-stdlib
org.springframework.cloud
@@ -104,7 +88,7 @@
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -117,7 +101,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -169,30 +153,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt
index ae6c2ddb..9d74e1ab 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt
+++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt
@@ -16,15 +16,12 @@
package kafka.e2e.shipping
-import kafka.e2e.shipping.stream.ShippingKStreamProcessor
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
-import org.springframework.cloud.stream.annotation.EnableBinding
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
-@EnableBinding(ShippingKStreamProcessor::class)
@SpringBootApplication
class ShippingServiceApplication
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt
index a06773a0..0d8c4b8e 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt
+++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt
@@ -27,46 +27,46 @@ import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.*
import org.apache.kafka.streams.state.KeyValueStore
-import org.springframework.cloud.stream.annotation.Input
-import org.springframework.cloud.stream.annotation.StreamListener
+import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.messaging.handler.annotation.SendTo
+import java.util.function.BiFunction
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {
- @StreamListener
- @SendTo("output")
- fun process(@Input("input") input: KStream, @Input("order") orderEvent: KStream): KStream {
+ @Bean
+ fun process(): BiFunction, KStream, KStream> {
+ return BiFunction { input, orderEvent ->
- val serdeConfig = mapOf(
- AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
+ val serdeConfig = mapOf(
+ AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
- val intSerde = Serdes.IntegerSerde()
- val customerSerde = SpecificAvroSerde()
- customerSerde.configure(serdeConfig, false)
- val orderCreatedSerde = SpecificAvroSerde()
- orderCreatedSerde.configure(serdeConfig, false)
- val orderShippedSerde = SpecificAvroSerde()
- orderShippedSerde.configure(serdeConfig, false)
+ val intSerde = Serdes.IntegerSerde()
+ val customerSerde = SpecificAvroSerde()
+ customerSerde.configure(serdeConfig, false)
+ val orderCreatedSerde = SpecificAvroSerde()
+ orderCreatedSerde.configure(serdeConfig, false)
+ val orderShippedSerde = SpecificAvroSerde()
+ orderShippedSerde.configure(serdeConfig, false)
- val stateStore: Materialized> =
- Materialized.`as`>("customer-store")
+ val stateStore: Materialized> =
+ Materialized.`as`>("customer-store")
- val customerTable: KTable = input.groupByKey()
- .reduce({ _, y -> y }, stateStore)
+ val customerTable: KTable = input.groupByKey()
+ .reduce({ _, y -> y }, stateStore)
- return (orderEvent.filter { _, value -> value.schema.name == "OrderCreatedEvent" }
- .map { key, value -> KeyValue(key, OrderCreatedEvent(value.get("id") as Int, value.get("productId") as Int, value.get("customerId") as Int)) }
- .selectKey { _, value -> value.customerId } as KStream)
- .join(customerTable, { orderIt, customer ->
- OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
- }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
- .selectKey { _, value -> value.id }
+ (orderEvent.filter { _, value -> value.schema.name == "OrderCreatedEvent" }
+ .map { key, value -> KeyValue(key, OrderCreatedEvent(value.get("id") as Int, value.get("productId") as Int, value.get("customerId") as Int)) }
+ .selectKey { _, value -> value.customerId } as KStream)
+ .join(customerTable, { orderIt, customer ->
+ OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
+ }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
+ .selectKey { _, value -> value.id }
+ }
}
}
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamProcessor.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamProcessor.kt
deleted file mode 100644
index 0c8d2edc..00000000
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamProcessor.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2019 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.e2e.shipping.stream
-
-import kafka.e2e.customer.Customer
-import kafka.e2e.order.OrderCreatedEvent
-import kafka.e2e.shipping.OrderShippedEvent
-import org.apache.kafka.streams.kstream.KStream
-import org.springframework.cloud.stream.annotation.Input
-import org.springframework.cloud.stream.annotation.Output
-
-/**
- * @author José A. Íñigo
- */
-interface ShippingKStreamProcessor {
-
- @Input("input")
- fun input(): KStream
-
- @Input("order")
- fun order(): KStream
-
- @Output("output")
- fun output(): KStream
-
-}
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml
index 4d4e68b3..5eb1e36f 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml
+++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml
@@ -3,6 +3,8 @@ spring:
name: shipping-service
cloud:
stream:
+ function:
+ definition: process
kafka:
streams:
binder:
@@ -17,21 +19,21 @@ spring:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
- input:
+ process-in-0:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
- order:
+ process-in-1:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
- output:
+ process-out-0:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
- input:
+ process-in-0:
destination: customer
- order:
+ process-in-1:
destination: order
- output:
+ process-out-0:
destination: order
server:
diff --git a/kafka-native-serialization/build.gradle.kts b/kafka-native-serialization/build.gradle.kts
new file mode 100644
index 00000000..6d52d04a
--- /dev/null
+++ b/kafka-native-serialization/build.gradle.kts
@@ -0,0 +1,12 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.kafka)
+ testImplementation(libs.spring.kafka.test)
+
+}
diff --git a/kafka-native-serialization/pom.xml b/kafka-native-serialization/pom.xml
index 659fd5a5..a98124dc 100644
--- a/kafka-native-serialization/pom.xml
+++ b/kafka-native-serialization/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
com.example
kafka-native-serialization
@@ -14,10 +14,6 @@
kafka-native-serialization
Demo project for Spring Boot
-
- 1.8
- 3.1.2
-
@@ -26,7 +22,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
org.springframework.cloud
@@ -51,25 +47,8 @@
spring-kafka-test
test
-
- org.junit.vintage
- junit-vintage-engine
- test
-
-
-
-
- org.springframework.cloud
- spring-cloud-stream-dependencies
- ${spring-cloud-stream.version}
- pom
- import
-
-
-
-
@@ -79,55 +58,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
-
diff --git a/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java b/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java
index 0212eca0..40965bd0 100644
--- a/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java
+++ b/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java
@@ -24,46 +24,41 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
-import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
+@EmbeddedKafka(topics = "topic2", count = 1,
+ bootstrapServersProperty = "spring.cloud.stream.kafka.binder.brokers")
@SpringBootTest
-@RunWith(SpringRunner.class)
public class KafkaNativeSerializationApplicationTests {
private static final String INPUT_TOPIC = "topic1";
private static final String OUTPUT_TOPIC = "topic2";
private static final String GROUP_NAME = "nativeSerializationTest";
- @ClassRule
- public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, OUTPUT_TOPIC);
-
- @BeforeClass
- public static void setup() {
- System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
- }
+ @Autowired
+ EmbeddedKafkaBroker embeddedKafka;
@Test
public void testSendReceive() {
- Map senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
+ Map senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put("value.serializer", StringSerializer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(INPUT_TOPIC);
template.sendDefault("foo");
- Map consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka.getEmbeddedKafka());
+ Map consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("value.deserializer", MyJsonDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
diff --git a/kafka-security-samples/build.gradle.kts b/kafka-security-samples/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/kafka-security-samples/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/kafka-security-samples/kafka-ssl-demo/build.gradle.kts b/kafka-security-samples/kafka-ssl-demo/build.gradle.kts
new file mode 100644
index 00000000..89c63351
--- /dev/null
+++ b/kafka-security-samples/kafka-ssl-demo/build.gradle.kts
@@ -0,0 +1,12 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.kafka)
+ testImplementation(libs.spring.cloud.stream.test.support)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/kafka-security-samples/kafka-ssl-demo/pom.xml b/kafka-security-samples/kafka-ssl-demo/pom.xml
index 2652d33a..51b509f3 100644
--- a/kafka-security-samples/kafka-ssl-demo/pom.xml
+++ b/kafka-security-samples/kafka-ssl-demo/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-security-samples
+ 0.0.1-SNAPSHOT
+ ..
kafka.ssl.demo
kafka-ssl-demo
@@ -14,22 +14,6 @@
kafka-ssl-demo
Demo project for Spring Boot
-
- 3.1.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-stream-dependencies
- ${spring-cloud-stream.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -48,16 +32,10 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
diff --git a/kafka-security-samples/pom.xml b/kafka-security-samples/pom.xml
index 8f0de228..4f5d1234 100644
--- a/kafka-security-samples/pom.xml
+++ b/kafka-security-samples/pom.xml
@@ -2,6 +2,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
kafka-security-samples
0.0.1-SNAPSHOT
diff --git a/kafka-streams-samples/build.gradle.kts b/kafka-streams-samples/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/kafka-streams-samples/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/kafka-streams-samples/kafka-streams-aggregate/build.gradle.kts b/kafka-streams-samples/kafka-streams-aggregate/build.gradle.kts
new file mode 100644
index 00000000..cecd3909
--- /dev/null
+++ b/kafka-streams-samples/kafka-streams-aggregate/build.gradle.kts
@@ -0,0 +1,11 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream.binder.kafka.streams)
+ testImplementation(libs.spring.kafka.test)
+ testImplementation(libs.kafka.streams.test.utils)
+
+}
diff --git a/kafka-streams-samples/kafka-streams-aggregate/pom.xml b/kafka-streams-samples/kafka-streams-aggregate/pom.xml
index 165fa67b..d653d57a 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/pom.xml
+++ b/kafka-streams-samples/kafka-streams-aggregate/pom.xml
@@ -11,27 +11,12 @@
Demo project for Spring Boot
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-streams-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
@@ -65,12 +50,7 @@
org.springframework.boot
- spring-boot-starter-web
-
-
- org.junit.vintage
- junit-vintage-engine
- test
+ spring-boot-starter-webmvc
@@ -84,55 +64,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java
index 15dde85c..50e4dc0b 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java
+++ b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java
@@ -18,7 +18,6 @@
import java.util.function.Consumer;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -52,8 +51,7 @@ public static class KafkaStreamsAggregateSampleApplication {
@Bean
public Consumer> aggregate() {
- ObjectMapper mapper = new ObjectMapper();
- Serde domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
+ Serde domainEventSerde = new JsonSerde<>(DomainEvent.class);
return input -> input
.groupBy(
diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java
index c3b6d43f..51d5f440 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java
+++ b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java
@@ -16,7 +16,6 @@
package kafka.streams.table.join;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -34,8 +33,7 @@ public class Producers {
public static void main(String... args) {
- ObjectMapper mapper = new ObjectMapper();
- Serde domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);
+ Serde domainEventSerde = new JsonSerde<>(DomainEvent.class);
Map props = new HashMap<>();
diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java b/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java
index e6ba52c8..dde33b08 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java
+++ b/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java
@@ -1,43 +1,39 @@
package kafka.streams.table.join;
+import java.time.Duration;
import java.util.Map;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.apache.kafka.streams.KafkaStreams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
-import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
-import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
-@RunWith(SpringRunner.class)
+@EmbeddedKafka(topics = "foobar", count = 1,
+ bootstrapServersProperty = "spring.cloud.stream.kafka.streams.binder.brokers")
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaStreamsAggregateSampleTests {
- @ClassRule
- public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "foobar");
-
- private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
+ @Autowired
+ EmbeddedKafkaBroker embeddedKafka;
@Autowired
StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@@ -45,26 +41,15 @@ public class KafkaStreamsAggregateSampleTests {
@LocalServerPort
int randomServerPort;
- @Before
+ @BeforeEach
public void before() {
streamsBuilderFactoryBean.setCloseTimeout(0);
}
- @BeforeClass
- public static void setUp() {
- System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
- }
-
- @AfterClass
- public static void tearDown() {
- System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
- }
-
@Test
public void testKafkaStreamsWordCountProcessor() throws Exception {
Map senderProps = KafkaTestUtils.producerProps(embeddedKafka);
- ObjectMapper mapper = new ObjectMapper();
- Serde domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);
+ Serde domainEventSerde = new JsonSerde<>(DomainEvent.class);
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, domainEventSerde.serializer().getClass());
@@ -72,7 +57,6 @@ public void testKafkaStreamsWordCountProcessor() throws Exception {
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
-
KafkaTemplate template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foobar");
@@ -81,13 +65,23 @@ public void testKafkaStreamsWordCountProcessor() throws Exception {
ddEvent.setEventType("create-domain-event");
template.sendDefault("", ddEvent);
- Thread.sleep(1000);
+
+ // Wait for Kafka Streams to reach RUNNING state before querying the state store
+ await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofMillis(500)).until(() -> {
+ KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
+ return kafkaStreams != null && kafkaStreams.state() == KafkaStreams.State.RUNNING;
+ });
+
RestTemplate restTemplate = new RestTemplate();
String fooResourceUrl
= "http://localhost:" + randomServerPort + "/events";
- ResponseEntity response
- = restTemplate.getForEntity(fooResourceUrl, String.class);
- assertThat(response.getBody()).contains("create-domain-event");
+
+ // Poll for the result since the state store may need time to process the record
+ await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofMillis(500)).untilAsserted(() -> {
+ ResponseEntity response
+ = restTemplate.getForEntity(fooResourceUrl, String.class);
+ assertThat(response.getBody()).contains("create-domain-event");
+ });
}
finally {
pf.destroy();
diff --git a/kafka-streams-samples/kafka-streams-avro/build.gradle.kts b/kafka-streams-samples/kafka-streams-avro/build.gradle.kts
new file mode 100644
index 00000000..92349e35
--- /dev/null
+++ b/kafka-streams-samples/kafka-streams-avro/build.gradle.kts
@@ -0,0 +1,26 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+ alias(libs.plugins.avro.gradle)
+}
+
+avro {
+ setStringType("String")
+}
+
+tasks.named("generateAvroJava") {
+ source("src/main/resources/avro")
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.kafka.streams)
+ implementation(libs.spring.cloud.stream)
+ implementation(libs.spring.cloud.stream.binder.kafka.streams)
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.avro)
+ implementation(libs.confluent.kafka.avro.serializer) {
+ exclude(group = "org.slf4j", module = "slf4j-api")
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ }
+ implementation(libs.confluent.kafka.streams.avro.serde)
+}
diff --git a/kafka-streams-samples/kafka-streams-avro/pom.xml b/kafka-streams-samples/kafka-streams-avro/pom.xml
index 1ffa3316..997d09f7 100644
--- a/kafka-streams-samples/kafka-streams-avro/pom.xml
+++ b/kafka-streams-samples/kafka-streams-avro/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-streams-samples
+ 0.0.1-SNAPSHOT
+ ..
com.example
kafka-streams-avro
@@ -14,9 +14,7 @@
kafka-streams-avro
Demo project for Spring Boot
- 8
- 2020.0.2
- 5.2.0
+ 7.9.0
1.11.3
@@ -31,7 +29,6 @@
org.springframework.cloud
spring-cloud-stream-binder-kafka-streams
- 3.1.3-SNAPSHOT
@@ -46,10 +43,8 @@
org.springframework.cloud
- spring-cloud-stream
+ spring-cloud-stream-test-binder
test
- test-binder
- test-jar
org.apache.avro
@@ -78,18 +73,6 @@
${confluent.version}
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
diff --git a/kafka-streams-samples/kafka-streams-branching/build.gradle.kts b/kafka-streams-samples/kafka-streams-branching/build.gradle.kts
new file mode 100644
index 00000000..cecd3909
--- /dev/null
+++ b/kafka-streams-samples/kafka-streams-branching/build.gradle.kts
@@ -0,0 +1,11 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream.binder.kafka.streams)
+ testImplementation(libs.spring.kafka.test)
+ testImplementation(libs.kafka.streams.test.utils)
+
+}
diff --git a/kafka-streams-samples/kafka-streams-branching/pom.xml b/kafka-streams-samples/kafka-streams-branching/pom.xml
index 690c8d1a..0fb480f9 100644
--- a/kafka-streams-samples/kafka-streams-branching/pom.xml
+++ b/kafka-streams-samples/kafka-streams-branching/pom.xml
@@ -11,27 +11,12 @@
Demo project for Spring Boot
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-streams-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
@@ -65,12 +50,7 @@
org.springframework.boot
- spring-boot-starter-web
-
-
- org.junit.vintage
- junit-vintage-engine
- test
+ spring-boot-starter-webmvc
@@ -83,54 +63,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java b/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java
index 5d3e87f8..4c0c8bec 100644
--- a/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java
+++ b/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java
@@ -19,12 +19,14 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
+import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -43,19 +45,29 @@ public static class WordCountProcessorApplication {
@SuppressWarnings("unchecked")
public Function, KStream, WordCount>[]> process() {
- Predicate
-
- org.junit.vintage
- junit-vintage-engine
- test
-
-
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
diff --git a/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java b/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java
index 1cec6021..5248c103 100644
--- a/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java
+++ b/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java
@@ -24,7 +24,6 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageHandler;
@@ -40,7 +39,7 @@ public class JdbcSink {
@Bean
public IntegrationFlow jdbcConsumerFlow() {
- return IntegrationFlows.from(Consumer.class, (gateway) -> gateway.beanName("jdbcConsumer"))
+ return IntegrationFlow.from(Consumer.class, (gateway) -> gateway.beanName("jdbcConsumer"))
.handle(jdbcHandler(null))
.get();
}
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java
index e5437556..faf0cbb1 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java
@@ -16,7 +16,7 @@
package org.springframework.cloud.stream.testing.processor;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java
index 7c808159..88954045 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java
@@ -16,36 +16,24 @@
package org.springframework.cloud.stream.testing.processor;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
-import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesMessageThat;
-import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
-import static org.springframework.integration.test.matcher.PayloadAndHeaderMatcher.sameExceptIgnorableHeaders;
+import static org.assertj.core.api.Assertions.assertThat;
-import java.util.concurrent.BlockingQueue;
-
-import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
-import org.springframework.http.MediaType;
-import org.springframework.integration.support.MessageBuilder;
+import org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration;
+import org.springframework.cloud.stream.binder.test.InputDestination;
+import org.springframework.cloud.stream.binder.test.OutputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.util.MimeType;
/**
* The Spring Boot-base test-case to demonstrate how can we test Spring Cloud Stream applications
@@ -54,7 +42,8 @@
* @author Artem Bilan
*
*/
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
+ classes = { ToUpperCaseProcessor.class, TestChannelBinderConfiguration.class })
@ImportAutoConfiguration(exclude = {
KafkaAutoConfiguration.class,
KafkaMetricsAutoConfiguration.class,
@@ -65,48 +54,37 @@
class ToUpperCaseProcessorTests {
@Autowired
- @Qualifier("uppercaseFunction-in-0")
- private MessageChannel input;
-
- @Autowired
- @Qualifier("uppercaseFunction-out-0")
- private MessageChannel output;
+ private InputDestination input;
@Autowired
- private MessageCollector collector;
+ private OutputDestination output;
@Test
- @SuppressWarnings("unchecked")
void testMessages() {
this.input.send(new GenericMessage<>("odd"));
- this.input.send(new GenericMessage<>("even"));
- this.input.send(new GenericMessage<>("odd meets even"));
- this.input.send(new GenericMessage<>("nothing but the best test"));
-
- BlockingQueue> messages = this.collector.forChannel(this.output);
+ Message result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("ODD");
- assertThat(messages, receivesPayloadThat(is("ODD")));
- assertThat(messages, receivesPayloadThat(is("EVEN")));
- assertThat(messages, receivesPayloadThat(is("ODD MEETS EVEN")));
- assertThat(messages, receivesPayloadThat(not("nothing but the best test")));
-
- Message testMessage =
- MessageBuilder.withPayload("headers")
- .setHeader("odd", "even")
- .build();
-
- input.send(testMessage);
-
- Message expected =
- MessageBuilder.withPayload("HEADERS")
- .copyHeaders(testMessage.getHeaders())
- .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
- .build();
+ this.input.send(new GenericMessage<>("even"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("EVEN");
- Matcher> sameExceptIgnorableHeaders =
- (Matcher>) (Matcher>) sameExceptIgnorableHeaders(expected, "accept");
+ this.input.send(new GenericMessage<>("odd meets even"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("ODD MEETS EVEN");
- assertThat(messages, receivesMessageThat(sameExceptIgnorableHeaders));
+ this.input.send(new GenericMessage<>("nothing but the best test"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isNotEqualTo("nothing but the best test");
+
+ this.input.send(new GenericMessage<>("headers"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("HEADERS");
}
}
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java
index 1ea61e23..b8d65a84 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java
@@ -27,11 +27,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
+import org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration;
import org.springframework.cloud.stream.testing.processor.ToUpperCaseProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -58,7 +57,6 @@
classes = ToUpperCaseProcessor.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ImportAutoConfiguration(exclude = {
- TestSupportBinderAutoConfiguration.class,
DataSourceAutoConfiguration.class,
TransactionAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class })
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java
index 27059ecd..0dc76093 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java
@@ -30,11 +30,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.test.mock.MockIntegration;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -66,7 +66,7 @@ class JdbcSinkTests {
@Autowired
private JdbcTemplate jdbcTemplate;
- @SpyBean(name = "jdbcHandler")
+ @MockitoSpyBean(name = "jdbcHandler")
private MessageHandler jdbcMessageHandler;
@Test
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java
index d642dc3f..92eb9d58 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java
@@ -17,26 +17,21 @@
package org.springframework.cloud.stream.testing.source;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
-
-import java.util.concurrent.BlockingQueue;
+import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration;
+import org.springframework.cloud.stream.binder.test.OutputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
/**
@@ -48,7 +43,8 @@
*/
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
- properties = "spring.cloud.stream.poller.fixed-delay=1")
+ properties = "spring.cloud.stream.poller.fixed-delay=1",
+ classes = { OddEvenSource.class, TestChannelBinderConfiguration.class })
@ImportAutoConfiguration(exclude = {
KafkaAutoConfiguration.class,
KafkaMetricsAutoConfiguration.class,
@@ -59,20 +55,25 @@
class OddEvenSourceTests {
@Autowired
- @Qualifier("oddEvenSupplier-out-0")
- MessageChannel outputDestination;
-
- @Autowired
- MessageCollector collector;
+ private OutputDestination output;
@Test
void testMessages() {
- BlockingQueue> messages = this.collector.forChannel(this.outputDestination);
+ Message result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("odd");
+
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("even");
+
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("odd");
- assertThat(messages, receivesPayloadThat(is("odd")));
- assertThat(messages, receivesPayloadThat(is("even")));
- assertThat(messages, receivesPayloadThat(is("odd")));
- assertThat(messages, receivesPayloadThat(is("even")));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("even");
}
}
diff --git a/transaction-kafka-samples/build.gradle.kts b/transaction-kafka-samples/build.gradle.kts
new file mode 100644
index 00000000..d959a664
--- /dev/null
+++ b/transaction-kafka-samples/build.gradle.kts
@@ -0,0 +1 @@
+// Aggregator module — no build configuration needed
diff --git a/transaction-kafka-samples/pom.xml b/transaction-kafka-samples/pom.xml
index 66b2789e..17858610 100644
--- a/transaction-kafka-samples/pom.xml
+++ b/transaction-kafka-samples/pom.xml
@@ -1,6 +1,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
transaction-kafka-samples
0.0.1-SNAPSHOT
diff --git a/transaction-kafka-samples/transaction-http-source/build.gradle.kts b/transaction-kafka-samples/transaction-http-source/build.gradle.kts
new file mode 100644
index 00000000..758aa57a
--- /dev/null
+++ b/transaction-kafka-samples/transaction-http-source/build.gradle.kts
@@ -0,0 +1,9 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/transaction-kafka-samples/transaction-http-source/pom.xml b/transaction-kafka-samples/transaction-http-source/pom.xml
index d884ffdc..7487913e 100644
--- a/transaction-kafka-samples/transaction-http-source/pom.xml
+++ b/transaction-kafka-samples/transaction-http-source/pom.xml
@@ -9,28 +9,12 @@
Transaction http source
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ transaction-kafka-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -56,7 +40,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -69,54 +53,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/transaction-kafka-samples/transaction-logger-sink/build.gradle.kts b/transaction-kafka-samples/transaction-logger-sink/build.gradle.kts
new file mode 100644
index 00000000..758aa57a
--- /dev/null
+++ b/transaction-kafka-samples/transaction-logger-sink/build.gradle.kts
@@ -0,0 +1,9 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/transaction-kafka-samples/transaction-logger-sink/pom.xml b/transaction-kafka-samples/transaction-logger-sink/pom.xml
index 1fa3dc26..d41a580a 100644
--- a/transaction-kafka-samples/transaction-logger-sink/pom.xml
+++ b/transaction-kafka-samples/transaction-logger-sink/pom.xml
@@ -9,28 +9,12 @@
Schema Registry Consumer
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ transaction-kafka-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -56,7 +40,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -69,54 +53,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/build.gradle.kts b/transaction-kafka-samples/transaction-spring-data-processor/build.gradle.kts
new file mode 100644
index 00000000..c1030df7
--- /dev/null
+++ b/transaction-kafka-samples/transaction-spring-data-processor/build.gradle.kts
@@ -0,0 +1,11 @@
+plugins {
+ alias(libs.plugins.spring.boot)
+}
+
+dependencies {
+ implementation(platform(libs.spring.cloud.dependencies))
+ implementation(libs.spring.cloud.stream.binder.kafka)
+ implementation(libs.spring.boot.starter.data.jpa)
+ runtimeOnly(libs.mariadb.java.client)
+ testImplementation(libs.spring.kafka.test)
+}
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/pom.xml b/transaction-kafka-samples/transaction-spring-data-processor/pom.xml
index e01d7512..4b02861e 100644
--- a/transaction-kafka-samples/transaction-spring-data-processor/pom.xml
+++ b/transaction-kafka-samples/transaction-spring-data-processor/pom.xml
@@ -10,28 +10,12 @@
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ transaction-kafka-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -57,7 +41,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
org.springframework.boot
@@ -66,7 +50,7 @@
org.mariadb.jdbc
mariadb-java-client
- 1.1.9
+ 3.5.1
runtime
@@ -80,54 +64,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java
index 51d76405..fdb79294 100644
--- a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java
+++ b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java
@@ -18,14 +18,14 @@
import java.util.Objects;
import java.util.StringJoiner;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.UniqueConstraint;
-import javax.validation.constraints.Max;
+import jakarta.persistence.Column;
+import jakarta.persistence.Entity;
+import jakarta.persistence.GeneratedValue;
+import jakarta.persistence.GenerationType;
+import jakarta.persistence.Id;
+import jakarta.persistence.Table;
+import jakarta.persistence.UniqueConstraint;
+import jakarta.validation.constraints.Max;
import org.hibernate.validator.constraints.Length;
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java
index c0a28ece..4f35756f 100644
--- a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java
+++ b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java
@@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
-import javax.transaction.Transactional;
+import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;