Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.4] - 2026-03-10

### Fixed

- **False SLA warnings/breaches for sensor-triggered daily pipelines** — `scheduleSLAAlerts` resolved the SLA deadline against today's date, but sensor-triggered daily pipelines run T+1 (data for today completes tomorrow). Between 00:00 UTC and the deadline hour, the breach time landed on the same day instead of the next day, causing premature SLA alerts. The SLA calculation now shifts the deadline date by +1 day for sensor-triggered daily pipelines.

## [0.7.3] - 2026-03-10

### Added
Expand Down Expand Up @@ -354,6 +360,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline

Released under the [Elastic License 2.0](LICENSE).

[0.7.4]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.4
[0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3
[0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2
[0.7.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.1
Expand Down
15 changes: 14 additions & 1 deletion internal/lambda/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,19 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error {
scheduleID := resolveScheduleID(cfg)
date := resolveWatchdogSLADate(cfg, now)

// Sensor-triggered daily pipelines run T+1: data for today completes
// tomorrow, so the SLA deadline is relative to tomorrow's date.
// Only slaDate is shifted; the original date is kept for schedule
// naming, trigger lookup, and fire-alert payload so cancellation
// stays consistent with the SFN's view of the pipeline.
slaDate := date
if cfg.Schedule.Cron == "" && !strings.HasPrefix(cfg.SLA.Deadline, ":") {
t, err := time.Parse("2006-01-02", date)
if err == nil {
slaDate = t.AddDate(0, 0, 1).Format("2006-01-02")
}
}

// Skip if pipeline already completed or permanently failed for this date.
tr, err := d.Store.GetTrigger(ctx, id, scheduleID, date)
switch {
Expand All @@ -397,7 +410,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error {
Mode: "calculate",
PipelineID: id,
ScheduleID: scheduleID,
Date: date,
Date: slaDate,
Deadline: cfg.SLA.Deadline,
ExpectedDuration: cfg.SLA.ExpectedDuration,
Timezone: cfg.SLA.Timezone,
Expand Down
312 changes: 312 additions & 0 deletions internal/lambda/watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -1943,6 +1944,317 @@ func TestWatchdog_PostRunSensorPresent(t *testing.T) {
}
}

func TestWatchdog_ScheduleSLAAlerts_SensorDaily_DeadlineNextDay(t *testing.T) {
mock := newMockDDB()
d, _, _ := testDeps(mock)
schedMock := &mockScheduler{}
d.Scheduler = schedMock
d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor"
d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role"
d.SchedulerGroupName = "interlock-sla"

// Fix time at 00:30 UTC on 2026-03-10. For a sensor-triggered daily
// pipeline with SLA deadline "02:00", data for 2026-03-10 won't complete
// until ~00:05 on 2026-03-11. The breach should be 2026-03-11T02:00:00Z
// (tomorrow), NOT 2026-03-10T02:00:00Z (today).
fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC)
d.NowFunc = func() time.Time { return fixedNow }
d.StartedAt = fixedNow.Add(-5 * time.Minute)

cfg := types.PipelineConfig{
Pipeline: types.PipelineIdentity{ID: "gold-daily-sensor"},
Schedule: types.ScheduleConfig{
// No cron — sensor-triggered daily pipeline.
Trigger: &types.TriggerCondition{
Key: "upstream-complete",
Check: types.CheckEquals,
Field: "status",
Value: "ready",
},
Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"},
},
SLA: &types.SLAConfig{
Deadline: "02:00",
ExpectedDuration: "30m",
},
Validation: types.ValidationConfig{Trigger: "ALL"},
Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "gold-daily"}},
}
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

schedMock.mu.Lock()
defer schedMock.mu.Unlock()
require.Len(t, schedMock.created, 2, "expected 2 SLA schedules (warning + breach)")

// Find the breach schedule and verify it targets tomorrow (2026-03-11),
// not today (2026-03-10).
for _, s := range schedMock.created {
name := *s.Name
if !strings.Contains(name, "breach") {
continue
}
// The schedule expression is "at(2026-03-11T02:00:00)" for the
// correct (next-day) deadline.
expr := *s.ScheduleExpression
assert.Contains(t, expr, "2026-03-11T02:00:00",
"sensor-triggered daily pipeline breach should target next day; got %s", expr)
assert.NotContains(t, expr, "2026-03-10T02:00:00",
"sensor-triggered daily pipeline breach should NOT target today; got %s", expr)
}
}

// ---------------------------------------------------------------------------
// SLA date boundary tests for the T+1 fix
// ---------------------------------------------------------------------------

func TestWatchdog_SLADateBoundaries(t *testing.T) {
// Helper to set up a scheduler-enabled Deps with a fixed clock.
setupDeps := func(mock *mockDDB, fixedNow time.Time) (*lambda.Deps, *mockScheduler) {
d, _, _ := testDeps(mock)
schedMock := &mockScheduler{}
d.Scheduler = schedMock
d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor"
d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role"
d.SchedulerGroupName = "interlock-sla"
d.NowFunc = func() time.Time { return fixedNow }
d.StartedAt = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
return d, schedMock
}

// sensorDailyCfg builds a sensor-triggered daily pipeline config.
sensorDailyCfg := func(id string) types.PipelineConfig {
return types.PipelineConfig{
Pipeline: types.PipelineIdentity{ID: id},
Schedule: types.ScheduleConfig{
Trigger: &types.TriggerCondition{
Key: "daily-status",
Check: types.CheckExists,
},
Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"},
},
SLA: &types.SLAConfig{
Deadline: "02:00",
ExpectedDuration: "30m",
},
Validation: types.ValidationConfig{Trigger: "ALL"},
Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}},
}
}

// findBreachExpr returns the ScheduleExpression from the breach schedule.
findBreachExpr := func(t *testing.T, schedMock *mockScheduler) string {
t.Helper()
schedMock.mu.Lock()
defer schedMock.mu.Unlock()
for _, s := range schedMock.created {
if strings.Contains(*s.Name, "breach") {
return *s.ScheduleExpression
}
}
t.Fatal("no breach schedule found")
return ""
}

// --- Positive cases: sensor-triggered daily, SHOULD shift T+1 ---

t.Run("MidnightWindow", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-midnight")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2026-03-11T02:00:00",
"midnight window: breach should be next day; got %s", expr)
assert.NotContains(t, expr, "2026-03-10T02:00:00",
"midnight window: breach should NOT be today; got %s", expr)
})

