From 90129470573ad7a70533665a01afc45280cdf1ac Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 23:10:26 +0700 Subject: [PATCH 1/4] fix: suppress SFN execution for dry-run pipelines on rerun and job-failure paths handleRerunRequest and handleJobFailure did not check cfg.DryRun before calling startSFNWithName, allowing rerun requests and job failure retries to start real Step Function executions for observation-only pipelines. Added dry-run guards in both handlers, defense-in-depth in startSFNWithName, and watchdog reconciliation skip to prevent orphaned trigger locks. --- CHANGELOG.md | 6 +++ internal/lambda/rerun.go | 14 +++++++ internal/lambda/sfn.go | 7 ++++ internal/lambda/stream_router_test.go | 59 +++++++++++++++++++++++++++ internal/lambda/watchdog.go | 5 +++ 5 files changed, 91 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63f3e99..ad87deb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.1] - 2026-03-12 + +### Fixed + +- **Dry-run pipelines could start Step Function executions via rerun and job-failure paths** — `handleRerunRequest` and `handleJobFailure` did not check `cfg.DryRun` before calling `startSFNWithName`, allowing rerun requests and job failure retries to start real SFN executions for dry-run pipelines. Added dry-run guards in both handlers and defense-in-depth in `startSFNWithName` to suppress execution unconditionally. Watchdog reconciliation loop now skips dry-run pipelines to prevent orphaned trigger locks. + ## [0.9.0] - 2026-03-12 ### Added diff --git a/internal/lambda/rerun.go b/internal/lambda/rerun.go index 5ab7a65..704b39f 100644 --- a/internal/lambda/rerun.go +++ b/internal/lambda/rerun.go @@ -34,6 +34,13 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even return nil } + // Dry-run pipelines never start real executions. + if cfg.DryRun { + d.Logger.Info("dry-run: skipping rerun request", + "pipelineId", pipelineID, "schedule", schedule, "date", date) + return nil + } + // --- Calendar exclusion check (execution date) --- if isExcludedDate(cfg, date) { if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar"); err != nil { @@ -199,6 +206,13 @@ func handleJobFailure(ctx context.Context, d *Deps, pipelineID, schedule, date, return nil } + // Dry-run pipelines never start real executions. + if cfg.DryRun { + d.Logger.Info("dry-run: skipping job failure rerun", + "pipelineId", pipelineID, "schedule", schedule, "date", date) + return nil + } + maxRetries := cfg.Job.MaxRetries // Check if the latest failure has a category for budget selection. diff --git a/internal/lambda/sfn.go b/internal/lambda/sfn.go index d449783..2942e9d 100644 --- a/internal/lambda/sfn.go +++ b/internal/lambda/sfn.go @@ -79,7 +79,14 @@ func startSFN(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineI } // startSFNWithName starts a Step Function execution with a custom execution name. +// Defense-in-depth: refuses to start if the pipeline is in dry-run mode. func startSFNWithName(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date, name string) error { + if cfg.DryRun { + d.Logger.Warn("startSFNWithName called for dry-run pipeline, suppressing execution", + "pipelineId", pipelineID, "schedule", scheduleID, "date", date) + return nil + } + sc := buildSFNConfig(cfg) // Warn if the sum of evaluation + poll windows exceeds the SFN timeout. diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 28bb2c5..bf460dc 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -4245,3 +4245,62 @@ func TestDryRunPostRunSensor_NoMarker(t *testing.T) { evtTypes := gatherEventDetailTypes(ebMock) assert.Empty(t, evtTypes, "no events when no DRY_RUN# marker exists") } + +// --------------------------------------------------------------------------- +// Dry-run: rerun and job-failure suppression +// --------------------------------------------------------------------------- + +func TestRerun_DryRun_SkipsExecution(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + cfg := testDryRunConfig() + seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") + + // Seed a failed job event so the rerun request would normally be accepted. + seedJobEvent(mock, "1709280000000", types.JobEventFail) + + record := makeDefaultRerunRequestRecord() + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // Dry-run pipeline must NOT start an SFN execution. + sfnMock.mu.Lock() + defer sfnMock.mu.Unlock() + assert.Empty(t, sfnMock.executions, "dry-run pipeline must not start SFN on rerun request") + + // No rerun records written (guard fires before any store side effects). + count, countErr := d.Store.CountRerunsBySource(context.Background(), "gold-revenue", "stream", "2026-03-01", []string{"manual"}) + require.NoError(t, countErr) + assert.Zero(t, count, "dry-run must not write rerun records") +} + +func TestJobFailure_DryRun_SkipsRerun(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + cfg := testDryRunConfig() + cfg.Job.MaxRetries = 2 + seedConfig(mock, cfg) + seedTriggerLock(mock, "gold-revenue", "2026-03-01") + + // Send a JOB# failure event — normally triggers a rerun under retry limit. + record := makeJobRecord("gold-revenue", types.JobEventFail) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // Dry-run pipeline must NOT start an SFN execution. + sfnMock.mu.Lock() + defer sfnMock.mu.Unlock() + assert.Empty(t, sfnMock.executions, "dry-run pipeline must not start SFN on job failure") + + // No rerun records written (guard fires before any store side effects). + count, countErr := d.Store.CountRerunsBySource(context.Background(), "gold-revenue", "stream", "2026-03-01", []string{"job-fail-retry"}) + require.NoError(t, countErr) + assert.Zero(t, count, "dry-run must not write rerun records on job failure") +} diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 44918ea..8b5f155 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -146,6 +146,11 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip reconciliation. + if cfg.DryRun { + continue + } + if isExcluded(cfg, now) { continue } From 50c2a49cc20a8a0c5272f0ffb3256e27acb7e245 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 23:25:50 +0700 Subject: [PATCH 2/4] fix: remove duplicate JOB_COMPLETED emission from orchestrator handleCheckJob published JOB_COMPLETED directly when polling detected success, but the stream-router's handleJobSuccess also published it when the JOB# record arrived via DynamoDB stream. This caused duplicate Slack alerts for polled jobs (Glue/EMR) while sync jobs only got one. The stream-router is now the single canonical source for JOB_COMPLETED across all job types (sync and polled). --- CHANGELOG.md | 1 + internal/lambda/e2e_test.go | 6 ++---- internal/lambda/orchestrator.go | 6 +++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad87deb..9420f64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - **Dry-run pipelines could start Step Function executions via rerun and job-failure paths** — `handleRerunRequest` and `handleJobFailure` did not check `cfg.DryRun` before calling `startSFNWithName`, allowing rerun requests and job failure retries to start real SFN executions for dry-run pipelines. Added dry-run guards in both handlers and defense-in-depth in `startSFNWithName` to suppress execution unconditionally. Watchdog reconciliation loop now skips dry-run pipelines to prevent orphaned trigger locks. +- **Duplicate `JOB_COMPLETED` alerts for polled jobs** — `handleCheckJob` in the orchestrator published `JOB_COMPLETED` when polling detected success, but the stream-router's `handleJobSuccess` also published the same event when the JOB# record arrived via DynamoDB stream. Removed the orchestrator emission; the stream-router is now the single canonical source for `JOB_COMPLETED` across all job types. ## [0.9.0] - 2026-03-12 diff --git a/internal/lambda/e2e_test.go b/internal/lambda/e2e_test.go index 0149f7a..7f66514 100644 --- a/internal/lambda/e2e_test.go +++ b/internal/lambda/e2e_test.go @@ -558,7 +558,7 @@ func TestE2E_PrimarySFNPaths(t *testing.T) { assert.Equal(t, "SLA_MET", r.slaOutcome) assert.Contains(t, r.events, "VALIDATION_PASSED") assert.Contains(t, r.events, "JOB_TRIGGERED") - assert.Contains(t, r.events, "JOB_COMPLETED") + // JOB_COMPLETED is emitted by stream-router (not SFN), verified in stream_router_test.go. assert.Contains(t, r.events, "SLA_MET") assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-a1")) assert.Contains(t, collectJoblogEvents(mock, "pipe-a1"), "success") @@ -589,7 +589,6 @@ func TestE2E_PrimarySFNPaths(t *testing.T) { assert.Equal(t, 4, r.evalCount) assert.Contains(t, r.events, "VALIDATION_PASSED") assert.Contains(t, r.events, "JOB_TRIGGERED") - assert.Contains(t, r.events, "JOB_COMPLETED") assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-a2")) assertAlertFormats(t, eb) }) @@ -754,7 +753,6 @@ func TestE2E_PostRunMonitoring(t *testing.T) { }) assert.Equal(t, sfnDone, r.terminal) - assert.Contains(t, r.events, "JOB_COMPLETED") assert.NotContains(t, r.events, "POST_RUN_DRIFT") assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-b1")) assertAlertFormats(t, eb) @@ -1932,7 +1930,7 @@ func TestE2E_CrossHandlerEdgeCases(t *testing.T) { assert.Equal(t, sfnDone, r.terminal) // check-job should have skipped the non-terminal entry and polled StatusChecker. - assert.Contains(t, r.events, "JOB_COMPLETED", "check-job should fall through non-terminal joblog to StatusChecker") + // JOB_COMPLETED is emitted by stream-router when the JOB# record arrives, not by the orchestrator. assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-j2")) joblogs := collectJoblogEvents(mock, "pipe-j2") assert.Contains(t, joblogs, "success", "StatusChecker success should be written to joblog") diff --git a/internal/lambda/orchestrator.go b/internal/lambda/orchestrator.go index 7bb6f2c..5ba53ab 100644 --- a/internal/lambda/orchestrator.go +++ b/internal/lambda/orchestrator.go @@ -172,9 +172,9 @@ func handleCheckJob(ctx context.Context, d *Deps, input OrchestratorInput) (Orch if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventSuccess, input.RunID, 0, ""); err != nil { d.Logger.Warn("failed to write polled job success joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) } - if err := publishEvent(ctx, d, string(types.EventJobCompleted), input.PipelineID, input.ScheduleID, input.Date, "job succeeded"); err != nil { - d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventJobCompleted, "error", err) - } + // JOB_COMPLETED is published by the stream-router when the JOB# + // record arrives via DynamoDB stream (handleJobSuccess). Publishing + // here as well would cause duplicate alerts for polled jobs. return OrchestratorOutput{Mode: "check-job", Event: "success"}, nil case "failed": var writeOpts []store.JobEventOption From 26a50b956cb35a8b3888c37016e78284b6143d15 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Fri, 13 Mar 2026 20:02:33 +0700 Subject: [PATCH 3/4] fix: close dry-run observation lifecycle and suppress watchdog alerts Add DRY_RUN_COMPLETED terminal event after WOULD_TRIGGER + SLA_PROJECTION to close the observation loop for each evaluation period. Carries SLA verdict (met/breach/n/a) so operators see each period resolve. Add cfg.DryRun guards to all seven watchdog functions: scheduleSLAAlerts, detectMissedSchedules, detectMissedInclusionSchedules, checkTriggerDeadlines, detectMissingPostRunSensors, detectRelativeSLABreaches, and detectStaleTriggers. Without these, dry-run pipelines received real SLA_WARNING/SLA_BREACH alerts via EventBridge Scheduler. Harden triggeredAt parse in late-data path to warn and return on bad data instead of silently producing garbage durations. --- CHANGELOG.md | 7 +- docs/content/docs/configuration/pipelines.md | 2 +- docs/content/docs/guides/dry-run.md | 5 +- docs/content/docs/reference/alerting.md | 3 + internal/lambda/dryrun.go | 64 +++++++++-- internal/lambda/stream_router_test.go | 107 +++++++++++++++++++ internal/lambda/watchdog.go | 36 +++++++ internal/lambda/watchdog_test.go | 83 ++++++++++++++ pkg/types/events.go | 1 + 9 files changed, 295 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9420f64..26008b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.9.1] - 2026-03-12 +## [0.9.1] - 2026-03-13 + +### Added + +- **`DRY_RUN_COMPLETED` event** — terminal event that closes the dry-run observation loop for each evaluation period. Published after `DRY_RUN_WOULD_TRIGGER` and `DRY_RUN_SLA_PROJECTION`, carrying the SLA verdict (`met`, `breach`, or `n/a`) so operators can see each period resolve. ### Fixed - **Dry-run pipelines could start Step Function executions via rerun and job-failure paths** — `handleRerunRequest` and `handleJobFailure` did not check `cfg.DryRun` before calling `startSFNWithName`, allowing rerun requests and job failure retries to start real SFN executions for dry-run pipelines. Added dry-run guards in both handlers and defense-in-depth in `startSFNWithName` to suppress execution unconditionally. Watchdog reconciliation loop now skips dry-run pipelines to prevent orphaned trigger locks. +- **Watchdog scheduled real SLA alerts for dry-run pipelines** — `scheduleSLAAlerts`, `detectMissedSchedules`, `detectMissedInclusionSchedules`, `checkTriggerDeadlines`, `detectMissingPostRunSensors`, and `detectRelativeSLABreaches` all iterated over dry-run pipelines without checking `cfg.DryRun`. This caused EventBridge Scheduler entries for SLA_WARNING/SLA_BREACH, SCHEDULE_MISSED events, and RELATIVE_SLA_BREACH alerts to fire for observation-only pipelines. Added `cfg.DryRun` guard to all six watchdog functions. - **Duplicate `JOB_COMPLETED` alerts for polled jobs** — `handleCheckJob` in the orchestrator published `JOB_COMPLETED` when polling detected success, but the stream-router's `handleJobSuccess` also published the same event when the JOB# record arrived via DynamoDB stream. Removed the orchestrator emission; the stream-router is now the single canonical source for `JOB_COMPLETED` across all job types. ## [0.9.0] - 2026-03-12 diff --git a/docs/content/docs/configuration/pipelines.md b/docs/content/docs/configuration/pipelines.md index 94fe6cf..48c3200 100644 --- a/docs/content/docs/configuration/pipelines.md +++ b/docs/content/docs/configuration/pipelines.md @@ -79,7 +79,7 @@ Declarative rules that determine pipeline readiness. See [Validation Rules](#val ### `dryRun` -Enables observation-only mode. When `true`, Interlock evaluates trigger conditions and validation rules against real sensor data but never starts a Step Function execution or triggers any job. All observations are published as EventBridge events (`DRY_RUN_WOULD_TRIGGER`, `DRY_RUN_LATE_DATA`, `DRY_RUN_SLA_PROJECTION`, `DRY_RUN_DRIFT`). +Enables observation-only mode. When `true`, Interlock evaluates trigger conditions and validation rules against real sensor data but never starts a Step Function execution or triggers any job. All observations are published as EventBridge events (`DRY_RUN_WOULD_TRIGGER`, `DRY_RUN_LATE_DATA`, `DRY_RUN_SLA_PROJECTION`, `DRY_RUN_DRIFT`, `DRY_RUN_COMPLETED`). | Field | Type | Default | Description | |---|---|---|---| diff --git a/docs/content/docs/guides/dry-run.md b/docs/content/docs/guides/dry-run.md index 354b446..adfc9a7 100644 --- a/docs/content/docs/guides/dry-run.md +++ b/docs/content/docs/guides/dry-run.md @@ -72,13 +72,14 @@ Sensor arrives → Stream-router ## Monitoring Events -Dry-run pipelines emit four event types to EventBridge: +Dry-run pipelines emit five event types to EventBridge: | Event | Meaning | |---|---| | `DRY_RUN_WOULD_TRIGGER` | All validation rules passed — Interlock would have triggered the job | -| `DRY_RUN_LATE_DATA` | Sensor data arrived after the trigger point was already recorded | | `DRY_RUN_SLA_PROJECTION` | Estimated completion time vs. deadline — `met` or `breach` status | +| `DRY_RUN_COMPLETED` | Observation loop closed — carries the SLA verdict (`met`, `breach`, or `n/a`) | +| `DRY_RUN_LATE_DATA` | Sensor data arrived after the trigger point was already recorded | | `DRY_RUN_DRIFT` | Post-run sensor data changed from the baseline captured at trigger time | Create an EventBridge rule to capture these events: diff --git a/docs/content/docs/reference/alerting.md b/docs/content/docs/reference/alerting.md index a013610..0b87b26 100644 --- a/docs/content/docs/reference/alerting.md +++ b/docs/content/docs/reference/alerting.md @@ -80,9 +80,12 @@ Published by the **stream-router** Lambda for pipelines with `dryRun: true`. The | `DRY_RUN_LATE_DATA` | Sensor updated after trigger point | Sensor data arrived after the dry-run trigger was recorded — would have triggered a re-run | | `DRY_RUN_SLA_PROJECTION` | Estimated completion vs. deadline | Projects whether the SLA would be met or breached based on `expectedDuration` and `deadline` | | `DRY_RUN_DRIFT` | Post-run sensor data changed | Sensor value drifted from baseline captured at trigger time — would have triggered a drift re-run | +| `DRY_RUN_COMPLETED` | Observation loop closed | Terminal event for the evaluation period — carries the SLA verdict (`met`, `breach`, or `n/a`) | The `DRY_RUN_SLA_PROJECTION` detail includes `status` (`"met"` or `"breach"`), `estimatedCompletion`, `deadline`, and `marginSeconds` fields. +The `DRY_RUN_COMPLETED` detail includes `triggeredAt`, `slaStatus` (`"met"`, `"breach"`, or `"n/a"`), and optionally `estimatedCompletion` and `deadline` when SLA is configured. + ### Watchdog Events Published by the **watchdog** Lambda, invoked on an EventBridge schedule (default: every 5 minutes). diff --git a/internal/lambda/dryrun.go b/internal/lambda/dryrun.go index 74240da..d9a716b 100644 --- a/internal/lambda/dryrun.go +++ b/internal/lambda/dryrun.go @@ -23,8 +23,17 @@ func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig if marker != nil { // Late data: sensor arrived after we already recorded a would-trigger. - triggeredAtStr, _ := marker.Data["triggeredAt"].(string) - triggeredAt, _ := time.Parse(time.RFC3339, triggeredAtStr) + triggeredAtStr, ok := marker.Data["triggeredAt"].(string) + if !ok || triggeredAtStr == "" { + d.Logger.WarnContext(ctx, "dry-run marker missing triggeredAt", "pipelineId", pipelineID) + return nil + } + triggeredAt, parseErr := time.Parse(time.RFC3339, triggeredAtStr) + if parseErr != nil { + d.Logger.WarnContext(ctx, "dry-run marker has invalid triggeredAt", + "pipelineId", pipelineID, "value", triggeredAtStr, "error", parseErr) + return nil + } lateBy := now.Sub(triggeredAt) if pubErr := publishEvent(ctx, d, string(types.EventDryRunLateData), pipelineID, scheduleID, date, @@ -83,8 +92,29 @@ func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig } // SLA projection if configured. + var slaVerdict *dryRunSLAVerdict if cfg.SLA != nil && cfg.SLA.ExpectedDuration != "" { - publishDryRunSLAProjection(ctx, d, cfg, pipelineID, scheduleID, date, now) + slaVerdict = publishDryRunSLAProjection(ctx, d, cfg, pipelineID, scheduleID, date, now) + } + + // Publish DRY_RUN_COMPLETED to close the observation loop. + completedDetail := map[string]interface{}{ + "triggeredAt": now.UTC().Format(time.RFC3339), + } + if slaVerdict != nil { + completedDetail["slaStatus"] = slaVerdict.Status + completedDetail["estimatedCompletion"] = slaVerdict.EstimatedCompletion + if slaVerdict.Deadline != "" { + completedDetail["deadline"] = slaVerdict.Deadline + } + } else { + completedDetail["slaStatus"] = "n/a" + } + + if pubErr := publishEvent(ctx, d, string(types.EventDryRunCompleted), pipelineID, scheduleID, date, + fmt.Sprintf("dry-run: observation complete for %s/%s", pipelineID, date), + completedDetail); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunCompleted, "error", pubErr) } d.Logger.Info("dry-run: would trigger", @@ -95,15 +125,24 @@ func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig return nil } +// dryRunSLAVerdict holds the SLA projection result for inclusion in the +// DRY_RUN_COMPLETED event detail. +type dryRunSLAVerdict struct { + Status string // "met" or "breach" + EstimatedCompletion string // RFC3339 + Deadline string // RFC3339, empty if no deadline configured +} + // publishDryRunSLAProjection computes and publishes an SLA projection event // for a dry-run pipeline. Reuses handleSLACalculate to resolve the breach // deadline consistently with production SLA monitoring (including hourly -// pipeline T+1 adjustment). -func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, triggeredAt time.Time) { +// pipeline T+1 adjustment). Returns the verdict for inclusion in the +// DRY_RUN_COMPLETED event. +func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, triggeredAt time.Time) *dryRunSLAVerdict { expectedDur, err := time.ParseDuration(cfg.SLA.ExpectedDuration) if err != nil { d.Logger.WarnContext(ctx, "dry-run: invalid expectedDuration", "error", err) - return + return nil } estimatedCompletion := triggeredAt.Add(expectedDur) @@ -113,7 +152,11 @@ func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.Pipelin "expectedDuration": cfg.SLA.ExpectedDuration, } - status := "met" + verdict := &dryRunSLAVerdict{ + Status: "met", + EstimatedCompletion: estimatedCompletion.UTC().Format(time.RFC3339), + } + message := fmt.Sprintf("dry-run: SLA projection for %s — estimated completion %s", pipelineID, estimatedCompletion.Format(time.RFC3339)) @@ -135,10 +178,11 @@ func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.Pipelin breachAt, parseErr := time.Parse(time.RFC3339, slaOutput.BreachAt) if parseErr == nil { detail["deadline"] = slaOutput.BreachAt + verdict.Deadline = slaOutput.BreachAt margin := breachAt.Sub(estimatedCompletion) detail["marginSeconds"] = margin.Seconds() if estimatedCompletion.After(breachAt) { - status = "breach" + verdict.Status = "breach" message = fmt.Sprintf("dry-run: SLA projection for %s — would breach by %.0fm", pipelineID, math.Abs(margin.Minutes())) } else { @@ -149,11 +193,13 @@ func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.Pipelin } } - detail["status"] = status + detail["status"] = verdict.Status if pubErr := publishEvent(ctx, d, string(types.EventDryRunSLAProjection), pipelineID, scheduleID, date, message, detail); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunSLAProjection, "error", pubErr) } + + return verdict } // handleDryRunPostRunSensor handles post-run sensor events for dry-run pipelines. diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index bf460dc..09ab196 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -4114,6 +4114,113 @@ func TestHandleSensorEvent_DryRun_CapturesBaseline(t *testing.T) { assert.True(t, baselineExists, "expected postrun-baseline# sensor to be written in DDB") } +func TestHandleSensorEvent_DryRun_Completed_NoSLA(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + // No SLA configured — just trigger + completed. + cfg.SLA = nil + seedConfig(mock, cfg) + + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN") + sfnMock.mu.Unlock() + + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunWouldTrigger)) + assert.Contains(t, evtTypes, string(types.EventDryRunCompleted), "expected DRY_RUN_COMPLETED event") + + // Parse the completed event detail — slaStatus should be "n/a". + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, input := range ebMock.events { + for _, e := range input.Entries { + if e.DetailType == nil || *e.DetailType != string(types.EventDryRunCompleted) { + continue + } + var detail map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) + innerDetail, ok := detail["detail"].(map[string]interface{}) + require.True(t, ok, "expected detail.detail map in completed event") + assert.Equal(t, "n/a", innerDetail["slaStatus"], "slaStatus should be n/a when no SLA configured") + return + } + } + t.Fatal("DRY_RUN_COMPLETED event not found in published events") +} + +func TestHandleSensorEvent_DryRun_Completed_WithSLA(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.SLA = &types.SLAConfig{ + Deadline: "08:00", + ExpectedDuration: "30m", + } + seedConfig(mock, cfg) + + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN") + sfnMock.mu.Unlock() + + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunWouldTrigger)) + assert.Contains(t, evtTypes, string(types.EventDryRunSLAProjection)) + assert.Contains(t, evtTypes, string(types.EventDryRunCompleted), "expected DRY_RUN_COMPLETED event") + + // Completed event must carry the SLA verdict. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, input := range ebMock.events { + for _, e := range input.Entries { + if e.DetailType == nil || *e.DetailType != string(types.EventDryRunCompleted) { + continue + } + var detail map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) + innerDetail, ok := detail["detail"].(map[string]interface{}) + require.True(t, ok, "expected detail.detail map in completed event") + assert.Equal(t, "met", innerDetail["slaStatus"], "SLA should be met (trigger at 01:15, 30m duration, 08:00 deadline)") + assert.NotEmpty(t, innerDetail["estimatedCompletion"], "expected estimatedCompletion in completed event") + assert.NotEmpty(t, innerDetail["deadline"], "expected deadline in completed event") + return + } + } + t.Fatal("DRY_RUN_COMPLETED event not found in published events") +} + // --------------------------------------------------------------------------- // Post-run dry-run sensor tests // --------------------------------------------------------------------------- diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 8b5f155..f6b65cf 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -66,6 +66,12 @@ func detectStaleTriggers(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines should never have TRIGGER# rows, but guard + // against stale rows from pre-dry-run migrations or bugs. + if cfg, cfgErr := d.ConfigCache.Get(ctx, pipelineID); cfgErr == nil && cfg != nil && cfg.DryRun { + continue + } + alertDetail := map[string]interface{}{ "source": "watchdog", "actionHint": "step function exceeded TTL — check SFN execution history", @@ -292,6 +298,11 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip missed schedule detection. + if cfg.DryRun { + continue + } + // Skip calendar-excluded days. if isExcluded(cfg, now) { continue @@ -372,6 +383,11 @@ func detectMissedInclusionSchedules(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip inclusion schedule detection. + if cfg.DryRun { + continue + } + // Skip calendar-excluded days. if isExcluded(cfg, now) { continue @@ -474,6 +490,11 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip SLA scheduling. + if cfg.DryRun { + continue + } + if isExcluded(cfg, now) { continue } @@ -584,6 +605,11 @@ func checkTriggerDeadlines(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip trigger deadline checks. + if cfg.DryRun { + continue + } + if isExcluded(cfg, now) { continue } @@ -761,6 +787,11 @@ func detectMissingPostRunSensors(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip post-run sensor checks. + if cfg.DryRun { + continue + } + scheduleID := resolveScheduleID(cfg) // Only check pipelines with a COMPLETED trigger for today. @@ -964,6 +995,11 @@ func detectRelativeSLABreaches(ctx context.Context, d *Deps) error { continue } + // Dry-run pipelines are observation-only — skip relative SLA checks. + if cfg.DryRun { + continue + } + maxDur, err := time.ParseDuration(cfg.SLA.MaxDuration) if err != nil { d.Logger.Warn("invalid maxDuration in SLA config", diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index c93c8ad..baad19c 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -3106,3 +3106,86 @@ func TestWatchdog_RelativeSLA_CrossDayNotBreached(t *testing.T) { "should not publish RELATIVE_SLA_BREACH before maxDuration is exceeded") } } + +// TestWatchdog_DryRun_SkipsAllSchedulingAndAlerts verifies that dry-run pipelines +// are completely invisible to the watchdog: no SLA schedules created, no +// SCHEDULE_MISSED events, no trigger deadline closures, and no relative SLA breaches. +func TestWatchdog_DryRun_SkipsAllSchedulingAndAlerts(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + fixedTime := time.Date(2026, 3, 13, 20, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + d.StartedAt = fixedTime.Add(-1 * time.Hour) + + // Configure a dry-run pipeline with cron schedule, SLA, trigger deadline, + // and inclusion dates — all features that the watchdog checks. + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "dryrun-weather"}, + DryRun: true, + Schedule: types.ScheduleConfig{ + Cron: "0 * * * *", + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + Trigger: &types.TriggerCondition{ + Key: "weather-data", + Check: "field_equals", + Field: "complete", + Value: "true", + Deadline: ":45", + }, + }, + SLA: &types.SLAConfig{ + Deadline: ":30", + ExpectedDuration: "15m", + MaxDuration: "2h", + }, + Validation: types.ValidationConfig{ + Trigger: "ALL", + Rules: []types.ValidationRule{ + {Key: "weather-data", Check: "field_equals", Field: "complete", Value: "true"}, + }, + }, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "test"}}, + } + seedConfig(mock, cfg) + + // Seed a first-sensor-arrival key to trigger the relative SLA path. + seedSensor(mock, "dryrun-weather", "first-sensor-arrival#2026-03-13", map[string]interface{}{ + "arrivedAt": "2026-03-13T17:00:00Z", + }) + + // Seed the trigger sensor so reconciliation would fire if not guarded. + seedSensor(mock, "dryrun-weather", "weather-data", map[string]interface{}{ + "complete": "true", + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // No EventBridge Scheduler entries for SLA. + schedMock.mu.Lock() + assert.Empty(t, schedMock.created, "dry-run pipeline must not create SLA schedules") + schedMock.mu.Unlock() + + // No watchdog event types should fire for a dry-run pipeline. + evtTypes := gatherEventDetailTypes(ebMock) + prohibitedEvents := []string{ + string(types.EventScheduleMissed), + string(types.EventIrregularScheduleMissed), + string(types.EventSensorDeadlineExpired), + string(types.EventRelativeSLABreach), + string(types.EventRelativeSLAWarning), + string(types.EventTriggerRecovered), + string(types.EventPostRunSensorMissing), + string(types.EventSFNTimeout), + } + for _, prohibited := range prohibitedEvents { + assert.NotContains(t, evtTypes, prohibited, + "dry-run pipeline must not produce %s events", prohibited) + } +} diff --git a/pkg/types/events.go b/pkg/types/events.go index 653025a..dd0c2cb 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -41,6 +41,7 @@ const ( EventDryRunLateData EventDetailType = "DRY_RUN_LATE_DATA" EventDryRunSLAProjection EventDetailType = "DRY_RUN_SLA_PROJECTION" EventDryRunDrift EventDetailType = "DRY_RUN_DRIFT" + EventDryRunCompleted EventDetailType = "DRY_RUN_COMPLETED" ) // EventSource is the EventBridge source for all interlock events. From be9b62d1b5dee20dd8d7d0b788932927d557eba6 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Fri, 13 Mar 2026 20:14:38 +0700 Subject: [PATCH 4/4] fix: add missing event types to alert EventBridge rule Add v0.8.0 events (SENSOR_DEADLINE_EXPIRED, IRREGULAR_SCHEDULE_MISSED, RELATIVE_SLA_WARNING, RELATIVE_SLA_BREACH) and all DRY_RUN_* events to the alert rule so they route to SQS and reach Slack via alert-dispatcher. --- deploy/terraform/eventbridge.tf | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/deploy/terraform/eventbridge.tf b/deploy/terraform/eventbridge.tf index 3f51690..c55a45a 100644 --- a/deploy/terraform/eventbridge.tf +++ b/deploy/terraform/eventbridge.tf @@ -91,6 +91,15 @@ resource "aws_cloudwatch_event_rule" "alert_events" { "TRIGGER_RECOVERED", "BASELINE_CAPTURE_FAILED", "PIPELINE_EXCLUDED", + "SENSOR_DEADLINE_EXPIRED", + "IRREGULAR_SCHEDULE_MISSED", + "RELATIVE_SLA_WARNING", + "RELATIVE_SLA_BREACH", + "DRY_RUN_WOULD_TRIGGER", + "DRY_RUN_LATE_DATA", + "DRY_RUN_SLA_PROJECTION", + "DRY_RUN_DRIFT", + "DRY_RUN_COMPLETED", ] }) }