From fd8db6c134db9e93805d4e7e84ff1d89e1fc4ea6 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Tue, 10 Mar 2026 21:57:36 +0700 Subject: [PATCH 1/3] fix: resolve SLA deadline to T+1 for sensor-triggered daily pipelines Sensor-triggered daily pipelines process today's data tomorrow, but scheduleSLAAlerts resolved the SLA deadline against today's date. Between 00:00 and the deadline hour, the breach time landed on the same day instead of the next day, causing false SLA warnings and breaches. Shift the date passed to handleSLACalculate by +1 day when the pipeline is sensor-triggered (no cron) with a daily deadline (HH:MM, not :MM). --- internal/lambda/watchdog.go | 15 +++++++- internal/lambda/watchdog_test.go | 63 ++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 8a5be32..114b0cb 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -381,6 +381,19 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { scheduleID := resolveScheduleID(cfg) date := resolveWatchdogSLADate(cfg, now) + // Sensor-triggered daily pipelines run T+1: data for today completes + // tomorrow, so the SLA deadline is relative to tomorrow's date. + // Only slaDate is shifted; the original date is kept for schedule + // naming, trigger lookup, and fire-alert payload so cancellation + // stays consistent with the SFN's view of the pipeline. + slaDate := date + if cfg.Schedule.Cron == "" && !strings.HasPrefix(cfg.SLA.Deadline, ":") { + t, err := time.Parse("2006-01-02", date) + if err == nil { + slaDate = t.AddDate(0, 0, 1).Format("2006-01-02") + } + } + // Skip if pipeline already completed or permanently failed for this date. tr, err := d.Store.GetTrigger(ctx, id, scheduleID, date) switch { @@ -397,7 +410,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { Mode: "calculate", PipelineID: id, ScheduleID: scheduleID, - Date: date, + Date: slaDate, Deadline: cfg.SLA.Deadline, ExpectedDuration: cfg.SLA.ExpectedDuration, Timezone: cfg.SLA.Timezone, diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index baf8463..a99e00d 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "testing" "time" @@ -1943,6 +1944,68 @@ func TestWatchdog_PostRunSensorPresent(t *testing.T) { } } +func TestWatchdog_ScheduleSLAAlerts_SensorDaily_DeadlineNextDay(t *testing.T) { + mock := newMockDDB() + d, _, _ := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + // Fix time at 00:30 UTC on 2026-03-10. For a sensor-triggered daily + // pipeline with SLA deadline "02:00", data for 2026-03-10 won't complete + // until ~00:05 on 2026-03-11. The breach should be 2026-03-11T02:00:00Z + // (tomorrow), NOT 2026-03-10T02:00:00Z (today). + fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "gold-daily-sensor"}, + Schedule: types.ScheduleConfig{ + // No cron — sensor-triggered daily pipeline. + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "gold-daily"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + require.Len(t, schedMock.created, 2, "expected 2 SLA schedules (warning + breach)") + + // Find the breach schedule and verify it targets tomorrow (2026-03-11), + // not today (2026-03-10). + for _, s := range schedMock.created { + name := *s.Name + if !strings.Contains(name, "breach") { + continue + } + // The schedule expression is "at(2026-03-11T02:00:00)" for the + // correct (next-day) deadline. + expr := *s.ScheduleExpression + assert.Contains(t, expr, "2026-03-11T02:00:00", + "sensor-triggered daily pipeline breach should target next day; got %s", expr) + assert.NotContains(t, expr, "2026-03-10T02:00:00", + "sensor-triggered daily pipeline breach should NOT target today; got %s", expr) + } +} + func TestWatchdog_PostRunNoConfig(t *testing.T) { mock := newMockDDB() d, _, ebMock := testDeps(mock) From c390fe5171d28bc57ec2ec91ddc662795ebc89d2 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Tue, 10 Mar 2026 21:58:40 +0700 Subject: [PATCH 2/3] docs: add v0.7.4 changelog for SLA T+1 daily sensor fix --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0ac6c6..2d7b911 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.4] - 2026-03-10 + +### Fixed + +- **False SLA warnings/breaches for sensor-triggered daily pipelines** — `scheduleSLAAlerts` resolved the SLA deadline against today's date, but sensor-triggered daily pipelines run T+1 (data for today completes tomorrow). Between 00:00 UTC and the deadline hour, the breach time landed on the same day instead of the next day, causing premature SLA alerts. The SLA calculation now shifts the deadline date by +1 day for sensor-triggered daily pipelines. + ## [0.7.3] - 2026-03-10 ### Added @@ -354,6 +360,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline Released under the [Elastic License 2.0](LICENSE). +[0.7.4]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.4 [0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3 [0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2 [0.7.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.1 From 876ede735737d32d6b8f369818edd56c9f7c1cbd Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Tue, 10 Mar 2026 22:04:16 +0700 Subject: [PATCH 3/3] test: comprehensive SLA T+1 date boundary tests 9 subtests covering midnight window, after-deadline, pre-midnight, month boundary, year boundary, leap year, non-leap year, and two negative cases (cron daily and hourly sensor should not shift). --- internal/lambda/watchdog_test.go | 249 +++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index a99e00d..675fdd4 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -2006,6 +2006,255 @@ func TestWatchdog_ScheduleSLAAlerts_SensorDaily_DeadlineNextDay(t *testing.T) { } } +// --------------------------------------------------------------------------- +// SLA date boundary tests for the T+1 fix +// --------------------------------------------------------------------------- + +func TestWatchdog_SLADateBoundaries(t *testing.T) { + // Helper to set up a scheduler-enabled Deps with a fixed clock. + setupDeps := func(mock *mockDDB, fixedNow time.Time) (*lambda.Deps, *mockScheduler) { + d, _, _ := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) + return d, schedMock + } + + // sensorDailyCfg builds a sensor-triggered daily pipeline config. + sensorDailyCfg := func(id string) types.PipelineConfig { + return types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: id}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "daily-status", + Check: types.CheckExists, + }, + Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}}, + } + } + + // findBreachExpr returns the ScheduleExpression from the breach schedule. + findBreachExpr := func(t *testing.T, schedMock *mockScheduler) string { + t.Helper() + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + for _, s := range schedMock.created { + if strings.Contains(*s.Name, "breach") { + return *s.ScheduleExpression + } + } + t.Fatal("no breach schedule found") + return "" + } + + // --- Positive cases: sensor-triggered daily, SHOULD shift T+1 --- + + t.Run("MidnightWindow", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-midnight") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-11T02:00:00", + "midnight window: breach should be next day; got %s", expr) + assert.NotContains(t, expr, "2026-03-10T02:00:00", + "midnight window: breach should NOT be today; got %s", expr) + }) + + t.Run("AfterDeadlineHour", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 10, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-after-deadline") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-11T02:00:00", + "after deadline: breach should be next day; got %s", expr) + }) + + t.Run("JustBeforeMidnight", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 23, 59, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-before-midnight") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-11T02:00:00", + "just before midnight: breach should be next day; got %s", expr) + }) + + t.Run("MonthBoundary", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-month") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-04-01", + "month boundary: breach should cross into April; got %s", expr) + }) + + t.Run("YearBoundary", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 12, 31, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-year") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2027-01-01", + "year boundary: breach should cross into 2027; got %s", expr) + }) + + t.Run("LeapYear", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2028, 2, 28, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-leap") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2028-02-29", + "leap year: breach should land on Feb 29; got %s", expr) + }) + + t.Run("NonLeapYear", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2027, 2, 28, 12, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := sensorDailyCfg("sla-boundary-nonleap") + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2027-03-01", + "non-leap year: breach should roll to Mar 1; got %s", expr) + }) + + // --- Negative cases: should NOT shift --- + + t.Run("CronDailyNoShift", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "sla-boundary-cron"}, + Schedule: types.ScheduleConfig{ + Cron: "0 2 * * *", + Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + expr := findBreachExpr(t, schedMock) + assert.Contains(t, expr, "2026-03-10T02:00:00", + "cron daily: breach should be same day (no T+1 shift); got %s", expr) + assert.NotContains(t, expr, "2026-03-11", + "cron daily: breach should NOT be next day; got %s", expr) + }) + + t.Run("HourlySensorNoShift", func(t *testing.T) { + mock := newMockDDB() + fixedNow := time.Date(2026, 3, 10, 10, 0, 0, 0, time.UTC) + d, schedMock := setupDeps(mock, fixedNow) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "sla-boundary-hourly"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "hourly-status", + Check: types.CheckExists, + }, + Evaluation: types.EvaluationWindow{Window: "5m", Interval: "1m"}, + }, + SLA: &types.SLAConfig{ + Deadline: ":45", + ExpectedDuration: "10m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // For hourly sensor at 10:00, resolveWatchdogSLADate returns + // "2026-03-10T09" (previous hour). The T+1 shift does NOT apply + // because the deadline starts with ":". The breach should be at + // :45 of the processing hour (T09's data processed in T10), + // i.e. 2026-03-10T10:45:00. + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + require.NotEmpty(t, schedMock.created, "expected SLA schedules for hourly sensor pipeline") + + for _, s := range schedMock.created { + if !strings.Contains(*s.Name, "breach") { + continue + } + expr := *s.ScheduleExpression + // Breach should stay on 2026-03-10, NOT shift to 2026-03-11. + assert.Contains(t, expr, "2026-03-10", + "hourly sensor: breach should stay on same day; got %s", expr) + assert.NotContains(t, expr, "2026-03-11", + "hourly sensor: breach should NOT shift to next day; got %s", expr) + } + }) +} + func TestWatchdog_PostRunNoConfig(t *testing.T) { mock := newMockDDB() d, _, ebMock := testDeps(mock)