diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index a50bcc9e69..a437fa5ce4 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -468,6 +468,7 @@ SqsTemplate.builder().configure(options -> options.acknowledgementMode(TemplateA ``` If an error occurs during acknowledgement, a `SqsAcknowledgementException` is thrown, containing both the messages that were successfully acknowledged and those which failed. +See <> for details on inspecting partial failure results and the fail-safe correlation behavior. To acknowledge messages received with `MANUAL` acknowledgement, the `Acknowledgement#acknowledge` and `Acknowledgement#acknowledgeAsync` methods can be used. @@ -2018,9 +2019,10 @@ NOTE: PARALLEL is the default for FIFO because ordering is guaranteed for proces This assures no messages from a given `MessageGroup` will be polled until the previous batch is acknowledged. Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure. +[[sqs-acknowledgement-result-callback]] ==== Acknowledgement Result Callback -The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. +The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. ```java public interface AcknowledgementResultCallback { @@ -2048,6 +2050,21 @@ public interface AsyncAcknowledgementResultCallback { } ``` +If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`. +Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed. + +```java +@Override +public void onFailure(Collection> messages, Throwable t) { + if (t instanceof SqsAcknowledgementException ex) { + Collection> failedMessages = ex.getFailedAcknowledgementMessages(); + // retry only failedMessages + } +} +``` + +NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification. + ```java @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index dc99d180d5..08b4b36047 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -22,9 +22,11 @@ import io.awspring.cloud.sqs.listener.QueueAttributesAware; import io.awspring.cloud.sqs.listener.SqsAsyncClientAware; import io.awspring.cloud.sqs.listener.SqsHeaders; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.UUID; +import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.stream.Collectors; @@ -34,8 +36,10 @@ import org.springframework.util.Assert; import org.springframework.util.StopWatch; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; /** * {@link AcknowledgementExecutor} implementation for SQS queues. Handle the messages deletion, usually requested by an @@ -95,12 +99,61 @@ private CompletableFuture deleteMessages(Collection> messagesTo StopWatch watch = new StopWatch(); watch.start(); return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient - .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) - .thenRun(() -> {}), - t -> CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, t))) + .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)).thenCompose( + response -> handleDeleteMessageBatchResponse(messagesToAck, response)), + t -> toAcknowledgementFailure(messagesToAck, t)) .whenComplete((v, t) -> logAckResult(messagesToAck, t, watch)); } + private CompletableFuture handleDeleteMessageBatchResponse(Collection> messagesToAck, + DeleteMessageBatchResponse response) { + if (!response.failed().isEmpty()) { + return CompletableFutures.failedFuture(createPartialFailureException(messagesToAck, response)); + } + return CompletableFuture.completedFuture(null); + } + + private CompletableFuture toAcknowledgementFailure(Collection> messagesToAck, Throwable throwable) { + Throwable cause = throwable instanceof CompletionException && throwable.getCause() != null ? throwable.getCause() + : throwable; + if (cause instanceof SqsAcknowledgementException) { + return CompletableFutures.failedFuture(cause); + } + return CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, cause)); + } + + private SqsAcknowledgementException createPartialFailureException(Collection> messages, + DeleteMessageBatchResponse response) { + Set messageIds = messages.stream().map(MessageHeaderUtils::getId).collect(Collectors.toSet()); + Set failedIds = response.failed().stream() + .map(BatchResultErrorEntry::id) + .collect(Collectors.toSet()); + + if (!messageIds.containsAll(failedIds)) { + logger.warn("Could not correlate all acknowledgement failure ids in queue {}: {}", this.queueName, + failedIds); + return new SqsAcknowledgementException("Could not correlate acknowledgement failure ids: " + failedIds, + Collections.emptyList(), messages.stream().map(msg -> (Message) msg).collect(Collectors.toList()), + this.queueUrl, null); + } + + List> successfulMessages = new ArrayList<>(); + List> failedMessages = new ArrayList<>(); + + for(Message msg : messages) { + if(failedIds.contains(MessageHeaderUtils.getId(msg))) { + failedMessages.add(msg); + } else { + successfulMessages.add(msg); + } + } + + logger.warn("Some messages could not be acknowledged in queue {}: {}", this.queueName, failedIds); + + return new SqsAcknowledgementException("Error acknowledging messages " + failedIds, successfulMessages, + failedMessages, this.queueUrl, null); + } + private DeleteMessageBatchRequest createDeleteMessageBatchRequest(Collection> messagesToAck) { return DeleteMessageBatchRequest .builder() @@ -113,7 +166,7 @@ private DeleteMessageBatchRequestEntry toDeleteMessageEntry(Message message) return DeleteMessageBatchRequestEntry .builder() .receiptHandle(MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER)) - .id(UUID.randomUUID().toString()) + .id(MessageHeaderUtils.getId(message)) .build(); } // @formatter:on diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java index 1ed71e4817..f7828231a4 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java @@ -28,6 +28,7 @@ import io.awspring.cloud.sqs.listener.SqsHeaders; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; @@ -38,8 +39,10 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; /** * Tests for {@link SqsAcknowledgementExecutor}. @@ -58,15 +61,23 @@ class SqsAcknowledgementExecutorTests { @Mock Message message; + @Mock + Message secondMessage; + String queueName = "sqsAcknowledgementExecutorTestsQueueName"; String queueUrl = "sqsAcknowledgementExecutorTestsQueueUrl"; String receiptHandle = "sqsAcknowledgementExecutorTestsQueueReceiptHandle"; + String secondReceiptHandle = "sqsAcknowledgementExecutorTestsQueueSecondReceiptHandle"; + MessageHeaders messageHeaders = new MessageHeaders( Collections.singletonMap(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, receiptHandle)); + MessageHeaders secondMessageHeaders = new MessageHeaders( + Collections.singletonMap(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, secondReceiptHandle)); + @Test void shouldDeleteMessages() throws Exception { Collection> messages = Collections.singletonList(message); @@ -74,7 +85,7 @@ void shouldDeleteMessages() throws Exception { given(queueAttributes.getQueueName()).willReturn(queueName); given(queueAttributes.getQueueUrl()).willReturn(queueUrl); given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) - .willReturn(CompletableFuture.completedFuture(null)); + .willReturn(CompletableFuture.completedFuture(DeleteMessageBatchResponse.builder().build())); SqsAcknowledgementExecutor executor = new SqsAcknowledgementExecutor<>(); executor.setSqsAsyncClient(sqsAsyncClient); @@ -127,4 +138,68 @@ void shouldWrapIfErrorIsThrown() { .extracting(SqsAcknowledgementException::getQueue).isEqualTo(queueUrl); } + @Test + void shouldWrapPartialBatchFailure() { + Message failedMessage = message; + Message successfulMessage = secondMessage; + MessageHeaders failedMessageHeaders = messageHeaders; + MessageHeaders successfulMessageHeaders = secondMessageHeaders; + Collection> messagesToAck = List.of(failedMessage, successfulMessage); + + given(failedMessage.getHeaders()).willReturn(failedMessageHeaders); + given(successfulMessage.getHeaders()).willReturn(successfulMessageHeaders); + given(queueAttributes.getQueueName()).willReturn(queueName); + given(queueAttributes.getQueueUrl()).willReturn(queueUrl); + + BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id(failedMessageHeaders.getId().toString()) + .code("ReceiptHandleIsInvalid").message("Receipt handle expired").build(); + + DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry) + .build(); + + given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .willReturn(CompletableFuture.completedFuture(partialFailureResponse)); + + SqsAcknowledgementExecutor executor = new SqsAcknowledgementExecutor<>(); + executor.setSqsAsyncClient(sqsAsyncClient); + executor.setQueueAttributes(queueAttributes); + + assertThatThrownBy(() -> executor.execute(messagesToAck).join()).isInstanceOf(CompletionException.class) + .getCause().isInstanceOf(SqsAcknowledgementException.class) + .asInstanceOf(type(SqsAcknowledgementException.class)).satisfies(ex -> { + assertThat(ex.getFailedAcknowledgementMessages()).containsExactly(failedMessage); + assertThat(ex.getSuccessfullyAcknowledgedMessages()).containsExactly(successfulMessage); + }); + } + + @Test + void shouldTreatAllMessagesAsFailedIfAwsFailureIdCannotBeCorrelated() { + Collection> messagesToAck = List.of(message, secondMessage); + + given(message.getHeaders()).willReturn(messageHeaders); + given(secondMessage.getHeaders()).willReturn(secondMessageHeaders); + given(queueAttributes.getQueueName()).willReturn(queueName); + given(queueAttributes.getQueueUrl()).willReturn(queueUrl); + + BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id("unknown-id") + .code("ReceiptHandleIsInvalid").message("Receipt handle expired").build(); + + DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry) + .build(); + + given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .willReturn(CompletableFuture.completedFuture(partialFailureResponse)); + + SqsAcknowledgementExecutor executor = new SqsAcknowledgementExecutor<>(); + executor.setSqsAsyncClient(sqsAsyncClient); + executor.setQueueAttributes(queueAttributes); + + assertThatThrownBy(() -> executor.execute(messagesToAck).join()).isInstanceOf(CompletionException.class) + .getCause().isInstanceOf(SqsAcknowledgementException.class) + .asInstanceOf(type(SqsAcknowledgementException.class)).satisfies(ex -> { + assertThat(ex.getSuccessfullyAcknowledgedMessages()).isEmpty(); + assertThat(ex.getFailedAcknowledgementMessages()).containsExactlyInAnyOrder(message, secondMessage); + }); + } + }