Skip to content

Commit dd12c31

Browse files
committed
Add concurrent waitForCondition example, and add examples to cloud tests
1 parent e8fe528 commit dd12c31

6 files changed

Lines changed: 207 additions & 13 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.wait;
4+
5+
import java.util.stream.IntStream;
6+
import software.amazon.lambda.durable.DurableContext;
7+
import software.amazon.lambda.durable.DurableHandler;
8+
import software.amazon.lambda.durable.config.MapConfig;
9+
import software.amazon.lambda.durable.config.WaitForConditionConfig;
10+
import software.amazon.lambda.durable.model.WaitForConditionResult;
11+
12+
/**
13+
* Example demonstrating concurrent waitForCondition operations using map.
14+
*
15+
* <p>Runs many (totalOperations) waitForCondition operations concurrently (maxConcurrency). Each operation:
16+
*
17+
* <ol>
18+
* <li>Uses attempt count as state (replay-safe).
19+
* <li>Fails and retries until the attempt count reaches the given threshold, and then succeeds
20+
* </ol>
21+
*/
22+
public class ConcurrentWaitForConditionExample extends DurableHandler<ConcurrentWaitForConditionExample.Input, String> {
23+
24+
public record Input(int threshold, int totalOperations, int maxConcurrency) {}
25+
26+
@Override
27+
public String handleRequest(Input input, DurableContext context) {
28+
var items = IntStream.range(0, input.totalOperations()).boxed().toList();
29+
30+
var config = MapConfig.builder().maxConcurrency(input.maxConcurrency()).build();
31+
32+
var result = context.map(
33+
"concurrent-wait-for-conditions",
34+
items,
35+
String.class,
36+
(item, index, ctx) -> {
37+
var conditionConfig = WaitForConditionConfig.<Integer>builder()
38+
.initialState(1)
39+
.build();
40+
// Poll until the counter reaches the input threshold
41+
var count = ctx.waitForCondition(
42+
"condition-" + index,
43+
Integer.class,
44+
(callCount, stepCtx) -> {
45+
if (callCount >= input.threshold()) {
46+
return WaitForConditionResult.stopPolling(callCount);
47+
}
48+
return WaitForConditionResult.continuePolling(callCount + 1);
49+
},
50+
conditionConfig);
51+
return String.valueOf(count);
52+
},
53+
config);
54+
55+
return String.join(" | ", result.results());
56+
}
57+
}

examples/src/main/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExample.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,29 @@
1010
/**
1111
* Example demonstrating the waitForCondition operation.
1212
*
13-
* <p>This example simulates waiting for an order to ship, by repeatedly calling a check function.
13+
* <p>This handler polls a condition function until it signals completion:
14+
*
15+
* <ol>
16+
* <li>The attempt count is used as a state (replay safe)
17+
* <li>Fails and retries until the attempt count reaches the given threshold, and then succeeds
18+
* </ol>
1419
*/
1520
public class WaitForConditionExample extends DurableHandler<Integer, Integer> {
1621

1722
@Override
18-
public Integer handleRequest(Integer input, DurableContext context) {
19-
// Poll the shipment status until the order is shipped.
20-
// The check function simulates an order shipment (0 -> 1 -> 2 -> 3 -> 4)
23+
public Integer handleRequest(Integer threshold, DurableContext context) {
24+
// Poll until the counter reaches the input threshold
2125
return context.waitForCondition(
22-
"wait-for-shipment",
26+
"wait-for-condition",
2327
Integer.class,
2428
(callCount, stepCtx) -> {
25-
// Simulate checking shipment status from an external service
26-
if (callCount >= 3) {
27-
// Order has shipped — stop polling
28-
return WaitForConditionResult.stopPolling(callCount + 1);
29+
if (callCount >= threshold) {
30+
// Condition met, stop polling
31+
return WaitForConditionResult.stopPolling(callCount);
2932
}
30-
// Order still processing — continue polling
33+
// Condition not met, keep polling
3134
return WaitForConditionResult.continuePolling(callCount + 1);
3235
},
33-
WaitForConditionConfig.<Integer>builder().initialState(1).build()); // Order pending - initial status
36+
WaitForConditionConfig.<Integer>builder().initialState(1).build());
3437
}
3538
}

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.junit.jupiter.api.Assertions.*;
66
import static software.amazon.lambda.durable.TypeToken.get;
77

8+
import java.time.Duration;
89
import java.util.HashMap;
910
import java.util.List;
1011
import java.util.Map;
@@ -25,6 +26,7 @@
2526
import software.amazon.lambda.durable.examples.step.ManyAsyncStepsExample;
2627
import software.amazon.lambda.durable.examples.types.ApprovalRequest;
2728
import software.amazon.lambda.durable.examples.types.GreetingRequest;
29+
import software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample;
2830
import software.amazon.lambda.durable.model.ExecutionStatus;
2931
import software.amazon.lambda.durable.testing.CloudDurableTestRunner;
3032

@@ -594,4 +596,41 @@ void testComplexMapExample() {
594596
assertTrue(output.contains("healthy"));
595597
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
596598
}
599+
600+
@Test
601+
void testWaitForConditionExample() {
602+
var runner = CloudDurableTestRunner.create(
603+
arn("wait-for-condition-example"), Integer.class, Integer.class, lambdaClient);
604+
var result = runner.run(3);
605+
606+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
607+
assertEquals(3, result.getResult(Integer.class));
608+
}
609+
610+
@Test
611+
void testConcurrentWaitForConditionExample() {
612+
var runner = CloudDurableTestRunner.create(
613+
arn("concurrent-wait-for-condition-example"),
614+
ConcurrentWaitForConditionExample.Input.class,
615+
String.class,
616+
lambdaClient);
617+
var result = runner.run(new ConcurrentWaitForConditionExample.Input(3, 100, 50));
618+
619+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
620+
621+
// Verify each operation finished with 3 attempts
622+
var allOperationsOutput = result.getResult(String.class);
623+
var operationOutputs = allOperationsOutput.split(" \\| ");
624+
assertEquals(100, operationOutputs.length);
625+
for (var operationOutput : operationOutputs) {
626+
assertEquals("3", operationOutput);
627+
}
628+
629+
// Verify each operation completes in under 30 seconds (extra time for flakiness)
630+
for (var operationResult : result.getOperations()) {
631+
assertTrue(
632+
operationResult.getDuration().compareTo(Duration.ofSeconds(30)) < 0,
633+
"Operation took " + operationResult.getDuration().toSeconds() + "s, expected < 30s");
634+
}
635+
}
597636
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.wait;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.model.ExecutionStatus;
9+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
10+
11+
class ConcurrentWaitForConditionExampleTest {
12+
13+
@Test
14+
void testConcurrentWaitForConditionExample() {
15+
var handler = new ConcurrentWaitForConditionExample();
16+
var runner = LocalDurableTestRunner.create(ConcurrentWaitForConditionExample.Input.class, handler);
17+
18+
var result = runner.runUntilComplete(new ConcurrentWaitForConditionExample.Input(3, 100, 50));
19+
20+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
21+
22+
var allOperationsOutput = result.getResult(String.class);
23+
var operationResults = allOperationsOutput.split(" \\| ");
24+
assertEquals(100, operationResults.length);
25+
for (var operationResult : operationResults) {
26+
assertEquals("3", operationResult);
27+
}
28+
}
29+
}

examples/src/test/java/software/amazon/lambda/durable/examples/wait/WaitForConditionExampleTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ void testWaitForConditionExample() {
1515
var handler = new WaitForConditionExample();
1616
var runner = LocalDurableTestRunner.create(Integer.class, handler);
1717

18-
var result = runner.runUntilComplete(123);
18+
var result = runner.runUntilComplete(3);
1919

2020
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
21-
assertEquals(4, result.getResult(Integer.class));
21+
assertEquals(3, result.getResult(Integer.class));
2222
}
2323
}