t.Run("AfterDeadlineHour", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 3, 10, 10, 0, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-after-deadline")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2026-03-11T02:00:00",
"after deadline: breach should be next day; got %s", expr)
})

t.Run("JustBeforeMidnight", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 3, 10, 23, 59, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-before-midnight")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2026-03-11T02:00:00",
"just before midnight: breach should be next day; got %s", expr)
})

t.Run("MonthBoundary", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 3, 31, 12, 0, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-month")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2026-04-01",
"month boundary: breach should cross into April; got %s", expr)
})

t.Run("YearBoundary", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 12, 31, 12, 0, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-year")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2027-01-01",
"year boundary: breach should cross into 2027; got %s", expr)
})

t.Run("LeapYear", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2028, 2, 28, 12, 0, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-leap")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2028-02-29",
"leap year: breach should land on Feb 29; got %s", expr)
})

t.Run("NonLeapYear", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2027, 2, 28, 12, 0, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := sensorDailyCfg("sla-boundary-nonleap")
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2027-03-01",
"non-leap year: breach should roll to Mar 1; got %s", expr)
})

// --- Negative cases: should NOT shift ---

t.Run("CronDailyNoShift", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := types.PipelineConfig{
Pipeline: types.PipelineIdentity{ID: "sla-boundary-cron"},
Schedule: types.ScheduleConfig{
Cron: "0 2 * * *",
Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"},
},
SLA: &types.SLAConfig{
Deadline: "02:00",
ExpectedDuration: "30m",
},
Validation: types.ValidationConfig{Trigger: "ALL"},
Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}},
}
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

expr := findBreachExpr(t, schedMock)
assert.Contains(t, expr, "2026-03-10T02:00:00",
"cron daily: breach should be same day (no T+1 shift); got %s", expr)
assert.NotContains(t, expr, "2026-03-11",
"cron daily: breach should NOT be next day; got %s", expr)
})

t.Run("HourlySensorNoShift", func(t *testing.T) {
mock := newMockDDB()
fixedNow := time.Date(2026, 3, 10, 10, 0, 0, 0, time.UTC)
d, schedMock := setupDeps(mock, fixedNow)

cfg := types.PipelineConfig{
Pipeline: types.PipelineIdentity{ID: "sla-boundary-hourly"},
Schedule: types.ScheduleConfig{
Trigger: &types.TriggerCondition{
Key: "hourly-status",
Check: types.CheckExists,
},
Evaluation: types.EvaluationWindow{Window: "5m", Interval: "1m"},
},
SLA: &types.SLAConfig{
Deadline: ":45",
ExpectedDuration: "10m",
},
Validation: types.ValidationConfig{Trigger: "ALL"},
Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo test"}},
}
seedConfig(mock, cfg)

err := lambda.HandleWatchdog(context.Background(), d)
require.NoError(t, err)

// For hourly sensor at 10:00, resolveWatchdogSLADate returns
// "2026-03-10T09" (previous hour). The T+1 shift does NOT apply
// because the deadline starts with ":". The breach should be at
// :45 of the processing hour (T09's data processed in T10),
// i.e. 2026-03-10T10:45:00.
schedMock.mu.Lock()
defer schedMock.mu.Unlock()
require.NotEmpty(t, schedMock.created, "expected SLA schedules for hourly sensor pipeline")

for _, s := range schedMock.created {
if !strings.Contains(*s.Name, "breach") {
continue
}
expr := *s.ScheduleExpression
// Breach should stay on 2026-03-10, NOT shift to 2026-03-11.
assert.Contains(t, expr, "2026-03-10",
"hourly sensor: breach should stay on same day; got %s", expr)
assert.NotContains(t, expr, "2026-03-11",
"hourly sensor: breach should NOT shift to next day; got %s", expr)
}
})
}

func TestWatchdog_PostRunNoConfig(t *testing.T) {
mock := newMockDDB()
d, _, ebMock := testDeps(mock)
Expand Down