diff --git a/batch-producer-consumer/pom.xml b/batch-producer-consumer/pom.xml
index 06517775..c945c9c2 100644
--- a/batch-producer-consumer/pom.xml
+++ b/batch-producer-consumer/pom.xml
@@ -16,7 +16,6 @@
Sample app using stream functions with batch producer and consumer using kafka
17
- 2023.0.0-SNAPSHOT
@@ -48,18 +47,6 @@
test
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
@@ -79,69 +66,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
-
diff --git a/confluent-schema-registry-integration/pom.xml b/confluent-schema-registry-integration/pom.xml
index 02148987..dcbc120f 100644
--- a/confluent-schema-registry-integration/pom.xml
+++ b/confluent-schema-registry-integration/pom.xml
@@ -55,7 +55,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
org.springframework.cloud
@@ -88,68 +88,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
index 4a60fd53..e8287bd9 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/pom.xml
@@ -42,14 +42,27 @@
org.testcontainers
- couchbase
+ testcontainers-couchbase
test
org.testcontainers
- junit-jupiter
+ testcontainers-junit-jupiter
test
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java
index 8bae6e9b..b14b4d8e 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/main/java/io/spring/example/couchbase/consumer/CouchbaseConsumerProperties.java
@@ -16,7 +16,7 @@
package io.spring.example.couchbase.consumer;
-import javax.validation.constraints.NotNull;
+import jakarta.validation.constraints.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.expression.Expression;
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
index 18fb618b..ac3790c9 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-consumer/src/test/java/io/spring/example/couchbase/consumer/CouchbaseConsumerTests.java
@@ -21,7 +21,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import javax.annotation.PreDestroy;
+import jakarta.annotation.PreDestroy;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc
index 8ab130fe..5c3cb03c 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/README.adoc
@@ -4,19 +4,4 @@ The **$$couchbase$$** $$sink$$ has the following options:
//tag::configuration-properties[]
-Properties grouped by prefix:
-
-
-=== couchbase.consumer
-
-$$bucket-expression$$:: $$A SpEL expression to specify the bucket.$$ *($$Expression$$, default: `$$$$`)*
-$$collection-expression$$:: $$A SpEL expression to specify the collection.$$ *($$Expression$$, default: `$$$$`)*
-$$key-expression$$:: $$A SpEL expression to specify the key.$$ *($$Expression$$, default: `$$$$`)*
-$$value-expression$$:: $$A SpEL expression to specify the value (default is payload).$$ *($$Expression$$, default: `$$$$`)*
-
-=== spring.couchbase
-
-$$connection-string$$:: $$Connection string used to locate the Couchbase cluster.$$ *($$String$$, default: `$$$$`)*
-$$password$$:: $$Cluster password.$$ *($$String$$, default: `$$$$`)*
-$$username$$:: $$Cluster username.$$ *($$String$$, default: `$$$$`)*
//end::configuration-properties[]
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
index b8f09b4d..b3629aec 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/couchbase-sink/pom.xml
@@ -39,24 +39,22 @@
org.springframework.cloud
- spring-cloud-stream
- test-jar
- test-binder
+ spring-cloud-stream-test-binder
test
org.testcontainers
- couchbase
+ testcontainers-couchbase
test
org.testcontainers
- kafka
+ testcontainers-kafka
test
org.testcontainers
- junit-jupiter
+ testcontainers-junit-jupiter
test
diff --git a/function-based-stream-app-samples/couchbase-stream-applications/pom.xml b/function-based-stream-app-samples/couchbase-stream-applications/pom.xml
index 618ed324..299fce7a 100644
--- a/function-based-stream-app-samples/couchbase-stream-applications/pom.xml
+++ b/function-based-stream-app-samples/couchbase-stream-applications/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ function-based-stream-app-samples
+ 0.0.1-SNAPSHOT
+ ..
io.spring.example
couchbase-stream-applications
@@ -16,11 +16,7 @@
Demo Stream Applications project for Couchhbase
- 8
- 2020.0.2
- 1.86
- 3.0.10
- 1.15.1
+ 3.7.5
@@ -36,48 +32,9 @@
java-client
${couchbase-java-client.version}
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
- org.testcontainers
- testcontainers-bom
- ${test-containers.version}
- pom
- import
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
-
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
-
-
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml
index 25d70301..e4ae7d62 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-processor/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ image-thumbnail-samples
+ 0.0.1-SNAPSHOT
+ ..
io.spring.cloud.stream.sample
image-thumbnail-processor
@@ -14,10 +14,6 @@
image-thumbnail-processor
Thumbnail processor
-
- 8
-
-
org.junit.jupiter
@@ -38,4 +34,17 @@
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml
index 5519d430..6d18670e 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-stream-sample/image-thumbnail-sink/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ image-thumbnail-samples
+ 0.0.1-SNAPSHOT
+ ../..
io.spring.cloud.stream.sample
image-thumbnail-sink
@@ -14,8 +14,6 @@
image-thumbnail-sink
Thumbnail Sink Application
- 8
- 2020.0.2
demo
@@ -46,12 +44,6 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
org.awaitility
@@ -60,9 +52,7 @@
org.springframework.cloud
- spring-cloud-stream
- test-jar
- test-binder
+ spring-cloud-stream-test-binder
test
@@ -70,17 +60,6 @@
reactor-test
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
@@ -109,46 +88,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestonl
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
- false
-
-
-
+
\ No newline at end of file
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml
index a94d7e6b..9f54b59f 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/image-thumbnail-web/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ image-thumbnail-samples
+ 0.0.1-SNAPSHOT
+ ..
io.spring.cloud.stream.sample
image-thumbnail-web
@@ -14,10 +14,6 @@
image-thumbnail-web
Demo project for Spring Boot
-
- 8
-
-
org.springframework.cloud.fn
@@ -44,12 +40,6 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
@@ -62,47 +52,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
diff --git a/function-based-stream-app-samples/image-thumbnail-samples/pom.xml b/function-based-stream-app-samples/image-thumbnail-samples/pom.xml
index a08cd26c..f6d58a03 100644
--- a/function-based-stream-app-samples/image-thumbnail-samples/pom.xml
+++ b/function-based-stream-app-samples/image-thumbnail-samples/pom.xml
@@ -2,6 +2,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ function-based-stream-app-samples
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
image-thumbnail-samples
0.0.1-SNAPSHOT
@@ -9,10 +15,6 @@
Samples using HTTP Request Function
pom
-
- 8
-
-
image-thumbnail-web
image-thumbnail-stream-sample/image-thumbnail-sink
diff --git a/function-based-stream-app-samples/pom.xml b/function-based-stream-app-samples/pom.xml
index e04fe1da..6829b9d3 100644
--- a/function-based-stream-app-samples/pom.xml
+++ b/function-based-stream-app-samples/pom.xml
@@ -2,6 +2,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
function-based-stream-app-samples
0.0.1-SNAPSHOT
@@ -9,10 +15,6 @@
Samples using Functions and Function-based stream applications
pom
-
- 8
-
-
image-thumbnail-samples
couchbase-stream-applications
diff --git a/kafka-batch-sample/pom.xml b/kafka-batch-sample/pom.xml
index 3e193f0e..6d3de590 100644
--- a/kafka-batch-sample/pom.xml
+++ b/kafka-batch-sample/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.3.3.RELEASE
-
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
com.example
kafka-batch-sample
@@ -14,10 +14,6 @@
kafka-batch-sample
Demo project for Spring Boot
-
- 1.8
- Hoxton.SR8
-
@@ -37,16 +33,10 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -56,18 +46,6 @@
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
diff --git a/kafka-batch-sample/src/main/resources/application.yml b/kafka-batch-sample/src/main/resources/application.yml
index 8696e7bc..3d6f4c26 100644
--- a/kafka-batch-sample/src/main/resources/application.yml
+++ b/kafka-batch-sample/src/main/resources/application.yml
@@ -17,7 +17,9 @@ spring:
---
spring:
- profiles: transactional
+ config:
+ activate:
+ on-profile: transactional
kafka:
consumer:
properties:
@@ -32,7 +34,9 @@ spring:
---
spring:
- profiles: batch-produce
+ config:
+ activate:
+ on-profile: batch-produce
cloud:
stream:
bindings:
diff --git a/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java b/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java
index 38e0aed1..44f31bfe 100644
--- a/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java
+++ b/kafka-batch-sample/src/test/java/com/example/demo/KafkaBatchSampleApplicationTests.java
@@ -2,8 +2,10 @@
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
+@EmbeddedKafka(topics = {"batch-in", "batch-out"}, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class KafkaBatchSampleApplicationTests {
@Test
diff --git a/kafka-binder-native-app/pom.xml b/kafka-binder-native-app/pom.xml
index 4d1ab0ed..c47f23a1 100644
--- a/kafka-binder-native-app/pom.xml
+++ b/kafka-binder-native-app/pom.xml
@@ -71,69 +71,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
-
-
diff --git a/kafka-e2e-kotlin-sample/customer-service/pom.xml b/kafka-e2e-kotlin-sample/customer-service/pom.xml
index 642b83cc..ba24a731 100644
--- a/kafka-e2e-kotlin-sample/customer-service/pom.xml
+++ b/kafka-e2e-kotlin-sample/customer-service/pom.xml
@@ -9,32 +9,17 @@
Customer Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-e2e-kotlin-sample
+ 0.0.1-SNAPSHOT
+ ..
- 1.8
- 1.3.31
1.11.3
5.2.1
- 2020.0.2
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
io.confluent
@@ -64,19 +49,18 @@
org.springframework.cloud
- spring-cloud-starter-stream-kafka
+ spring-cloud-stream-binder-kafka
org.springframework.cloud
- spring-cloud-stream-schema
- 3.0.0.BUILD-SNAPSHOT
+ spring-cloud-stream-schema-registry-client
org.springframework.boot
spring-boot-starter-webflux
- com.fasterxml.jackson.module
+ tools.jackson.module
jackson-module-kotlin
@@ -85,7 +69,7 @@
org.jetbrains.kotlin
- kotlin-stdlib-jdk8
+ kotlin-stdlib
org.springframework.cloud
@@ -104,7 +88,7 @@
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -117,7 +101,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -169,30 +153,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
diff --git a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt
index 02096656..f0bfdd03 100644
--- a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt
+++ b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/CustomerServiceApplication.kt
@@ -18,13 +18,10 @@ package kafka.e2e.customer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
-import org.springframework.cloud.stream.annotation.EnableBinding
-import org.springframework.cloud.stream.messaging.Source
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
-@EnableBinding(Source::class)
@SpringBootApplication
class CustomerServiceApplication
diff --git a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt
index 3f244f95..5f86471f 100644
--- a/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt
+++ b/kafka-e2e-kotlin-sample/customer-service/src/main/kotlin/kafka/e2e/customer/service/CustomerServiceImpl.kt
@@ -17,19 +17,19 @@
package kafka.e2e.customer.service
import kafka.e2e.customer.Customer
-import org.springframework.cloud.stream.messaging.Source
+import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Service
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
@Service
-class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
+class CustomerServiceImpl(private val streamBridge: StreamBridge) : CustomerService {
override fun save(customer: Customer) {
- val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.getId()).build()
- customerKafkaProducer.output().send(message)
+ val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.KEY, customer.getId()).build()
+ streamBridge.send("output", message)
}
}
diff --git a/kafka-e2e-kotlin-sample/order-service/pom.xml b/kafka-e2e-kotlin-sample/order-service/pom.xml
index 36e0858c..9b56f12e 100644
--- a/kafka-e2e-kotlin-sample/order-service/pom.xml
+++ b/kafka-e2e-kotlin-sample/order-service/pom.xml
@@ -9,32 +9,17 @@
Order Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-e2e-kotlin-sample
+ 0.0.1-SNAPSHOT
+ ..
- 1.8
- 1.3.31
1.11.3
5.2.1
- 2020.0.2
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
io.confluent
@@ -64,19 +49,18 @@
org.springframework.cloud
- spring-cloud-starter-stream-kafka
+ spring-cloud-stream-binder-kafka
org.springframework.cloud
- spring-cloud-stream-schema
- 3.0.0.BUILD-SNAPSHOT
+ spring-cloud-stream-schema-registry-client
org.springframework.boot
spring-boot-starter-webflux
- com.fasterxml.jackson.module
+ tools.jackson.module
jackson-module-kotlin
@@ -85,7 +69,7 @@
org.jetbrains.kotlin
- kotlin-stdlib-jdk8
+ kotlin-stdlib
org.springframework.cloud
@@ -104,7 +88,7 @@
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -157,30 +141,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
diff --git a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt
index ec304559..2764ec0d 100644
--- a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt
+++ b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/OrderServiceApplication.kt
@@ -18,15 +18,10 @@ package kafka.e2e.order
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
-import org.springframework.cloud.stream.annotation.EnableBinding
-import org.springframework.cloud.stream.messaging.Source
-import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
-@EnableSchemaRegistryClient
-@EnableBinding(Source::class)
@SpringBootApplication
class OrderServiceApplication
diff --git a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt
index d61515ed..ddfbcfef 100644
--- a/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt
+++ b/kafka-e2e-kotlin-sample/order-service/src/main/kotlin/kafka/e2e/order/producer/OrderProducer.kt
@@ -18,20 +18,21 @@ package kafka.e2e.order.producer
import kafka.e2e.order.OrderCreatedEvent
import kafka.e2e.order.dto.Order
-import org.springframework.cloud.stream.messaging.Source
+import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
@Component
-class OrderProducer(private val source: Source) {
+class OrderProducer(private val streamBridge: StreamBridge) {
fun publishOrderCreatedEvent(order: Order) {
- source.output().send(MessageBuilder.withPayload(OrderCreatedEvent(order.id, order.productId, order.customerId))
- .setHeader(KafkaHeaders.MESSAGE_KEY, order.id).build())
+ val message = MessageBuilder.withPayload(OrderCreatedEvent(order.id, order.productId, order.customerId))
+ .setHeader(KafkaHeaders.KEY, order.id).build()
+ streamBridge.send("output", message)
}
}
diff --git a/kafka-e2e-kotlin-sample/pom.xml b/kafka-e2e-kotlin-sample/pom.xml
index 0036d4c5..763c84b9 100644
--- a/kafka-e2e-kotlin-sample/pom.xml
+++ b/kafka-e2e-kotlin-sample/pom.xml
@@ -1,6 +1,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
kafka-e2e-kotlin-sample
0.0.1-SNAPSHOT
diff --git a/kafka-e2e-kotlin-sample/shipping-service/pom.xml b/kafka-e2e-kotlin-sample/shipping-service/pom.xml
index 8beacb2f..719ac8d4 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/pom.xml
+++ b/kafka-e2e-kotlin-sample/shipping-service/pom.xml
@@ -9,32 +9,17 @@
Shipping Service
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-e2e-kotlin-sample
+ 0.0.1-SNAPSHOT
+ ..
- 1.8
- 1.3.31
1.11.3
5.2.1
- 2020.0.2
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
io.confluent
@@ -68,15 +53,14 @@
org.springframework.cloud
- spring-cloud-stream-schema
- 3.0.0.BUILD-SNAPSHOT
+ spring-cloud-stream-schema-registry-client
org.springframework.boot
spring-boot-starter-webflux
- com.fasterxml.jackson.module
+ tools.jackson.module
jackson-module-kotlin
@@ -85,7 +69,7 @@
org.jetbrains.kotlin
- kotlin-stdlib-jdk8
+ kotlin-stdlib
org.springframework.cloud
@@ -104,7 +88,7 @@
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
@@ -117,7 +101,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -169,30 +153,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt
index ae6c2ddb..9d74e1ab 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt
+++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/ShippingServiceApplication.kt
@@ -16,15 +16,12 @@
package kafka.e2e.shipping
-import kafka.e2e.shipping.stream.ShippingKStreamProcessor
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
-import org.springframework.cloud.stream.annotation.EnableBinding
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
-@EnableBinding(ShippingKStreamProcessor::class)
@SpringBootApplication
class ShippingServiceApplication
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt
index a06773a0..0d8c4b8e 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt
+++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt
@@ -27,46 +27,46 @@ import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.*
import org.apache.kafka.streams.state.KeyValueStore
-import org.springframework.cloud.stream.annotation.Input
-import org.springframework.cloud.stream.annotation.StreamListener
+import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.messaging.handler.annotation.SendTo
+import java.util.function.BiFunction
/**
- * @author José A. Íñigo
+ * @author Jose A. Inigo
*/
@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {
- @StreamListener
- @SendTo("output")
- fun process(@Input("input") input: KStream, @Input("order") orderEvent: KStream): KStream {
+ @Bean
+ fun process(): BiFunction, KStream, KStream> {
+ return BiFunction { input, orderEvent ->
- val serdeConfig = mapOf(
- AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
+ val serdeConfig = mapOf(
+ AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
- val intSerde = Serdes.IntegerSerde()
- val customerSerde = SpecificAvroSerde()
- customerSerde.configure(serdeConfig, false)
- val orderCreatedSerde = SpecificAvroSerde()
- orderCreatedSerde.configure(serdeConfig, false)
- val orderShippedSerde = SpecificAvroSerde()
- orderShippedSerde.configure(serdeConfig, false)
+ val intSerde = Serdes.IntegerSerde()
+ val customerSerde = SpecificAvroSerde()
+ customerSerde.configure(serdeConfig, false)
+ val orderCreatedSerde = SpecificAvroSerde()
+ orderCreatedSerde.configure(serdeConfig, false)
+ val orderShippedSerde = SpecificAvroSerde()
+ orderShippedSerde.configure(serdeConfig, false)
- val stateStore: Materialized> =
- Materialized.`as`>("customer-store")
+ val stateStore: Materialized> =
+ Materialized.`as`>("customer-store")
- val customerTable: KTable = input.groupByKey()
- .reduce({ _, y -> y }, stateStore)
+ val customerTable: KTable = input.groupByKey()
+ .reduce({ _, y -> y }, stateStore)
- return (orderEvent.filter { _, value -> value.schema.name == "OrderCreatedEvent" }
- .map { key, value -> KeyValue(key, OrderCreatedEvent(value.get("id") as Int, value.get("productId") as Int, value.get("customerId") as Int)) }
- .selectKey { _, value -> value.customerId } as KStream)
- .join(customerTable, { orderIt, customer ->
- OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
- }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
- .selectKey { _, value -> value.id }
+ (orderEvent.filter { _, value -> value.schema.name == "OrderCreatedEvent" }
+ .map { key, value -> KeyValue(key, OrderCreatedEvent(value.get("id") as Int, value.get("productId") as Int, value.get("customerId") as Int)) }
+ .selectKey { _, value -> value.customerId } as KStream)
+ .join(customerTable, { orderIt, customer ->
+ OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
+ }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
+ .selectKey { _, value -> value.id }
+ }
}
}
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamProcessor.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamProcessor.kt
deleted file mode 100644
index 0c8d2edc..00000000
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamProcessor.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2019 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.e2e.shipping.stream
-
-import kafka.e2e.customer.Customer
-import kafka.e2e.order.OrderCreatedEvent
-import kafka.e2e.shipping.OrderShippedEvent
-import org.apache.kafka.streams.kstream.KStream
-import org.springframework.cloud.stream.annotation.Input
-import org.springframework.cloud.stream.annotation.Output
-
-/**
- * @author José A. Íñigo
- */
-interface ShippingKStreamProcessor {
-
- @Input("input")
- fun input(): KStream
-
- @Input("order")
- fun order(): KStream
-
- @Output("output")
- fun output(): KStream
-
-}
diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml
index 4d4e68b3..5eb1e36f 100644
--- a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml
+++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml
@@ -3,6 +3,8 @@ spring:
name: shipping-service
cloud:
stream:
+ function:
+ definition: process
kafka:
streams:
binder:
@@ -17,21 +19,21 @@ spring:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
- input:
+ process-in-0:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
- order:
+ process-in-1:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
- output:
+ process-out-0:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
- input:
+ process-in-0:
destination: customer
- order:
+ process-in-1:
destination: order
- output:
+ process-out-0:
destination: order
server:
diff --git a/kafka-native-serialization/pom.xml b/kafka-native-serialization/pom.xml
index 659fd5a5..a98124dc 100644
--- a/kafka-native-serialization/pom.xml
+++ b/kafka-native-serialization/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
com.example
kafka-native-serialization
@@ -14,10 +14,6 @@
kafka-native-serialization
Demo project for Spring Boot
-
- 1.8
- 3.1.2
-
@@ -26,7 +22,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
org.springframework.cloud
@@ -51,25 +47,8 @@
spring-kafka-test
test
-
- org.junit.vintage
- junit-vintage-engine
- test
-
-
-
-
- org.springframework.cloud
- spring-cloud-stream-dependencies
- ${spring-cloud-stream.version}
- pom
- import
-
-
-
-
@@ -79,55 +58,4 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
-
diff --git a/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java b/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java
index 0212eca0..40965bd0 100644
--- a/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java
+++ b/kafka-native-serialization/src/test/java/com/example/kafkanativeserialization/KafkaNativeSerializationApplicationTests.java
@@ -24,46 +24,41 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
-import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
+@EmbeddedKafka(topics = "topic2", count = 1,
+ bootstrapServersProperty = "spring.cloud.stream.kafka.binder.brokers")
@SpringBootTest
-@RunWith(SpringRunner.class)
public class KafkaNativeSerializationApplicationTests {
private static final String INPUT_TOPIC = "topic1";
private static final String OUTPUT_TOPIC = "topic2";
private static final String GROUP_NAME = "nativeSerializationTest";
- @ClassRule
- public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, OUTPUT_TOPIC);
-
- @BeforeClass
- public static void setup() {
- System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
- }
+ @Autowired
+ EmbeddedKafkaBroker embeddedKafka;
@Test
public void testSendReceive() {
- Map senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
+ Map senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put("value.serializer", StringSerializer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(INPUT_TOPIC);
template.sendDefault("foo");
- Map consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka.getEmbeddedKafka());
+ Map consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("value.deserializer", MyJsonDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
diff --git a/kafka-security-samples/kafka-ssl-demo/pom.xml b/kafka-security-samples/kafka-ssl-demo/pom.xml
index 2652d33a..51b509f3 100644
--- a/kafka-security-samples/kafka-ssl-demo/pom.xml
+++ b/kafka-security-samples/kafka-ssl-demo/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-security-samples
+ 0.0.1-SNAPSHOT
+ ..
kafka.ssl.demo
kafka-ssl-demo
@@ -14,22 +14,6 @@
kafka-ssl-demo
Demo project for Spring Boot
-
- 3.1.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-stream-dependencies
- ${spring-cloud-stream.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -48,16 +32,10 @@
org.springframework.boot
spring-boot-starter-test
test
-
-
- org.junit.vintage
- junit-vintage-engine
-
-
org.springframework.cloud
- spring-cloud-stream-test-support
+ spring-cloud-stream-test-binder
test
diff --git a/kafka-security-samples/pom.xml b/kafka-security-samples/pom.xml
index 8f0de228..4f5d1234 100644
--- a/kafka-security-samples/pom.xml
+++ b/kafka-security-samples/pom.xml
@@ -2,6 +2,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
kafka-security-samples
0.0.1-SNAPSHOT
diff --git a/kafka-streams-samples/kafka-streams-aggregate/pom.xml b/kafka-streams-samples/kafka-streams-aggregate/pom.xml
index 165fa67b..d653d57a 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/pom.xml
+++ b/kafka-streams-samples/kafka-streams-aggregate/pom.xml
@@ -11,27 +11,12 @@
Demo project for Spring Boot
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-streams-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
@@ -65,12 +50,7 @@
org.springframework.boot
- spring-boot-starter-web
-
-
- org.junit.vintage
- junit-vintage-engine
- test
+ spring-boot-starter-webmvc
@@ -84,55 +64,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java
index 15dde85c..50e4dc0b 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java
+++ b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/KafkaStreamsAggregateSample.java
@@ -18,7 +18,6 @@
import java.util.function.Consumer;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -52,8 +51,7 @@ public static class KafkaStreamsAggregateSampleApplication {
@Bean
public Consumer> aggregate() {
- ObjectMapper mapper = new ObjectMapper();
- Serde domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
+ Serde domainEventSerde = new JsonSerde<>(DomainEvent.class);
return input -> input
.groupBy(
diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java
index c3b6d43f..51d5f440 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java
+++ b/kafka-streams-samples/kafka-streams-aggregate/src/main/java/kafka/streams/table/join/Producers.java
@@ -16,7 +16,6 @@
package kafka.streams.table.join;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -34,8 +33,7 @@ public class Producers {
public static void main(String... args) {
- ObjectMapper mapper = new ObjectMapper();
- Serde domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);
+ Serde domainEventSerde = new JsonSerde<>(DomainEvent.class);
Map props = new HashMap<>();
diff --git a/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java b/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java
index e6ba52c8..dde33b08 100644
--- a/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java
+++ b/kafka-streams-samples/kafka-streams-aggregate/src/test/java/kafka/streams/table/join/KafkaStreamsAggregateSampleTests.java
@@ -1,43 +1,39 @@
package kafka.streams.table.join;
+import java.time.Duration;
import java.util.Map;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.apache.kafka.streams.KafkaStreams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
-import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
+import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
-import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
-@RunWith(SpringRunner.class)
+@EmbeddedKafka(topics = "foobar", count = 1,
+ bootstrapServersProperty = "spring.cloud.stream.kafka.streams.binder.brokers")
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaStreamsAggregateSampleTests {
- @ClassRule
- public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "foobar");
-
- private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
+ @Autowired
+ EmbeddedKafkaBroker embeddedKafka;
@Autowired
StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@@ -45,26 +41,15 @@ public class KafkaStreamsAggregateSampleTests {
@LocalServerPort
int randomServerPort;
- @Before
+ @BeforeEach
public void before() {
streamsBuilderFactoryBean.setCloseTimeout(0);
}
- @BeforeClass
- public static void setUp() {
- System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
- }
-
- @AfterClass
- public static void tearDown() {
- System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
- }
-
@Test
public void testKafkaStreamsWordCountProcessor() throws Exception {
Map senderProps = KafkaTestUtils.producerProps(embeddedKafka);
- ObjectMapper mapper = new ObjectMapper();
- Serde domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);
+ Serde domainEventSerde = new JsonSerde<>(DomainEvent.class);
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, domainEventSerde.serializer().getClass());
@@ -72,7 +57,6 @@ public void testKafkaStreamsWordCountProcessor() throws Exception {
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
-
KafkaTemplate template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foobar");
@@ -81,13 +65,23 @@ public void testKafkaStreamsWordCountProcessor() throws Exception {
ddEvent.setEventType("create-domain-event");
template.sendDefault("", ddEvent);
- Thread.sleep(1000);
+
+ // Wait for Kafka Streams to reach RUNNING state before querying the state store
+ await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofMillis(500)).until(() -> {
+ KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
+ return kafkaStreams != null && kafkaStreams.state() == KafkaStreams.State.RUNNING;
+ });
+
RestTemplate restTemplate = new RestTemplate();
String fooResourceUrl
= "http://localhost:" + randomServerPort + "/events";
- ResponseEntity response
- = restTemplate.getForEntity(fooResourceUrl, String.class);
- assertThat(response.getBody()).contains("create-domain-event");
+
+ // Poll for the result since the state store may need time to process the record
+ await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofMillis(500)).untilAsserted(() -> {
+ ResponseEntity response
+ = restTemplate.getForEntity(fooResourceUrl, String.class);
+ assertThat(response.getBody()).contains("create-domain-event");
+ });
}
finally {
pf.destroy();
diff --git a/kafka-streams-samples/kafka-streams-avro/pom.xml b/kafka-streams-samples/kafka-streams-avro/pom.xml
index 1ffa3316..997d09f7 100644
--- a/kafka-streams-samples/kafka-streams-avro/pom.xml
+++ b/kafka-streams-samples/kafka-streams-avro/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-streams-samples
+ 0.0.1-SNAPSHOT
+ ..
com.example
kafka-streams-avro
@@ -14,9 +14,7 @@
kafka-streams-avro
Demo project for Spring Boot
- 8
- 2020.0.2
- 5.2.0
+ 7.9.0
1.11.3
@@ -31,7 +29,6 @@
org.springframework.cloud
spring-cloud-stream-binder-kafka-streams
- 3.1.3-SNAPSHOT
@@ -46,10 +43,8 @@
org.springframework.cloud
- spring-cloud-stream
+ spring-cloud-stream-test-binder
test
- test-binder
- test-jar
org.apache.avro
@@ -78,18 +73,6 @@
${confluent.version}
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
diff --git a/kafka-streams-samples/kafka-streams-branching/pom.xml b/kafka-streams-samples/kafka-streams-branching/pom.xml
index 690c8d1a..0fb480f9 100644
--- a/kafka-streams-samples/kafka-streams-branching/pom.xml
+++ b/kafka-streams-samples/kafka-streams-branching/pom.xml
@@ -11,27 +11,12 @@
Demo project for Spring Boot
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ kafka-streams-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
@@ -65,12 +50,7 @@
org.springframework.boot
- spring-boot-starter-web
-
-
- org.junit.vintage
- junit-vintage-engine
- test
+ spring-boot-starter-webmvc
@@ -83,54 +63,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java b/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java
index 5d3e87f8..4c0c8bec 100644
--- a/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java
+++ b/kafka-streams-samples/kafka-streams-branching/src/main/java/kafka/streams/branching/KafkaStreamsBranchingSample.java
@@ -19,12 +19,14 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
+import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -43,19 +45,29 @@ public static class WordCountProcessorApplication {
@SuppressWarnings("unchecked")
public Function, KStream, WordCount>[]> process() {
- Predicate
-
- org.junit.vintage
- junit-vintage-engine
- test
-
-
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
diff --git a/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java b/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java
index 1cec6021..5248c103 100644
--- a/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java
+++ b/testing-samples/testing-demo/src/main/java/org/springframework/cloud/stream/testing/sink/JdbcSink.java
@@ -24,7 +24,6 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageHandler;
@@ -40,7 +39,7 @@ public class JdbcSink {
@Bean
public IntegrationFlow jdbcConsumerFlow() {
- return IntegrationFlows.from(Consumer.class, (gateway) -> gateway.beanName("jdbcConsumer"))
+ return IntegrationFlow.from(Consumer.class, (gateway) -> gateway.beanName("jdbcConsumer"))
.handle(jdbcHandler(null))
.get();
}
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java
index e5437556..faf0cbb1 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/NaiveToUpperCaseTests.java
@@ -16,7 +16,7 @@
package org.springframework.cloud.stream.testing.processor;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java
index 7c808159..88954045 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/ToUpperCaseProcessorTests.java
@@ -16,36 +16,24 @@
package org.springframework.cloud.stream.testing.processor;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
-import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesMessageThat;
-import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
-import static org.springframework.integration.test.matcher.PayloadAndHeaderMatcher.sameExceptIgnorableHeaders;
+import static org.assertj.core.api.Assertions.assertThat;
-import java.util.concurrent.BlockingQueue;
-
-import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
-import org.springframework.http.MediaType;
-import org.springframework.integration.support.MessageBuilder;
+import org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration;
+import org.springframework.cloud.stream.binder.test.InputDestination;
+import org.springframework.cloud.stream.binder.test.OutputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.util.MimeType;
/**
* The Spring Boot-base test-case to demonstrate how can we test Spring Cloud Stream applications
@@ -54,7 +42,8 @@
* @author Artem Bilan
*
*/
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
+ classes = { ToUpperCaseProcessor.class, TestChannelBinderConfiguration.class })
@ImportAutoConfiguration(exclude = {
KafkaAutoConfiguration.class,
KafkaMetricsAutoConfiguration.class,
@@ -65,48 +54,37 @@
class ToUpperCaseProcessorTests {
@Autowired
- @Qualifier("uppercaseFunction-in-0")
- private MessageChannel input;
-
- @Autowired
- @Qualifier("uppercaseFunction-out-0")
- private MessageChannel output;
+ private InputDestination input;
@Autowired
- private MessageCollector collector;
+ private OutputDestination output;
@Test
- @SuppressWarnings("unchecked")
void testMessages() {
this.input.send(new GenericMessage<>("odd"));
- this.input.send(new GenericMessage<>("even"));
- this.input.send(new GenericMessage<>("odd meets even"));
- this.input.send(new GenericMessage<>("nothing but the best test"));
-
- BlockingQueue> messages = this.collector.forChannel(this.output);
+ Message result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("ODD");
- assertThat(messages, receivesPayloadThat(is("ODD")));
- assertThat(messages, receivesPayloadThat(is("EVEN")));
- assertThat(messages, receivesPayloadThat(is("ODD MEETS EVEN")));
- assertThat(messages, receivesPayloadThat(not("nothing but the best test")));
-
- Message testMessage =
- MessageBuilder.withPayload("headers")
- .setHeader("odd", "even")
- .build();
-
- input.send(testMessage);
-
- Message expected =
- MessageBuilder.withPayload("HEADERS")
- .copyHeaders(testMessage.getHeaders())
- .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
- .build();
+ this.input.send(new GenericMessage<>("even"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("EVEN");
- Matcher> sameExceptIgnorableHeaders =
- (Matcher>) (Matcher>) sameExceptIgnorableHeaders(expected, "accept");
+ this.input.send(new GenericMessage<>("odd meets even"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("ODD MEETS EVEN");
- assertThat(messages, receivesMessageThat(sameExceptIgnorableHeaders));
+ this.input.send(new GenericMessage<>("nothing but the best test"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isNotEqualTo("nothing but the best test");
+
+ this.input.send(new GenericMessage<>("headers"));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("HEADERS");
}
}
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java
index 1ea61e23..b8d65a84 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/processor/integration/ToUpperCaseProcessorIntTests.java
@@ -27,11 +27,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
+import org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration;
import org.springframework.cloud.stream.testing.processor.ToUpperCaseProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -58,7 +57,6 @@
classes = ToUpperCaseProcessor.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ImportAutoConfiguration(exclude = {
- TestSupportBinderAutoConfiguration.class,
DataSourceAutoConfiguration.class,
TransactionAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class })
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java
index 27059ecd..0dc76093 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/sink/JdbcSinkTests.java
@@ -30,11 +30,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.test.mock.MockIntegration;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -66,7 +66,7 @@ class JdbcSinkTests {
@Autowired
private JdbcTemplate jdbcTemplate;
- @SpyBean(name = "jdbcHandler")
+ @MockitoSpyBean(name = "jdbcHandler")
private MessageHandler jdbcMessageHandler;
@Test
diff --git a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java
index d642dc3f..92eb9d58 100644
--- a/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java
+++ b/testing-samples/testing-demo/src/test/java/org/springframework/cloud/stream/testing/source/OddEvenSourceTests.java
@@ -17,26 +17,21 @@
package org.springframework.cloud.stream.testing.source;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
-
-import java.util.concurrent.BlockingQueue;
+import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
+import org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.kafka.autoconfigure.metrics.KafkaMetricsAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration;
+import org.springframework.cloud.stream.binder.test.OutputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
/**
@@ -48,7 +43,8 @@
*/
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
- properties = "spring.cloud.stream.poller.fixed-delay=1")
+ properties = "spring.cloud.stream.poller.fixed-delay=1",
+ classes = { OddEvenSource.class, TestChannelBinderConfiguration.class })
@ImportAutoConfiguration(exclude = {
KafkaAutoConfiguration.class,
KafkaMetricsAutoConfiguration.class,
@@ -59,20 +55,25 @@
class OddEvenSourceTests {
@Autowired
- @Qualifier("oddEvenSupplier-out-0")
- MessageChannel outputDestination;
-
- @Autowired
- MessageCollector collector;
+ private OutputDestination output;
@Test
void testMessages() {
- BlockingQueue> messages = this.collector.forChannel(this.outputDestination);
+ Message result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("odd");
+
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("even");
+
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("odd");
- assertThat(messages, receivesPayloadThat(is("odd")));
- assertThat(messages, receivesPayloadThat(is("even")));
- assertThat(messages, receivesPayloadThat(is("odd")));
- assertThat(messages, receivesPayloadThat(is("even")));
+ result = this.output.receive(5000);
+ assertThat(result).isNotNull();
+ assertThat(new String(result.getPayload())).isEqualTo("even");
}
}
diff --git a/transaction-kafka-samples/pom.xml b/transaction-kafka-samples/pom.xml
index 66b2789e..17858610 100644
--- a/transaction-kafka-samples/pom.xml
+++ b/transaction-kafka-samples/pom.xml
@@ -1,6 +1,12 @@
4.0.0
+
+ io.spring.cloud.stream.sample
+ spring-cloud-stream-samples-parent
+ 0.0.1-SNAPSHOT
+ ..
+
io.spring.cloud.stream.sample
transaction-kafka-samples
0.0.1-SNAPSHOT
diff --git a/transaction-kafka-samples/transaction-http-source/pom.xml b/transaction-kafka-samples/transaction-http-source/pom.xml
index d884ffdc..7487913e 100644
--- a/transaction-kafka-samples/transaction-http-source/pom.xml
+++ b/transaction-kafka-samples/transaction-http-source/pom.xml
@@ -9,28 +9,12 @@
Transaction http source
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ transaction-kafka-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -56,7 +40,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -69,54 +53,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/transaction-kafka-samples/transaction-logger-sink/pom.xml b/transaction-kafka-samples/transaction-logger-sink/pom.xml
index 1fa3dc26..d41a580a 100644
--- a/transaction-kafka-samples/transaction-logger-sink/pom.xml
+++ b/transaction-kafka-samples/transaction-logger-sink/pom.xml
@@ -9,28 +9,12 @@
Schema Registry Consumer
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ transaction-kafka-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -56,7 +40,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
@@ -69,54 +53,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/pom.xml b/transaction-kafka-samples/transaction-spring-data-processor/pom.xml
index e01d7512..4b02861e 100644
--- a/transaction-kafka-samples/transaction-spring-data-processor/pom.xml
+++ b/transaction-kafka-samples/transaction-spring-data-processor/pom.xml
@@ -10,28 +10,12 @@
- org.springframework.boot
- spring-boot-starter-parent
- 2.4.4
-
+ io.spring.cloud.stream.sample
+ transaction-kafka-samples
+ 0.0.1-SNAPSHOT
+ ..
-
- 2020.0.2
-
-
-
-
-
- org.springframework.cloud
- spring-cloud-dependencies
- ${spring-cloud.version}
- pom
- import
-
-
-
-
org.springframework.cloud
@@ -57,7 +41,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webmvc
org.springframework.boot
@@ -66,7 +50,7 @@
org.mariadb.jdbc
mariadb-java-client
- 1.1.9
+ 3.5.1
runtime
@@ -80,54 +64,5 @@
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
-
- true
-
-
- false
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/libs-milestone-local
-
- false
-
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/libs-release-local
-
- false
-
-
-
+
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java
index 51d76405..fdb79294 100644
--- a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java
+++ b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/Person.java
@@ -18,14 +18,14 @@
import java.util.Objects;
import java.util.StringJoiner;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-import javax.persistence.UniqueConstraint;
-import javax.validation.constraints.Max;
+import jakarta.persistence.Column;
+import jakarta.persistence.Entity;
+import jakarta.persistence.GeneratedValue;
+import jakarta.persistence.GenerationType;
+import jakarta.persistence.Id;
+import jakarta.persistence.Table;
+import jakarta.persistence.UniqueConstraint;
+import jakarta.validation.constraints.Max;
import org.hibernate.validator.constraints.Length;
diff --git a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java
index c0a28ece..4f35756f 100644
--- a/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java
+++ b/transaction-kafka-samples/transaction-spring-data-processor/src/main/java/sample/processor/ProcessorApplication.java
@@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
-import javax.transaction.Transactional;
+import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;