examples/template.yaml

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,56 @@ Resources:
518518
DockerContext: ../
519519
DockerTag: durable-examples
520520

521+
WaitForConditionExampleFunction:
522+
Type: AWS::Serverless::Function
523+
Properties:
524+
PackageType: Image
525+
FunctionName: !Join
526+
- ''
527+
- - 'wait-for-condition-example'
528+
- !Ref FunctionNameSuffix
529+
ImageConfig:
530+
Command: ["software.amazon.lambda.durable.examples.wait.WaitForConditionExample::handleRequest"]
531+
DurableConfig:
532+
ExecutionTimeout: 300
533+
RetentionPeriodInDays: 7
534+
Policies:
535+
- Statement:
536+
- Effect: Allow
537+
Action:
538+
- lambda:CheckpointDurableExecutions
539+
- lambda:GetDurableExecutionState
540+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:wait-for-condition-example${FunctionNameSuffix}"
541+
Metadata:
542+
Dockerfile: !Ref DockerFile
543+
DockerContext: ../
544+
DockerTag: durable-examples
545+
546+
ConcurrentWaitForConditionExampleFunction:
547+
Type: AWS::Serverless::Function
548+
Properties:
549+
PackageType: Image
550+
FunctionName: !Join
551+
- ''
552+
- - 'concurrent-wait-for-condition-example'
553+
- !Ref FunctionNameSuffix
554+
ImageConfig:
555+
Command: ["software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample::handleRequest"]
556+
DurableConfig:
557+
ExecutionTimeout: 300
558+
RetentionPeriodInDays: 7
559+
Policies:
560+
- Statement:
561+
- Effect: Allow
562+
Action:
563+
- lambda:CheckpointDurableExecutions
564+
- lambda:GetDurableExecutionState
565+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:concurrent-wait-for-condition-example${FunctionNameSuffix}"
566+
Metadata:
567+
Dockerfile: !Ref DockerFile
568+
DockerContext: ../
569+
DockerTag: durable-examples
570+
521571
Outputs:
522572
NoopExampleFunction:
523573
Description: Noop Example Function ARN
@@ -679,3 +729,19 @@ Outputs:
679729
Description: Complex Map Example Function Name
680730
Value: !Ref ComplexMapExampleFunction
681731

732+
WaitForConditionExampleFunction:
733+
Description: Wait For Condition Example Function ARN
734+
Value: !GetAtt WaitForConditionExampleFunction.Arn
735+
736+
WaitForConditionExampleFunctionName:
737+
Description: Wait For Condition Example Function Name
738+
Value: !Ref WaitForConditionExampleFunction
739+
740+
ConcurrentWaitForConditionExampleFunction:
741+
Description: Concurrent Wait For Condition Example Function ARN
742+
Value: !GetAtt ConcurrentWaitForConditionExampleFunction.Arn
743+
744+
ConcurrentWaitForConditionExampleFunctionName:
745+
Description: Concurrent Wait For Condition Example Function Name
746+
Value: !Ref ConcurrentWaitForConditionExampleFunction
747+

0 commit comments

Comments
 (0)