diff --git a/CHANGELOG.md b/CHANGELOG.md index d0ac6c6..2d7b911 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.4] - 2026-03-10 + +### Fixed + +- **False SLA warnings/breaches for sensor-triggered daily pipelines** — `scheduleSLAAlerts` resolved the SLA deadline against today's date, but sensor-triggered daily pipelines run T+1 (data for today completes tomorrow). Between 00:00 UTC and the deadline hour, the breach time landed on the same day instead of the next day, causing premature SLA alerts. The SLA calculation now shifts the deadline date by +1 day for sensor-triggered daily pipelines. + ## [0.7.3] - 2026-03-10 ### Added @@ -354,6 +360,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline Released under the [Elastic License 2.0](LICENSE). +[0.7.4]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.4 [0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3 [0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2 [0.7.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.1 diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 8a5be32..114b0cb 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -381,6 +381,19 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { scheduleID := resolveScheduleID(cfg) date := resolveWatchdogSLADate(cfg, now) + // Sensor-triggered daily pipelines run T+1: data for today completes + // tomorrow, so the SLA deadline is relative to tomorrow's date. + // Only slaDate is shifted; the original date is kept for schedule + // naming, trigger lookup, and fire-alert payload so cancellation + // stays consistent with the SFN's view of the pipeline. + slaDate := date + if cfg.Schedule.Cron == "" && !strings.HasPrefix(cfg.SLA.Deadline, ":") { + t, err := time.Parse("2006-01-02", date) + if err == nil { + slaDate = t.AddDate(0, 0, 1).Format("2006-01-02") + } + } + // Skip if pipeline already completed or permanently failed for this date. tr, err := d.Store.GetTrigger(ctx, id, scheduleID, date) switch { @@ -397,7 +410,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { Mode: "calculate", PipelineID: id, ScheduleID: scheduleID, - Date: date, + Date: slaDate, Deadline: cfg.SLA.Deadline, ExpectedDuration: cfg.SLA.ExpectedDuration, Timezone: cfg.SLA.Timezone, diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index baf8463..675fdd4 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "testing" "time" @@ -1943,6 +1944,317 @@ func TestWatchdog_PostRunSensorPresent(t *testing.T) { } } +func TestWatchdog_ScheduleSLAAlerts_SensorDaily_DeadlineNextDay(t *testing.T) { + mock := newMockDDB() + d, _, _ := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + // Fix time at 00:30 UTC on 2026-03-10. For a sensor-triggered daily + // pipeline with SLA deadline "02:00", data for 2026-03-10 won't complete + // until ~00:05 on 2026-03-11. The breach should be 2026-03-11T02:00:00Z + // (tomorrow), NOT 2026-03-10T02:00:00Z (today). + fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "gold-daily-sensor"}, + Schedule: types.ScheduleConfig{ + // No cron — sensor-triggered daily pipeline. + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "gold-daily"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + require.Len(t, schedMock.created, 2, "expected 2 SLA schedules (warning + breach)") + + // Find the breach schedule and verify it targets tomorrow (2026-03-11), + // not today (2026-03-10). + for _, s := range schedMock.created { + name := *s.Name + if !strings.Contains(name, "breach") { + continue + } + // The schedule expression is "at(2026-03-11T02:00:00)" for the + // correct (next-day) deadline. + expr := *s.ScheduleExpression + assert.Contains(t, expr, "2026-03-11T02:00:00", + "sensor-triggered daily pipeline breach should target next day; got %s", expr) + assert.NotContains(t, expr, "2026-03-10T02:00:00", + "sensor-triggered daily pipeline breach should NOT target today; got %s", expr) + } +} + +// --------------------------------------------------------------------------- +// SLA date boundary tests for the T+1 fix +// --------------------------------------------------------------------------- + +func TestWatchdog_SLADateBoundaries(t *testing.T) { + // Helper to set up a scheduler-enabled Deps with a fixed clock. + setupDeps := func(mock *mockDDB, fixedNow time.Time) (*lambda.Deps, *mockScheduler) { + d, _, _ := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) + return d, schedMock + } + + // sensorDailyCfg builds a sensor-triggered daily pipeline config. + sensorDailyCfg := func(id string) types.PipelineConfig { + return types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: id}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "daily-status", + Check: types.CheckExists, + }, + Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}}, + } + } + + // findBreachExpr returns the ScheduleExpression from the breach schedule. + findBreachExpr := func(t *testing.T, schedMock *mockScheduler) string { + t.Helper() + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + for _, s := range schedMock.created { + if strings.Contains(*s.Name, "breach") { + return *s.ScheduleExpression + } + } + t.Fatal("no breach schedule found") + return "" + } + + // --- Positive cases: sensor-triggered daily, SHOULD shift T+1 --- + + t.Run("MidnightWindow", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-midnight") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-11T02:00:00", + "midnight window: breach should be next day; got %s", expr) + assert.NotContains(t, expr, "2026-03-10T02:00:00", + "midnight window: breach should NOT be today; got %s", expr) + }) + + t.Run("AfterDeadlineHour", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 10, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-after-deadline") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-11T02:00:00", + "after deadline: breach should be next day; got %s", expr) + }) + + t.Run("JustBeforeMidnight", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 23, 59, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-before-midnight") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-11T02:00:00", + "just before midnight: breach should be next day; got %s", expr) + }) + + t.Run("MonthBoundary", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-month") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-04-01", + "month boundary: breach should cross into April; got %s", expr) + }) + + t.Run("YearBoundary", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 12, 31, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-year") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2027-01-01", + "year boundary: breach should cross into 2027; got %s", expr) + }) + + t.Run("LeapYear", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2028, 2, 28, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-leap") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2028-02-29", + "leap year: breach should land on Feb 29; got %s", expr) + }) + + t.Run("NonLeapYear", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2027, 2, 28, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-nonleap") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2027-03-01", + "non-leap year: breach should roll to Mar 1; got %s", expr) + }) + + // --- Negative cases: should NOT shift --- + + t.Run("CronDailyNoShift", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "sla-boundary-cron"}, + Schedule: types.ScheduleConfig{ + Cron: "0 2 * * *", + Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-10T02:00:00", + "cron daily: breach should be same day (no T+1 shift); got %s", expr) + assert.NotContains(t, expr, "2026-03-11", + "cron daily: breach should NOT be next day; got %s", expr) + }) + + t.Run("HourlySensorNoShift", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 10, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "sla-boundary-hourly"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "hourly-status", + Check: types.CheckExists, + }, + Evaluation: types.EvaluationWindow{Window: "5m", Interval: "1m"}, + }, + SLA: &types.SLAConfig{ + Deadline: ":45", + ExpectedDuration: "10m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // For hourly sensor at 10:00, resolveWatchdogSLADate returns + // "2026-03-10T09" (previous hour). The T+1 shift does NOT apply + // because the deadline starts with ":". The breach should be at + // :45 of the processing hour (T09's data processed in T10), + // i.e. 2026-03-10T10:45:00. + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + require.NotEmpty(t, schedMock.created, "expected SLA schedules for hourly sensor pipeline") + + for _, s := range schedMock.created { + if !strings.Contains(*s.Name, "breach") { + continue + } + expr := *s.ScheduleExpression + // Breach should stay on 2026-03-10, NOT shift to 2026-03-11. + assert.Contains(t, expr, "2026-03-10", + "hourly sensor: breach should stay on same day; got %s", expr) + assert.NotContains(t, expr, "2026-03-11", + "hourly sensor: breach should NOT shift to next day; got %s", expr) + } + }) +} + func TestWatchdog_PostRunNoConfig(t *testing.T) { mock := newMockDDB() d, _, ebMock := testDeps(mock)