Skip to content

Commit 728175b

Browse files
authored
Add failure category plumbing for pipeline retry loops (#14)
* [self-healing]: add failure category plumbing for pipeline retry loops Run-checker now classifies failures from all trigger types (Glue, SFN, EMR, EMR Serverless, Databricks) into FailureCategory (TRANSIENT, TIMEOUT, PERMANENT) and passes them through to the orchestrator. IsRetryable defaults empty/unknown categories to retryable so that missing classification never silently disables retries. The orchestrator logResult action defaults empty failureCategory to TRANSIENT for backward compatibility with ASL templates that don't pass it yet. checkReadiness returns "not_ready" with poll metadata (pollAdvised, failedTraits) instead of bare "skip", enabling ASL readiness polling. * fix diagram alignment
1 parent c8ecfc5 commit 728175b

19 files changed

Lines changed: 341 additions & 83 deletions

File tree

cmd/lambda/orchestrator/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,11 @@ func checkReadiness(_ context.Context, _ *intlambda.Deps, req intlambda.Orchestr
355355
if !allPass {
356356
return intlambda.OrchestratorResponse{
357357
Action: req.Action,
358-
Result: "skip",
358+
Result: "not_ready",
359359
Payload: map[string]interface{}{
360-
"reason": "not ready",
361-
"blocking": blocking,
360+
"pollAdvised": true,
361+
"failedTraits": blocking,
362+
"message": fmt.Sprintf("blocked by %d traits", len(blocking)),
362363
},
363364
}, nil
364365
}
@@ -408,6 +409,9 @@ func logResult(ctx context.Context, d *intlambda.Deps, req intlambda.Orchestrato
408409
}
409410

410411
if status == string(types.RunFailed) {
412+
if failureCategory == "" {
413+
failureCategory = string(types.FailureTransient)
414+
}
411415
entry.FailureMessage = message
412416
entry.FailureCategory = types.FailureCategory(failureCategory)
413417

cmd/lambda/orchestrator/main_test.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,10 @@ func TestCheckReadiness_HasBlocking(t *testing.T) {
206206
},
207207
})
208208
require.NoError(t, err)
209-
assert.Equal(t, "skip", resp.Result)
210-
blocking := resp.Payload["blocking"].([]string)
211-
assert.Contains(t, blocking, "schema")
209+
assert.Equal(t, "not_ready", resp.Result)
210+
assert.Equal(t, true, resp.Payload["pollAdvised"])
211+
failedTraits := resp.Payload["failedTraits"].([]string)
212+
assert.Contains(t, failedTraits, "schema")
212213
}
213214

