diff --git a/.github/scripts/update_sdk_version.sh b/.github/scripts/update_sdk_version.sh
index b2e1033be9..a87f88e3b4 100755
--- a/.github/scripts/update_sdk_version.sh
+++ b/.github/scripts/update_sdk_version.sh
@@ -15,6 +15,8 @@ mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SD
# BOMs are standalone (no parent), so versions:set skips them — update explicitly.
mvn versions:set -DnewVersion=$DAPR_JAVA_SDK_VERSION -f sdk-bom/pom.xml
mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SDK_VERSION -f sdk-bom/pom.xml
+# Install dapr-sdk-bom locally so dapr-spring-bom's import can resolve when loaded with -f (no reactor).
+mvn install -N -DskipTests -f sdk-bom/pom.xml
mvn versions:set -DnewVersion=$DAPR_JAVA_SDK_VERSION -f dapr-spring/dapr-spring-bom/pom.xml
mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SDK_VERSION -f dapr-spring/dapr-spring-bom/pom.xml
mvn versions:set-property -Dproperty=dapr.sdk.alpha.version -DnewVersion=$DAPR_JAVA_SDK_ALPHA_VERSION -f sdk-tests/pom.xml
diff --git a/dapr-spring/dapr-spring-6-data/pom.xml b/dapr-spring/dapr-spring-6-data/pom.xml
index 46411f07c3..64aa7dd5a6 100644
--- a/dapr-spring/dapr-spring-6-data/pom.xml
+++ b/dapr-spring/dapr-spring-6-data/pom.xml
@@ -7,7 +7,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-bom/pom.xml b/dapr-spring/dapr-spring-bom/pom.xml
index d15820a210..80efd43dd5 100644
--- a/dapr-spring/dapr-spring-bom/pom.xml
+++ b/dapr-spring/dapr-spring-bom/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-bom
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
pom
dapr-spring-bom
Dapr Spring Bill of Materials (BOM). Import this POM to manage versions
@@ -45,7 +45,7 @@
true
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
diff --git a/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml
index 5ed01dc328..75c3fa124e 100644
--- a/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml
+++ b/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
index 5d13434f14..ee3e5817da 100644
--- a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
+++ b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-observation/pom.xml b/dapr-spring/dapr-spring-boot-observation/pom.xml
index c78b98cc94..4f008b0139 100644
--- a/dapr-spring/dapr-spring-boot-observation/pom.xml
+++ b/dapr-spring/dapr-spring-boot-observation/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-properties/pom.xml b/dapr-spring/dapr-spring-boot-properties/pom.xml
index 548c277621..516bbba839 100644
--- a/dapr-spring/dapr-spring-boot-properties/pom.xml
+++ b/dapr-spring/dapr-spring-boot-properties/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml
index 47be20f17d..0ebc19c340 100644
--- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml
+++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml
index 4744517a8d..0f31067c64 100644
--- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml
+++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml
index 93ef0935fc..88f711eebe 100644
--- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml
+++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml
index a1c5a284f6..c77d843c2a 100644
--- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml
+++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../../pom.xml
diff --git a/dapr-spring/dapr-spring-boot-tests/pom.xml b/dapr-spring/dapr-spring-boot-tests/pom.xml
index c08d82ac84..7de18a4c7d 100644
--- a/dapr-spring/dapr-spring-boot-tests/pom.xml
+++ b/dapr-spring/dapr-spring-boot-tests/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-data/pom.xml b/dapr-spring/dapr-spring-data/pom.xml
index c6f9ac4e23..1b6d9d5d91 100644
--- a/dapr-spring/dapr-spring-data/pom.xml
+++ b/dapr-spring/dapr-spring-data/pom.xml
@@ -7,7 +7,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-messaging/pom.xml b/dapr-spring/dapr-spring-messaging/pom.xml
index 94fb975fd0..11d842335a 100644
--- a/dapr-spring/dapr-spring-messaging/pom.xml
+++ b/dapr-spring/dapr-spring-messaging/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/dapr-spring-workflows/pom.xml b/dapr-spring/dapr-spring-workflows/pom.xml
index 14f2da176e..efb05e7063 100644
--- a/dapr-spring/dapr-spring-workflows/pom.xml
+++ b/dapr-spring/dapr-spring-workflows/pom.xml
@@ -6,7 +6,7 @@
io.dapr.spring
dapr-spring-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/dapr-spring/pom.xml b/dapr-spring/pom.xml
index d2f35baaab..813357f49e 100644
--- a/dapr-spring/pom.xml
+++ b/dapr-spring/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml
index cdceed2aa1..5e5107473d 100644
--- a/durabletask-client/pom.xml
+++ b/durabletask-client/pom.xml
@@ -6,7 +6,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/examples/pom.xml b/examples/pom.xml
index 88119859aa..a8c33fd32a 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
index 0f10980988..969138738e 100644
--- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
+++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
@@ -108,6 +108,38 @@ client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPaylo
.blockLast();
```
+### Subscription with a Dead-Letter Topic
+
+For workloads that need to forward unprocessable messages to a dead-letter topic,
+use the listener-based `subscribeToEvents` overload that accepts a
+`deadLetterTopic` argument. When the listener returns `Status.DROP`, the Dapr
+runtime publishes the message to the configured dead-letter topic:
+
+```java
+var listener = new SubscriptionListener() {
+ @Override
+ public Mono onEvent(CloudEvent event) {
+ if (shouldRejectMessage(event)) {
+ return Mono.just(Status.DROP); // forwarded to deadLetterTopicName
+ }
+ return Mono.just(Status.SUCCESS);
+ }
+
+ @Override
+ public void onError(RuntimeException exception) {
+ System.out.println("Subscriber got exception: " + exception.getMessage());
+ }
+};
+
+try (var subscription = client.subscribeToEvents(
+ PUBSUB_NAME, topicName, deadLetterTopicName, listener, TypeRef.STRING)) {
+ subscription.awaitTermination();
+}
+```
+
+See [SubscriberWithDeadLetter.java](SubscriberWithDeadLetter.java) for a complete example
+that consumes the dead-letter topic in the same process.
+
### Subscription Lifecycle
The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
@@ -138,6 +170,13 @@ Or run the CloudEvent Subscriber example:
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent
```
+Or run the dead-letter Subscriber example, which routes messages whose payload
+contains "fail" to a dead-letter topic and consumes both topics:
+
+```bash
+dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberWithDeadLetter
+```
+
Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:
4.1.132.Final
2.21.2
diff --git a/sdk-springboot/pom.xml b/sdk-springboot/pom.xml
index d2ea1706e0..a7ceb25eeb 100644
--- a/sdk-springboot/pom.xml
+++ b/sdk-springboot/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml
index 050123ce6b..db50656522 100644
--- a/sdk-tests/pom.xml
+++ b/sdk-tests/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java
index 334bc5232a..9b0b78ef2f 100644
--- a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java
@@ -46,6 +46,8 @@ public class PubSubStreamIT extends BaseIT {
private static final String TOPIC_NAME_FLUX = "stream-topic-flux";
private static final String TOPIC_NAME_CLOUDEVENT = "stream-topic-cloudevent";
private static final String TOPIC_NAME_RAWPAYLOAD = "stream-topic-rawpayload";
+ private static final String TOPIC_NAME_DLQ = "stream-topic-dlq";
+ private static final String TOPIC_NAME_DLQ_DEADLETTER = "stream-topic-dlq-deadletter";
private static final String PUBSUB_NAME = "messagebus";
private final List runs = new ArrayList<>();
@@ -261,4 +263,77 @@ public void testPubSubRawPayload() throws Exception {
disposable.dispose();
}
}
+
+ @Test
+ public void testPubSubDeadLetterTopic() throws Exception {
+ final DaprRun daprRun = closeLater(startDaprApp(
+ this.getClass().getSimpleName() + "-dlq",
+ 60000));
+
+ var runId = UUID.randomUUID().toString();
+ try (DaprClient client = daprRun.newDaprClient();
+ DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) {
+
+ // Subscribe to the dead-letter topic first so we don't miss any messages.
+ Set deadLetterMessageIds = Collections.synchronizedSet(new HashSet<>());
+ var deadLetterListener = new SubscriptionListener() {
+ @Override
+ public Mono onEvent(CloudEvent event) {
+ if (event.getData() != null && event.getData().contains(runId)) {
+ deadLetterMessageIds.add(event.getId());
+ System.out.println("Received dead-letter message ID: " + event.getId());
+ }
+ return Mono.just(Status.SUCCESS);
+ }
+
+ @Override
+ public void onError(RuntimeException exception) {
+ System.err.println("Dead-letter subscription error: " + exception.getMessage());
+ }
+ };
+
+ // Subscribe to the main topic with a listener that always DROPs, which should
+ // forward the messages to the dead-letter topic.
+ var mainListener = new SubscriptionListener() {
+ @Override
+ public Mono onEvent(CloudEvent event) {
+ if (event.getData() != null && event.getData().contains(runId)) {
+ System.out.println("Dropping message ID: " + event.getId());
+ return Mono.just(Status.DROP);
+ }
+ return Mono.just(Status.DROP);
+ }
+
+ @Override
+ public void onError(RuntimeException exception) {
+ System.err.println("Main subscription error: " + exception.getMessage());
+ }
+ };
+
+ try (var deadLetterSubscription = previewClient.subscribeToEvents(
+ PUBSUB_NAME, TOPIC_NAME_DLQ_DEADLETTER, deadLetterListener, TypeRef.STRING);
+ var mainSubscription = previewClient.subscribeToEvents(
+ PUBSUB_NAME, TOPIC_NAME_DLQ, TOPIC_NAME_DLQ_DEADLETTER, mainListener, TypeRef.STRING)) {
+
+ // Publish messages to the main topic.
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ String message = String.format("DLQ message #%d for run %s", i, runId);
+ client.publishEvent(PUBSUB_NAME, TOPIC_NAME_DLQ, message).block();
+ }
+
+ callWithRetry(() -> {
+ var count = deadLetterMessageIds.size();
+ System.out.println(
+ String.format("Got %d dead-letter messages out of %d for topic %s.",
+ count, NUM_MESSAGES, TOPIC_NAME_DLQ_DEADLETTER));
+ assertEquals(NUM_MESSAGES, deadLetterMessageIds.size());
+ }, 120000);
+
+ mainSubscription.close();
+ mainSubscription.awaitTermination();
+ deadLetterSubscription.close();
+ deadLetterSubscription.awaitTermination();
+ }
+ }
+ }
}
diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml
index f0fd66fd11..f94870078a 100644
--- a/sdk-workflows/pom.xml
+++ b/sdk-workflows/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/sdk/pom.xml b/sdk/pom.xml
index 43c0f625e3..349eaa27ff 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index e5c95ec0d7..3adb3bc37f 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -490,14 +490,29 @@ public Mono> publishEvents(BulkPublishRequest requ
@Override
public Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener listener, TypeRef type) {
- DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
+ return subscribeToEvents(pubsubName, topic, null, listener, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Subscription subscribeToEvents(
+ String pubsubName,
+ String topic,
+ String deadLetterTopic,
+ SubscriptionListener listener,
+ TypeRef type) {
+ DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
- .setPubsubName(pubsubName)
- .build();
+ .setPubsubName(pubsubName);
+ if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
+ initialRequestBuilder.setDeadLetterTopic(deadLetterTopic);
+ }
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
- .setInitialRequest(initialRequest)
+ .setInitialRequest(initialRequestBuilder.build())
.build();
return buildSubscription(listener, type, request);
}
@@ -525,7 +540,7 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
*/
@Override
public Flux subscribeToTopic(String pubsubName, String topic, TypeRef type) {
- return subscribeToTopic(pubsubName, topic, type, null);
+ return subscribeToTopic(pubsubName, topic, null, type, null);
}
/**
@@ -533,6 +548,27 @@ public Flux subscribeToTopic(String pubsubName, String topic, TypeRef
*/
@Override
public Flux subscribeToTopic(String pubsubName, String topic, TypeRef type, Map metadata) {
+ return subscribeToTopic(pubsubName, topic, null, type, metadata);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Flux subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef type) {
+ return subscribeToTopic(pubsubName, topic, deadLetterTopic, type, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Flux subscribeToTopic(
+ String pubsubName,
+ String topic,
+ String deadLetterTopic,
+ TypeRef type,
+ Map metadata) {
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
@@ -542,6 +578,10 @@ public Flux subscribeToTopic(String pubsubName, String topic, TypeRef
initialRequestBuilder.putAllMetadata(metadata);
}
+ if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
+ initialRequestBuilder.setDeadLetterTopic(deadLetterTopic);
+ }
+
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(initialRequestBuilder.build())
diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
index bb5ae5e1fa..d0d2e3ea66 100644
--- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
+++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
@@ -292,6 +292,25 @@ Mono> publishEvents(String pubsubName, String topicNa
Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener listener, TypeRef type);
+ /**
+ * Subscribe to pubsub via streaming with a dead-letter topic.
+ * @param pubsubName Name of the pubsub component.
+ * @param topic Name of the topic to subscribe to.
+ * @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
+ * @param listener Callback methods to process events.
+ * @param type Type for object deserialization.
+ * @param Type of object deserialization.
+ * @return An active subscription.
+ * @deprecated Use {@link #subscribeToTopic(String, String, String, TypeRef)} instead for a more reactive approach.
+ */
+ @Deprecated
+ Subscription subscribeToEvents(
+ String pubsubName,
+ String topic,
+ String deadLetterTopic,
+ SubscriptionListener listener,
+ TypeRef type);
+
/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
*
@@ -352,6 +371,36 @@ Subscription subscribeToEvents(
*/
Flux subscribeToTopic(String pubsubName, String topic, TypeRef type, Map metadata);
+ /**
+ * Subscribe to pubsub events via streaming using Project Reactor Flux with a dead-letter topic.
+ *
+ * @param pubsubName Name of the pubsub component.
+ * @param topic Name of the topic to subscribe to.
+ * @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
+ * @param type Type for object deserialization.
+ * @return A Flux of deserialized event payloads.
+ * @param Type of the event payload.
+ */
+ Flux subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef type);
+
+ /**
+ * Subscribe to pubsub events via streaming using Project Reactor Flux with metadata and dead-letter topic.
+ *
+ * @param pubsubName Name of the pubsub component.
+ * @param topic Name of the topic to subscribe to.
+ * @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable).
+ * @param type Type for object deserialization.
+ * @param metadata Subscription metadata (e.g., {"rawPayload": "true"}).
+ * @return A Flux of deserialized event payloads.
+ * @param Type of the event payload.
+ */
+ Flux subscribeToTopic(
+ String pubsubName,
+ String topic,
+ String deadLetterTopic,
+ TypeRef type,
+ Map metadata);
+
/*
* Converse with an LLM.
*
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
index ab82896dd1..b8d1f3f6b2 100644
--- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
@@ -95,6 +95,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -948,6 +949,253 @@ public void onCompleted() {
assertEquals("true", capturedMetadata.get().get("rawPayload"));
}
+ @Test
+ public void subscribeToEventsWithDeadLetterTopicTest() throws Exception {
+ var pubsubName = "pubsubName";
+ var topicName = "topicName";
+ var deadLetterTopic = "topicName-DLQ";
+ var started = new Semaphore(0);
+ var capturedInitial = new AtomicReference();
+ var gotInitial = new Semaphore(0);
+
+ doAnswer((Answer>) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[0];
+
+ new Thread(() -> {
+ try {
+ started.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
+ observer.onCompleted();
+ }).start();
+
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) {
+ if (request.hasInitialRequest()) {
+ capturedInitial.set(request.getInitialRequest());
+ gotInitial.release();
+ }
+ started.release();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
+
+ var subscription = previewClient.subscribeToEvents(
+ pubsubName,
+ topicName,
+ deadLetterTopic,
+ new SubscriptionListener() {
+ @Override
+ public Mono onEvent(CloudEvent event) {
+ return Mono.just(Status.SUCCESS);
+ }
+
+ @Override
+ public void onError(RuntimeException exception) {
+ }
+ },
+ TypeRef.STRING);
+
+ gotInitial.acquire();
+ subscription.close();
+
+ assertNotNull(capturedInitial.get());
+ assertEquals(pubsubName, capturedInitial.get().getPubsubName());
+ assertEquals(topicName, capturedInitial.get().getTopic());
+ assertTrue(capturedInitial.get().hasDeadLetterTopic());
+ assertEquals(deadLetterTopic, capturedInitial.get().getDeadLetterTopic());
+ }
+
+ @Test
+ public void subscribeToEventsWithoutDeadLetterTopicDoesNotSetFieldTest() throws Exception {
+ var pubsubName = "pubsubName";
+ var topicName = "topicName";
+ var started = new Semaphore(0);
+ var capturedInitial = new AtomicReference();
+ var gotInitial = new Semaphore(0);
+
+ doAnswer((Answer>) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[0];
+
+ new Thread(() -> {
+ try {
+ started.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
+ observer.onCompleted();
+ }).start();
+
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) {
+ if (request.hasInitialRequest()) {
+ capturedInitial.set(request.getInitialRequest());
+ gotInitial.release();
+ }
+ started.release();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
+
+ var subscription = previewClient.subscribeToEvents(
+ pubsubName,
+ topicName,
+ new SubscriptionListener() {
+ @Override
+ public Mono onEvent(CloudEvent event) {
+ return Mono.just(Status.SUCCESS);
+ }
+
+ @Override
+ public void onError(RuntimeException exception) {
+ }
+ },
+ TypeRef.STRING);
+
+ gotInitial.acquire();
+ subscription.close();
+
+ assertNotNull(capturedInitial.get());
+ assertFalse(capturedInitial.get().hasDeadLetterTopic());
+ }
+
+ @Test
+ public void subscribeToTopicWithDeadLetterTopicTest() throws Exception {
+ var pubsubName = "pubsubName";
+ var topicName = "topicName";
+ var deadLetterTopic = "topicName-DLQ";
+ var started = new Semaphore(0);
+ var capturedInitial = new AtomicReference();
+
+ doAnswer((Answer>) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[0];
+
+ new Thread(() -> {
+ try {
+ started.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
+ observer.onCompleted();
+ }).start();
+
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) {
+ if (request.hasInitialRequest()) {
+ capturedInitial.set(request.getInitialRequest());
+ }
+ started.release();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
+
+ var disposable = previewClient.subscribeToTopic(pubsubName, topicName, deadLetterTopic, TypeRef.STRING)
+ .subscribe();
+
+ // Wait briefly for the initial request to be captured, then dispose.
+ for (int i = 0; i < 50 && capturedInitial.get() == null; i++) {
+ Thread.sleep(20);
+ }
+ disposable.dispose();
+
+ assertNotNull(capturedInitial.get());
+ assertEquals(pubsubName, capturedInitial.get().getPubsubName());
+ assertEquals(topicName, capturedInitial.get().getTopic());
+ assertTrue(capturedInitial.get().hasDeadLetterTopic());
+ assertEquals(deadLetterTopic, capturedInitial.get().getDeadLetterTopic());
+ }
+
+ @Test
+ public void subscribeToTopicWithDeadLetterTopicAndMetadataTest() throws Exception {
+ var pubsubName = "pubsubName";
+ var topicName = "topicName";
+ var deadLetterTopic = "topicName-DLQ";
+ var started = new Semaphore(0);
+ var capturedInitial = new AtomicReference();
+
+ doAnswer((Answer>) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[0];
+
+ new Thread(() -> {
+ try {
+ started.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
+ observer.onCompleted();
+ }).start();
+
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) {
+ if (request.hasInitialRequest()) {
+ capturedInitial.set(request.getInitialRequest());
+ }
+ started.release();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
+
+ Map metadata = Map.of("rawPayload", "true");
+ var disposable = previewClient
+ .subscribeToTopic(pubsubName, topicName, deadLetterTopic, TypeRef.STRING, metadata)
+ .subscribe();
+
+ for (int i = 0; i < 50 && capturedInitial.get() == null; i++) {
+ Thread.sleep(20);
+ }
+ disposable.dispose();
+
+ assertNotNull(capturedInitial.get());
+ assertTrue(capturedInitial.get().hasDeadLetterTopic());
+ assertEquals(deadLetterTopic, capturedInitial.get().getDeadLetterTopic());
+ assertEquals("true", capturedInitial.get().getMetadataMap().get("rawPayload"));
+ }
+
@Test
public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception {
List inputs = new ArrayList<>();
diff --git a/spring-boot-4-examples/consumer-app/pom.xml b/spring-boot-4-examples/consumer-app/pom.xml
index b2ae3dfb2e..27eed8e81e 100644
--- a/spring-boot-4-examples/consumer-app/pom.xml
+++ b/spring-boot-4-examples/consumer-app/pom.xml
@@ -5,7 +5,7 @@
io.dapr
spring-boot-4-examples
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/pom.xml b/spring-boot-4-examples/pom.xml
index 42e018a81a..77761bd15a 100644
--- a/spring-boot-4-examples/pom.xml
+++ b/spring-boot-4-examples/pom.xml
@@ -6,7 +6,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/producer-app/pom.xml b/spring-boot-4-examples/producer-app/pom.xml
index 3cdeb81f1c..24eddeab8c 100644
--- a/spring-boot-4-examples/producer-app/pom.xml
+++ b/spring-boot-4-examples/producer-app/pom.xml
@@ -6,7 +6,7 @@
io.dapr
spring-boot-4-examples
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml b/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml
index 4fcc0645e7..d3fc5bcf60 100644
--- a/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml
+++ b/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml
@@ -6,7 +6,7 @@
io.dapr
sb4-multi-app
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/workflows/multi-app/pom.xml b/spring-boot-4-examples/workflows/multi-app/pom.xml
index 3788daa606..0c047ef1f0 100644
--- a/spring-boot-4-examples/workflows/multi-app/pom.xml
+++ b/spring-boot-4-examples/workflows/multi-app/pom.xml
@@ -6,7 +6,7 @@
io.dapr
sb4-workflows
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml b/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml
index 960ff3dfc1..21d33025c3 100644
--- a/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml
+++ b/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml
@@ -6,7 +6,7 @@
io.dapr
sb4-multi-app
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml b/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml
index 0edda5993b..9d727f54ed 100644
--- a/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml
+++ b/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml
@@ -6,7 +6,7 @@
io.dapr
sb4-multi-app
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/workflows/patterns/pom.xml b/spring-boot-4-examples/workflows/patterns/pom.xml
index cdf0a4652c..5b149ac3cc 100644
--- a/spring-boot-4-examples/workflows/patterns/pom.xml
+++ b/spring-boot-4-examples/workflows/patterns/pom.xml
@@ -6,7 +6,7 @@
io.dapr
sb4-workflows
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-examples/workflows/pom.xml b/spring-boot-4-examples/workflows/pom.xml
index e74c2392a5..2d21a3febf 100644
--- a/spring-boot-4-examples/workflows/pom.xml
+++ b/spring-boot-4-examples/workflows/pom.xml
@@ -6,7 +6,7 @@
io.dapr
spring-boot-4-examples
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-4-sdk-tests/pom.xml b/spring-boot-4-sdk-tests/pom.xml
index f7533bed93..d823aa5aaa 100644
--- a/spring-boot-4-sdk-tests/pom.xml
+++ b/spring-boot-4-sdk-tests/pom.xml
@@ -6,7 +6,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/consumer-app/pom.xml b/spring-boot-examples/consumer-app/pom.xml
index f551c05496..caa841fad4 100644
--- a/spring-boot-examples/consumer-app/pom.xml
+++ b/spring-boot-examples/consumer-app/pom.xml
@@ -5,7 +5,7 @@
io.dapr
spring-boot-examples
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/pom.xml b/spring-boot-examples/pom.xml
index 5be4902938..25bffa0e0f 100644
--- a/spring-boot-examples/pom.xml
+++ b/spring-boot-examples/pom.xml
@@ -6,7 +6,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/producer-app/pom.xml b/spring-boot-examples/producer-app/pom.xml
index 4b542f6a20..77c886cc2c 100644
--- a/spring-boot-examples/producer-app/pom.xml
+++ b/spring-boot-examples/producer-app/pom.xml
@@ -6,7 +6,7 @@
io.dapr
spring-boot-examples
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml b/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml
index 5a1a22fe55..97769ee927 100644
--- a/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml
+++ b/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml
@@ -6,7 +6,7 @@
io.dapr
multi-app
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/multi-app/pom.xml b/spring-boot-examples/workflows/multi-app/pom.xml
index 5c05771355..46f51a2a17 100644
--- a/spring-boot-examples/workflows/multi-app/pom.xml
+++ b/spring-boot-examples/workflows/multi-app/pom.xml
@@ -6,7 +6,7 @@
io.dapr
workflows
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/multi-app/worker-one/pom.xml b/spring-boot-examples/workflows/multi-app/worker-one/pom.xml
index 5572d6f3da..20221ce441 100644
--- a/spring-boot-examples/workflows/multi-app/worker-one/pom.xml
+++ b/spring-boot-examples/workflows/multi-app/worker-one/pom.xml
@@ -6,7 +6,7 @@
io.dapr
multi-app
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/multi-app/worker-two/pom.xml b/spring-boot-examples/workflows/multi-app/worker-two/pom.xml
index 6175187e34..eacf792486 100644
--- a/spring-boot-examples/workflows/multi-app/worker-two/pom.xml
+++ b/spring-boot-examples/workflows/multi-app/worker-two/pom.xml
@@ -6,7 +6,7 @@
io.dapr
multi-app
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/patterns/pom.xml b/spring-boot-examples/workflows/patterns/pom.xml
index c3ec262f68..507c2bac23 100644
--- a/spring-boot-examples/workflows/patterns/pom.xml
+++ b/spring-boot-examples/workflows/patterns/pom.xml
@@ -6,7 +6,7 @@
io.dapr
workflows
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/pom.xml b/spring-boot-examples/workflows/pom.xml
index c79b064d26..b4c613f181 100644
--- a/spring-boot-examples/workflows/pom.xml
+++ b/spring-boot-examples/workflows/pom.xml
@@ -6,7 +6,7 @@
io.dapr
spring-boot-examples
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml b/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml
index 9f53cf3dd0..f2ddddba1b 100644
--- a/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml
+++ b/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml
@@ -6,7 +6,7 @@
io.dapr
versioning
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml b/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml
index a441cb678a..306d4b78a4 100644
--- a/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml
+++ b/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml
@@ -6,7 +6,7 @@
io.dapr
versioning
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml b/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml
index 937f4e794d..3dbec298c7 100644
--- a/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml
+++ b/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml
@@ -6,7 +6,7 @@
io.dapr
versioning
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml b/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml
index b16d36736f..e2838a3f27 100644
--- a/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml
+++ b/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml
@@ -6,7 +6,7 @@
io.dapr
versioning
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/versioning/pom.xml b/spring-boot-examples/workflows/versioning/pom.xml
index e3a9813380..2888cb1dd1 100644
--- a/spring-boot-examples/workflows/versioning/pom.xml
+++ b/spring-boot-examples/workflows/versioning/pom.xml
@@ -6,7 +6,7 @@
io.dapr
workflows
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml b/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml
index 6ff543a2b1..9fbb8db6a0 100644
--- a/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml
+++ b/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml
@@ -6,7 +6,7 @@
io.dapr
versioning
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml
diff --git a/testcontainers-dapr/pom.xml b/testcontainers-dapr/pom.xml
index f76b23efa6..caf86ad0bd 100644
--- a/testcontainers-dapr/pom.xml
+++ b/testcontainers-dapr/pom.xml
@@ -5,7 +5,7 @@
io.dapr
dapr-sdk-parent
- 1.18.0-SNAPSHOT
+ 1.18.0-rc-1
../pom.xml