diff --git a/CHANGELOG.md b/CHANGELOG.md index 00210a9..ccc31ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- **Calendar exclusion uses execution date**: `isExcludedDate` checks the job's execution date (not `time.Now()`), preventing incorrect exclusions on re-runs for previous days. Supports both `YYYY-MM-DD` and composite `YYYY-MM-DDTHH` date formats. +- **Atomic lock reset**: `ResetTriggerLock` uses single DynamoDB `UpdateItem` with `attribute_exists(PK)` condition, eliminating the race window between delete and create. +- **Lock release on SFN start failure**: Both rerun and job-failure retry paths release the trigger lock if `StartExecution` fails, preventing permanently stuck pipelines. +- **Terminal trigger status on calendar exclusion**: `handleJobFailure` sets `FAILED_FINAL` instead of leaving the lock in `RUNNING` state to silently expire via TTL. +- **ASL CompleteTrigger failure path**: New `CheckSLAForCompleteTriggerFailure` → `CancelSLAOnCompleteTriggerFailure` → `CompleteTriggerFailed` states ensure SLA schedules are cancelled before entering terminal Fail state. +- **Event ordering**: `RERUN_ACCEPTED` only publishes after `ResetTriggerLock` confirms lock atomicity. +- **New events**: `BASELINE_CAPTURE_FAILED` (baseline capture error), `PIPELINE_EXCLUDED` (calendar exclusion in sensor, rerun, job-failure, and post-run drift paths), `RERUN_ACCEPTED` (audit trail for accepted reruns). - **publishEvent error logging**: All 17 `publishEvent` call sites across stream-router and orchestrator now log errors at WARN level instead of silently discarding them. - **SLA monitor error wrapping**: `createOneTimeSchedule` wraps errors with schedule name context via `fmt.Errorf`. - **HTTP response body sanitization**: Error response bodies truncated to 512 bytes with control characters stripped. diff --git a/deploy/statemachine.asl.json b/deploy/statemachine.asl.json index 5b2991b..16fec85 100644 --- a/deploy/statemachine.asl.json +++ b/deploy/statemachine.asl.json @@ -369,10 +369,62 @@ { "ErrorEquals": ["States.ALL"], "ResultPath": "$.errorInfo", - "Next": "CheckCancelSLA" + "Next": "CheckSLAForCompleteTriggerFailure" } ] }, + "CheckSLAForCompleteTriggerFailure": { + "Type": "Choice", + "Comment": "Cancel SLA before failing if SLA was configured; otherwise fail immediately", + "Choices": [ + { + "Variable": "$.config.sla", + "IsPresent": true, + "Next": "CancelSLAOnCompleteTriggerFailure" + } + ], + "Default": "CompleteTriggerFailed" + }, + "CancelSLAOnCompleteTriggerFailure": { + "Type": "Task", + "Comment": "Best-effort SLA cancellation before marking CompleteTrigger as failed", + "Resource": "${sla_monitor_arn}", + "Parameters": { + "mode": "cancel", + "pipelineId.$": "$.pipelineId", + "scheduleId.$": "$.scheduleId", + "date.$": "$.date", + "deadline.$": "$.config.sla.deadline", + "expectedDuration.$": "$.config.sla.expectedDuration" + }, + "ResultPath": "$.slaResult", + "Next": "CompleteTriggerFailed", + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.TooManyRequestsException", + "States.TaskFailed" + ], + "IntervalSeconds": 2, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "ResultPath": "$.slaErrorInfo", + "Next": "CompleteTriggerFailed" + } + ] + }, + "CompleteTriggerFailed": { + "Type": "Fail", + "Error": "CompleteTriggerFailed", + "Cause": "Failed to set trigger to terminal status after retries" + }, "CheckCancelSLA": { "Type": "Choice", "Comment": "Cancel SLA schedules if SLA was configured, otherwise finish", diff --git a/deploy/statemachine_test.go b/deploy/statemachine_test.go index c513925..f44b00f 100644 --- a/deploy/statemachine_test.go +++ b/deploy/statemachine_test.go @@ -167,6 +167,9 @@ func TestASL_TopLevelStatesExist(t *testing.T) { "CancelSLASchedules", "InfraFailure", "Done", + "CheckSLAForCompleteTriggerFailure", + "CancelSLAOnCompleteTriggerFailure", + "CompleteTriggerFailed", } for _, name := range expected { _, ok := asl.States[name] @@ -335,10 +338,15 @@ func TestASL_JobPollingFlow(t *testing.T) { } assert.True(t, foundComplete, "IsJobDone should route to CompleteTrigger") - // CompleteTrigger → CheckCancelSLA + // CompleteTrigger → CheckCancelSLA (happy path) var complete taskState require.NoError(t, json.Unmarshal(asl.States["CompleteTrigger"], &complete)) assert.Equal(t, "CheckCancelSLA", complete.Next) + + // CompleteTrigger Catch routes to CheckSLAForCompleteTriggerFailure (not Done) + require.NotEmpty(t, complete.Catch) + assert.Equal(t, "CheckSLAForCompleteTriggerFailure", complete.Catch[0].Next, + "CompleteTrigger catch must route to CheckSLAForCompleteTriggerFailure, not Done") } func TestASL_CancelSLAFlow(t *testing.T) { @@ -442,3 +450,44 @@ func TestASL_JobPollExhaustionFlow(t *testing.T) { require.NotEmpty(t, exhausted.Catch) assert.Equal(t, "InjectTimeoutEvent", exhausted.Catch[0].Next) } + +func TestASL_CompleteTriggerFailurePath(t *testing.T) { + asl := loadASL(t) + + // CompleteTrigger Catch routes to CheckSLAForCompleteTriggerFailure + var complete taskState + require.NoError(t, json.Unmarshal(asl.States["CompleteTrigger"], &complete)) + require.NotEmpty(t, complete.Catch) + assert.Equal(t, "CheckSLAForCompleteTriggerFailure", complete.Catch[0].Next, + "CompleteTrigger catch must route to CheckSLAForCompleteTriggerFailure") + + // CheckSLAForCompleteTriggerFailure is a Choice state + raw, ok := asl.States["CheckSLAForCompleteTriggerFailure"] + require.True(t, ok, "CheckSLAForCompleteTriggerFailure state must exist") + var slaCheck choiceState + require.NoError(t, json.Unmarshal(raw, &slaCheck)) + assert.Equal(t, "Choice", slaCheck.Type) + // SLA present → CancelSLAOnCompleteTriggerFailure + require.NotEmpty(t, slaCheck.Choices) + assert.Equal(t, "CancelSLAOnCompleteTriggerFailure", slaCheck.Choices[0].Next) + // No SLA → CompleteTriggerFailed directly + assert.Equal(t, "CompleteTriggerFailed", slaCheck.Default) + + // CancelSLAOnCompleteTriggerFailure is a Task state that routes to CompleteTriggerFailed + raw, ok = asl.States["CancelSLAOnCompleteTriggerFailure"] + require.True(t, ok, "CancelSLAOnCompleteTriggerFailure state must exist") + var cancelTask taskState + require.NoError(t, json.Unmarshal(raw, &cancelTask)) + assert.Equal(t, "Task", cancelTask.Type) + assert.Equal(t, "CompleteTriggerFailed", cancelTask.Next) + // Catch swallows errors and still routes to CompleteTriggerFailed + require.NotEmpty(t, cancelTask.Catch) + assert.Equal(t, "CompleteTriggerFailed", cancelTask.Catch[0].Next) + + // CompleteTriggerFailed is a Fail terminal state + raw, ok = asl.States["CompleteTriggerFailed"] + require.True(t, ok, "CompleteTriggerFailed state must exist") + var failed stateBase + require.NoError(t, json.Unmarshal(raw, &failed)) + assert.Equal(t, "Fail", failed.Type) +} diff --git a/internal/lambda/e2e_test.go b/internal/lambda/e2e_test.go index c2be6f2..54360b8 100644 --- a/internal/lambda/e2e_test.go +++ b/internal/lambda/e2e_test.go @@ -2128,7 +2128,10 @@ func TestE2E_CalendarExclusionFullSkip(t *testing.T) { assert.Equal(t, 0, countSFNExecutions(sfnM)) assertNoTriggerLock(t, mock, "pipe-cal1", "stream", today) - assert.Empty(t, collectEventTypes(eb)) + // Phase 2: PIPELINE_EXCLUDED event is published when the sensor trigger + // is suppressed by a calendar exclusion. + evts := collectEventTypes(eb) + assert.Equal(t, []string{string(types.EventPipelineExcluded)}, evts) }) } @@ -2289,7 +2292,10 @@ func TestE2E_PostRunBeforeBaseline(t *testing.T) { func TestE2E_RerunAfterTriggerTTLExpiry(t *testing.T) { ctx := context.Background() - t.Run("TriggerDeleted_ManualRerunSucceeds", func(t *testing.T) { + // When the trigger lock row has been deleted by DynamoDB TTL, ResetTriggerLock + // returns (false, nil) and an INFRA_FAILURE event is published. No SFN is + // started because the lock cannot be atomically reset. + t.Run("TriggerDeleted_PublishesInfraFailure", func(t *testing.T) { mock := newMockDDB() tr := &mockTriggerRunner{} sc := &mockStatusPoller{} @@ -2314,7 +2320,52 @@ func TestE2E_RerunAfterTriggerTTLExpiry(t *testing.T) { sfnBefore := countSFNExecutions(sfnM) require.NoError(t, lambda.HandleStreamEvent(ctx, d, makeRerunRequestWithReasonE2E("pipe-ttl1", "manual"))) - assert.Greater(t, countSFNExecutions(sfnM), sfnBefore, "rerun should start SFN even without existing trigger") + assert.Equal(t, sfnBefore, countSFNExecutions(sfnM), "no SFN when trigger lock row was deleted by TTL") + + // Should have published an INFRA_FAILURE event. + eb.mu.Lock() + defer eb.mu.Unlock() + require.NotEmpty(t, eb.events, "expected INFRA_FAILURE event when trigger lock is missing") + found := false + for _, ev := range eb.events { + for _, entry := range ev.Entries { + if entry.DetailType != nil && *entry.DetailType == string(types.EventInfraFailure) { + found = true + } + } + } + assert.True(t, found, "INFRA_FAILURE event should be published when lock reset fails") + }) + + // Happy path: trigger lock exists, rerun proceeds atomically via ResetTriggerLock. + t.Run("TriggerExists_ManualRerunSucceeds", func(t *testing.T) { + mock := newMockDDB() + tr := &mockTriggerRunner{} + sc := &mockStatusPoller{} + d, sfnM, eb := buildE2EDeps(mock, tr, sc) + + cfg := e2ePipeline("pipe-ttl2") + cfg.Job.MaxManualReruns = intPtr(2) + seedConfig(mock, cfg) + + // Seed trigger lock — required for ResetTriggerLock to succeed. + mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("pipe-ttl2")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-07")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + }) + + // Seed a failed job event so circuit breaker allows the rerun. + oldTS := fmt.Sprintf("%d", time.Now().Add(-1*time.Hour).UnixMilli()) + mock.putRaw("joblog", map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("pipe-ttl2")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.JobSK("stream", "2026-03-07", oldTS)}, + "event": &ddbtypes.AttributeValueMemberS{Value: types.JobEventFail}, + }) + + sfnBefore := countSFNExecutions(sfnM) + require.NoError(t, lambda.HandleStreamEvent(ctx, d, makeRerunRequestWithReasonE2E("pipe-ttl2", "manual"))) + assert.Greater(t, countSFNExecutions(sfnM), sfnBefore, "rerun should start SFN when trigger lock exists") assertAlertFormats(t, eb) }) } diff --git a/internal/lambda/export_test.go b/internal/lambda/export_test.go new file mode 100644 index 0000000..738c420 --- /dev/null +++ b/internal/lambda/export_test.go @@ -0,0 +1,10 @@ +// Package lambda - export_test.go exposes unexported symbols for unit testing. +// This file is compiled ONLY during test builds (the _test.go suffix applies +// even to files in the non-_test package when placed here). +package lambda + +import "github.com/dwsmith1983/interlock/pkg/types" + +// IsExcludedDate re-exports isExcludedDate for white-box unit testing from +// the external test package (package lambda_test). +var IsExcludedDate func(cfg *types.PipelineConfig, dateStr string) bool = isExcludedDate diff --git a/internal/lambda/mock_test.go b/internal/lambda/mock_test.go index 256d801..29f4c5a 100644 --- a/internal/lambda/mock_test.go +++ b/internal/lambda/mock_test.go @@ -167,6 +167,16 @@ func (m *mockDDB) UpdateItem(_ context.Context, input *dynamodb.UpdateItemInput, key := ddbItemKey(*input.TableName, pk, sk) item, ok := m.items[key] + + // Support attribute_exists(PK) condition: reject update if item doesn't exist. + if input.ConditionExpression != nil && *input.ConditionExpression == "attribute_exists(PK)" { + if !ok { + return nil, &ddbtypes.ConditionalCheckFailedException{ + Message: strPtr("The conditional request failed"), + } + } + } + if !ok { item = map[string]ddbtypes.AttributeValue{ "PK": &ddbtypes.AttributeValueMemberS{Value: pk}, diff --git a/internal/lambda/orchestrator.go b/internal/lambda/orchestrator.go index 5f8412b..e28909a 100644 --- a/internal/lambda/orchestrator.go +++ b/internal/lambda/orchestrator.go @@ -320,6 +320,10 @@ func handleCompleteTrigger(ctx context.Context, d *Deps, input OrchestratorInput if err := capturePostRunBaseline(ctx, d, input.PipelineID, input.ScheduleID, input.Date); err != nil { d.Logger.WarnContext(ctx, "failed to capture post-run baseline", "pipeline", input.PipelineID, "error", err) + if pubErr := publishEvent(ctx, d, string(types.EventBaselineCaptureFailed), input.PipelineID, input.ScheduleID, input.Date, + fmt.Sprintf("baseline capture failed for %s: %v", input.PipelineID, err)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish baseline capture failure event", "error", pubErr) + } } } diff --git a/internal/lambda/orchestrator_test.go b/internal/lambda/orchestrator_test.go index 8255504..0aa86b6 100644 --- a/internal/lambda/orchestrator_test.go +++ b/internal/lambda/orchestrator_test.go @@ -1382,6 +1382,114 @@ func TestCompleteTrigger_DateScopedBaseline(t *testing.T) { } } +// --------------------------------------------------------------------------- +// CompleteTrigger — baseline capture failure event tests +// --------------------------------------------------------------------------- + +// TestCompleteTrigger_BaselineCaptureFailed_PublishesEvent verifies that when +// capturePostRunBaseline fails (sensor query error), a BASELINE_CAPTURE_FAILED +// event is published to EventBridge and the trigger is still marked COMPLETED. +func TestCompleteTrigger_BaselineCaptureFailed_PublishesEvent(t *testing.T) { + pipelineID := "pipe-baseline-fail" + date := "2026-03-08T06" + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: pipelineID}, + PostRun: &types.PostRunConfig{ + Rules: []types.ValidationRule{ + {Key: "audit-result", Check: types.CheckGTE, Field: "sensor_count", Value: float64(100)}, + }, + }, + } + + callCount := 0 + ddb := &mockDynamo{ + // First GetItem call → config. Second GetItem call (from capturePostRunBaseline) → also config. + getItemFn: func(_ context.Context, input *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + return &dynamodb.GetItemOutput{Item: configItem(pipelineID, cfg)}, nil + }, + // Query for sensors fails — this causes capturePostRunBaseline to return an error. + queryFn: func(_ context.Context, _ *dynamodb.QueryInput, _ ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) { + callCount++ + return nil, fmt.Errorf("dynamodb: provisioned throughput exceeded") + }, + } + + d := newTestDeps(ddb) + out, err := lambda.HandleOrchestrator(context.Background(), d, lambda.OrchestratorInput{ + Mode: "complete-trigger", + PipelineID: pipelineID, + ScheduleID: "stream", + Date: date, + Event: "success", + }) + if err != nil { + t.Fatalf("unexpected Lambda error: baseline capture failure should not fail the handler: %v", err) + } + + // Trigger must still be marked COMPLETED — baseline failure is non-fatal. + if out.Status != types.TriggerStatusCompleted { + t.Errorf("status = %q, want %q — baseline failure must not affect trigger completion", out.Status, types.TriggerStatusCompleted) + } + + // BASELINE_CAPTURE_FAILED event must be published to EventBridge. + eb := d.EventBridge.(*mockEventBridge) + eb.mu.Lock() + defer eb.mu.Unlock() + found := false + for _, evt := range eb.events { + if evt.Entries[0].DetailType != nil && *evt.Entries[0].DetailType == string(types.EventBaselineCaptureFailed) { + found = true + break + } + } + if !found { + t.Errorf("expected BASELINE_CAPTURE_FAILED event to be published; got %d events: %v", len(eb.events), eb.events) + } +} + +// TestCompleteTrigger_BaselineCaptureFailed_NoEvent_WhenNoPostRunConfig verifies +// that no BASELINE_CAPTURE_FAILED event is published when the pipeline has no +// PostRun config (capturePostRunBaseline returns nil early). +func TestCompleteTrigger_BaselineCaptureFailed_NoEvent_WhenNoPostRunConfig(t *testing.T) { + pipelineID := "pipe-no-postrun-fail" + date := "2026-03-08" + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: pipelineID}, + // No PostRun config — capturePostRunBaseline returns nil immediately. + } + + ddb := &mockDynamo{ + getItemFn: func(_ context.Context, _ *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + return &dynamodb.GetItemOutput{Item: configItem(pipelineID, cfg)}, nil + }, + queryFn: func(_ context.Context, _ *dynamodb.QueryInput, _ ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) { + return nil, fmt.Errorf("dynamodb: should not be called") + }, + } + + d := newTestDeps(ddb) + _, err := lambda.HandleOrchestrator(context.Background(), d, lambda.OrchestratorInput{ + Mode: "complete-trigger", + PipelineID: pipelineID, + ScheduleID: "stream", + Date: date, + Event: "success", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // No BASELINE_CAPTURE_FAILED event — capturePostRunBaseline returned nil (no PostRun). + eb := d.EventBridge.(*mockEventBridge) + eb.mu.Lock() + defer eb.mu.Unlock() + for _, evt := range eb.events { + if evt.Entries[0].DetailType != nil && *evt.Entries[0].DetailType == string(types.EventBaselineCaptureFailed) { + t.Error("BASELINE_CAPTURE_FAILED must NOT be published when pipeline has no PostRun config") + } + } +} + // --------------------------------------------------------------------------- // Unknown mode test // --------------------------------------------------------------------------- diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index 4f33df5..573d7a2 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -190,22 +190,31 @@ func handleJobFailure(ctx context.Context, d *Deps, pipelineID, schedule, date, return nil } + // Calendar exclusion check: skip retry if the execution date is excluded. + // Mark trigger as terminal so the lock doesn't silently expire via TTL. + if isExcludedDate(cfg, date) { + if err := d.Store.SetTriggerStatus(ctx, pipelineID, schedule, date, types.TriggerStatusFailedFinal); err != nil { + d.Logger.WarnContext(ctx, "failed to set trigger status after calendar exclusion", "error", err) + } + if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, schedule, date, + fmt.Sprintf("job failure retry skipped for %s: execution date %s excluded by calendar", pipelineID, date)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) + } + return nil + } + // Under retry limit — write rerun record and restart the pipeline. attempt, err := d.Store.WriteRerun(ctx, pipelineID, schedule, date, "job-fail-retry", jobEvent) if err != nil { return fmt.Errorf("write rerun for %q: %w", pipelineID, err) } - if err := d.Store.ReleaseTriggerLock(ctx, pipelineID, schedule, date); err != nil { - return fmt.Errorf("release trigger lock for %q: %w", pipelineID, err) - } - - acquired, err := d.Store.AcquireTriggerLock(ctx, pipelineID, schedule, date, ResolveTriggerLockTTL()) + acquired, err := d.Store.ResetTriggerLock(ctx, pipelineID, schedule, date, ResolveTriggerLockTTL()) if err != nil { - return fmt.Errorf("re-acquire trigger lock for %q: %w", pipelineID, err) + return fmt.Errorf("reset trigger lock for %q: %w", pipelineID, err) } if !acquired { - d.Logger.Warn("failed to re-acquire trigger lock, skipping rerun", + d.Logger.Warn("failed to reset trigger lock, skipping rerun", "pipelineId", pipelineID, "schedule", schedule, "date", date) return nil } @@ -213,6 +222,9 @@ func handleJobFailure(ctx context.Context, d *Deps, pipelineID, schedule, date, // Use a unique execution name that includes the rerun attempt number. execName := truncateExecName(fmt.Sprintf("%s-%s-%s-rerun-%d", pipelineID, schedule, date, attempt)) if err := startSFNWithName(ctx, d, cfg, pipelineID, schedule, date, execName); err != nil { + if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, schedule, date); relErr != nil { + d.Logger.Warn("failed to release lock after SFN start failure", "error", relErr) + } return fmt.Errorf("start SFN rerun for %q: %w", pipelineID, err) } @@ -255,6 +267,16 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even return nil } + // --- Calendar exclusion check (execution date) --- + if isExcludedDate(cfg, date) { + _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar") + if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, schedule, date, + fmt.Sprintf("rerun blocked for %s: execution date %s excluded by calendar", pipelineID, date)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) + } + return nil + } + // Extract reason from stream record NewImage. Default to "manual". reason := "manual" if img := record.Change.NewImage; img != nil { @@ -330,36 +352,45 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even return nil } - // --- Acceptance: write rerun record FIRST (before lock release) --- + // --- Acceptance: write rerun record FIRST (before lock reset) --- if _, err := d.Store.WriteRerun(ctx, pipelineID, schedule, date, reason, ""); err != nil { return fmt.Errorf("write rerun for %q: %w", pipelineID, err) } - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunAccepted, "", 0, "") - // Delete date-scoped postrun-baseline so re-run captures fresh baseline. if cfg.PostRun != nil { _ = d.Store.DeleteSensor(ctx, pipelineID, "postrun-baseline#"+date) } - // Release existing lock and re-acquire for the new execution. - if err := d.Store.ReleaseTriggerLock(ctx, pipelineID, schedule, date); err != nil { - return fmt.Errorf("release trigger lock for %q: %w", pipelineID, err) - } - - acquired, err := d.Store.AcquireTriggerLock(ctx, pipelineID, schedule, date, ResolveTriggerLockTTL()) + // Atomically reset the trigger lock for the new execution. + acquired, err := d.Store.ResetTriggerLock(ctx, pipelineID, schedule, date, ResolveTriggerLockTTL()) if err != nil { - return fmt.Errorf("re-acquire trigger lock for %q: %w", pipelineID, err) + return fmt.Errorf("reset trigger lock for %q: %w", pipelineID, err) } if !acquired { - d.Logger.Warn("failed to re-acquire trigger lock, skipping rerun", + if pubErr := publishEvent(ctx, d, string(types.EventInfraFailure), pipelineID, schedule, date, + fmt.Sprintf("lock reset failed for rerun of %s, orphaned rerun record", pipelineID)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "error", pubErr) + } + d.Logger.Warn("failed to reset trigger lock, orphaned rerun record", "pipelineId", pipelineID, "schedule", schedule, "date", date) return nil } + // Publish acceptance event only after lock atomicity is confirmed. + _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunAccepted, "", 0, "") + + if pubErr := publishEvent(ctx, d, string(types.EventRerunAccepted), pipelineID, schedule, date, + fmt.Sprintf("rerun accepted for %s (reason: %s)", pipelineID, reason)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunAccepted, "error", pubErr) + } + execName := truncateExecName(fmt.Sprintf("%s-%s-%s-%s-rerun-%d", pipelineID, schedule, date, reason, time.Now().Unix())) if err := startSFNWithName(ctx, d, cfg, pipelineID, schedule, date, execName); err != nil { + if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, schedule, date); relErr != nil { + d.Logger.Warn("failed to release lock after SFN start failure", "error", relErr) + } return fmt.Errorf("start SFN rerun for %q: %w", pipelineID, err) } @@ -497,13 +528,19 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event return nil } - // Check calendar exclusions. + // Check calendar exclusions (wall-clock date). now := time.Now() if isExcluded(cfg, now) { d.Logger.Info("pipeline excluded by calendar", "pipelineId", pipelineID, "date", now.Format("2006-01-02"), ) + scheduleIDForEvent := resolveScheduleID(cfg) + dateForEvent := ResolveExecutionDate(sensorData) + if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, scheduleIDForEvent, dateForEvent, + fmt.Sprintf("sensor trigger suppressed for %s: wall-clock date excluded by calendar", pipelineID)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) + } return nil } @@ -700,10 +737,20 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPostRunDrift, "error", err) } - // Trigger rerun via the existing circuit breaker path. - if writeErr := d.Store.WriteRerunRequest(ctx, pipelineID, scheduleID, date, "data-drift"); writeErr != nil { - d.Logger.WarnContext(ctx, "failed to write rerun request on post-run drift", - "pipelineId", pipelineID, "error", writeErr) + // Trigger rerun via the existing circuit breaker path only if the + // execution date is not excluded by the pipeline's calendar config. + if isExcludedDate(cfg, date) { + if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, scheduleID, date, + fmt.Sprintf("post-run drift rerun skipped for %s: execution date %s excluded by calendar", pipelineID, date)); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) + } + d.Logger.InfoContext(ctx, "post-run drift rerun skipped: execution date excluded by calendar", + "pipelineId", pipelineID, "date", date) + } else { + if writeErr := d.Store.WriteRerunRequest(ctx, pipelineID, scheduleID, date, "data-drift"); writeErr != nil { + d.Logger.WarnContext(ctx, "failed to write rerun request on post-run drift", + "pipelineId", pipelineID, "error", writeErr) + } } return nil } @@ -963,6 +1010,48 @@ func resolveScheduleID(cfg *types.PipelineConfig) string { return "stream" } +// isExcludedDate checks calendar exclusions against a job's execution date +// (not wall-clock time). dateStr supports "YYYY-MM-DD" and "YYYY-MM-DDTHH". +func isExcludedDate(cfg *types.PipelineConfig, dateStr string) bool { + excl := cfg.Schedule.Exclude + if excl == nil { + return false + } + if len(dateStr) < 10 { + return false // unparseable, safe default + } + datePortion := dateStr[:10] + + // Resolve the location to interpret the execution date in. + loc := time.UTC + if cfg.Schedule.Timezone != "" { + if l, err := time.LoadLocation(cfg.Schedule.Timezone); err == nil { + loc = l + } + } + + // Parse the date as midnight in the configured timezone so that weekday + // and date-string comparisons reflect the local calendar date. + t, err := time.ParseInLocation("2006-01-02", datePortion, loc) + if err != nil { + return false // safe default + } + + if excl.Weekends { + day := t.Weekday() + if day == time.Saturday || day == time.Sunday { + return true + } + } + dateStr2 := t.Format("2006-01-02") + for _, d := range excl.Dates { + if d == dateStr2 { + return true + } + } + return false +} + // isExcluded checks whether the pipeline should be excluded from running // based on calendar exclusions (weekends and specific dates). func isExcluded(cfg *types.PipelineConfig, now time.Time) bool { diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 0c11909..230ecec 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "strings" "testing" "time" @@ -360,10 +361,10 @@ func makeJobRecord(pipelineID, jobEvent string) events.DynamoDBEventRecord { // seedRerun inserts an existing RERUN# row into the mock rerun table // with reason "job-fail-retry" (the source used by handleJobFailure). -func seedRerun(mock *mockDDB, pipelineID, schedule, date string, attempt int) { +func seedRerun(mock *mockDDB, attempt int) { mock.putRaw("rerun", map[string]ddbtypes.AttributeValue{ - "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK(pipelineID)}, - "SK": &ddbtypes.AttributeValueMemberS{Value: types.RerunSK(schedule, date, attempt)}, + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("gold-revenue")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.RerunSK("stream", "2026-03-01", attempt)}, "reason": &ddbtypes.AttributeValueMemberS{Value: "job-fail-retry"}, }) } @@ -393,6 +394,8 @@ func TestStreamRouter_JobFail_UnderRetryLimit_Reruns(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + // Seed trigger lock — required for ResetTriggerLock to succeed. + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // No existing reruns — first failure should trigger a rerun. record := makeJobRecord("gold-revenue", types.JobEventFail) @@ -423,8 +426,8 @@ func TestStreamRouter_JobFail_OverRetryLimit_Alerts(t *testing.T) { seedConfig(mock, cfg) // Seed 2 existing reruns — at limit, so next failure should be final. - seedRerun(mock, "gold-revenue", "stream", "2026-03-01", 0) - seedRerun(mock, "gold-revenue", "stream", "2026-03-01", 1) + seedRerun(mock, 0) + seedRerun(mock, 1) record := makeJobRecord("gold-revenue", types.JobEventFail) event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} @@ -475,6 +478,7 @@ func TestStreamRouter_JobTimeout_TreatedAsFailure(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Timeout event should be treated like a failure — triggers rerun. record := makeJobRecord("gold-revenue", types.JobEventTimeout) @@ -495,6 +499,7 @@ func TestStreamRouter_JobFail_DriftRerunsIgnored(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a drift rerun (should NOT count toward failure budget). mock.putRaw("rerun", map[string]ddbtypes.AttributeValue{ @@ -804,23 +809,12 @@ func TestResolveExecutionDate_NonNumericHour(t *testing.T) { // Rerun request helpers and tests // --------------------------------------------------------------------------- -// makeRerunRequestRecord builds a DynamoDB stream event record for a RERUN_REQUEST# write. -func makeRerunRequestRecord() events.DynamoDBEventRecord { - pk := types.PipelinePK("gold-revenue") - sk := types.RerunRequestSK("stream", "2026-03-01") - return events.DynamoDBEventRecord{ - EventName: "INSERT", - Change: events.DynamoDBStreamRecord{ - Keys: map[string]events.DynamoDBAttributeValue{ - "PK": events.NewStringAttribute(pk), - "SK": events.NewStringAttribute(sk), - }, - NewImage: map[string]events.DynamoDBAttributeValue{ - "PK": events.NewStringAttribute(pk), - "SK": events.NewStringAttribute(sk), - }, - }, - } +// makeDefaultRerunRequestRecord is a convenience alias used by tests written +// before Phase 3 introduced the parameterized makeRerunRequestRecordFull helper. +// It uses the fixed defaults: pipelineID="gold-revenue", schedule="stream", +// date="2026-03-01", reason="" (no explicit reason). +func makeDefaultRerunRequestRecord() events.DynamoDBEventRecord { + return makeRerunRequestRecordFull("") } // seedJobEvent inserts a job log record into the mock joblog table. @@ -861,11 +855,12 @@ func TestStreamRouter_RerunRequest_FailedJob_Allowed(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a failed job event. seedJobEvent(mock, "1709280000000", types.JobEventFail) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -884,6 +879,7 @@ func TestStreamRouter_RerunRequest_SuccessDataChanged_Allowed(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event with timestamp 1000000. seedJobEvent(mock, "1000000", types.JobEventSuccess) @@ -894,7 +890,7 @@ func TestStreamRouter_RerunRequest_SuccessDataChanged_Allowed(t *testing.T) { "status": "ready", }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -923,7 +919,7 @@ func TestStreamRouter_RerunRequest_SuccessDataUnchanged_Rejected(t *testing.T) { "status": "ready", }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -947,11 +943,12 @@ func TestStreamRouter_RerunRequest_InfraExhausted_Allowed(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed an infra-trigger-exhausted job event. seedJobEvent(mock, "1709280000000", types.JobEventInfraTriggerExhausted) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1182,9 +1179,10 @@ func TestRerun_NoJobRecord_Allowed(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // No job events seeded — GetLatestJobEvent returns nil → allow (never ran). - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1201,7 +1199,7 @@ func TestRerun_NoConfig_Skips(t *testing.T) { d, sfnMock, _ := testDeps(mock) // No config seeded for this pipeline. - record := makeRerunRequestRecord() // uses "gold-revenue" + record := makeDefaultRerunRequestRecord() // uses "gold-revenue" event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1245,11 +1243,12 @@ func TestRerun_TimeoutJob_Allowed(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a timeout job event. seedJobEvent(mock, "1709280000000", types.JobEventTimeout) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1267,11 +1266,12 @@ func TestRerun_UnknownJobEvent_Allowed(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a job event with an unknown event type. seedJobEvent(mock, "1709280000000", "some-future-event") - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1295,7 +1295,7 @@ func TestRerun_StartSFNError(t *testing.T) { // Make SFN client return an error. sfnMock.err = fmt.Errorf("SFN service error") - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} // Error is logged, HandleStreamEvent still returns nil. @@ -1317,11 +1317,12 @@ func TestSensorFreshness_NoSensors(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event — no sensors exist. seedJobEvent(mock, "1000000", types.JobEventSuccess) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1339,6 +1340,7 @@ func TestSensorFreshness_NoUpdatedAtField(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event. seedJobEvent(mock, "1000000", types.JobEventSuccess) @@ -1348,7 +1350,7 @@ func TestSensorFreshness_NoUpdatedAtField(t *testing.T) { "status": "ready", }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1366,6 +1368,7 @@ func TestSensorFreshness_FreshSensor_Float(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event with timestamp 1000000. seedJobEvent(mock, "1000000", types.JobEventSuccess) @@ -1375,7 +1378,7 @@ func TestSensorFreshness_FreshSensor_Float(t *testing.T) { "updatedAt": float64(2000000), }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1401,7 +1404,7 @@ func TestSensorFreshness_StaleSensor_Float(t *testing.T) { "updatedAt": float64(1000000), }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1423,6 +1426,7 @@ func TestSensorFreshness_FreshSensor_String(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event with timestamp 1000000. seedJobEvent(mock, "1000000", types.JobEventSuccess) @@ -1432,7 +1436,7 @@ func TestSensorFreshness_FreshSensor_String(t *testing.T) { "updatedAt": "2000000", }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1449,6 +1453,7 @@ func TestSensorFreshness_InvalidJobSK(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event with a short SK that has < 4 parts. // The SK format should be JOB#schedule#date#timestamp, but we create a malformed one. @@ -1463,7 +1468,7 @@ func TestSensorFreshness_InvalidJobSK(t *testing.T) { "updatedAt": float64(1), }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -1481,6 +1486,7 @@ func TestSensorFreshness_InvalidTimestamp(t *testing.T) { cfg := testJobConfig() seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a successful job event with non-numeric timestamp. mock.putRaw("joblog", map[string]ddbtypes.AttributeValue{ @@ -1494,7 +1500,7 @@ func TestSensorFreshness_InvalidTimestamp(t *testing.T) { "updatedAt": float64(1), }) - record := makeRerunRequestRecord() + record := makeDefaultRerunRequestRecord() event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} err := lambda.HandleStreamEvent(context.Background(), d, event) @@ -2561,6 +2567,7 @@ func TestRerun_DriftUnderLimit(t *testing.T) { cfg := testRerunLimitConfig(2, 1) seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // No existing drift reruns — under limit. // Seed a failed job so circuit breaker allows the rerun. @@ -2613,6 +2620,7 @@ func TestRerun_WritesRerunBeforeLockRelease(t *testing.T) { cfg := testRerunLimitConfig(3, 3) seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a failed job so circuit breaker allows the rerun. seedJobEvent(mock, "1709280000000", types.JobEventFail) @@ -2652,6 +2660,7 @@ func TestRerun_DeletesPostrunBaseline(t *testing.T) { }, } seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a failed job so circuit breaker allows the rerun. seedJobEvent(mock, "1709280000000", types.JobEventFail) @@ -2727,6 +2736,7 @@ func TestStreamRouter_JobFail_TransientUsesMaxRetries(t *testing.T) { cfg.Job.MaxRetries = 3 cfg.Job.MaxCodeRetries = intPtr(0) // would block if category was checked wrong seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a joblog entry with TRANSIENT category. mock.putRaw("joblog", map[string]ddbtypes.AttributeValue{ @@ -2756,6 +2766,7 @@ func TestStreamRouter_JobFail_EmptyCategoryUsesMaxRetries(t *testing.T) { cfg.Job.MaxRetries = 3 cfg.Job.MaxCodeRetries = intPtr(0) seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") // Seed a joblog entry WITHOUT category (backward compat). mock.putRaw("joblog", map[string]ddbtypes.AttributeValue{ @@ -3054,3 +3065,626 @@ func TestPostRunSensor_NoPostRunConfig_GoesToTrigger(t *testing.T) { require.NoError(t, err) // No error, just silently ignored. } + +// --------------------------------------------------------------------------- +// Phase 3: Atomic lock reset tests (ResetTriggerLock replaces delete+create) +// --------------------------------------------------------------------------- + +// seedTriggerLockWithSchedule inserts a trigger lock for a given schedule (not +// just "stream"). Used by Phase 3 tests where the schedule name matters. +func seedTriggerLockWithSchedule(mock *mockDDB) { + mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("gold-revenue")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-01")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + "ttl": &ddbtypes.AttributeValueMemberN{Value: "9999999999"}, + }) +} + +// makeJobRecordWithScheduleDate builds a JOB# stream record with an explicit +// schedule and date encoded in the SK. +func makeJobRecordWithScheduleDate(pipelineID, jobEvent, schedule, date string) events.DynamoDBEventRecord { + const timestamp = "1709312400" + pk := types.PipelinePK(pipelineID) + sk := types.JobSK(schedule, date, timestamp) + + return events.DynamoDBEventRecord{ + EventName: "INSERT", + Change: events.DynamoDBStreamRecord{ + Keys: map[string]events.DynamoDBAttributeValue{ + "PK": events.NewStringAttribute(pk), + "SK": events.NewStringAttribute(sk), + }, + NewImage: map[string]events.DynamoDBAttributeValue{ + "PK": events.NewStringAttribute(pk), + "SK": events.NewStringAttribute(sk), + "event": events.NewStringAttribute(jobEvent), + }, + }, + } +} + +// makeRerunRequestRecordFull builds a RERUN_REQUEST# stream record with +// explicit pipeline, schedule, date, and reason parameters. Unlike the +// zero-argument makeRerunRequestRecord helper, this version is parameterized +// for Phase 3 tests. +func makeRerunRequestRecordFull(reason string) events.DynamoDBEventRecord { + pk := types.PipelinePK("gold-revenue") + sk := types.RerunRequestSK("stream", "2026-03-01") + + newImage := map[string]events.DynamoDBAttributeValue{ + "PK": events.NewStringAttribute(pk), + "SK": events.NewStringAttribute(sk), + } + if reason != "" { + newImage["reason"] = events.NewStringAttribute(reason) + } + + return events.DynamoDBEventRecord{ + EventName: "INSERT", + Change: events.DynamoDBStreamRecord{ + Keys: map[string]events.DynamoDBAttributeValue{ + "PK": events.NewStringAttribute(pk), + "SK": events.NewStringAttribute(sk), + }, + NewImage: newImage, + }, + } +} + +// triggerLockExists returns true if the trigger lock row is present in the mock. +func triggerLockExists(mock *mockDDB) bool { + mock.mu.Lock() + defer mock.mu.Unlock() + key := ddbItemKey(testControlTable, types.PipelinePK("gold-revenue"), types.TriggerSK("stream", "2026-03-01")) + _, ok := mock.items[key] + return ok +} + +// TestJobFailure_AtomicLockReset_Success verifies that a job failure under the +// retry limit uses ResetTriggerLock (atomic UpdateItem) instead of the +// delete+create pattern. The trigger row must still exist after the operation +// (it was updated in place, not deleted and re-created). +func TestJobFailure_AtomicLockReset_Success(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + const ( + pipeline = "gold-revenue" + schedule = "stream" + date = "2026-03-01" + ) + + cfg := testJobConfig() + seedConfig(mock, cfg) + + // Seed existing trigger lock — ResetTriggerLock requires it to exist. + seedTriggerLockWithSchedule(mock) + + record := makeJobRecordWithScheduleDate(pipeline, types.JobEventFail, schedule, date) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // SFN must have started for the rerun. + sfnMock.mu.Lock() + defer sfnMock.mu.Unlock() + require.Len(t, sfnMock.executions, 1, "expected one SFN execution for rerun") + assert.Contains(t, *sfnMock.executions[0].Name, "rerun-0") + + // The trigger lock row must still exist — ResetTriggerLock updates in place. + assert.True(t, triggerLockExists(mock), + "trigger lock row must persist after atomic reset (not deleted)") +} + +// TestJobFailure_LockResetFails_NoSFN verifies that when there is no trigger +// row for the pipeline/schedule/date (lock was never held or expired via TTL), +// ResetTriggerLock returns false and no SFN execution is started. +func TestJobFailure_LockResetFails_NoSFN(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + const ( + pipeline = "gold-revenue" + schedule = "stream" + date = "2026-03-01" + ) + + cfg := testJobConfig() + seedConfig(mock, cfg) + + // No trigger lock seeded — ResetTriggerLock should return (false, nil). + record := makeJobRecordWithScheduleDate(pipeline, types.JobEventFail, schedule, date) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + defer sfnMock.mu.Unlock() + assert.Empty(t, sfnMock.executions, "no SFN when trigger lock does not exist") +} + +// TestRerunRequest_AtomicLockReset verifies that a manual rerun request uses +// ResetTriggerLock instead of delete+create. +func TestRerunRequest_AtomicLockReset(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + cfg := testJobConfig() + seedConfig(mock, cfg) + + // Seed trigger lock so ResetTriggerLock can find it. + seedTriggerLockWithSchedule(mock) + + record := makeRerunRequestRecordFull("manual") + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // SFN must have started. + sfnMock.mu.Lock() + defer sfnMock.mu.Unlock() + require.Len(t, sfnMock.executions, 1, "expected one SFN execution for rerun") + + // Trigger lock row must still exist after reset. + assert.True(t, triggerLockExists(mock), + "trigger lock row must persist after atomic reset") +} + +// TestRerunRequest_LockResetFails_PublishesInfraFailure verifies that when the +// trigger lock row does not exist, ResetTriggerLock returns false and an +// INFRA_FAILURE event is published instead of starting a new SFN execution. +func TestRerunRequest_LockResetFails_PublishesInfraFailure(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + cfg := testJobConfig() + seedConfig(mock, cfg) + + // No trigger lock seeded — ResetTriggerLock returns (false, nil). + record := makeRerunRequestRecordFull("manual") + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // No SFN execution. + sfnMock.mu.Lock() + sfnExecs := len(sfnMock.executions) + sfnMock.mu.Unlock() + assert.Equal(t, 0, sfnExecs, "no SFN when lock reset fails") + + // Only INFRA_FAILURE — RERUN_ACCEPTED is emitted after lock reset, which + // did not succeed here. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.Len(t, ebMock.events, 1, "expected one INFRA_FAILURE event") + assert.Equal(t, string(types.EventInfraFailure), *ebMock.events[0].Entries[0].DetailType) +} + +// TestJobFailure_SFNStartFails_ReleasesLock verifies that when the SFN +// StartExecution call fails after ResetTriggerLock succeeds, the trigger lock +// is released so the pipeline is not permanently stuck. +func TestJobFailure_SFNStartFails_ReleasesLock(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + const ( + pipeline = "gold-revenue" + schedule = "stream" + date = "2026-03-01" + ) + + cfg := testJobConfig() + seedConfig(mock, cfg) + + // Seed trigger lock. + seedTriggerLockWithSchedule(mock) + + // Make SFN fail. + sfnMock.err = fmt.Errorf("SFN unavailable") + + record := makeJobRecordWithScheduleDate(pipeline, types.JobEventFail, schedule, date) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + // HandleStreamEvent swallows per-record errors — the handler returns nil. + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // Trigger lock must be released after SFN failure (so next attempt can acquire it). + assert.False(t, triggerLockExists(mock), + "trigger lock must be released after SFN start failure") +} + +// TestRerunRequest_SFNStartFails_ReleasesLock mirrors the above test for the +// rerun-request path. +func TestRerunRequest_SFNStartFails_ReleasesLock(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + cfg := testJobConfig() + seedConfig(mock, cfg) + + seedTriggerLockWithSchedule(mock) + + sfnMock.err = fmt.Errorf("SFN unavailable") + + record := makeRerunRequestRecordFull("manual") + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + assert.False(t, triggerLockExists(mock), + "trigger lock must be released after SFN start failure") +} + +// --------------------------------------------------------------------------- +// Phase 2: isExcludedDate unit tests (Task A / C1) +// --------------------------------------------------------------------------- + +func TestIsExcludedDate_NoExclusions(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{Exclude: nil}, + } + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03-07"), "nil exclusion must not exclude") + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03-08"), "nil exclusion must not exclude") +} + +func TestIsExcludedDate_WeekendSaturday(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Weekends: true}, + }, + } + // 2026-03-07 is a Saturday. + assert.True(t, lambda.IsExcludedDate(cfg, "2026-03-07"), "Saturday must be excluded when Weekends=true") +} + +func TestIsExcludedDate_WeekendSunday(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Weekends: true}, + }, + } + // 2026-03-08 is a Sunday. + assert.True(t, lambda.IsExcludedDate(cfg, "2026-03-08"), "Sunday must be excluded when Weekends=true") +} + +func TestIsExcludedDate_WeekdayFriday(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Weekends: true}, + }, + } + // 2026-03-06 is a Friday. + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03-06"), "Friday must NOT be excluded when Weekends=true") +} + +func TestIsExcludedDate_SpecificDate(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Dates: []string{"2026-12-25", "2026-01-01"}}, + }, + } + assert.True(t, lambda.IsExcludedDate(cfg, "2026-12-25"), "Christmas must be excluded") + assert.True(t, lambda.IsExcludedDate(cfg, "2026-01-01"), "New Year's Day must be excluded") + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03-06"), "regular day must not be excluded") +} + +func TestIsExcludedDate_CompositePerHourDate(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Dates: []string{"2026-03-07"}}, + }, + } + // Per-hour composite date "2026-03-07T10" — date portion is 2026-03-07. + assert.True(t, lambda.IsExcludedDate(cfg, "2026-03-07T10"), "composite date must match date portion") + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03-06T10"), "non-excluded composite date must not be excluded") +} + +func TestIsExcludedDate_InvalidDate(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Weekends: true, Dates: []string{"2026-03-07"}}, + }, + } + // Unparseable date — safe default is false (do not exclude). + assert.False(t, lambda.IsExcludedDate(cfg, "not-a-date"), "invalid date must return false (safe default)") +} + +func TestIsExcludedDate_EmptyDate(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Weekends: true}, + }, + } + // Empty string is too short to parse — safe default is false. + assert.False(t, lambda.IsExcludedDate(cfg, ""), "empty date must return false (safe default)") +} + +func TestIsExcludedDate_ShortDate(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Exclude: &types.ExclusionConfig{Weekends: true}, + }, + } + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03"), "short date string must return false (safe default)") +} + +func TestIsExcludedDate_Timezone(t *testing.T) { + cfg := &types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "p"}, + Schedule: types.ScheduleConfig{ + Timezone: "America/Los_Angeles", + Exclude: &types.ExclusionConfig{Weekends: true}, + }, + } + // 2026-03-07 is Saturday regardless of timezone. + assert.True(t, lambda.IsExcludedDate(cfg, "2026-03-07"), "Saturday is excluded in any timezone") + assert.False(t, lambda.IsExcludedDate(cfg, "2026-03-06"), "Friday is not excluded") +} + +// --------------------------------------------------------------------------- +// Phase 2: Integration tests for handler calendar exclusion (Task B / C2) +// --------------------------------------------------------------------------- + +func TestRerunRequest_CalendarExclusion(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + cfg := testJobConfig() + cfg.Schedule.Exclude = &types.ExclusionConfig{Dates: []string{"2026-03-01"}} + seedConfig(mock, cfg) + + record := makeDefaultRerunRequestRecord() // schedule=stream, date=2026-03-01 + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "rerun must not start SFN for excluded execution date") + sfnMock.mu.Unlock() + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.NotEmpty(t, ebMock.events, "expected PIPELINE_EXCLUDED event") + assert.Equal(t, string(types.EventPipelineExcluded), *ebMock.events[0].Entries[0].DetailType) +} + +func TestRerunRequest_CalendarExclusion_WritesJobEvent(t *testing.T) { + mock := newMockDDB() + d, _, _ := testDeps(mock) + + cfg := testJobConfig() + cfg.Schedule.Exclude = &types.ExclusionConfig{Dates: []string{"2026-03-01"}} + seedConfig(mock, cfg) + + record := makeDefaultRerunRequestRecord() + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + mock.mu.Lock() + defer mock.mu.Unlock() + found := false + for key := range mock.items { + if strings.Contains(key, "joblog") && strings.Contains(key, "JOB#") { + found = true + break + } + } + assert.True(t, found, "expected a JOB# entry in the joblog table for calendar exclusion rejection") +} + +func TestRerunRequest_WeekendExclusion(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + cfg := testJobConfig() + cfg.Schedule.Exclude = &types.ExclusionConfig{Weekends: true} + seedConfig(mock, cfg) + + // 2026-03-07 is a Saturday. + pk := types.PipelinePK("gold-revenue") + sk := types.RerunRequestSK("stream", "2026-03-07") + record := events.DynamoDBEventRecord{ + EventName: "INSERT", + Change: events.DynamoDBStreamRecord{ + Keys: map[string]events.DynamoDBAttributeValue{ + "PK": events.NewStringAttribute(pk), + "SK": events.NewStringAttribute(sk), + }, + NewImage: map[string]events.DynamoDBAttributeValue{ + "PK": events.NewStringAttribute(pk), + "SK": events.NewStringAttribute(sk), + }, + }, + } + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "rerun on Saturday must be blocked by weekend exclusion") + sfnMock.mu.Unlock() + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.NotEmpty(t, ebMock.events) + assert.Equal(t, string(types.EventPipelineExcluded), *ebMock.events[0].Entries[0].DetailType) +} + +func TestJobFailure_CalendarExclusion(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + cfg := testJobConfig() + cfg.Schedule.Exclude = &types.ExclusionConfig{Dates: []string{"2026-03-01"}} + seedConfig(mock, cfg) + + record := makeJobRecord("gold-revenue", types.JobEventFail) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "job failure rerun must not start SFN for excluded execution date") + sfnMock.mu.Unlock() + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.NotEmpty(t, ebMock.events, "expected PIPELINE_EXCLUDED event") + assert.Equal(t, string(types.EventPipelineExcluded), *ebMock.events[0].Entries[0].DetailType) +} + +func TestJobFailure_CalendarExclusion_RetryLimitBeatsExclusion(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + cfg := testJobConfig() + cfg.Schedule.Exclude = &types.ExclusionConfig{Dates: []string{"2026-03-01"}} + seedConfig(mock, cfg) + + seedRerun(mock, 0) + seedRerun(mock, 1) + + record := makeJobRecord("gold-revenue", types.JobEventFail) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "no SFN when retries exhausted") + sfnMock.mu.Unlock() + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.NotEmpty(t, ebMock.events) + assert.Equal(t, string(types.EventRetryExhausted), *ebMock.events[0].Entries[0].DetailType, + "retry-limit check must run before calendar exclusion in handleJobFailure") +} + +func TestPostRunDrift_CalendarExclusion(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + cfg := postRunConfig() + cfg.Schedule.Exclude = &types.ExclusionConfig{Dates: []string{"2026-03-01"}} + seedConfig(mock, cfg) + seedTriggerWithStatus(mock, "gold-revenue", "2026-03-01", types.TriggerStatusCompleted) + + seedSensor(mock, "gold-revenue", "postrun-baseline#2026-03-01", map[string]interface{}{ + "sensor_count": float64(100), + }) + + record := makeSensorRecord("gold-revenue", "audit-result", map[string]events.DynamoDBAttributeValue{ + "data": events.NewMapAttribute(map[string]events.DynamoDBAttributeValue{ + "sensor_count": events.NewNumberAttribute("150"), + "date": events.NewStringAttribute("2026-03-01"), + }), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + found := false + for _, evt := range ebMock.events { + if *evt.Entries[0].DetailType == string(types.EventPostRunDrift) { + found = true + break + } + } + assert.True(t, found, "POST_RUN_DRIFT event must be published even when execution date is excluded") + + // PIPELINE_EXCLUDED event must be published when drift rerun is skipped. + excludedFound := false + for _, evt := range ebMock.events { + if *evt.Entries[0].DetailType == string(types.EventPipelineExcluded) { + excludedFound = true + break + } + } + assert.True(t, excludedFound, "PIPELINE_EXCLUDED event must be published when drift rerun is skipped by calendar") + + rerunKey := ddbItemKey(testControlTable, types.PipelinePK("gold-revenue"), types.RerunRequestSK("stream", "2026-03-01")) + mock.mu.Lock() + _, rerunExists := mock.items[rerunKey] + mock.mu.Unlock() + assert.False(t, rerunExists, "rerun request must NOT be written when execution date is excluded") +} + +func TestPostRunDrift_NotExcluded_WritesRerun(t *testing.T) { + mock := newMockDDB() + d, _, _ := testDeps(mock) + + cfg := postRunConfig() + seedConfig(mock, cfg) + seedTriggerWithStatus(mock, "gold-revenue", "2026-03-01", types.TriggerStatusCompleted) + + seedSensor(mock, "gold-revenue", "postrun-baseline#2026-03-01", map[string]interface{}{ + "sensor_count": float64(100), + }) + + record := makeSensorRecord("gold-revenue", "audit-result", map[string]events.DynamoDBAttributeValue{ + "data": events.NewMapAttribute(map[string]events.DynamoDBAttributeValue{ + "sensor_count": events.NewNumberAttribute("150"), + "date": events.NewStringAttribute("2026-03-01"), + }), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + rerunKey := ddbItemKey(testControlTable, types.PipelinePK("gold-revenue"), types.RerunRequestSK("stream", "2026-03-01")) + mock.mu.Lock() + _, rerunExists := mock.items[rerunKey] + mock.mu.Unlock() + assert.True(t, rerunExists, "rerun request MUST be written for non-excluded dates with drift") +} + +func TestSensorEvent_CalendarExclusion_PublishesEvent(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + cfg := testStreamConfig() + today := time.Now().Format("2006-01-02") + cfg.Schedule.Exclude = &types.ExclusionConfig{Dates: []string{today}} + seedConfig(mock, cfg) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "no SFN when calendar-excluded") + sfnMock.mu.Unlock() + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.NotEmpty(t, ebMock.events, "PIPELINE_EXCLUDED event must be published on sensor exclusion") + assert.Equal(t, string(types.EventPipelineExcluded), *ebMock.events[0].Entries[0].DetailType) +} diff --git a/internal/store/control.go b/internal/store/control.go index 7d68c7c..d4b4a8f 100644 --- a/internal/store/control.go +++ b/internal/store/control.go @@ -386,6 +386,39 @@ func (s *Store) HasTriggerForDate(ctx context.Context, pipelineID, schedule, dat return out.Count > 0, nil } +// ResetTriggerLock refreshes the TTL on an existing trigger lock without +// releasing it. This extends the lock window for long-running jobs that need +// more time than the original TTL allowed. Returns (true, nil) if the row +// existed and was updated, (false, nil) if the row does not exist. +func (s *Store) ResetTriggerLock(ctx context.Context, pipelineID, schedule, date string, ttl time.Duration) (bool, error) { + ttlEpoch := fmt.Sprintf("%d", time.Now().Add(ttl).Unix()) + _, err := s.Client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: &s.ControlTable, + Key: map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK(pipelineID)}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK(schedule, date)}, + }, + ConditionExpression: aws.String("attribute_exists(PK)"), + UpdateExpression: aws.String("SET #status = :running, #ttl = :ttl"), + ExpressionAttributeNames: map[string]string{ + "#status": "status", + "#ttl": "ttl", + }, + ExpressionAttributeValues: map[string]ddbtypes.AttributeValue{ + ":running": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + ":ttl": &ddbtypes.AttributeValueMemberN{Value: ttlEpoch}, + }, + }) + if err != nil { + var ccfe *ddbtypes.ConditionalCheckFailedException + if errors.As(err, &ccfe) { + return false, nil + } + return false, fmt.Errorf("reset trigger lock %q/%s/%s: %w", pipelineID, schedule, date, err) + } + return true, nil +} + // SetTriggerStatus updates only the status attribute of an existing trigger row, // preserving TTL and other attributes. func (s *Store) SetTriggerStatus(ctx context.Context, pipelineID, schedule, date, status string) error { diff --git a/internal/store/control_test.go b/internal/store/control_test.go index 0c862d9..7ab7aff 100644 --- a/internal/store/control_test.go +++ b/internal/store/control_test.go @@ -915,6 +915,88 @@ func TestSetTriggerStatus_DynamoError(t *testing.T) { } } +// --- ResetTriggerLock tests --- + +func TestResetTriggerLock_Success(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + // Seed a trigger row with status=RUNNING and an existing TTL. + mock.putRaw("control", map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("pipe-1")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("daily", "2026-03-01")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + "ttl": &ddbtypes.AttributeValueMemberN{Value: "1740000000"}, + }) + + ok, err := s.ResetTriggerLock(context.Background(), "pipe-1", "daily", "2026-03-01", 24*time.Hour) + if err != nil { + t.Fatalf("ResetTriggerLock: %v", err) + } + if !ok { + t.Error("expected true (row existed), got false") + } + + // Verify status remains RUNNING and TTL was updated. + key := itemKey("control", types.PipelinePK("pipe-1"), types.TriggerSK("daily", "2026-03-01")) + mock.mu.Lock() + item, exists := mock.items[key] + mock.mu.Unlock() + + if !exists { + t.Fatal("expected trigger row to still exist after reset") + } + status := item["status"].(*ddbtypes.AttributeValueMemberS).Value + if status != types.TriggerStatusRunning { + t.Errorf("status = %q, want %q", status, types.TriggerStatusRunning) + } + ttlAttr, hasTTL := item["ttl"] + if !hasTTL { + t.Fatal("expected ttl attribute to be present after reset") + } + // New TTL must differ from the original seed value (it was updated). + newTTL := ttlAttr.(*ddbtypes.AttributeValueMemberN).Value + if newTTL == "1740000000" { + t.Errorf("ttl was not updated: still %q (original seed value)", newTTL) + } +} + +func TestResetTriggerLock_NonExistent(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + // No trigger row seeded — condition attribute_exists(PK) should fail. + ok, err := s.ResetTriggerLock(context.Background(), "pipe-1", "daily", "2026-03-01", 24*time.Hour) + if err != nil { + t.Fatalf("expected nil error for non-existent row, got: %v", err) + } + if ok { + t.Error("expected false (row does not exist), got true") + } +} + +func TestResetTriggerLock_DynamoError(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + injected := errors.New("service unavailable") + mock.errFn = errOnOp("UpdateItem", injected) + + ok, err := s.ResetTriggerLock(context.Background(), "pipe-1", "daily", "2026-03-01", 24*time.Hour) + if err == nil { + t.Fatal("expected error, got nil") + } + if ok { + t.Error("expected ok=false on DynamoDB error") + } + if !errors.Is(err, injected) { + t.Errorf("expected wrapped injected error, got: %v", err) + } + if !strings.Contains(err.Error(), "reset trigger lock") { + t.Errorf("expected 'reset trigger lock' in error, got: %v", err) + } +} + // --- WriteRerunRequest error-path tests --- func TestWriteRerunRequest_DynamoError(t *testing.T) { diff --git a/internal/store/mock_test.go b/internal/store/mock_test.go index c6c4f23..dd44c4e 100644 --- a/internal/store/mock_test.go +++ b/internal/store/mock_test.go @@ -107,6 +107,15 @@ func (m *mockDDB) UpdateItem(_ context.Context, input *dynamodb.UpdateItemInput, sk := input.Key["SK"].(*ddbtypes.AttributeValueMemberS).Value key := itemKey(*input.TableName, pk, sk) + // Support attribute_exists(PK) condition: fail if the item does not exist. + if input.ConditionExpression != nil && *input.ConditionExpression == "attribute_exists(PK)" { + if _, exists := m.items[key]; !exists { + return nil, &ddbtypes.ConditionalCheckFailedException{ + Message: strPtr("The conditional request failed"), + } + } + } + item, ok := m.items[key] if !ok { item = map[string]ddbtypes.AttributeValue{ diff --git a/pkg/types/events.go b/pkg/types/events.go index 94597a5..cc2f629 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -23,12 +23,15 @@ const ( EventRerunRejected EventDetailType = "RERUN_REJECTED" EventTriggerRecovered EventDetailType = "TRIGGER_RECOVERED" EventDataDrift EventDetailType = "DATA_DRIFT" + EventBaselineCaptureFailed EventDetailType = "BASELINE_CAPTURE_FAILED" + EventPipelineExcluded EventDetailType = "PIPELINE_EXCLUDED" EventPostRunBaselineCaptured EventDetailType = "POST_RUN_BASELINE_CAPTURED" EventPostRunPassed EventDetailType = "POST_RUN_PASSED" EventPostRunFailed EventDetailType = "POST_RUN_FAILED" EventPostRunDrift EventDetailType = "POST_RUN_DRIFT" EventPostRunDriftInflight EventDetailType = "POST_RUN_DRIFT_INFLIGHT" EventPostRunSensorMissing EventDetailType = "POST_RUN_SENSOR_MISSING" + EventRerunAccepted EventDetailType = "RERUN_ACCEPTED" ) // EventSource is the EventBridge source for all interlock events.