Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.joda.time.Duration.millis;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -38,6 +41,7 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -263,10 +267,16 @@ public void testWriteBatchesWithTimeout() {

p.run().waitUntilFinish();

SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
// due to added delay, batches are timed out on arrival of every 3rd msg
verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1], entries[2]));
verify(sqs).sendMessageBatch(request("queue", entries[3], entries[4]));
// Nominally batches time out on arrival of every 3rd message ([0,1,2], [3,4]). The exact
// grouping depends on wall clock time and is unreliable on loaded machines (#38946), so
// verify timing-independent invariants instead: expired batches are flushed on append, so
// with >= 100ms between messages no batch can exceed 3 entries before exceeding the 150ms
// timeout, forcing at least 2 batches for 5 messages.
Map<String, List<SendMessageBatchRequest>> requests = captureBatchRequests(atLeast(2));
assertThat(requests.keySet()).containsExactly("queue");
assertMessageBodies(requestsFor(requests, "queue"), range(0, 5));
assertThat(requestsFor(requests, "queue"))
.allSatisfy(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(3));
}

@Test
Expand All @@ -285,11 +295,16 @@ public void testWriteBatchesWithStrictTimeout() {

p.run().waitUntilFinish();

SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
// using strict timeouts batches, batches are timed out by a separate thread
verify(sqs).sendMessageBatch(request("queue", entries[0], entries[1]));
verify(sqs).sendMessageBatch(request("queue", entries[2], entries[3]));
verify(sqs).sendMessageBatch(request("queue", entries[4]));
// Nominally the separate timeout thread flushes [0,1], [2,3], [4]. The exact grouping
// depends on wall clock time and is unreliable on loaded machines (#38946), so verify
// timing-independent invariants instead. Expired batches are also flushed on append
// (independently of the timeout thread), so no batch can exceed 3 entries and 5 messages
// require at least 2 batches.
Map<String, List<SendMessageBatchRequest>> requests = captureBatchRequests(atLeast(2));
assertThat(requests.keySet()).containsExactly("queue");
assertMessageBodies(requestsFor(requests, "queue"), range(0, 5));
assertThat(requestsFor(requests, "queue"))
.allSatisfy(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(3));
}

@Test
Expand Down Expand Up @@ -337,11 +352,18 @@ public void testWriteBatchesToDynamicWithTimeout() {

p.run().waitUntilFinish();

SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
// due to added delay, dynamic batches are timed out on arrival of every 2nd msg (per batch)
verify(sqs).sendMessageBatch(request("even", entries[0], entries[2]));
verify(sqs).sendMessageBatch(request("uneven", entries[1], entries[3]));
verify(sqs).sendMessageBatch(request("even", entries[4]));
// Nominally dynamic batches are timed out on arrival of every 2nd message per batch
// ([0,2], [1,3], [4]). The exact grouping depends on wall clock time and is unreliable on
// loaded machines (#38946), so verify timing-independent invariants instead: per queue,
// consecutive messages are >= 200ms apart, so expired batches are flushed on append once
// reaching 2 entries, requiring at least 3 batches overall.
Map<String, List<SendMessageBatchRequest>> requests = captureBatchRequests(atLeast(3));
assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
assertMessageBodies(requestsFor(requests, "even"), range(0, 5).filter(i -> i % 2 == 0));
assertMessageBodies(requestsFor(requests, "uneven"), range(0, 5).filter(i -> i % 2 == 1));
requests.values().stream()
.flatMap(List::stream)
.forEach(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(2));
}

@Test
Expand All @@ -360,14 +382,19 @@ public void testWriteBatchesToDynamicWithStrictTimeout() {

p.run().waitUntilFinish();

SendMessageBatchRequestEntry[] entries = entries(range(0, 5));
// using strict timeouts batches, batches are timed out by a separate thread before any 2nd
// entry
verify(sqs).sendMessageBatch(request("even", entries[0]));
verify(sqs).sendMessageBatch(request("uneven", entries[1]));
verify(sqs).sendMessageBatch(request("even", entries[2]));
verify(sqs).sendMessageBatch(request("uneven", entries[3]));
verify(sqs).sendMessageBatch(request("even", entries[4]));
// Nominally the separate timeout thread flushes every batch before a 2nd entry arrives
// (5 singleton batches). The exact grouping depends on wall clock time and is unreliable
// on loaded machines (#38946), so verify timing-independent invariants instead. Expired
// batches are also flushed on append (independently of the timeout thread) and per queue
// consecutive messages are >= 200ms apart, so no batch can exceed 2 entries and at least
// 3 batches are required overall.
Map<String, List<SendMessageBatchRequest>> requests = captureBatchRequests(atLeast(3));
assertThat(requests.keySet()).containsExactlyInAnyOrder("even", "uneven");
assertMessageBodies(requestsFor(requests, "even"), range(0, 5).filter(i -> i % 2 == 0));
assertMessageBodies(requestsFor(requests, "uneven"), range(0, 5).filter(i -> i % 2 == 1));
requests.values().stream()
.flatMap(List::stream)
.forEach(req -> assertThat(req.entries().size()).isLessThanOrEqualTo(2));
}

@Test
Expand Down Expand Up @@ -406,6 +433,32 @@ private SendMessageBatchRequest anyRequest() {
return any();
}

/** Captures all batch requests, verifying the given mode, and groups them by queue url. */
private Map<String, List<SendMessageBatchRequest>> captureBatchRequests(
org.mockito.verification.VerificationMode mode) {
ArgumentCaptor<SendMessageBatchRequest> captor =
ArgumentCaptor.forClass(SendMessageBatchRequest.class);
verify(sqs, mode).sendMessageBatch(captor.capture());
return captor.getAllValues().stream().collect(groupingBy(SendMessageBatchRequest::queueUrl));
}

/** Returns the (non-null) batch requests for a queue, failing if there were none. */
private List<SendMessageBatchRequest> requestsFor(
Map<String, List<SendMessageBatchRequest>> requests, String queue) {
return checkStateNotNull(requests.get(queue), "no batch requests for queue %s", queue);
}

/** Asserts that the requests contain exactly the expected message bodies, each exactly once. */
private void assertMessageBodies(List<SendMessageBatchRequest> requests, IntStream expectedMsgs) {
assertThat(
requests.stream()
.flatMap(req -> req.entries().stream())
.map(SendMessageBatchRequestEntry::messageBody)
.collect(toList()))
.containsExactlyInAnyOrderElementsOf(
expectedMsgs.mapToObj(Integer::toString).collect(toList()));
}

private SendMessageBatchRequest request(String queue, SendMessageBatchRequestEntry... entries) {
return SendMessageBatchRequest.builder()
.queueUrl(queue)
Expand Down
Loading