214215
func TestCheckReadiness_OptionalFail(t *testing.T) {
@@ -301,6 +302,57 @@ func TestLogResult(t *testing.T) {
301302
assert.Equal(t, "proceed", resp.Result)
302303
}
303304

305+
func TestLogResult_FailedWithCategory(t *testing.T) {
306+
d := testDeps(t)
307+
seedPipeline(t, d, types.PipelineConfig{Name: "pipe-a"})
308+
309+
resp, err := handleOrchestrator(context.Background(), d, intlambda.OrchestratorRequest{
310+
Action: "logResult",
311+
PipelineID: "pipe-a",
312+
ScheduleID: "daily",
313+
Payload: map[string]interface{}{
314+
"status": string(types.RunFailed),
315+
"runID": "run-2",
316+
"message": "glue timeout",
317+
"failureCategory": string(types.FailureTimeout),
318+
},
319+
})
320+
require.NoError(t, err)
321+
assert.Equal(t, "proceed", resp.Result)
322+
assert.Equal(t, true, resp.Payload["retryable"])
323+
324+
// Verify category was persisted
325+
date := time.Now().UTC().Format("2006-01-02")
326+
entry, err := d.Provider.GetRunLog(context.Background(), "pipe-a", date, "daily")
327+
require.NoError(t, err)
328+
assert.Equal(t, types.FailureTimeout, entry.FailureCategory)
329+
}
330+
331+
func TestLogResult_FailedEmptyCategoryDefaultsTransient(t *testing.T) {
332+
d := testDeps(t)
333+
seedPipeline(t, d, types.PipelineConfig{Name: "pipe-a"})
334+
335+
resp, err := handleOrchestrator(context.Background(), d, intlambda.OrchestratorRequest{
336+
Action: "logResult",
337+
PipelineID: "pipe-a",
338+
ScheduleID: "daily",
339+
Payload: map[string]interface{}{
340+
"status": string(types.RunFailed),
341+
"runID": "run-3",
342+
"message": "unknown error",
343+
},
344+
})
345+
require.NoError(t, err)
346+
assert.Equal(t, "proceed", resp.Result)
347+
assert.Equal(t, true, resp.Payload["retryable"])
348+
349+
// Verify empty category defaulted to transient
350+
date := time.Now().UTC().Format("2006-01-02")
351+
entry, err := d.Provider.GetRunLog(context.Background(), "pipe-a", date, "daily")
352+
require.NoError(t, err)
353+
assert.Equal(t, types.FailureTransient, entry.FailureCategory)
354+
}
355+
304356
// --- releaseLock ---
305357

306358
func TestReleaseLock(t *testing.T) {

cmd/lambda/run-checker/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ func handleRunCheck(ctx context.Context, d *intlambda.Deps, req intlambda.RunChe
4242
}
4343

4444
return intlambda.RunCheckResponse{
45-
State: result.State,
46-
Message: result.Message,
45+
State: result.State,
46+
Message: result.Message,
47+
FailureCategory: result.FailureCategory,
4748
}, nil
4849
}
4950

cmd/lambda/run-checker/main_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -177,18 +177,19 @@ func TestHandleRunCheck_UnknownType(t *testing.T) {
177177

178178
func TestHandleRunCheck_GlueStatusMapping(t *testing.T) {
179179
tests := []struct {
180-
name string
181-
state gluetypes.JobRunState
182-
expected trigger.RunCheckState
180+
name string
181+
state gluetypes.JobRunState
182+
expected trigger.RunCheckState
183+
failureCategory types.FailureCategory
183184
}{
184-
{"succeeded", gluetypes.JobRunStateSucceeded, trigger.RunCheckSucceeded},
185-
{"failed", gluetypes.JobRunStateFailed, trigger.RunCheckFailed},
186-
{"timeout", gluetypes.JobRunStateTimeout, trigger.RunCheckFailed},
187-
{"stopped", gluetypes.JobRunStateStopped, trigger.RunCheckFailed},
188-
{"error", gluetypes.JobRunStateError, trigger.RunCheckFailed},
189-
{"running", gluetypes.JobRunStateRunning, trigger.RunCheckRunning},
190-
{"starting", gluetypes.JobRunStateStarting, trigger.RunCheckRunning},
191-
{"waiting", gluetypes.JobRunStateWaiting, trigger.RunCheckRunning},
185+
{"succeeded", gluetypes.JobRunStateSucceeded, trigger.RunCheckSucceeded, ""},
186+
{"failed", gluetypes.JobRunStateFailed, trigger.RunCheckFailed, types.FailureTransient},
187+
{"timeout", gluetypes.JobRunStateTimeout, trigger.RunCheckFailed, types.FailureTimeout},
188+
{"stopped", gluetypes.JobRunStateStopped, trigger.RunCheckFailed, types.FailureTransient},
189+
{"error", gluetypes.JobRunStateError, trigger.RunCheckFailed, types.FailureTransient},
190+
{"running", gluetypes.JobRunStateRunning, trigger.RunCheckRunning, ""},
191+
{"starting", gluetypes.JobRunStateStarting, trigger.RunCheckRunning, ""},
192+
{"waiting", gluetypes.JobRunStateWaiting, trigger.RunCheckRunning, ""},
192193
}
193194

194195
for _, tt := range tests {
@@ -215,6 +216,7 @@ func TestHandleRunCheck_GlueStatusMapping(t *testing.T) {
215216
require.NoError(t, err)
216217
assert.Equal(t, tt.expected, resp.State)
217218
assert.Equal(t, string(tt.state), resp.Message)
219+
assert.Equal(t, tt.failureCategory, resp.FailureCategory)
218220
})
219221
}
220222
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
---
2+
title: "Retry Loop ASL Pattern"
3+
weight: 10
4+
---
5+
6+
# Step Function Retry & Readiness Polling Pattern
7+
8+
This guide shows the recommended ASL patterns for implementing retry loops and readiness polling in your Step Function state machine.
9+
10+
## Prerequisites
11+
12+
These patterns require interlock v0.2.2+ which adds:
13+
14+
- `failureCategory` in run-checker responses (classifies failures as `TRANSIENT`, `TIMEOUT`, or `PERMANENT`)
15+
- `retryable` and `retryBackoffSeconds` in orchestrator `logResult` responses
16+
- `not_ready` result with `pollAdvised` from orchestrator `checkReadiness`
17+
18+
## Failure Retry Loop
19+
20+
When a job fails, the orchestrator's `logResult` action returns retry metadata. The ASL can use this to loop back and retry.
21+
22+
```json
23+
{
24+
"LogRunFailed": {
25+
"Type": "Task",
26+
"Resource": "${OrchestratorArn}",
27+
"Parameters": {
28+
"action": "logResult",
29+
"pipelineID.$": "$.pipelineID",
30+
"scheduleID.$": "$.scheduleID",
31+
"payload": {
32+
"status": "FAILED",
33+
"runID.$": "$.runID",
34+
"message.$": "$.failureMessage",
35+
"failureCategory.$": "$.failureCategory"
36+
}
37+
},
38+
"ResultPath": "$.logResult",
39+
"Next": "IsRetryable"
40+
},
41+
42+
"IsRetryable": {
43+
"Type": "Choice",
44+
"Choices": [
45+
{
46+
"Variable": "$.logResult.payload.retryable",
47+
"BooleanEquals": true,
48+
"Next": "WaitRetryBackoff"
49+
}
50+
],
51+
"Default": "ReleaseLockFailed"
52+
},
53+
54+
"WaitRetryBackoff": {
55+
"Type": "Wait",
56+
"SecondsPath": "$.logResult.payload.retryBackoffSeconds",
57+
"Next": "AcquireLock"
58+
}
59+
}
60+
```
61+
62+
### How it works
63+
64+
1. `LogRunFailed` calls the orchestrator with `failureCategory` from the run-checker
65+
2. The orchestrator computes `retryable` (based on category + attempt count + max attempts) and `retryBackoffSeconds`
66+
3. `IsRetryable` branches: if retryable, wait and loop back to `AcquireLock`; otherwise, proceed to final cleanup
67+
4. `WaitRetryBackoff` uses `SecondsPath` for dynamic exponential backoff
68+
69+
### Backward compatibility
70+
71+
If the ASL does not pass `failureCategory`, the orchestrator defaults it to `TRANSIENT`, making the failure retryable. This ensures existing deployments get retry behavior without ASL changes.
72+
73+
## Readiness Polling
74+
75+
When traits fail (data not ready), the orchestrator returns `not_ready` with poll metadata. The ASL can use this to wait and re-evaluate.
76+
77+
```json
78+
{
79+
"CheckReadiness": {
80+
"Type": "Task",
81+
"Resource": "${OrchestratorArn}",
82+
"Parameters": {
83+
"action": "checkReadiness",
84+
"pipelineID.$": "$.pipelineID",
85+
"payload": {
86+
"traitResults.$": "$.traitResults"
87+
}
88+
},
89+
"ResultPath": "$.readiness",
90+
"Next": "IsReady"
91+
},
92+
93+
"IsReady": {
94+
"Type": "Choice",
95+
"Choices": [
96+
{
97+
"Variable": "$.readiness.result",
98+
"StringEquals": "proceed",
99+
"Next": "TriggerJob"
100+
},
101+
{
102+
"Variable": "$.readiness.result",
103+
"StringEquals": "not_ready",
104+
"Next": "WaitReadiness"
105+
}
106+
],
107+
"Default": "HandleEvaluatorError"
108+
},
109+
110+
"WaitReadiness": {
111+
"Type": "Wait",
112+
"Seconds": 60,
113+
"Next": "AcquireLock"
114+
}
115+
}
116+
```
117+
118+
### How it works
119+
120+
1. `CheckReadiness` evaluates trait results and returns `proceed`, `not_ready`, or `error`
121+
2. `IsReady` branches on the result:
122+
- `proceed`: all required traits pass, trigger the job
123+
- `not_ready`: data not ready yet, wait and re-evaluate (loops back to `AcquireLock`)
124+
- `error`: evaluator infrastructure failure, handle separately
125+
3. `WaitReadiness` pauses before re-evaluation (use a fixed interval or compute dynamically)
126+
127+
### Backward compatibility
128+
129+
The previous `skip` result is replaced by `not_ready`. Existing ASL templates that check `result == "proceed"` with a default fallback will treat `not_ready` the same as `skip` — both hit the default path. No ASL changes are required for existing deployments to continue working.
130+
131+
## Complete Flow
132+
133+
The recommended state machine flow combining both patterns:
134+
135+
```
136+
AcquireLock → CheckRunLog → ResolvePipeline → EvaluateTraits → CheckReadiness
137+
138+
┌──────────┼──────────┐
139+
│ │ │
140+
proceed not_ready error
141+
│ │ │
142+
TriggerJob Wait(60s) Alert+Skip
143+
│ │
144+
PollStatus → AcquireLock
145+
146+
┌─────┼─────┐
147+
│ │
148+
succeeded failed
149+
│ │
150+
LogCompleted LogFailed
151+
152+
┌─────┼─────┐
153+
│ │
154+
retryable non-retryable
155+
│ │
156+
Wait(backoff) Cleanup
157+
158+
AcquireLock
159+
```

internal/lambda/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type RunCheckRequest struct {
9494

9595
// RunCheckResponse is the output of the run-checker Lambda.
9696
type RunCheckResponse struct {
97-
State trigger.RunCheckState `json:"state"`
98-
Message string `json:"message"`
97+
State trigger.RunCheckState `json:"state"`
98+
Message string `json:"message"`
99+
FailureCategory types.FailureCategory `json:"failureCategory,omitempty"`
99100
}

internal/schedule/retry.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,15 @@ func CalculateBackoff(policy types.RetryPolicy, attempt int) time.Duration {
4040
}
4141

4242
// IsRetryable returns whether a failure category should be retried.
43+
// An empty category defaults to retryable — better to retry once too many
44+
// than silently drop a run that could have recovered.
4345
func IsRetryable(policy types.RetryPolicy, category types.FailureCategory) bool {
4446
if category == types.FailurePermanent {
4547
return false
4648
}
49+
if category == "" {
50+
return true
51+
}
4752
if len(policy.RetryableFailures) == 0 {
4853
// Default: retry transient and timeout
4954
return category == types.FailureTransient || category == types.FailureTimeout

internal/schedule/retry_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ func TestIsRetryable(t *testing.T) {
6969
}
7070
}
7171

72+
func TestIsRetryable_EmptyCategory(t *testing.T) {
73+
policy := DefaultRetryPolicy()
74+
assert.True(t, IsRetryable(policy, ""))
75+
}
76+
77+
func TestIsRetryable_EmptyCategory_CustomPolicy(t *testing.T) {
78+
policy := types.RetryPolicy{
79+
RetryableFailures: []types.FailureCategory{types.FailureTransient},
80+
}
81+
assert.True(t, IsRetryable(policy, ""))
82+
}
83+
7284
func TestIsRetryable_EmptyPolicyDefaults(t *testing.T) {
7385
policy := types.RetryPolicy{}
7486

internal/trigger/databricks.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,15 @@ func (r *Runner) checkDatabricksStatus(ctx context.Context, metadata map[string]
134134
if resultState == "SUCCESS" {
135135
return StatusResult{State: RunCheckSucceeded, Message: msg}, nil
136136
}
137-
return StatusResult{State: RunCheckFailed, Message: msg}, nil
137+
return StatusResult{State: RunCheckFailed, Message: msg, FailureCategory: types.FailureTransient}, nil
138138
}
139139

140-
if lifeCycleState == "INTERNAL_ERROR" || lifeCycleState == "SKIPPED" {
141-
return StatusResult{State: RunCheckFailed, Message: msg}, nil
140+
if lifeCycleState == "SKIPPED" {
141+
return StatusResult{State: RunCheckFailed, Message: msg, FailureCategory: types.FailurePermanent}, nil
142+
}
143+
144+
if lifeCycleState == "INTERNAL_ERROR" {
145+
return StatusResult{State: RunCheckFailed, Message: msg, FailureCategory: types.FailureTransient}, nil
142146
}
143147

144148
return StatusResult{State: RunCheckRunning, Message: msg}, nil

0 commit comments

Comments
 (0)