diff --git a/.github/scripts/update_sdk_version.sh b/.github/scripts/update_sdk_version.sh index b2e1033be9..a87f88e3b4 100755 --- a/.github/scripts/update_sdk_version.sh +++ b/.github/scripts/update_sdk_version.sh @@ -15,6 +15,8 @@ mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SD # BOMs are standalone (no parent), so versions:set skips them — update explicitly. mvn versions:set -DnewVersion=$DAPR_JAVA_SDK_VERSION -f sdk-bom/pom.xml mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SDK_VERSION -f sdk-bom/pom.xml +# Install dapr-sdk-bom locally so dapr-spring-bom's import can resolve when loaded with -f (no reactor). +mvn install -N -DskipTests -f sdk-bom/pom.xml mvn versions:set -DnewVersion=$DAPR_JAVA_SDK_VERSION -f dapr-spring/dapr-spring-bom/pom.xml mvn versions:set-property -Dproperty=dapr.sdk.version -DnewVersion=$DAPR_JAVA_SDK_VERSION -f dapr-spring/dapr-spring-bom/pom.xml mvn versions:set-property -Dproperty=dapr.sdk.alpha.version -DnewVersion=$DAPR_JAVA_SDK_ALPHA_VERSION -f sdk-tests/pom.xml diff --git a/dapr-spring/dapr-spring-6-data/pom.xml b/dapr-spring/dapr-spring-6-data/pom.xml index 46411f07c3..64aa7dd5a6 100644 --- a/dapr-spring/dapr-spring-6-data/pom.xml +++ b/dapr-spring/dapr-spring-6-data/pom.xml @@ -7,7 +7,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-bom/pom.xml b/dapr-spring/dapr-spring-bom/pom.xml index d15820a210..80efd43dd5 100644 --- a/dapr-spring/dapr-spring-bom/pom.xml +++ b/dapr-spring/dapr-spring-bom/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-bom - 1.18.0-SNAPSHOT + 1.18.0-rc-1 pom dapr-spring-bom Dapr Spring Bill of Materials (BOM). Import this POM to manage versions @@ -45,7 +45,7 @@ true - 1.18.0-SNAPSHOT + 1.18.0-rc-1 diff --git a/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml index 5ed01dc328..75c3fa124e 100644 --- a/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml +++ b/dapr-spring/dapr-spring-boot-4-autoconfigure/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml index 5d13434f14..ee3e5817da 100644 --- a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml +++ b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-boot-observation/pom.xml b/dapr-spring/dapr-spring-boot-observation/pom.xml index c78b98cc94..4f008b0139 100644 --- a/dapr-spring/dapr-spring-boot-observation/pom.xml +++ b/dapr-spring/dapr-spring-boot-observation/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-boot-properties/pom.xml b/dapr-spring/dapr-spring-boot-properties/pom.xml index 548c277621..516bbba839 100644 --- a/dapr-spring/dapr-spring-boot-properties/pom.xml +++ b/dapr-spring/dapr-spring-boot-properties/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml index 47be20f17d..0ebc19c340 100644 --- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml +++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter-test/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../../pom.xml diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml index 4744517a8d..0f31067c64 100644 --- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml +++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-4-starter/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../../pom.xml diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml index 93ef0935fc..88f711eebe 100644 --- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml +++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter-test/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../../pom.xml diff --git a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml index a1c5a284f6..c77d843c2a 100644 --- a/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml +++ b/dapr-spring/dapr-spring-boot-starters/dapr-spring-boot-starter/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../../pom.xml diff --git a/dapr-spring/dapr-spring-boot-tests/pom.xml b/dapr-spring/dapr-spring-boot-tests/pom.xml index c08d82ac84..7de18a4c7d 100644 --- a/dapr-spring/dapr-spring-boot-tests/pom.xml +++ b/dapr-spring/dapr-spring-boot-tests/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-data/pom.xml b/dapr-spring/dapr-spring-data/pom.xml index c6f9ac4e23..1b6d9d5d91 100644 --- a/dapr-spring/dapr-spring-data/pom.xml +++ b/dapr-spring/dapr-spring-data/pom.xml @@ -7,7 +7,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-messaging/pom.xml b/dapr-spring/dapr-spring-messaging/pom.xml index 94fb975fd0..11d842335a 100644 --- a/dapr-spring/dapr-spring-messaging/pom.xml +++ b/dapr-spring/dapr-spring-messaging/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/dapr-spring-workflows/pom.xml b/dapr-spring/dapr-spring-workflows/pom.xml index 14f2da176e..efb05e7063 100644 --- a/dapr-spring/dapr-spring-workflows/pom.xml +++ b/dapr-spring/dapr-spring-workflows/pom.xml @@ -6,7 +6,7 @@ io.dapr.spring dapr-spring-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/dapr-spring/pom.xml b/dapr-spring/pom.xml index d2f35baaab..813357f49e 100644 --- a/dapr-spring/pom.xml +++ b/dapr-spring/pom.xml @@ -7,7 +7,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml index cdceed2aa1..5e5107473d 100644 --- a/durabletask-client/pom.xml +++ b/durabletask-client/pom.xml @@ -6,7 +6,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 88119859aa..a8c33fd32a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -7,7 +7,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md index 0f10980988..969138738e 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -108,6 +108,38 @@ client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPaylo .blockLast(); ``` +### Subscription with a Dead-Letter Topic + +For workloads that need to forward unprocessable messages to a dead-letter topic, +use the listener-based `subscribeToEvents` overload that accepts a +`deadLetterTopic` argument. When the listener returns `Status.DROP`, the Dapr +runtime publishes the message to the configured dead-letter topic: + +```java +var listener = new SubscriptionListener() { + @Override + public Mono onEvent(CloudEvent event) { + if (shouldRejectMessage(event)) { + return Mono.just(Status.DROP); // forwarded to deadLetterTopicName + } + return Mono.just(Status.SUCCESS); + } + + @Override + public void onError(RuntimeException exception) { + System.out.println("Subscriber got exception: " + exception.getMessage()); + } +}; + +try (var subscription = client.subscribeToEvents( + PUBSUB_NAME, topicName, deadLetterTopicName, listener, TypeRef.STRING)) { + subscription.awaitTermination(); +} +``` + +See [SubscriberWithDeadLetter.java](SubscriberWithDeadLetter.java) for a complete example +that consumes the dead-letter topic in the same process. + ### Subscription Lifecycle The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. @@ -138,6 +170,13 @@ Or run the CloudEvent Subscriber example: dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent ``` +Or run the dead-letter Subscriber example, which routes messages whose payload +contains "fail" to a dead-letter topic and consumes both topics: + +```bash +dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberWithDeadLetter +``` + Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side: 4.1.132.Final 2.21.2 diff --git a/sdk-springboot/pom.xml b/sdk-springboot/pom.xml index d2ea1706e0..a7ceb25eeb 100644 --- a/sdk-springboot/pom.xml +++ b/sdk-springboot/pom.xml @@ -7,7 +7,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 050123ce6b..db50656522 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -7,7 +7,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java index 334bc5232a..9b0b78ef2f 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java @@ -46,6 +46,8 @@ public class PubSubStreamIT extends BaseIT { private static final String TOPIC_NAME_FLUX = "stream-topic-flux"; private static final String TOPIC_NAME_CLOUDEVENT = "stream-topic-cloudevent"; private static final String TOPIC_NAME_RAWPAYLOAD = "stream-topic-rawpayload"; + private static final String TOPIC_NAME_DLQ = "stream-topic-dlq"; + private static final String TOPIC_NAME_DLQ_DEADLETTER = "stream-topic-dlq-deadletter"; private static final String PUBSUB_NAME = "messagebus"; private final List runs = new ArrayList<>(); @@ -261,4 +263,77 @@ public void testPubSubRawPayload() throws Exception { disposable.dispose(); } } + + @Test + public void testPubSubDeadLetterTopic() throws Exception { + final DaprRun daprRun = closeLater(startDaprApp( + this.getClass().getSimpleName() + "-dlq", + 60000)); + + var runId = UUID.randomUUID().toString(); + try (DaprClient client = daprRun.newDaprClient(); + DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { + + // Subscribe to the dead-letter topic first so we don't miss any messages. + Set deadLetterMessageIds = Collections.synchronizedSet(new HashSet<>()); + var deadLetterListener = new SubscriptionListener() { + @Override + public Mono onEvent(CloudEvent event) { + if (event.getData() != null && event.getData().contains(runId)) { + deadLetterMessageIds.add(event.getId()); + System.out.println("Received dead-letter message ID: " + event.getId()); + } + return Mono.just(Status.SUCCESS); + } + + @Override + public void onError(RuntimeException exception) { + System.err.println("Dead-letter subscription error: " + exception.getMessage()); + } + }; + + // Subscribe to the main topic with a listener that always DROPs, which should + // forward the messages to the dead-letter topic. + var mainListener = new SubscriptionListener() { + @Override + public Mono onEvent(CloudEvent event) { + if (event.getData() != null && event.getData().contains(runId)) { + System.out.println("Dropping message ID: " + event.getId()); + return Mono.just(Status.DROP); + } + return Mono.just(Status.DROP); + } + + @Override + public void onError(RuntimeException exception) { + System.err.println("Main subscription error: " + exception.getMessage()); + } + }; + + try (var deadLetterSubscription = previewClient.subscribeToEvents( + PUBSUB_NAME, TOPIC_NAME_DLQ_DEADLETTER, deadLetterListener, TypeRef.STRING); + var mainSubscription = previewClient.subscribeToEvents( + PUBSUB_NAME, TOPIC_NAME_DLQ, TOPIC_NAME_DLQ_DEADLETTER, mainListener, TypeRef.STRING)) { + + // Publish messages to the main topic. + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("DLQ message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME_DLQ, message).block(); + } + + callWithRetry(() -> { + var count = deadLetterMessageIds.size(); + System.out.println( + String.format("Got %d dead-letter messages out of %d for topic %s.", + count, NUM_MESSAGES, TOPIC_NAME_DLQ_DEADLETTER)); + assertEquals(NUM_MESSAGES, deadLetterMessageIds.size()); + }, 120000); + + mainSubscription.close(); + mainSubscription.awaitTermination(); + deadLetterSubscription.close(); + deadLetterSubscription.awaitTermination(); + } + } + } } diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index f0fd66fd11..f94870078a 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -7,7 +7,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/sdk/pom.xml b/sdk/pom.xml index 43c0f625e3..349eaa27ff 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -7,7 +7,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index e5c95ec0d7..3adb3bc37f 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -490,14 +490,29 @@ public Mono> publishEvents(BulkPublishRequest requ @Override public Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type) { - DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = + return subscribeToEvents(pubsubName, topic, null, listener, type); + } + + /** + * {@inheritDoc} + */ + @Override + public Subscription subscribeToEvents( + String pubsubName, + String topic, + String deadLetterTopic, + SubscriptionListener listener, + TypeRef type) { + DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder = DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) - .setPubsubName(pubsubName) - .build(); + .setPubsubName(pubsubName); + if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) { + initialRequestBuilder.setDeadLetterTopic(deadLetterTopic); + } DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request = DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() - .setInitialRequest(initialRequest) + .setInitialRequest(initialRequestBuilder.build()) .build(); return buildSubscription(listener, type, request); } @@ -525,7 +540,7 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef */ @Override public Flux subscribeToTopic(String pubsubName, String topic, TypeRef type) { - return subscribeToTopic(pubsubName, topic, type, null); + return subscribeToTopic(pubsubName, topic, null, type, null); } /** @@ -533,6 +548,27 @@ public Flux subscribeToTopic(String pubsubName, String topic, TypeRef */ @Override public Flux subscribeToTopic(String pubsubName, String topic, TypeRef type, Map metadata) { + return subscribeToTopic(pubsubName, topic, null, type, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef type) { + return subscribeToTopic(pubsubName, topic, deadLetterTopic, type, null); + } + + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToTopic( + String pubsubName, + String topic, + String deadLetterTopic, + TypeRef type, + Map metadata) { DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder = DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) @@ -542,6 +578,10 @@ public Flux subscribeToTopic(String pubsubName, String topic, TypeRef initialRequestBuilder.putAllMetadata(metadata); } + if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) { + initialRequestBuilder.setDeadLetterTopic(deadLetterTopic); + } + DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request = DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() .setInitialRequest(initialRequestBuilder.build()) diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index bb5ae5e1fa..d0d2e3ea66 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -292,6 +292,25 @@ Mono> publishEvents(String pubsubName, String topicNa Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + /** + * Subscribe to pubsub via streaming with a dead-letter topic. + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable). + * @param listener Callback methods to process events. + * @param type Type for object deserialization. + * @param Type of object deserialization. + * @return An active subscription. + * @deprecated Use {@link #subscribeToTopic(String, String, String, TypeRef)} instead for a more reactive approach. + */ + @Deprecated + Subscription subscribeToEvents( + String pubsubName, + String topic, + String deadLetterTopic, + SubscriptionListener listener, + TypeRef type); + /** * Subscribe to pubsub events via streaming using Project Reactor Flux. * @@ -352,6 +371,36 @@ Subscription subscribeToEvents( */ Flux subscribeToTopic(String pubsubName, String topic, TypeRef type, Map metadata); + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux with a dead-letter topic. + * + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable). + * @param type Type for object deserialization. + * @return A Flux of deserialized event payloads. + * @param Type of the event payload. + */ + Flux subscribeToTopic(String pubsubName, String topic, String deadLetterTopic, TypeRef type); + + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux with metadata and dead-letter topic. + * + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param deadLetterTopic Name of the dead-letter topic to forward failed messages to (null or empty to disable). + * @param type Type for object deserialization. + * @param metadata Subscription metadata (e.g., {"rawPayload": "true"}). + * @return A Flux of deserialized event payloads. + * @param Type of the event payload. + */ + Flux subscribeToTopic( + String pubsubName, + String topic, + String deadLetterTopic, + TypeRef type, + Map metadata); + /* * Converse with an LLM. * diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index ab82896dd1..b8d1f3f6b2 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -95,6 +95,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -948,6 +949,253 @@ public void onCompleted() { assertEquals("true", capturedMetadata.get().get("rawPayload")); } + @Test + public void subscribeToEventsWithDeadLetterTopicTest() throws Exception { + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var deadLetterTopic = "topicName-DLQ"; + var started = new Semaphore(0); + var capturedInitial = new AtomicReference(); + var gotInitial = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + observer.onCompleted(); + }).start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) { + if (request.hasInitialRequest()) { + capturedInitial.set(request.getInitialRequest()); + gotInitial.release(); + } + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + var subscription = previewClient.subscribeToEvents( + pubsubName, + topicName, + deadLetterTopic, + new SubscriptionListener() { + @Override + public Mono onEvent(CloudEvent event) { + return Mono.just(Status.SUCCESS); + } + + @Override + public void onError(RuntimeException exception) { + } + }, + TypeRef.STRING); + + gotInitial.acquire(); + subscription.close(); + + assertNotNull(capturedInitial.get()); + assertEquals(pubsubName, capturedInitial.get().getPubsubName()); + assertEquals(topicName, capturedInitial.get().getTopic()); + assertTrue(capturedInitial.get().hasDeadLetterTopic()); + assertEquals(deadLetterTopic, capturedInitial.get().getDeadLetterTopic()); + } + + @Test + public void subscribeToEventsWithoutDeadLetterTopicDoesNotSetFieldTest() throws Exception { + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var started = new Semaphore(0); + var capturedInitial = new AtomicReference(); + var gotInitial = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + observer.onCompleted(); + }).start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) { + if (request.hasInitialRequest()) { + capturedInitial.set(request.getInitialRequest()); + gotInitial.release(); + } + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + var subscription = previewClient.subscribeToEvents( + pubsubName, + topicName, + new SubscriptionListener() { + @Override + public Mono onEvent(CloudEvent event) { + return Mono.just(Status.SUCCESS); + } + + @Override + public void onError(RuntimeException exception) { + } + }, + TypeRef.STRING); + + gotInitial.acquire(); + subscription.close(); + + assertNotNull(capturedInitial.get()); + assertFalse(capturedInitial.get().hasDeadLetterTopic()); + } + + @Test + public void subscribeToTopicWithDeadLetterTopicTest() throws Exception { + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var deadLetterTopic = "topicName-DLQ"; + var started = new Semaphore(0); + var capturedInitial = new AtomicReference(); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + observer.onCompleted(); + }).start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) { + if (request.hasInitialRequest()) { + capturedInitial.set(request.getInitialRequest()); + } + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + var disposable = previewClient.subscribeToTopic(pubsubName, topicName, deadLetterTopic, TypeRef.STRING) + .subscribe(); + + // Wait briefly for the initial request to be captured, then dispose. + for (int i = 0; i < 50 && capturedInitial.get() == null; i++) { + Thread.sleep(20); + } + disposable.dispose(); + + assertNotNull(capturedInitial.get()); + assertEquals(pubsubName, capturedInitial.get().getPubsubName()); + assertEquals(topicName, capturedInitial.get().getTopic()); + assertTrue(capturedInitial.get().hasDeadLetterTopic()); + assertEquals(deadLetterTopic, capturedInitial.get().getDeadLetterTopic()); + } + + @Test + public void subscribeToTopicWithDeadLetterTopicAndMetadataTest() throws Exception { + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var deadLetterTopic = "topicName-DLQ"; + var started = new Semaphore(0); + var capturedInitial = new AtomicReference(); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + observer.onCompleted(); + }).start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) { + if (request.hasInitialRequest()) { + capturedInitial.set(request.getInitialRequest()); + } + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + Map metadata = Map.of("rawPayload", "true"); + var disposable = previewClient + .subscribeToTopic(pubsubName, topicName, deadLetterTopic, TypeRef.STRING, metadata) + .subscribe(); + + for (int i = 0; i < 50 && capturedInitial.get() == null; i++) { + Thread.sleep(20); + } + disposable.dispose(); + + assertNotNull(capturedInitial.get()); + assertTrue(capturedInitial.get().hasDeadLetterTopic()); + assertEquals(deadLetterTopic, capturedInitial.get().getDeadLetterTopic()); + assertEquals("true", capturedInitial.get().getMetadataMap().get("rawPayload")); + } + @Test public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception { List inputs = new ArrayList<>(); diff --git a/spring-boot-4-examples/consumer-app/pom.xml b/spring-boot-4-examples/consumer-app/pom.xml index b2ae3dfb2e..27eed8e81e 100644 --- a/spring-boot-4-examples/consumer-app/pom.xml +++ b/spring-boot-4-examples/consumer-app/pom.xml @@ -5,7 +5,7 @@ io.dapr spring-boot-4-examples - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/pom.xml b/spring-boot-4-examples/pom.xml index 42e018a81a..77761bd15a 100644 --- a/spring-boot-4-examples/pom.xml +++ b/spring-boot-4-examples/pom.xml @@ -6,7 +6,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/producer-app/pom.xml b/spring-boot-4-examples/producer-app/pom.xml index 3cdeb81f1c..24eddeab8c 100644 --- a/spring-boot-4-examples/producer-app/pom.xml +++ b/spring-boot-4-examples/producer-app/pom.xml @@ -6,7 +6,7 @@ io.dapr spring-boot-4-examples - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml b/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml index 4fcc0645e7..d3fc5bcf60 100644 --- a/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml +++ b/spring-boot-4-examples/workflows/multi-app/orchestrator/pom.xml @@ -6,7 +6,7 @@ io.dapr sb4-multi-app - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/workflows/multi-app/pom.xml b/spring-boot-4-examples/workflows/multi-app/pom.xml index 3788daa606..0c047ef1f0 100644 --- a/spring-boot-4-examples/workflows/multi-app/pom.xml +++ b/spring-boot-4-examples/workflows/multi-app/pom.xml @@ -6,7 +6,7 @@ io.dapr sb4-workflows - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml b/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml index 960ff3dfc1..21d33025c3 100644 --- a/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml +++ b/spring-boot-4-examples/workflows/multi-app/worker-one/pom.xml @@ -6,7 +6,7 @@ io.dapr sb4-multi-app - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml b/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml index 0edda5993b..9d727f54ed 100644 --- a/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml +++ b/spring-boot-4-examples/workflows/multi-app/worker-two/pom.xml @@ -6,7 +6,7 @@ io.dapr sb4-multi-app - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/workflows/patterns/pom.xml b/spring-boot-4-examples/workflows/patterns/pom.xml index cdf0a4652c..5b149ac3cc 100644 --- a/spring-boot-4-examples/workflows/patterns/pom.xml +++ b/spring-boot-4-examples/workflows/patterns/pom.xml @@ -6,7 +6,7 @@ io.dapr sb4-workflows - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-examples/workflows/pom.xml b/spring-boot-4-examples/workflows/pom.xml index e74c2392a5..2d21a3febf 100644 --- a/spring-boot-4-examples/workflows/pom.xml +++ b/spring-boot-4-examples/workflows/pom.xml @@ -6,7 +6,7 @@ io.dapr spring-boot-4-examples - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-4-sdk-tests/pom.xml b/spring-boot-4-sdk-tests/pom.xml index f7533bed93..d823aa5aaa 100644 --- a/spring-boot-4-sdk-tests/pom.xml +++ b/spring-boot-4-sdk-tests/pom.xml @@ -6,7 +6,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/consumer-app/pom.xml b/spring-boot-examples/consumer-app/pom.xml index f551c05496..caa841fad4 100644 --- a/spring-boot-examples/consumer-app/pom.xml +++ b/spring-boot-examples/consumer-app/pom.xml @@ -5,7 +5,7 @@ io.dapr spring-boot-examples - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/pom.xml b/spring-boot-examples/pom.xml index 5be4902938..25bffa0e0f 100644 --- a/spring-boot-examples/pom.xml +++ b/spring-boot-examples/pom.xml @@ -6,7 +6,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/producer-app/pom.xml b/spring-boot-examples/producer-app/pom.xml index 4b542f6a20..77c886cc2c 100644 --- a/spring-boot-examples/producer-app/pom.xml +++ b/spring-boot-examples/producer-app/pom.xml @@ -6,7 +6,7 @@ io.dapr spring-boot-examples - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml b/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml index 5a1a22fe55..97769ee927 100644 --- a/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml +++ b/spring-boot-examples/workflows/multi-app/orchestrator/pom.xml @@ -6,7 +6,7 @@ io.dapr multi-app - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/multi-app/pom.xml b/spring-boot-examples/workflows/multi-app/pom.xml index 5c05771355..46f51a2a17 100644 --- a/spring-boot-examples/workflows/multi-app/pom.xml +++ b/spring-boot-examples/workflows/multi-app/pom.xml @@ -6,7 +6,7 @@ io.dapr workflows - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/multi-app/worker-one/pom.xml b/spring-boot-examples/workflows/multi-app/worker-one/pom.xml index 5572d6f3da..20221ce441 100644 --- a/spring-boot-examples/workflows/multi-app/worker-one/pom.xml +++ b/spring-boot-examples/workflows/multi-app/worker-one/pom.xml @@ -6,7 +6,7 @@ io.dapr multi-app - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/multi-app/worker-two/pom.xml b/spring-boot-examples/workflows/multi-app/worker-two/pom.xml index 6175187e34..eacf792486 100644 --- a/spring-boot-examples/workflows/multi-app/worker-two/pom.xml +++ b/spring-boot-examples/workflows/multi-app/worker-two/pom.xml @@ -6,7 +6,7 @@ io.dapr multi-app - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/patterns/pom.xml b/spring-boot-examples/workflows/patterns/pom.xml index c3ec262f68..507c2bac23 100644 --- a/spring-boot-examples/workflows/patterns/pom.xml +++ b/spring-boot-examples/workflows/patterns/pom.xml @@ -6,7 +6,7 @@ io.dapr workflows - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/pom.xml b/spring-boot-examples/workflows/pom.xml index c79b064d26..b4c613f181 100644 --- a/spring-boot-examples/workflows/pom.xml +++ b/spring-boot-examples/workflows/pom.xml @@ -6,7 +6,7 @@ io.dapr spring-boot-examples - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml b/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml index 9f53cf3dd0..f2ddddba1b 100644 --- a/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml +++ b/spring-boot-examples/workflows/versioning/full-version-worker-one/pom.xml @@ -6,7 +6,7 @@ io.dapr versioning - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml b/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml index a441cb678a..306d4b78a4 100644 --- a/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml +++ b/spring-boot-examples/workflows/versioning/full-version-worker-two/pom.xml @@ -6,7 +6,7 @@ io.dapr versioning - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml b/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml index 937f4e794d..3dbec298c7 100644 --- a/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml +++ b/spring-boot-examples/workflows/versioning/patch-version-worker-one/pom.xml @@ -6,7 +6,7 @@ io.dapr versioning - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml b/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml index b16d36736f..e2838a3f27 100644 --- a/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml +++ b/spring-boot-examples/workflows/versioning/patch-version-worker-two/pom.xml @@ -6,7 +6,7 @@ io.dapr versioning - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/versioning/pom.xml b/spring-boot-examples/workflows/versioning/pom.xml index e3a9813380..2888cb1dd1 100644 --- a/spring-boot-examples/workflows/versioning/pom.xml +++ b/spring-boot-examples/workflows/versioning/pom.xml @@ -6,7 +6,7 @@ io.dapr workflows - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml b/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml index 6ff543a2b1..9fbb8db6a0 100644 --- a/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml +++ b/spring-boot-examples/workflows/versioning/version-orchestrator/pom.xml @@ -6,7 +6,7 @@ io.dapr versioning - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml diff --git a/testcontainers-dapr/pom.xml b/testcontainers-dapr/pom.xml index f76b23efa6..caf86ad0bd 100644 --- a/testcontainers-dapr/pom.xml +++ b/testcontainers-dapr/pom.xml @@ -5,7 +5,7 @@ io.dapr dapr-sdk-parent - 1.18.0-SNAPSHOT + 1.18.0-rc-1 ../pom.xml