diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java index e92720bfb5a5..90ea28d60447 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java @@ -21,8 +21,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -30,6 +32,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,6 +41,7 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -263,10 +267,16 @@ public void testWriteBatchesWithTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // due to added delay, batches are timed out on arrival of every 3rd msg - verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1], entries[2])); - verify(sqs).sendMessageBatch(request("queue", entries[3], entries[4])); + // Nominally batches time out on arrival of every 3rd message ([0,1,2], [3,4]). The exact + // grouping depends on wall clock time and is unreliable on loaded machines (#38946), so + // verify timing-independent invariants instead: expired batches are flushed on append, so + // with >= 100ms between messages no batch can exceed 3 entries before exceeding the 150ms + // timeout, forcing at least 2 batches for 5 messages. + Map> requests = captureBatchRequests(atLeast(2)); + assertThat(requests.keySet()).containsExactly("queue"); + assertMessageBodies(requestsFor(requests, "queue"), range(0, 5)); + assertThat(requestsFor(requests, "queue")) + .allSatisfy(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(3)); } @Test @@ -285,11 +295,16 @@ public void testWriteBatchesWithStrictTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // using strict timeouts batches, batches are timed out by a separate thread - verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1])); - verify(sqs).sendMessageBatch(request("queue", entries[2], entries[3])); - verify(sqs).sendMessageBatch(request("queue", entries[4])); + // Nominally the separate timeout thread flushes [0,1], [2,3], [4]. The exact grouping + // depends on wall clock time and is unreliable on loaded machines (#38946), so verify + // timing-independent invariants instead. Expired batches are also flushed on append + // (independently of the timeout thread), so no batch can exceed 3 entries and 5 messages + // require at least 2 batches. + Map> requests = captureBatchRequests(atLeast(2)); + assertThat(requests.keySet()).containsExactly("queue"); + assertMessageBodies(requestsFor(requests, "queue"), range(0, 5)); + assertThat(requestsFor(requests, "queue")) + .allSatisfy(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(3)); } @Test @@ -337,11 +352,18 @@ public void testWriteBatchesToDynamicWithTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // due to added delay, dynamic batches are timed out on arrival of every 2nd msg (per batch) - verify(sqs).sendMessageBatch(request("even", entries[0], entries[2])); - verify(sqs).sendMessageBatch(request("uneven", entries[1], entries[3])); - verify(sqs).sendMessageBatch(request("even", entries[4])); + // Nominally dynamic batches are timed out on arrival of every 2nd message per batch + // ([0,2], [1,3], [4]). The exact grouping depends on wall clock time and is unreliable on + // loaded machines (#38946), so verify timing-independent invariants instead: per queue, + // consecutive messages are >= 200ms apart, so expired batches are flushed on append once + // reaching 2 entries, requiring at least 3 batches overall. + Map> requests = captureBatchRequests(atLeast(3)); + assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven"); + assertMessageBodies(requestsFor(requests, "even"), range(0, 5).filter(i -> i % 2 == 0)); + assertMessageBodies(requestsFor(requests, "uneven"), range(0, 5).filter(i -> i % 2 == 1)); + requests.values().stream() + .flatMap(List::stream) + .forEach(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(2)); } @Test @@ -360,14 +382,19 @@ public void testWriteBatchesToDynamicWithStrictTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // using strict timeouts batches, batches are timed out by a separate thread before any 2nd - // entry - verify(sqs).sendMessageBatch(request("even", entries[0])); - verify(sqs).sendMessageBatch(request("uneven", entries[1])); - verify(sqs).sendMessageBatch(request("even", entries[2])); - verify(sqs).sendMessageBatch(request("uneven", entries[3])); - verify(sqs).sendMessageBatch(request("even", entries[4])); + // Nominally the separate timeout thread flushes every batch before a 2nd entry arrives + // (5 singleton batches). The exact grouping depends on wall clock time and is unreliable + // on loaded machines (#38946), so verify timing-independent invariants instead. Expired + // batches are also flushed on append (independently of the timeout thread) and per queue + // consecutive messages are >= 200ms apart, so no batch can exceed 2 entries and at least + // 3 batches are required overall. + Map> requests = captureBatchRequests(atLeast(3)); + assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven"); + assertMessageBodies(requestsFor(requests, "even"), range(0, 5).filter(i -> i % 2 == 0)); + assertMessageBodies(requestsFor(requests, "uneven"), range(0, 5).filter(i -> i % 2 == 1)); + requests.values().stream() + .flatMap(List::stream) + .forEach(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(2)); } @Test @@ -406,6 +433,32 @@ private SendMessageBatchRequest anyRequest() { return any(); } + /** Captures all batch requests, verifying the given mode, and groups them by queue url. */ + private Map> captureBatchRequests( + org.mockito.verification.VerificationMode mode) { + ArgumentCaptor captor = + ArgumentCaptor.forClass(SendMessageBatchRequest.class); + verify(sqs, mode).sendMessageBatch(captor.capture()); + return captor.getAllValues().stream().collect(groupingBy(SendMessageBatchRequest::queueUrl)); + } + + /** Returns the (non-null) batch requests for a queue, failing if there were none. */ + private List requestsFor( + Map> requests, String queue) { + return checkStateNotNull(requests.get(queue), "no batch requests for queue %s", queue); + } + + /** Asserts that the requests contain exactly the expected message bodies, each exactly once. */ + private void assertMessageBodies(List requests, IntStream expectedMsgs) { + assertThat( + requests.stream() + .flatMap(req -> req.entries().stream()) + .map(SendMessageBatchRequestEntry::messageBody) + .collect(toList())) + .containsExactlyInAnyOrderElementsOf( + expectedMsgs.mapToObj(Integer::toString).collect(toList())); + } + private SendMessageBatchRequest request(String queue, SendMessageBatchRequestEntry... entries) { return SendMessageBatchRequest.builder() .queueUrl(queue)