From 62a4c6d524eb9cd8552d5b212fe06cf42374c346 Mon Sep 17 00:00:00 2001 From: Tobias Kaymak Date: Mon, 15 Jun 2026 19:10:24 +0000 Subject: [PATCH] [aws2] Make SqsIOWriteBatchesTest timeout tests timing-independent The four timeout-related tests asserted the exact grouping of messages into SendMessageBatch calls. Those groupings depend on wall-clock timing (the per-message Thread.sleep delay racing the configured batch timeout), so on loaded CI runners batches form differently and the strict verify(...).sendMessageBatch(request(exact entries)) checks fail with Mockito ArgumentsAreDifferent. Rewrite the assertions to verify timing-independent invariants instead: all expected message bodies are sent exactly once, no batch exceeds the size implied by the timeout cadence, and at least the minimum number of batches is produced. This still exercises the timeout-driven flushing (both synchronous and the strict separate-thread variant) without depending on exact wall-clock behavior. Fixes #38946 --- .../io/aws2/sqs/SqsIOWriteBatchesTest.java | 97 ++++++++++++++----- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java index e92720bfb5a5..90ea28d60447 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sqs/SqsIOWriteBatchesTest.java @@ -21,8 +21,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -30,6 +32,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,6 +41,7 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -263,10 +267,16 @@ public void testWriteBatchesWithTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // due to added delay, batches are timed out on arrival of every 3rd msg - verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1], entries[2])); - verify(sqs).sendMessageBatch(request("queue", entries[3], entries[4])); + // Nominally batches time out on arrival of every 3rd message ([0,1,2], [3,4]). The exact + // grouping depends on wall clock time and is unreliable on loaded machines (#38946), so + // verify timing-independent invariants instead: expired batches are flushed on append, so + // with >= 100ms between messages no batch can exceed 3 entries before exceeding the 150ms + // timeout, forcing at least 2 batches for 5 messages. + Map> requests = captureBatchRequests(atLeast(2)); + assertThat(requests.keySet()).containsExactly("queue"); + assertMessageBodies(requestsFor(requests, "queue"), range(0, 5)); + assertThat(requestsFor(requests, "queue")) + .allSatisfy(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(3)); } @Test @@ -285,11 +295,16 @@ public void testWriteBatchesWithStrictTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // using strict timeouts batches, batches are timed out by a separate thread - verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1])); - verify(sqs).sendMessageBatch(request("queue", entries[2], entries[3])); - verify(sqs).sendMessageBatch(request("queue", entries[4])); + // Nominally the separate timeout thread flushes [0,1], [2,3], [4]. The exact grouping + // depends on wall clock time and is unreliable on loaded machines (#38946), so verify + // timing-independent invariants instead. Expired batches are also flushed on append + // (independently of the timeout thread), so no batch can exceed 3 entries and 5 messages + // require at least 2 batches. + Map> requests = captureBatchRequests(atLeast(2)); + assertThat(requests.keySet()).containsExactly("queue"); + assertMessageBodies(requestsFor(requests, "queue"), range(0, 5)); + assertThat(requestsFor(requests, "queue")) + .allSatisfy(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(3)); } @Test @@ -337,11 +352,18 @@ public void testWriteBatchesToDynamicWithTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // due to added delay, dynamic batches are timed out on arrival of every 2nd msg (per batch) - verify(sqs).sendMessageBatch(request("even", entries[0], entries[2])); - verify(sqs).sendMessageBatch(request("uneven", entries[1], entries[3])); - verify(sqs).sendMessageBatch(request("even", entries[4])); + // Nominally dynamic batches are timed out on arrival of every 2nd message per batch + // ([0,2], [1,3], [4]). The exact grouping depends on wall clock time and is unreliable on + // loaded machines (#38946), so verify timing-independent invariants instead: per queue, + // consecutive messages are >= 200ms apart, so expired batches are flushed on append once + // reaching 2 entries, requiring at least 3 batches overall. + Map> requests = captureBatchRequests(atLeast(3)); + assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven"); + assertMessageBodies(requestsFor(requests, "even"), range(0, 5).filter(i -> i % 2 == 0)); + assertMessageBodies(requestsFor(requests, "uneven"), range(0, 5).filter(i -> i % 2 == 1)); + requests.values().stream() + .flatMap(List::stream) + .forEach(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(2)); } @Test @@ -360,14 +382,19 @@ public void testWriteBatchesToDynamicWithStrictTimeout() { p.run().waitUntilFinish(); - SendMessageBatchRequestEntry[] entries = entries(range(0, 5)); - // using strict timeouts batches, batches are timed out by a separate thread before any 2nd - // entry - verify(sqs).sendMessageBatch(request("even", entries[0])); - verify(sqs).sendMessageBatch(request("uneven", entries[1])); - verify(sqs).sendMessageBatch(request("even", entries[2])); - verify(sqs).sendMessageBatch(request("uneven", entries[3])); - verify(sqs).sendMessageBatch(request("even", entries[4])); + // Nominally the separate timeout thread flushes every batch before a 2nd entry arrives + // (5 singleton batches). The exact grouping depends on wall clock time and is unreliable + // on loaded machines (#38946), so verify timing-independent invariants instead. Expired + // batches are also flushed on append (independently of the timeout thread) and per queue + // consecutive messages are >= 200ms apart, so no batch can exceed 2 entries and at least + // 3 batches are required overall. + Map> requests = captureBatchRequests(atLeast(3)); + assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven"); + assertMessageBodies(requestsFor(requests, "even"), range(0, 5).filter(i -> i % 2 == 0)); + assertMessageBodies(requestsFor(requests, "uneven"), range(0, 5).filter(i -> i % 2 == 1)); + requests.values().stream() + .flatMap(List::stream) + .forEach(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(2)); } @Test @@ -406,6 +433,32 @@ private SendMessageBatchRequest anyRequest() { return any(); } + /** Captures all batch requests, verifying the given mode, and groups them by queue url. */ + private Map> captureBatchRequests( + org.mockito.verification.VerificationMode mode) { + ArgumentCaptor captor = + ArgumentCaptor.forClass(SendMessageBatchRequest.class); + verify(sqs, mode).sendMessageBatch(captor.capture()); + return captor.getAllValues().stream().collect(groupingBy(SendMessageBatchRequest::queueUrl)); + } + + /** Returns the (non-null) batch requests for a queue, failing if there were none. */ + private List requestsFor( + Map> requests, String queue) { + return checkStateNotNull(requests.get(queue), "no batch requests for queue %s", queue); + } + + /** Asserts that the requests contain exactly the expected message bodies, each exactly once. */ + private void assertMessageBodies(List requests, IntStream expectedMsgs) { + assertThat( + requests.stream() + .flatMap(req -> req.entries().stream()) + .map(SendMessageBatchRequestEntry::messageBody) + .collect(toList())) + .containsExactlyInAnyOrderElementsOf( + expectedMsgs.mapToObj(Integer::toString).collect(toList())); + } + private SendMessageBatchRequest request(String queue, SendMessageBatchRequestEntry... entries) { return SendMessageBatchRequest.builder() .queueUrl(queue)