From c006b9545a4dc92613f2bc0445a70f71ed373a16 Mon Sep 17 00:00:00 2001 From: joyoungjae Date: Sun, 25 Jan 2026 21:28:41 +0900 Subject: [PATCH 01/13] Fix : Add looging for SQS Partial Acknowledgement --- .../SqsAcknowledgementExecutor.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index dc99d180d5..3e8c2dce58 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -38,7 +38,8 @@ import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; /** - * {@link AcknowledgementExecutor} implementation for SQS queues. Handle the messages deletion, usually requested by an + * {@link AcknowledgementExecutor} implementation for SQS queues. Handle the + * messages deletion, usually requested by an * {@link ExecutingAcknowledgementProcessor}. * * @author Tomaz Fernandes @@ -75,8 +76,7 @@ public CompletableFuture execute(Collection> messagesToAck) { logger.debug("Executing acknowledgement for {} messages", messagesToAck.size()); Assert.notEmpty(messagesToAck, () -> "empty collection sent to acknowledge in queue " + this.queueName); return deleteMessages(messagesToAck); - } - catch (Exception e) { + } catch (Exception e) { return CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, e)); } } @@ -96,8 +96,15 @@ private CompletableFuture deleteMessages(Collection> messagesTo watch.start(); return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) - .thenRun(() -> {}), - t -> CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, t))) + .thenCompose(response -> { + if (!response.failed().isEmpty()) { + logger.warn("Some messages could not be acknowledged in queue {}", this.queueName); + return CompletableFutures.failedFuture( + createAcknowledgementException(messagesToAck, null)); + } + return CompletableFuture.completedFuture(null); + }), + t -> CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, t))) .whenComplete((v, t) -> logAckResult(messagesToAck, t, watch)); } @@ -129,8 +136,7 @@ private void logAckResult(Collection> messagesToAck, Throwable t, Sto logger.error("Error acknowledging in queue {} messages {} in {}ms", this.queueName, MessageHeaderUtils.getId(messagesToAck), totalTimeMillis, t instanceof CompletionException ? t.getCause() : t); - } - else { + } else { logger.trace("Done acknowledging in queue {} messages: {} in {}ms", this.queueName, MessageHeaderUtils.getId(messagesToAck), totalTimeMillis); } From fde00e361b34c51679b8977dff30f966b17413b5 Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 25 Jan 2026 21:37:20 +0900 Subject: [PATCH 02/13] Style : Modify code style to suit the characteristics of the existing repository --- .../acknowledgement/SqsAcknowledgementExecutor.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index 3e8c2dce58..280c7b9745 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -38,8 +38,7 @@ import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; /** - * {@link AcknowledgementExecutor} implementation for SQS queues. Handle the - * messages deletion, usually requested by an + * {@link AcknowledgementExecutor} implementation for SQS queues. Handle the messages deletion, usually requested by an * {@link ExecutingAcknowledgementProcessor}. * * @author Tomaz Fernandes @@ -74,9 +73,11 @@ public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) { public CompletableFuture execute(Collection> messagesToAck) { try { logger.debug("Executing acknowledgement for {} messages", messagesToAck.size()); - Assert.notEmpty(messagesToAck, () -> "empty collection sent to acknowledge in queue " + this.queueName); + Assert.notEmpty(messagesToAck, + () -> "empty collection sent to acknowledge in queue " + this.queueName); return deleteMessages(messagesToAck); - } catch (Exception e) { + } + catch (Exception e) { return CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, e)); } } @@ -136,7 +137,8 @@ private void logAckResult(Collection> messagesToAck, Throwable t, Sto logger.error("Error acknowledging in queue {} messages {} in {}ms", this.queueName, MessageHeaderUtils.getId(messagesToAck), totalTimeMillis, t instanceof CompletionException ? t.getCause() : t); - } else { + } + else { logger.trace("Done acknowledging in queue {} messages: {} in {}ms", this.queueName, MessageHeaderUtils.getId(messagesToAck), totalTimeMillis); } From e12bd5f16db520e34fec6525e7c8f2f6026f6572 Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 25 Jan 2026 21:44:41 +0900 Subject: [PATCH 03/13] Fix : Test code changes due to added logging - Changed null to DeleteMessageBatchResponse --- .../acknowledgement/SqsAcknowledgementExecutorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java index 1ed71e4817..e1164d822e 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; /** * Tests for {@link SqsAcknowledgementExecutor}. @@ -74,7 +75,7 @@ void shouldDeleteMessages() throws Exception { given(queueAttributes.getQueueName()).willReturn(queueName); given(queueAttributes.getQueueUrl()).willReturn(queueUrl); given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) - .willReturn(CompletableFuture.completedFuture(null)); + .willReturn(CompletableFuture.completedFuture(DeleteMessageBatchResponse.builder().build())); SqsAcknowledgementExecutor executor = new SqsAcknowledgementExecutor<>(); executor.setSqsAsyncClient(sqsAsyncClient); From de0e709d6d3168b25fee702dd5ed9c0854c1e84c Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 25 Jan 2026 22:25:49 +0900 Subject: [PATCH 04/13] Test : Add Test for partialBatchFailure --- .../SqsAcknowledgementExecutorTests.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java index e1164d822e..5d952078ba 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java @@ -38,6 +38,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; @@ -128,4 +129,28 @@ void shouldWrapIfErrorIsThrown() { .extracting(SqsAcknowledgementException::getQueue).isEqualTo(queueUrl); } + @Test + void shouldWrapPartialBatchFailure() { + Collection> messages = Collections.singletonList(message); + given(message.getHeaders()).willReturn(messageHeaders); + given(queueAttributes.getQueueName()).willReturn(queueName); + given(queueAttributes.getQueueUrl()).willReturn(queueUrl); + + BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id("test-id").code("ReceiptHandleIsInvalid") + .message("Receipt handle expired").build(); + + DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry) + .build(); + + given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .willReturn(CompletableFuture.completedFuture(partialFailureResponse)); + + SqsAcknowledgementExecutor executor = new SqsAcknowledgementExecutor<>(); + executor.setSqsAsyncClient(sqsAsyncClient); + executor.setQueueAttributes(queueAttributes); + + assertThatThrownBy(() -> executor.execute(messages).join()).isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(SqsAcknowledgementException.class); + } + } From 78d3b265fa8fc31594ecf26aefb649f0f16c9544 Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 25 Jan 2026 22:47:17 +0900 Subject: [PATCH 05/13] Fix : Added logging of failed ID list --- .../acknowledgement/SqsAcknowledgementExecutor.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index 280c7b9745..e9aa5b044b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -34,6 +34,7 @@ import org.springframework.util.Assert; import org.springframework.util.StopWatch; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; @@ -73,8 +74,7 @@ public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) { public CompletableFuture execute(Collection> messagesToAck) { try { logger.debug("Executing acknowledgement for {} messages", messagesToAck.size()); - Assert.notEmpty(messagesToAck, - () -> "empty collection sent to acknowledge in queue " + this.queueName); + Assert.notEmpty(messagesToAck, () -> "empty collection sent to acknowledge in queue " + this.queueName); return deleteMessages(messagesToAck); } catch (Exception e) { @@ -99,7 +99,9 @@ private CompletableFuture deleteMessages(Collection> messagesTo .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) .thenCompose(response -> { if (!response.failed().isEmpty()) { - logger.warn("Some messages could not be acknowledged in queue {}", this.queueName); + logger.warn("Some messages could not be acknowledged in queue {}: {}", + this.queueName, + response.failed().stream().map(BatchResultErrorEntry::id).toList()); return CompletableFutures.failedFuture( createAcknowledgementException(messagesToAck, null)); } From b27810952e176e250f7238a47a3daafb34edc87b Mon Sep 17 00:00:00 2001 From: co2plant Date: Mon, 26 Jan 2026 01:11:33 +0900 Subject: [PATCH 06/13] Fix: Added success/failure lists to SqsAcknowledgementException DeleteMessageBatch response - Correlate items using message IDs for accurate mapping - Pass different lists to the SqsAcknowledgementException constructor to handle partial failures --- .../SqsAcknowledgementExecutor.java | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index e9aa5b044b..11742ffc26 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -22,9 +22,11 @@ import io.awspring.cloud.sqs.listener.QueueAttributesAware; import io.awspring.cloud.sqs.listener.SqsAsyncClientAware; import io.awspring.cloud.sqs.listener.SqsHeaders; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.UUID; +import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.stream.Collectors; @@ -99,11 +101,31 @@ private CompletableFuture deleteMessages(Collection> messagesTo .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) .thenCompose(response -> { if (!response.failed().isEmpty()) { + Set failedIds = response.failed().stream() + .map(BatchResultErrorEntry::id) + .collect(Collectors.toSet()); + logger.warn("Some messages could not be acknowledged in queue {}: {}", - this.queueName, - response.failed().stream().map(BatchResultErrorEntry::id).toList()); + this.queueName, failedIds); + + List> successfulMessages = new ArrayList<>(); + List> failedMessages = new ArrayList<>(); + + for(Message msg : messagesToAck) { + if(failedIds.contains(MessageHeaderUtils.getId(msg))) { + failedMessages.add(msg); + } else { + successfulMessages.add(msg); + } + } + return CompletableFutures.failedFuture( - createAcknowledgementException(messagesToAck, null)); + new SqsAcknowledgementException( + "Error acknowledging messages " + failedIds, + successfulMessages, + failedMessages, + this.queueUrl, + null)); } return CompletableFuture.completedFuture(null); }), @@ -123,7 +145,7 @@ private DeleteMessageBatchRequestEntry toDeleteMessageEntry(Message message) return DeleteMessageBatchRequestEntry .builder() .receiptHandle(MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER)) - .id(UUID.randomUUID().toString()) + .id(MessageHeaderUtils.getId(message)) .build(); } // @formatter:on From 1730ba5bbb5dffe8a8a8150cc7c853269ee54494 Mon Sep 17 00:00:00 2001 From: co2plant Date: Mon, 26 Jan 2026 01:52:01 +0900 Subject: [PATCH 07/13] Refactoring: Extract the createPartialFailureException helper method - Extract partial error handling logic from the deleteMessages method - Improved code readability and maintainability - placed the calling method at the top and the helper method at the bottom --- .../SqsAcknowledgementExecutor.java | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index 11742ffc26..3c610c1ff5 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -39,6 +39,7 @@ import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; /** * {@link AcknowledgementExecutor} implementation for SQS queues. Handle the messages deletion, usually requested by an @@ -101,31 +102,8 @@ private CompletableFuture deleteMessages(Collection> messagesTo .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) .thenCompose(response -> { if (!response.failed().isEmpty()) { - Set failedIds = response.failed().stream() - .map(BatchResultErrorEntry::id) - .collect(Collectors.toSet()); - - logger.warn("Some messages could not be acknowledged in queue {}: {}", - this.queueName, failedIds); - - List> successfulMessages = new ArrayList<>(); - List> failedMessages = new ArrayList<>(); - - for(Message msg : messagesToAck) { - if(failedIds.contains(MessageHeaderUtils.getId(msg))) { - failedMessages.add(msg); - } else { - successfulMessages.add(msg); - } - } - - return CompletableFutures.failedFuture( - new SqsAcknowledgementException( - "Error acknowledging messages " + failedIds, - successfulMessages, - failedMessages, - this.queueUrl, - null)); + return CompletableFutures. failedFuture( + createPartialFailureException(messagesToAck, response)); } return CompletableFuture.completedFuture(null); }), @@ -133,6 +111,33 @@ private CompletableFuture deleteMessages(Collection> messagesTo .whenComplete((v, t) -> logAckResult(messagesToAck, t, watch)); } + private SqsAcknowledgementException createPartialFailureException(Collection> messages, DeleteMessageBatchResponse response){ + Set failedIds = response.failed().stream() + .map(BatchResultErrorEntry::id) + .collect(Collectors.toSet()); + + List> successfulMessages = new ArrayList<>(); + List> failedMessages = new ArrayList<>(); + + for(Message msg : messages) { + if(failedIds.contains(MessageHeaderUtils.getId(msg))) { + failedMessages.add(msg); + } else { + successfulMessages.add(msg); + } + } + + logger.warn("Some messages could not be acknowledged in queue {}: {}", + this.queueName, failedIds); + + return new SqsAcknowledgementException( + "Error acknowledging messages " + failedIds, + successfulMessages, + failedMessages, + this.queueUrl, + null); + } + private DeleteMessageBatchRequest createDeleteMessageBatchRequest(Collection> messagesToAck) { return DeleteMessageBatchRequest .builder() From 4bdcd0430c8a14cc8b09c1c4bfc9aae38f379e81 Mon Sep 17 00:00:00 2001 From: co2plant Date: Mon, 16 Feb 2026 22:18:51 +0900 Subject: [PATCH 08/13] Refactoring: Separate the batch response processing deletion part from SqsAcknowledgementExecutor. --- .../SqsAcknowledgementExecutor.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index 3c610c1ff5..4d59de7b5e 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -100,17 +100,19 @@ private CompletableFuture deleteMessages(Collection> messagesTo watch.start(); return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) - .thenCompose(response -> { - if (!response.failed().isEmpty()) { - return CompletableFutures. failedFuture( - createPartialFailureException(messagesToAck, response)); - } - return CompletableFuture.completedFuture(null); - }), + .thenCompose(response -> handleDeleteBatchResponse(messagesToAck, response)), t -> CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, t))) .whenComplete((v, t) -> logAckResult(messagesToAck, t, watch)); } + private CompletableFuture handleDeleteBatchResponse(Collection> messagesToAck, DeleteMessageBatchResponse response){ + if(!response.failed().isEmpty()) { + return CompletableFutures. failedFuture( + createPartialFailureException(messagesToAck, response)); + } + return CompletableFuture.completedFuture(null); + } + private SqsAcknowledgementException createPartialFailureException(Collection> messages, DeleteMessageBatchResponse response){ Set failedIds = response.failed().stream() .map(BatchResultErrorEntry::id) From fc089194bba107c4d95fda93d8c4e7571604fb3c Mon Sep 17 00:00:00 2001 From: co2plant Date: Thu, 19 Feb 2026 00:43:16 +0900 Subject: [PATCH 09/13] Fix: improve partial SQS acknowledgement failure handling - handle DeleteMessageBatch partial failures in a dedicated method - map AWS failed entry IDs to original message IDs - populate SqsAcknowledgementException with successful/failed message lists - avoid wrapping SqsAcknowledgementException twice - extend SqsAcknowledgementExecutorTests to assert partial failure mapping --- .../SqsAcknowledgementExecutor.java | 23 +++++++++----- .../SqsAcknowledgementExecutorTests.java | 31 +++++++++++++++---- 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index 4d59de7b5e..80bdc52f32 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -99,20 +99,29 @@ private CompletableFuture deleteMessages(Collection> messagesTo StopWatch watch = new StopWatch(); watch.start(); return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient - .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)) - .thenCompose(response -> handleDeleteBatchResponse(messagesToAck, response)), - t -> CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, t))) + .deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)).thenCompose( + response -> handleDeleteMessageBatchResponse(messagesToAck, response)), + t -> toAcknowledgementFailure(messagesToAck, t)) .whenComplete((v, t) -> logAckResult(messagesToAck, t, watch)); } - private CompletableFuture handleDeleteBatchResponse(Collection> messagesToAck, DeleteMessageBatchResponse response){ - if(!response.failed().isEmpty()) { - return CompletableFutures. failedFuture( - createPartialFailureException(messagesToAck, response)); + private CompletableFuture handleDeleteMessageBatchResponse(Collection> messagesToAck, + DeleteMessageBatchResponse response) { + if (!response.failed().isEmpty()) { + return CompletableFutures.failedFuture(createPartialFailureException(messagesToAck, response)); } return CompletableFuture.completedFuture(null); } + private CompletableFuture toAcknowledgementFailure(Collection> messagesToAck, Throwable throwable) { + Throwable cause = throwable instanceof CompletionException && throwable.getCause() != null ? throwable.getCause() + : throwable; + if (cause instanceof SqsAcknowledgementException) { + return CompletableFutures.failedFuture(cause); + } + return CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, cause)); + } + private SqsAcknowledgementException createPartialFailureException(Collection> messages, DeleteMessageBatchResponse response){ Set failedIds = response.failed().stream() .map(BatchResultErrorEntry::id) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java index 5d952078ba..0f41503bc2 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java @@ -28,6 +28,7 @@ import io.awspring.cloud.sqs.listener.SqsHeaders; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; @@ -60,15 +61,23 @@ class SqsAcknowledgementExecutorTests { @Mock Message message; + @Mock + Message secondMessage; + String queueName = "sqsAcknowledgementExecutorTestsQueueName"; String queueUrl = "sqsAcknowledgementExecutorTestsQueueUrl"; String receiptHandle = "sqsAcknowledgementExecutorTestsQueueReceiptHandle"; + String secondReceiptHandle = "sqsAcknowledgementExecutorTestsQueueSecondReceiptHandle"; + MessageHeaders messageHeaders = new MessageHeaders( Collections.singletonMap(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, receiptHandle)); + MessageHeaders secondMessageHeaders = new MessageHeaders( + Collections.singletonMap(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, secondReceiptHandle)); + @Test void shouldDeleteMessages() throws Exception { Collection> messages = Collections.singletonList(message); @@ -131,13 +140,19 @@ void shouldWrapIfErrorIsThrown() { @Test void shouldWrapPartialBatchFailure() { - Collection> messages = Collections.singletonList(message); - given(message.getHeaders()).willReturn(messageHeaders); + Message failedMessage = message; + Message successfulMessage = secondMessage; + MessageHeaders failedMessageHeaders = messageHeaders; + MessageHeaders successfulMessageHeaders = secondMessageHeaders; + Collection> messagesToAck = List.of(failedMessage, successfulMessage); + + given(failedMessage.getHeaders()).willReturn(failedMessageHeaders); + given(successfulMessage.getHeaders()).willReturn(successfulMessageHeaders); given(queueAttributes.getQueueName()).willReturn(queueName); given(queueAttributes.getQueueUrl()).willReturn(queueUrl); - BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id("test-id").code("ReceiptHandleIsInvalid") - .message("Receipt handle expired").build(); + BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id(failedMessageHeaders.getId().toString()) + .code("ReceiptHandleIsInvalid").message("Receipt handle expired").build(); DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry) .build(); @@ -149,8 +164,12 @@ void shouldWrapPartialBatchFailure() { executor.setSqsAsyncClient(sqsAsyncClient); executor.setQueueAttributes(queueAttributes); - assertThatThrownBy(() -> executor.execute(messages).join()).isInstanceOf(CompletionException.class) - .hasCauseInstanceOf(SqsAcknowledgementException.class); + assertThatThrownBy(() -> executor.execute(messagesToAck).join()).isInstanceOf(CompletionException.class) + .getCause().isInstanceOf(SqsAcknowledgementException.class) + .asInstanceOf(type(SqsAcknowledgementException.class)).satisfies(ex -> { + assertThat(ex.getFailedAcknowledgementMessages()).containsExactly(failedMessage); + assertThat(ex.getSuccessfullyAcknowledgedMessages()).containsExactly(successfulMessage); + }); } } From e3fa4a7a46225cee7d573fa8533a3d5b10d625a3 Mon Sep 17 00:00:00 2001 From: co2plant Date: Thu, 19 Feb 2026 00:43:34 +0900 Subject: [PATCH 10/13] Docs: document partial acknowledgement failure handling in callbacks - fix callback interface name to AsyncAcknowledgementResultCallback - explain that onFailure receives SqsAcknowledgementException on partial failures - add an example using successful/failed acknowledgement message lists for retry --- docs/src/main/asciidoc/sqs.adoc | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index a50bcc9e69..c69cfcc356 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -2020,7 +2020,7 @@ Implementations of this interface will be executed after an acknowledgement exec ==== Acknowledgement Result Callback -The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. +The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. ```java public interface AcknowledgementResultCallback { @@ -2048,6 +2048,19 @@ public interface AsyncAcknowledgementResultCallback { } ``` +If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`. +Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed. + +```java +@Override +public void onFailure(Collection> messages, Throwable t) { + if (t instanceof SqsAcknowledgementException ex) { + Collection> failedMessages = ex.getFailedAcknowledgementMessages(); + // retry only failedMessages + } +} +``` + ```java @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { From 38e50ea7cd4e36c85fb9fa351ffa78517b31faa5 Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 22 Feb 2026 22:03:29 +0900 Subject: [PATCH 11/13] Fix: add fail-safe handling for uncorrelated SQS acknowledgement failure ids --- .../SqsAcknowledgementExecutor.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java index 80bdc52f32..08b4b36047 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutor.java @@ -122,10 +122,20 @@ private CompletableFuture toAcknowledgementFailure(Collection> return CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, cause)); } - private SqsAcknowledgementException createPartialFailureException(Collection> messages, DeleteMessageBatchResponse response){ + private SqsAcknowledgementException createPartialFailureException(Collection> messages, + DeleteMessageBatchResponse response) { + Set messageIds = messages.stream().map(MessageHeaderUtils::getId).collect(Collectors.toSet()); Set failedIds = response.failed().stream() - .map(BatchResultErrorEntry::id) - .collect(Collectors.toSet()); + .map(BatchResultErrorEntry::id) + .collect(Collectors.toSet()); + + if (!messageIds.containsAll(failedIds)) { + logger.warn("Could not correlate all acknowledgement failure ids in queue {}: {}", this.queueName, + failedIds); + return new SqsAcknowledgementException("Could not correlate acknowledgement failure ids: " + failedIds, + Collections.emptyList(), messages.stream().map(msg -> (Message) msg).collect(Collectors.toList()), + this.queueUrl, null); + } List> successfulMessages = new ArrayList<>(); List> failedMessages = new ArrayList<>(); @@ -138,15 +148,10 @@ private SqsAcknowledgementException createPartialFailureException(Collection> messagesToAck) { From 932f0900b379d7a8a177645b0d27c5a675d77d1c Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 22 Feb 2026 22:03:45 +0900 Subject: [PATCH 12/13] Test: add coverage for uncorrelated acknowledgement failure ids --- .../SqsAcknowledgementExecutorTests.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java index 0f41503bc2..f7828231a4 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/acknowledgement/SqsAcknowledgementExecutorTests.java @@ -172,4 +172,34 @@ void shouldWrapPartialBatchFailure() { }); } + @Test + void shouldTreatAllMessagesAsFailedIfAwsFailureIdCannotBeCorrelated() { + Collection> messagesToAck = List.of(message, secondMessage); + + given(message.getHeaders()).willReturn(messageHeaders); + given(secondMessage.getHeaders()).willReturn(secondMessageHeaders); + given(queueAttributes.getQueueName()).willReturn(queueName); + given(queueAttributes.getQueueUrl()).willReturn(queueUrl); + + BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id("unknown-id") + .code("ReceiptHandleIsInvalid").message("Receipt handle expired").build(); + + DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry) + .build(); + + given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .willReturn(CompletableFuture.completedFuture(partialFailureResponse)); + + SqsAcknowledgementExecutor executor = new SqsAcknowledgementExecutor<>(); + executor.setSqsAsyncClient(sqsAsyncClient); + executor.setQueueAttributes(queueAttributes); + + assertThatThrownBy(() -> executor.execute(messagesToAck).join()).isInstanceOf(CompletionException.class) + .getCause().isInstanceOf(SqsAcknowledgementException.class) + .asInstanceOf(type(SqsAcknowledgementException.class)).satisfies(ex -> { + assertThat(ex.getSuccessfullyAcknowledgedMessages()).isEmpty(); + assertThat(ex.getFailedAcknowledgementMessages()).containsExactlyInAnyOrder(message, secondMessage); + }); + } + } From 7b71da165a3c78efad843ce1db03d20f83e96ea1 Mon Sep 17 00:00:00 2001 From: co2plant Date: Sun, 22 Feb 2026 22:06:25 +0900 Subject: [PATCH 13/13] Docs: document fail-safe correlation behavior in acknowledgement result callbacks --- docs/src/main/asciidoc/sqs.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index c69cfcc356..a437fa5ce4 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -468,6 +468,7 @@ SqsTemplate.builder().configure(options -> options.acknowledgementMode(TemplateA ``` If an error occurs during acknowledgement, a `SqsAcknowledgementException` is thrown, containing both the messages that were successfully acknowledged and those which failed. +See <> for details on inspecting partial failure results and the fail-safe correlation behavior. To acknowledge messages received with `MANUAL` acknowledgement, the `Acknowledgement#acknowledge` and `Acknowledgement#acknowledgeAsync` methods can be used. @@ -2018,6 +2019,7 @@ NOTE: PARALLEL is the default for FIFO because ordering is guaranteed for proces This assures no messages from a given `MessageGroup` will be polled until the previous batch is acknowledged. Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure. +[[sqs-acknowledgement-result-callback]] ==== Acknowledgement Result Callback The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. @@ -2061,6 +2063,8 @@ public void onFailure(Collection> messages, Throwable t) { } ``` +NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification. + ```java @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {