Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- **Calendar exclusion uses execution date**: `isExcludedDate` checks the job's execution date (not `time.Now()`), preventing incorrect exclusions on re-runs for previous days. Supports both `YYYY-MM-DD` and composite `YYYY-MM-DDTHH` date formats.
- **Atomic lock reset**: `ResetTriggerLock` uses single DynamoDB `UpdateItem` with `attribute_exists(PK)` condition, eliminating the race window between delete and create.
- **Lock release on SFN start failure**: Both rerun and job-failure retry paths release the trigger lock if `StartExecution` fails, preventing permanently stuck pipelines.
- **Terminal trigger status on calendar exclusion**: `handleJobFailure` sets `FAILED_FINAL` instead of leaving the lock in `RUNNING` state to silently expire via TTL.
- **ASL CompleteTrigger failure path**: New `CheckSLAForCompleteTriggerFailure` → `CancelSLAOnCompleteTriggerFailure` → `CompleteTriggerFailed` states ensure SLA schedules are cancelled before entering terminal Fail state.
- **Event ordering**: `RERUN_ACCEPTED` only publishes after `ResetTriggerLock` confirms lock atomicity.
- **New events**: `BASELINE_CAPTURE_FAILED` (baseline capture error), `PIPELINE_EXCLUDED` (calendar exclusion in sensor, rerun, job-failure, and post-run drift paths), `RERUN_ACCEPTED` (audit trail for accepted reruns).
- **publishEvent error logging**: All 17 `publishEvent` call sites across stream-router and orchestrator now log errors at WARN level instead of silently discarding them.
- **SLA monitor error wrapping**: `createOneTimeSchedule` wraps errors with schedule name context via `fmt.Errorf`.
- **HTTP response body sanitization**: Error response bodies truncated to 512 bytes with control characters stripped.
Expand Down
54 changes: 53 additions & 1 deletion deploy/statemachine.asl.json
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,62 @@
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.errorInfo",
"Next": "CheckCancelSLA"
"Next": "CheckSLAForCompleteTriggerFailure"
}
]
},
"CheckSLAForCompleteTriggerFailure": {
"Type": "Choice",
"Comment": "Cancel SLA before failing if SLA was configured; otherwise fail immediately",
"Choices": [
{
"Variable": "$.config.sla",
"IsPresent": true,
"Next": "CancelSLAOnCompleteTriggerFailure"
}
],
"Default": "CompleteTriggerFailed"
},
"CancelSLAOnCompleteTriggerFailure": {
"Type": "Task",
"Comment": "Best-effort SLA cancellation before marking CompleteTrigger as failed",
"Resource": "${sla_monitor_arn}",
"Parameters": {
"mode": "cancel",
"pipelineId.$": "$.pipelineId",
"scheduleId.$": "$.scheduleId",
"date.$": "$.date",
"deadline.$": "$.config.sla.deadline",
"expectedDuration.$": "$.config.sla.expectedDuration"
},
"ResultPath": "$.slaResult",
"Next": "CompleteTriggerFailed",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.TooManyRequestsException",
"States.TaskFailed"
],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.slaErrorInfo",
"Next": "CompleteTriggerFailed"
}
]
},
"CompleteTriggerFailed": {
"Type": "Fail",
"Error": "CompleteTriggerFailed",
"Cause": "Failed to set trigger to terminal status after retries"
},
"CheckCancelSLA": {
"Type": "Choice",
"Comment": "Cancel SLA schedules if SLA was configured, otherwise finish",
Expand Down
51 changes: 50 additions & 1 deletion deploy/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func TestASL_TopLevelStatesExist(t *testing.T) {
"CancelSLASchedules",
"InfraFailure",
"Done",
"CheckSLAForCompleteTriggerFailure",
"CancelSLAOnCompleteTriggerFailure",
"CompleteTriggerFailed",
}
for _, name := range expected {
_, ok := asl.States[name]
Expand Down Expand Up @@ -335,10 +338,15 @@ func TestASL_JobPollingFlow(t *testing.T) {
}
assert.True(t, foundComplete, "IsJobDone should route to CompleteTrigger")

// CompleteTrigger → CheckCancelSLA
// CompleteTrigger → CheckCancelSLA (happy path)
var complete taskState
require.NoError(t, json.Unmarshal(asl.States["CompleteTrigger"], &complete))
assert.Equal(t, "CheckCancelSLA", complete.Next)

// CompleteTrigger Catch routes to CheckSLAForCompleteTriggerFailure (not Done)
require.NotEmpty(t, complete.Catch)
assert.Equal(t, "CheckSLAForCompleteTriggerFailure", complete.Catch[0].Next,
"CompleteTrigger catch must route to CheckSLAForCompleteTriggerFailure, not Done")
}

func TestASL_CancelSLAFlow(t *testing.T) {
Expand Down Expand Up @@ -442,3 +450,44 @@ func TestASL_JobPollExhaustionFlow(t *testing.T) {
require.NotEmpty(t, exhausted.Catch)
assert.Equal(t, "InjectTimeoutEvent", exhausted.Catch[0].Next)
}

func TestASL_CompleteTriggerFailurePath(t *testing.T) {
asl := loadASL(t)

// CompleteTrigger Catch routes to CheckSLAForCompleteTriggerFailure
var complete taskState
require.NoError(t, json.Unmarshal(asl.States["CompleteTrigger"], &complete))
require.NotEmpty(t, complete.Catch)
assert.Equal(t, "CheckSLAForCompleteTriggerFailure", complete.Catch[0].Next,
"CompleteTrigger catch must route to CheckSLAForCompleteTriggerFailure")

// CheckSLAForCompleteTriggerFailure is a Choice state
raw, ok := asl.States["CheckSLAForCompleteTriggerFailure"]
require.True(t, ok, "CheckSLAForCompleteTriggerFailure state must exist")
var slaCheck choiceState
require.NoError(t, json.Unmarshal(raw, &slaCheck))
assert.Equal(t, "Choice", slaCheck.Type)
// SLA present → CancelSLAOnCompleteTriggerFailure
require.NotEmpty(t, slaCheck.Choices)
assert.Equal(t, "CancelSLAOnCompleteTriggerFailure", slaCheck.Choices[0].Next)
// No SLA → CompleteTriggerFailed directly
assert.Equal(t, "CompleteTriggerFailed", slaCheck.Default)

// CancelSLAOnCompleteTriggerFailure is a Task state that routes to CompleteTriggerFailed
raw, ok = asl.States["CancelSLAOnCompleteTriggerFailure"]
require.True(t, ok, "CancelSLAOnCompleteTriggerFailure state must exist")
var cancelTask taskState
require.NoError(t, json.Unmarshal(raw, &cancelTask))
assert.Equal(t, "Task", cancelTask.Type)
assert.Equal(t, "CompleteTriggerFailed", cancelTask.Next)
// Catch swallows errors and still routes to CompleteTriggerFailed
require.NotEmpty(t, cancelTask.Catch)
assert.Equal(t, "CompleteTriggerFailed", cancelTask.Catch[0].Next)

// CompleteTriggerFailed is a Fail terminal state
raw, ok = asl.States["CompleteTriggerFailed"]
require.True(t, ok, "CompleteTriggerFailed state must exist")
var failed stateBase
require.NoError(t, json.Unmarshal(raw, &failed))
assert.Equal(t, "Fail", failed.Type)
}
57 changes: 54 additions & 3 deletions internal/lambda/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2128,7 +2128,10 @@ func TestE2E_CalendarExclusionFullSkip(t *testing.T) {

assert.Equal(t, 0, countSFNExecutions(sfnM))
assertNoTriggerLock(t, mock, "pipe-cal1", "stream", today)
assert.Empty(t, collectEventTypes(eb))
// Phase 2: PIPELINE_EXCLUDED event is published when the sensor trigger
// is suppressed by a calendar exclusion.
evts := collectEventTypes(eb)
assert.Equal(t, []string{string(types.EventPipelineExcluded)}, evts)
})
}

Expand Down Expand Up @@ -2289,7 +2292,10 @@ func TestE2E_PostRunBeforeBaseline(t *testing.T) {
func TestE2E_RerunAfterTriggerTTLExpiry(t *testing.T) {
ctx := context.Background()

t.Run("TriggerDeleted_ManualRerunSucceeds", func(t *testing.T) {
// When the trigger lock row has been deleted by DynamoDB TTL, ResetTriggerLock
// returns (false, nil) and an INFRA_FAILURE event is published. No SFN is
// started because the lock cannot be atomically reset.
t.Run("TriggerDeleted_PublishesInfraFailure", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
sc := &mockStatusPoller{}
Expand All @@ -2314,7 +2320,52 @@ func TestE2E_RerunAfterTriggerTTLExpiry(t *testing.T) {

sfnBefore := countSFNExecutions(sfnM)
require.NoError(t, lambda.HandleStreamEvent(ctx, d, makeRerunRequestWithReasonE2E("pipe-ttl1", "manual")))
assert.Greater(t, countSFNExecutions(sfnM), sfnBefore, "rerun should start SFN even without existing trigger")
assert.Equal(t, sfnBefore, countSFNExecutions(sfnM), "no SFN when trigger lock row was deleted by TTL")

// Should have published an INFRA_FAILURE event.
eb.mu.Lock()
defer eb.mu.Unlock()
require.NotEmpty(t, eb.events, "expected INFRA_FAILURE event when trigger lock is missing")
found := false
for _, ev := range eb.events {
for _, entry := range ev.Entries {
if entry.DetailType != nil && *entry.DetailType == string(types.EventInfraFailure) {
found = true
}
}
}
assert.True(t, found, "INFRA_FAILURE event should be published when lock reset fails")
})

// Happy path: trigger lock exists, rerun proceeds atomically via ResetTriggerLock.
t.Run("TriggerExists_ManualRerunSucceeds", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
sc := &mockStatusPoller{}
d, sfnM, eb := buildE2EDeps(mock, tr, sc)

cfg := e2ePipeline("pipe-ttl2")
cfg.Job.MaxManualReruns = intPtr(2)
seedConfig(mock, cfg)

// Seed trigger lock — required for ResetTriggerLock to succeed.
mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{
"PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("pipe-ttl2")},
"SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-07")},
"status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning},
})

// Seed a failed job event so circuit breaker allows the rerun.
oldTS := fmt.Sprintf("%d", time.Now().Add(-1*time.Hour).UnixMilli())
mock.putRaw("joblog", map[string]ddbtypes.AttributeValue{
"PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("pipe-ttl2")},
"SK": &ddbtypes.AttributeValueMemberS{Value: types.JobSK("stream", "2026-03-07", oldTS)},
"event": &ddbtypes.AttributeValueMemberS{Value: types.JobEventFail},
})

sfnBefore := countSFNExecutions(sfnM)
require.NoError(t, lambda.HandleStreamEvent(ctx, d, makeRerunRequestWithReasonE2E("pipe-ttl2", "manual")))
assert.Greater(t, countSFNExecutions(sfnM), sfnBefore, "rerun should start SFN when trigger lock exists")
assertAlertFormats(t, eb)
})
}
Expand Down
10 changes: 10 additions & 0 deletions internal/lambda/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Package lambda - export_test.go exposes unexported symbols for unit testing.
// This file is compiled ONLY during test builds (the _test.go suffix applies
// even to files in the non-_test package when placed here).
package lambda

import "github.com/dwsmith1983/interlock/pkg/types"

// IsExcludedDate re-exports isExcludedDate for white-box unit testing from
// the external test package (package lambda_test).
var IsExcludedDate func(cfg *types.PipelineConfig, dateStr string) bool = isExcludedDate
10 changes: 10 additions & 0 deletions internal/lambda/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ func (m *mockDDB) UpdateItem(_ context.Context, input *dynamodb.UpdateItemInput,
key := ddbItemKey(*input.TableName, pk, sk)

item, ok := m.items[key]

// Support attribute_exists(PK) condition: reject update if item doesn't exist.
if input.ConditionExpression != nil && *input.ConditionExpression == "attribute_exists(PK)" {
if !ok {
return nil, &ddbtypes.ConditionalCheckFailedException{
Message: strPtr("The conditional request failed"),
}
}
}

if !ok {
item = map[string]ddbtypes.AttributeValue{
"PK": &ddbtypes.AttributeValueMemberS{Value: pk},
Expand Down
4 changes: 4 additions & 0 deletions internal/lambda/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func handleCompleteTrigger(ctx context.Context, d *Deps, input OrchestratorInput
if err := capturePostRunBaseline(ctx, d, input.PipelineID, input.ScheduleID, input.Date); err != nil {
d.Logger.WarnContext(ctx, "failed to capture post-run baseline",
"pipeline", input.PipelineID, "error", err)
if pubErr := publishEvent(ctx, d, string(types.EventBaselineCaptureFailed), input.PipelineID, input.ScheduleID, input.Date,
fmt.Sprintf("baseline capture failed for %s: %v", input.PipelineID, err)); pubErr != nil {
d.Logger.WarnContext(ctx, "failed to publish baseline capture failure event", "error", pubErr)
}
}
}

Expand Down
108 changes: 108 additions & 0 deletions internal/lambda/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,114 @@ func TestCompleteTrigger_DateScopedBaseline(t *testing.T) {
}
}

// ---------------------------------------------------------------------------
// CompleteTrigger — baseline capture failure event tests
// ---------------------------------------------------------------------------

// TestCompleteTrigger_BaselineCaptureFailed_PublishesEvent verifies that when
// capturePostRunBaseline fails (sensor query error), a BASELINE_CAPTURE_FAILED
// event is published to EventBridge and the trigger is still marked COMPLETED.
func TestCompleteTrigger_BaselineCaptureFailed_PublishesEvent(t *testing.T) {
pipelineID := "pipe-baseline-fail"
date := "2026-03-08T06"
cfg := types.PipelineConfig{
Pipeline: types.PipelineIdentity{ID: pipelineID},
PostRun: &types.PostRunConfig{
Rules: []types.ValidationRule{
{Key: "audit-result", Check: types.CheckGTE, Field: "sensor_count", Value: float64(100)},
},
},
}

callCount := 0
ddb := &mockDynamo{
// First GetItem call → config. Second GetItem call (from capturePostRunBaseline) → also config.
getItemFn: func(_ context.Context, input *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) {
return &dynamodb.GetItemOutput{Item: configItem(pipelineID, cfg)}, nil
},
// Query for sensors fails — this causes capturePostRunBaseline to return an error.
queryFn: func(_ context.Context, _ *dynamodb.QueryInput, _ ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) {
callCount++
return nil, fmt.Errorf("dynamodb: provisioned throughput exceeded")
},
}

d := newTestDeps(ddb)
out, err := lambda.HandleOrchestrator(context.Background(), d, lambda.OrchestratorInput{
Mode: "complete-trigger",
PipelineID: pipelineID,
ScheduleID: "stream",
Date: date,
Event: "success",
})
if err != nil {
t.Fatalf("unexpected Lambda error: baseline capture failure should not fail the handler: %v", err)
}

// Trigger must still be marked COMPLETED — baseline failure is non-fatal.
if out.Status != types.TriggerStatusCompleted {
t.Errorf("status = %q, want %q — baseline failure must not affect trigger completion", out.Status, types.TriggerStatusCompleted)
}

// BASELINE_CAPTURE_FAILED event must be published to EventBridge.
eb := d.EventBridge.(*mockEventBridge)
eb.mu.Lock()
defer eb.mu.Unlock()
found := false
for _, evt := range eb.events {
if evt.Entries[0].DetailType != nil && *evt.Entries[0].DetailType == string(types.EventBaselineCaptureFailed) {
found = true
break
}
}
if !found {
t.Errorf("expected BASELINE_CAPTURE_FAILED event to be published; got %d events: %v", len(eb.events), eb.events)
}
}

// TestCompleteTrigger_BaselineCaptureFailed_NoEvent_WhenNoPostRunConfig verifies
// that no BASELINE_CAPTURE_FAILED event is published when the pipeline has no
// PostRun config (capturePostRunBaseline returns nil early).
func TestCompleteTrigger_BaselineCaptureFailed_NoEvent_WhenNoPostRunConfig(t *testing.T) {
pipelineID := "pipe-no-postrun-fail"
date := "2026-03-08"
cfg := types.PipelineConfig{
Pipeline: types.PipelineIdentity{ID: pipelineID},
// No PostRun config — capturePostRunBaseline returns nil immediately.
}

ddb := &mockDynamo{
getItemFn: func(_ context.Context, _ *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) {
return &dynamodb.GetItemOutput{Item: configItem(pipelineID, cfg)}, nil
},
queryFn: func(_ context.Context, _ *dynamodb.QueryInput, _ ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) {
return nil, fmt.Errorf("dynamodb: should not be called")
},
}

d := newTestDeps(ddb)
_, err := lambda.HandleOrchestrator(context.Background(), d, lambda.OrchestratorInput{
Mode: "complete-trigger",
PipelineID: pipelineID,
ScheduleID: "stream",
Date: date,
Event: "success",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// No BASELINE_CAPTURE_FAILED event — capturePostRunBaseline returned nil (no PostRun).
eb := d.EventBridge.(*mockEventBridge)
eb.mu.Lock()
defer eb.mu.Unlock()
for _, evt := range eb.events {
if evt.Entries[0].DetailType != nil && *evt.Entries[0].DetailType == string(types.EventBaselineCaptureFailed) {
t.Error("BASELINE_CAPTURE_FAILED must NOT be published when pipeline has no PostRun config")
}
}
}

// ---------------------------------------------------------------------------
// Unknown mode test
// ---------------------------------------------------------------------------
Expand Down
Loading