From e124eaf757be11362b337cc66e5b00255fba0595 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Tue, 10 Mar 2026 23:37:49 +0700 Subject: [PATCH 1/4] feat: inclusion calendar and relative SLA for irregular pipelines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two capabilities for pipelines that don't fit regular cron schedules: Inclusion calendar (schedule.include.dates) for known irregular dates — monthly close, quarterly filing, specific business dates. Watchdog detects missed inclusion dates and publishes IRREGULAR_SCHEDULE_MISSED. Relative SLA (sla.maxDuration) for ad-hoc pipelines with no predictable schedule. Clock starts at first sensor arrival, covers the entire lifecycle through completion. Warning at 75% or breach minus expectedDuration. New types, 3 event types, config validation, store methods, stream-router first-sensor-arrival tracking, sla-monitor relative path, watchdog defense-in-depth, and ASL parameter passthrough. --- CHANGELOG.md | 18 ++ deploy/statemachine.asl.json | 8 +- internal/lambda/dynstream.go | 20 ++ internal/lambda/inclusion_test.go | 94 ++++++ internal/lambda/sfn.go | 24 +- internal/lambda/sla_monitor.go | 74 ++++- internal/lambda/sla_monitor_test.go | 141 +++++++++ internal/lambda/stream_router.go | 10 + internal/lambda/stream_router_test.go | 73 +++++ internal/lambda/types.go | 2 + internal/lambda/watchdog.go | 197 +++++++++++++ internal/lambda/watchdog_test.go | 398 ++++++++++++++++++++++++++ internal/store/control.go | 44 +++ internal/store/control_test.go | 102 +++++++ internal/validation/config.go | 31 ++ internal/validation/config_test.go | 106 ++++++- pkg/types/events.go | 3 + pkg/types/pipeline.go | 12 +- 18 files changed, 1336 insertions(+), 21 deletions(-) create mode 100644 internal/lambda/inclusion_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d7b911..492b630 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.8.0] - 2026-03-10 + +### Added + +- **Inclusion calendar scheduling** (`schedule.include.dates`) — explicit YYYY-MM-DD date lists for pipelines that run on known irregular dates (monthly close, quarterly filing, specific business dates). Mutually exclusive with cron. Watchdog detects missed inclusion dates and publishes `IRREGULAR_SCHEDULE_MISSED` events. +- **Relative SLA** (`sla.maxDuration`) — duration-based SLA for ad-hoc pipelines with no predictable schedule. Clock starts at first sensor arrival and covers the entire lifecycle: evaluation → trigger → job → post-run → completion. Warning at 75% of maxDuration (or `breachAt - expectedDuration` when set). New events: `RELATIVE_SLA_WARNING`, `RELATIVE_SLA_BREACH`. +- **First-sensor-arrival tracking** — stream-router records `first-sensor-arrival#` on lock acquisition (idempotent conditional write). Used as T=0 for relative SLA calculation. +- **Watchdog defense-in-depth for relative SLA** — `detectRelativeSLABreaches` scans pipelines with `maxDuration` config and fires `RELATIVE_SLA_BREACH` if the EventBridge scheduler failed to fire. +- **`WriteSensorIfAbsent` store method** — conditional PutItem that only writes if the key doesn't exist, used for first-sensor-arrival idempotency. +- **Config validation** for new fields: cron/include mutual exclusion, inclusion date format (YYYY-MM-DD), maxDuration format and 24h cap, maxDuration requires trigger. + +### Changed + +- `SLAConfig.Deadline` and `SLAConfig.ExpectedDuration` are now `omitempty` — relative SLA configs may omit the wall-clock deadline entirely. +- SFN ASL passes `maxDuration` and `sensorArrivalAt` to `CancelSLASchedules` and `CancelSLAOnCompleteTriggerFailure` states. +- sla-monitor `handleSLACalculate` routes to relative path when `MaxDuration` + `SensorArrivalAt` are present. + ## [0.7.4] - 2026-03-10 ### Fixed @@ -360,6 +377,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline Released under the [Elastic License 2.0](LICENSE). +[0.8.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.8.0 [0.7.4]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.4 [0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3 [0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2 diff --git a/deploy/statemachine.asl.json b/deploy/statemachine.asl.json index 16fec85..caf0867 100644 --- a/deploy/statemachine.asl.json +++ b/deploy/statemachine.asl.json @@ -395,7 +395,9 @@ "scheduleId.$": "$.scheduleId", "date.$": "$.date", "deadline.$": "$.config.sla.deadline", - "expectedDuration.$": "$.config.sla.expectedDuration" + "expectedDuration.$": "$.config.sla.expectedDuration", + "maxDuration.$": "$.config.sla.maxDuration", + "sensorArrivalAt.$": "$.sensorArrivalAt" }, "ResultPath": "$.slaResult", "Next": "CompleteTriggerFailed", @@ -447,7 +449,9 @@ "scheduleId.$": "$.scheduleId", "date.$": "$.date", "deadline.$": "$.config.sla.deadline", - "expectedDuration.$": "$.config.sla.expectedDuration" + "expectedDuration.$": "$.config.sla.expectedDuration", + "maxDuration.$": "$.config.sla.maxDuration", + "sensorArrivalAt.$": "$.sensorArrivalAt" }, "ResultPath": "$.slaResult", "Next": "Done", diff --git a/internal/lambda/dynstream.go b/internal/lambda/dynstream.go index 56e6953..8bda4b9 100644 --- a/internal/lambda/dynstream.go +++ b/internal/lambda/dynstream.go @@ -188,6 +188,26 @@ func resolveTimezone(tz string) *time.Location { return time.UTC } +// MostRecentInclusionDate returns the most recent date from dates that is on +// or before now (comparing date only, ignoring time of day). Dates must be +// YYYY-MM-DD strings; unparseable entries are silently skipped. Returns +// ("", false) if no dates qualify. +func MostRecentInclusionDate(dates []string, now time.Time) (string, bool) { + nowDate := now.Format("2006-01-02") + best := "" + found := false + for _, d := range dates { + if _, err := time.Parse("2006-01-02", d); err != nil { + continue + } + if d <= nowDate && d > best { + best = d + found = true + } + } + return best, found +} + // isExcludedTime is the core calendar exclusion check. It evaluates // whether the given time falls on a weekend or a specifically excluded date. func isExcludedTime(excl *types.ExclusionConfig, t time.Time) bool { diff --git a/internal/lambda/inclusion_test.go b/internal/lambda/inclusion_test.go new file mode 100644 index 0000000..af4d999 --- /dev/null +++ b/internal/lambda/inclusion_test.go @@ -0,0 +1,94 @@ +package lambda_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/dwsmith1983/interlock/internal/lambda" +) + +func TestMostRecentInclusionDate(t *testing.T) { + // Fixed reference time: 2026-03-10T14:00:00Z + now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + + tests := []struct { + name string + dates []string + now time.Time + wantDate string + wantOK bool + }{ + { + name: "empty dates list", + dates: nil, + now: now, + wantOK: false, + }, + { + name: "all dates in the future", + dates: []string{"2026-03-11", "2026-04-01", "2026-06-30"}, + now: now, + wantOK: false, + }, + { + name: "single past date", + dates: []string{"2026-03-01"}, + now: now, + wantDate: "2026-03-01", + wantOK: true, + }, + { + name: "multiple past dates picks most recent", + dates: []string{"2026-01-15", "2026-02-28", "2026-03-05"}, + now: now, + wantDate: "2026-03-05", + wantOK: true, + }, + { + name: "today's date is included (on or before now)", + dates: []string{"2026-03-05", "2026-03-10", "2026-03-15"}, + now: now, + wantDate: "2026-03-10", + wantOK: true, + }, + { + name: "mix of past and future picks most recent past", + dates: []string{"2026-01-01", "2026-03-08", "2026-03-12", "2026-04-01"}, + now: now, + wantDate: "2026-03-08", + wantOK: true, + }, + { + name: "invalid date format is skipped", + dates: []string{"not-a-date", "2026-13-01"}, + now: now, + wantOK: false, + }, + { + name: "invalid dates mixed with valid date", + dates: []string{"bad", "2026-03-01", "also-bad"}, + now: now, + wantDate: "2026-03-01", + wantOK: true, + }, + { + name: "now at midnight still includes today", + dates: []string{"2026-03-10"}, + now: time.Date(2026, 3, 10, 0, 0, 0, 0, time.UTC), + wantDate: "2026-03-10", + wantOK: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotDate, gotOK := lambda.MostRecentInclusionDate(tt.dates, tt.now) + assert.Equal(t, tt.wantOK, gotOK) + if tt.wantOK { + assert.Equal(t, tt.wantDate, gotDate) + } + }) + } +} diff --git a/internal/lambda/sfn.go b/internal/lambda/sfn.go index bbb84df..d449783 100644 --- a/internal/lambda/sfn.go +++ b/internal/lambda/sfn.go @@ -13,10 +13,11 @@ import ( // sfnInput is the top-level input for the Step Function state machine. // It includes pipeline identity fields and a config block used by Wait states. type sfnInput struct { - PipelineID string `json:"pipelineId"` - ScheduleID string `json:"scheduleId"` - Date string `json:"date"` - Config sfnConfig `json:"config"` + PipelineID string `json:"pipelineId"` + ScheduleID string `json:"scheduleId"` + Date string `json:"date"` + SensorArrivalAt string `json:"sensorArrivalAt,omitempty"` // RFC3339; first sensor arrival for relative SLA + Config sfnConfig `json:"config"` } // sfnConfig holds timing parameters for the SFN evaluation loop and SLA branch. @@ -100,6 +101,21 @@ func startSFNWithName(ctx context.Context, d *Deps, cfg *types.PipelineConfig, p Date: date, Config: sc, } + + // Populate sensorArrivalAt for relative SLA passthrough. + if sc.SLA != nil && sc.SLA.MaxDuration != "" && d.Store != nil { + arrivalKey := "first-sensor-arrival#" + date + arrivalData, readErr := d.Store.GetSensorData(ctx, pipelineID, arrivalKey) + if readErr != nil { + d.Logger.WarnContext(ctx, "failed to read first-sensor-arrival for SFN input", + "pipelineId", pipelineID, "error", readErr) + } else if arrivalData != nil { + if at, ok := arrivalData["arrivedAt"].(string); ok { + input.SensorArrivalAt = at + } + } + } + payload, err := json.Marshal(input) if err != nil { return fmt.Errorf("marshal SFN input: %w", err) diff --git a/internal/lambda/sla_monitor.go b/internal/lambda/sla_monitor.go index c48140e..ac5bd8a 100644 --- a/internal/lambda/sla_monitor.go +++ b/internal/lambda/sla_monitor.go @@ -39,11 +39,19 @@ func HandleSLAMonitor(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAM } } -// handleSLACalculate computes warning and breach times from the deadline -// and expected duration. Warning time = deadline - expectedDuration. -// Breach time = deadline. Returns full ISO 8601 timestamps required by -// Step Functions TimestampPath. +// handleSLACalculate computes warning and breach times. Supports two modes: +// +// 1. Schedule-based (deadline): breachAt = deadline, warningAt = deadline - expectedDuration. +// 2. Relative (maxDuration + sensorArrivalAt): breachAt = sensorArrivalAt + maxDuration, +// warningAt = breachAt - expectedDuration (or breachAt - 25% of maxDuration if no expectedDuration). +// +// Returns full ISO 8601 timestamps required by Step Functions TimestampPath. func handleSLACalculate(input SLAMonitorInput, now time.Time) (SLAMonitorOutput, error) { + // Relative SLA path: maxDuration + sensorArrivalAt. + if input.MaxDuration != "" && input.SensorArrivalAt != "" { + return handleRelativeSLACalculate(input) + } + dur, err := time.ParseDuration(input.ExpectedDuration) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("parse expectedDuration %q: %w", input.ExpectedDuration, err) @@ -122,6 +130,41 @@ func handleSLACalculate(input SLAMonitorInput, now time.Time) (SLAMonitorOutput, }, nil } +// handleRelativeSLACalculate computes warning and breach times from +// sensorArrivalAt + maxDuration. Warning offset uses expectedDuration +// if provided, otherwise defaults to 25% of maxDuration (i.e. warning +// fires at 75% of the total allowed time). +func handleRelativeSLACalculate(input SLAMonitorInput) (SLAMonitorOutput, error) { + maxDur, err := time.ParseDuration(input.MaxDuration) + if err != nil { + return SLAMonitorOutput{}, fmt.Errorf("parse maxDuration %q: %w", input.MaxDuration, err) + } + + arrivalAt, err := time.Parse(time.RFC3339, input.SensorArrivalAt) + if err != nil { + return SLAMonitorOutput{}, fmt.Errorf("parse sensorArrivalAt %q: %w", input.SensorArrivalAt, err) + } + + breachAt := arrivalAt.Add(maxDur) + + // Warning offset: use expectedDuration if provided, otherwise 25% of maxDuration. + var warningOffset time.Duration + if input.ExpectedDuration != "" { + warningOffset, err = time.ParseDuration(input.ExpectedDuration) + if err != nil { + return SLAMonitorOutput{}, fmt.Errorf("parse expectedDuration %q: %w", input.ExpectedDuration, err) + } + } else { + warningOffset = maxDur / 4 + } + warningAt := breachAt.Add(-warningOffset) + + return SLAMonitorOutput{ + WarningAt: warningAt.UTC().Format(time.RFC3339), + BreachAt: breachAt.UTC().Format(time.RFC3339), + }, nil +} + // handleSLAFireAlert publishes an SLA alert event to EventBridge and // returns the alert metadata. func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { @@ -251,14 +294,23 @@ func handleSLASchedule(ctx context.Context, d *Deps, input SLAMonitorInput) (SLA // Warning/breach events were already published at the correct time by the // Scheduler-invoked fire-alert calls. func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { - // If warningAt/breachAt not provided, recalculate from deadline/expectedDuration. - if input.WarningAt == "" && input.BreachAt == "" && input.Deadline != "" { - calc, err := handleSLACalculate(input, d.now()) - if err != nil { - return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate: %w", err) + // If warningAt/breachAt not provided, recalculate from the available config. + if input.WarningAt == "" && input.BreachAt == "" { + if input.MaxDuration != "" && input.SensorArrivalAt != "" { + calc, err := handleRelativeSLACalculate(input) + if err != nil { + return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate (relative): %w", err) + } + input.WarningAt = calc.WarningAt + input.BreachAt = calc.BreachAt + } else if input.Deadline != "" { + calc, err := handleSLACalculate(input, d.now()) + if err != nil { + return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate: %w", err) + } + input.WarningAt = calc.WarningAt + input.BreachAt = calc.BreachAt } - input.WarningAt = calc.WarningAt - input.BreachAt = calc.BreachAt } if d.Scheduler != nil { diff --git a/internal/lambda/sla_monitor_test.go b/internal/lambda/sla_monitor_test.go index 59aef85..96e103d 100644 --- a/internal/lambda/sla_monitor_test.go +++ b/internal/lambda/sla_monitor_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dwsmith1983/interlock/internal/lambda" + "github.com/dwsmith1983/interlock/internal/store" "github.com/dwsmith1983/interlock/pkg/types" ) @@ -165,6 +166,146 @@ func TestSLAMonitor_Calculate_InvalidDuration(t *testing.T) { } } +// --------------------------------------------------------------------------- +// Relative SLA calculation tests +// --------------------------------------------------------------------------- + +func TestSLAMonitor_Calculate_Relative_Basic(t *testing.T) { + // Relative SLA: maxDuration=2h, sensorArrivalAt=14:00 UTC, no expectedDuration. + // breachAt = 14:00 + 2h = 16:00 UTC + // warningAt = breachAt - 25% of 2h = 16:00 - 0h30m = 15:30 UTC + d := &lambda.Deps{Logger: slog.Default()} + out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "calculate", + PipelineID: "adhoc-pipeline", + MaxDuration: "2h", + SensorArrivalAt: "2026-03-10T14:00:00Z", + }) + require.NoError(t, err) + assert.Equal(t, "2026-03-10T16:00:00Z", out.BreachAt) + assert.Equal(t, "2026-03-10T15:30:00Z", out.WarningAt) +} + +func TestSLAMonitor_Calculate_Relative_WithExpectedDuration(t *testing.T) { + // Relative SLA: maxDuration=2h, sensorArrivalAt=14:00 UTC, expectedDuration=30m. + // breachAt = 14:00 + 2h = 16:00 UTC + // warningAt = 16:00 - 30m = 15:30 UTC + d := &lambda.Deps{Logger: slog.Default()} + out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "calculate", + PipelineID: "adhoc-pipeline", + MaxDuration: "2h", + SensorArrivalAt: "2026-03-10T14:00:00Z", + ExpectedDuration: "30m", + }) + require.NoError(t, err) + assert.Equal(t, "2026-03-10T16:00:00Z", out.BreachAt) + assert.Equal(t, "2026-03-10T15:30:00Z", out.WarningAt) +} + +func TestSLAMonitor_Calculate_Relative_Default75Percent(t *testing.T) { + // Without expectedDuration, warning should be at 75% of maxDuration. + // maxDuration=4h → breachAt = arrival + 4h, warningAt = breachAt - 1h (75% of 4h = 3h, so warning at T+3h). + d := &lambda.Deps{Logger: slog.Default()} + out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "calculate", + PipelineID: "adhoc-pipeline", + MaxDuration: "4h", + SensorArrivalAt: "2026-03-10T10:00:00Z", + }) + require.NoError(t, err) + assert.Equal(t, "2026-03-10T14:00:00Z", out.BreachAt) + // 75% of 4h = 3h, so warning at 10:00 + 3h = 13:00 (i.e. breachAt - 1h) + assert.Equal(t, "2026-03-10T13:00:00Z", out.WarningAt) +} + +func TestSLAMonitor_Calculate_Relative_InvalidMaxDuration(t *testing.T) { + d := &lambda.Deps{Logger: slog.Default()} + _, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "calculate", + PipelineID: "adhoc-pipeline", + MaxDuration: "not-a-duration", + SensorArrivalAt: "2026-03-10T14:00:00Z", + }) + require.Error(t, err) +} + +func TestSLAMonitor_Calculate_Relative_InvalidSensorArrivalAt(t *testing.T) { + d := &lambda.Deps{Logger: slog.Default()} + _, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "calculate", + PipelineID: "adhoc-pipeline", + MaxDuration: "2h", + SensorArrivalAt: "not-a-time", + }) + require.Error(t, err) +} + +func TestSLAMonitor_FireAlert_RelativeSLAWarning(t *testing.T) { + mock := newMockDDB() + s := &store.Store{ + Client: mock, + ControlTable: testControlTable, + JobLogTable: "joblog", + RerunTable: "rerun", + } + ebMock := &mockEventBridge{} + d := &lambda.Deps{ + Store: s, + EventBridge: ebMock, + EventBusName: "interlock-bus", + Logger: slog.Default(), + } + + out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "fire-alert", + PipelineID: "adhoc-pipeline", + ScheduleID: "stream", + Date: "2026-03-10", + AlertType: string(types.EventRelativeSLAWarning), + BreachAt: "2099-01-01T00:00:00Z", // far future — don't suppress + }) + require.NoError(t, err) + assert.Equal(t, string(types.EventRelativeSLAWarning), out.AlertType) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.Len(t, ebMock.events, 1) + assert.Equal(t, string(types.EventRelativeSLAWarning), *ebMock.events[0].Entries[0].DetailType) +} + +func TestSLAMonitor_FireAlert_RelativeSLABreach(t *testing.T) { + mock := newMockDDB() + s := &store.Store{ + Client: mock, + ControlTable: testControlTable, + JobLogTable: "joblog", + RerunTable: "rerun", + } + ebMock := &mockEventBridge{} + d := &lambda.Deps{ + Store: s, + EventBridge: ebMock, + EventBusName: "interlock-bus", + Logger: slog.Default(), + } + + out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ + Mode: "fire-alert", + PipelineID: "adhoc-pipeline", + ScheduleID: "stream", + Date: "2026-03-10", + AlertType: string(types.EventRelativeSLABreach), + }) + require.NoError(t, err) + assert.Equal(t, string(types.EventRelativeSLABreach), out.AlertType) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + require.Len(t, ebMock.events, 1) + assert.Equal(t, string(types.EventRelativeSLABreach), *ebMock.events[0].Entries[0].DetailType) +} + // --------------------------------------------------------------------------- // Schedule tests // --------------------------------------------------------------------------- diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index 949b5ac..60aa64a 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -233,6 +233,16 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event return nil } + // Record first sensor arrival time (idempotent — only writes if absent). + // This timestamp serves as T=0 for relative SLA calculation. + arrivalKey := "first-sensor-arrival#" + date + if _, writeErr := d.Store.WriteSensorIfAbsent(ctx, pipelineID, arrivalKey, map[string]interface{}{ + "arrivedAt": now.UTC().Format(time.RFC3339), + }); writeErr != nil { + d.Logger.WarnContext(ctx, "failed to write first-sensor-arrival", + "pipelineId", pipelineID, "date", date, "error", writeErr) + } + // Start Step Function execution. if err := startSFN(ctx, d, cfg, pipelineID, scheduleID, date); err != nil { if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, scheduleID, date); relErr != nil { diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 3bb1d14..3afd45c 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -567,6 +567,79 @@ func TestStreamRouter_TriggerValueMismatch_NoSFN(t *testing.T) { assert.Empty(t, sfnMock.executions, "expected no SFN when trigger value does not match") } +// --------------------------------------------------------------------------- +// First-sensor-arrival recording tests +// --------------------------------------------------------------------------- + +func TestStreamRouter_SensorMatch_RecordsFirstSensorArrival(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 14, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := testStreamConfig() + seedConfig(mock, cfg) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // Verify SFN was started (lock acquired). + sfnMock.mu.Lock() + require.Len(t, sfnMock.executions, 1, "expected SFN execution") + sfnMock.mu.Unlock() + + // Verify the first-sensor-arrival record was written. + date := fixedNow.Format("2006-01-02") + sensorKey := "first-sensor-arrival#" + date + data, err := d.Store.GetSensorData(context.Background(), "gold-revenue", sensorKey) + require.NoError(t, err) + require.NotNil(t, data, "first-sensor-arrival sensor should be written on lock acquisition") + assert.Equal(t, fixedNow.Format(time.RFC3339), data["arrivedAt"]) +} + +func TestStreamRouter_SensorMatch_FirstArrivalIdempotent(t *testing.T) { + mock := newMockDDB() + d, _, _ := testDeps(mock) + + firstArrival := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + secondArrival := time.Date(2026, 3, 10, 15, 0, 0, 0, time.UTC) + + cfg := testStreamConfig() + seedConfig(mock, cfg) + + // Pre-seed a first-sensor-arrival record (as if lock was acquired earlier). + date := firstArrival.Format("2006-01-02") + seedSensor(mock, "gold-revenue", "first-sensor-arrival#"+date, map[string]interface{}{ + "arrivedAt": firstArrival.Format(time.RFC3339), + }) + + // Set time to second arrival. + d.NowFunc = func() time.Time { return secondArrival } + + // Send another sensor event — lock already held, so no SFN or overwrite. + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // Verify original arrival time is preserved (not overwritten). + sensorKey := "first-sensor-arrival#" + date + data, err := d.Store.GetSensorData(context.Background(), "gold-revenue", sensorKey) + require.NoError(t, err) + require.NotNil(t, data, "first-sensor-arrival sensor should still exist") + assert.Equal(t, firstArrival.Format(time.RFC3339), data["arrivedAt"], + "first arrival time should not be overwritten by subsequent sensor events") +} + // --------------------------------------------------------------------------- // Late data arrival tests // --------------------------------------------------------------------------- diff --git a/internal/lambda/types.go b/internal/lambda/types.go index 958db55..503661e 100644 --- a/internal/lambda/types.go +++ b/internal/lambda/types.go @@ -86,6 +86,8 @@ type SLAMonitorInput struct { AlertType string `json:"alertType,omitempty"` // SLA_WARNING, SLA_BREACH (fire-alert) Deadline string `json:"deadline,omitempty"` // "HH:MM" or ":MM" ExpectedDuration string `json:"expectedDuration,omitempty"` // e.g. "30m" + MaxDuration string `json:"maxDuration,omitempty"` // e.g. "2h"; relative SLA + SensorArrivalAt string `json:"sensorArrivalAt,omitempty"` // RFC3339; T=0 for relative SLA Timezone string `json:"timezone,omitempty"` Critical bool `json:"critical,omitempty"` WarningAt string `json:"warningAt,omitempty"` // RFC3339, passed to cancel mode diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 114b0cb..5504e0c 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -24,6 +24,9 @@ func HandleWatchdog(ctx context.Context, d *Deps) error { if err := detectMissedSchedules(ctx, d); err != nil { d.Logger.Error("missed schedule detection failed", "error", err) } + if err := detectMissedInclusionSchedules(ctx, d); err != nil { + d.Logger.Error("missed inclusion schedule detection failed", "error", err) + } if err := reconcileSensorTriggers(ctx, d); err != nil { d.Logger.Error("sensor trigger reconciliation failed", "error", err) } @@ -36,6 +39,9 @@ func HandleWatchdog(ctx context.Context, d *Deps) error { if err := detectMissingPostRunSensors(ctx, d); err != nil { d.Logger.Error("post-run sensor absence detection failed", "error", err) } + if err := detectRelativeSLABreaches(ctx, d); err != nil { + d.Logger.Error("relative SLA breach detection failed", "error", err) + } return nil } @@ -353,6 +359,84 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { return nil } +// detectMissedInclusionSchedules checks pipelines with inclusion calendar config +// for missed schedules on irregular dates. For each pipeline with an Include +// config, it finds the most recent applicable date and verifies that a trigger +// exists. If no trigger is found and no dedup marker exists, an +// IRREGULAR_SCHEDULE_MISSED event is published. +func detectMissedInclusionSchedules(ctx context.Context, d *Deps) error { + configs, err := d.ConfigCache.GetAll(ctx) + if err != nil { + return fmt.Errorf("load configs: %w", err) + } + + now := d.now() + + for id, cfg := range configs { + if cfg.Schedule.Include == nil || len(cfg.Schedule.Include.Dates) == 0 { + continue + } + + // Skip calendar-excluded days. + if isExcluded(cfg, now) { + continue + } + + date, ok := MostRecentInclusionDate(cfg.Schedule.Include.Dates, now) + if !ok { + continue + } + + scheduleID := resolveScheduleID(cfg) + + // Check if a trigger exists for the most recent inclusion date. + found, err := d.Store.HasTriggerForDate(ctx, id, scheduleID, date) + if err != nil { + d.Logger.Error("failed to check trigger for inclusion schedule", + "pipelineId", id, "date", date, "error", err) + continue + } + if found { + continue + } + + // Check dedup marker to avoid re-alerting on subsequent watchdog runs. + dedupKey := "irregular-missed-check#" + date + dedupData, err := d.Store.GetSensorData(ctx, id, dedupKey) + if err != nil { + d.Logger.Error("dedup marker lookup failed for inclusion schedule", + "pipelineId", id, "date", date, "error", err) + continue + } + if dedupData != nil { + continue + } + + alertDetail := map[string]interface{}{ + "source": "watchdog", + "actionHint": fmt.Sprintf("inclusion date %s expected to have a trigger — none found", date), + } + if err := publishEvent(ctx, d, string(types.EventIrregularScheduleMissed), id, scheduleID, date, + fmt.Sprintf("missed inclusion schedule for %s on %s", id, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish irregular schedule missed event", "error", err, "pipeline", id, "date", date) + } + + // Write dedup marker. + if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ + "alerted": "true", + }); err != nil { + d.Logger.Warn("failed to write inclusion dedup marker", "error", err, "pipeline", id, "date", date) + } + + d.Logger.Info("detected missed inclusion schedule", + "pipelineId", id, + "schedule", scheduleID, + "date", date, + ) + } + return nil +} + // scheduleSLAAlerts proactively creates EventBridge Scheduler entries for all // pipelines with SLA configs. This ensures warnings/breaches fire even when // pipelines never trigger (data never arrives, sensor fails, etc.). @@ -843,3 +927,116 @@ func hasPostRunSensorUpdate(rules []types.ValidationRule, sensors map[string]map } return false } + +// detectRelativeSLABreaches checks pipelines with MaxDuration SLA config for +// breaches. This is a defense-in-depth fallback: if the EventBridge Scheduler +// fails to fire the relative SLA breach alert, the watchdog catches it. +func detectRelativeSLABreaches(ctx context.Context, d *Deps) error { + configs, err := d.ConfigCache.GetAll(ctx) + if err != nil { + return fmt.Errorf("load configs: %w", err) + } + + now := d.now() + today := now.Format("2006-01-02") + + for id, cfg := range configs { + if cfg.SLA == nil || cfg.SLA.MaxDuration == "" { + continue + } + + maxDur, err := time.ParseDuration(cfg.SLA.MaxDuration) + if err != nil { + d.Logger.Warn("invalid maxDuration in SLA config", + "pipelineId", id, "maxDuration", cfg.SLA.MaxDuration, "error", err) + continue + } + + scheduleID := resolveScheduleID(cfg) + + // Look up the first-sensor-arrival marker for today. + arrivalKey := "first-sensor-arrival#" + today + arrivalData, err := d.Store.GetSensorData(ctx, id, arrivalKey) + if err != nil { + d.Logger.Error("first-sensor-arrival lookup failed", + "pipelineId", id, "error", err) + continue + } + if arrivalData == nil { + continue + } + + arrivedAtStr, ok := arrivalData["arrivedAt"].(string) + if !ok || arrivedAtStr == "" { + continue + } + arrivedAt, err := time.Parse(time.RFC3339, arrivedAtStr) + if err != nil { + d.Logger.Warn("invalid arrivedAt in first-sensor-arrival", + "pipelineId", id, "arrivedAt", arrivedAtStr, "error", err) + continue + } + + // Check if the relative SLA has been breached. + breachAt := arrivedAt.Add(maxDur) + if now.Before(breachAt) { + continue + } + + // Skip if pipeline already completed or permanently failed. + tr, err := d.Store.GetTrigger(ctx, id, scheduleID, today) + if err != nil { + d.Logger.Warn("trigger lookup failed in relative SLA check", + "pipelineId", id, "error", err) + continue + } + if tr != nil && (tr.Status == types.TriggerStatusCompleted || tr.Status == types.TriggerStatusFailedFinal) { + continue + } + if isJobTerminal(ctx, d, id, scheduleID, today) { + continue + } + + // Check dedup marker to avoid re-alerting on subsequent watchdog runs. + dedupKey := "relative-sla-breach-check#" + today + dedupData, err := d.Store.GetSensorData(ctx, id, dedupKey) + if err != nil { + d.Logger.Error("dedup marker lookup failed for relative SLA breach", + "pipelineId", id, "error", err) + continue + } + if dedupData != nil { + continue + } + + alertDetail := map[string]interface{}{ + "source": "watchdog", + "maxDuration": cfg.SLA.MaxDuration, + "sensorArrivalAt": arrivedAtStr, + "breachAt": breachAt.UTC().Format(time.RFC3339), + "actionHint": "relative SLA breached — pipeline has exceeded maxDuration since first sensor arrival", + } + if err := publishEvent(ctx, d, string(types.EventRelativeSLABreach), id, scheduleID, today, + fmt.Sprintf("relative SLA breach for %s on %s", id, today), alertDetail); err != nil { + d.Logger.Warn("failed to publish relative SLA breach event", + "error", err, "pipeline", id, "date", today) + } + + // Write dedup marker. + if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ + "alerted": "true", + }); err != nil { + d.Logger.Warn("failed to write relative SLA breach dedup marker", + "error", err, "pipeline", id, "date", today) + } + + d.Logger.Info("detected relative SLA breach", + "pipelineId", id, + "schedule", scheduleID, + "date", today, + "sensorArrivalAt", arrivedAtStr, + "breachAt", breachAt.UTC().Format(time.RFC3339), + ) + } + return nil +} diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index 675fdd4..f0bca79 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -2297,3 +2297,401 @@ func TestWatchdog_PostRunNoConfig(t *testing.T) { "should not publish POST_RUN_SENSOR_MISSING for pipeline without PostRun config") } } + +// --------------------------------------------------------------------------- +// Inclusion schedule missed detection tests +// --------------------------------------------------------------------------- + +func TestWatchdog_InclusionSchedule_NilInclude_Skipped(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Pipeline with no Include config — should be ignored by inclusion detection. + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // No IRREGULAR_SCHEDULE_MISSED events. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventIrregularScheduleMissed), *ev.Entries[0].DetailType, + "should not publish IRREGULAR_SCHEDULE_MISSED when Include is nil") + } +} + +func TestWatchdog_InclusionSchedule_TriggerExists_NoAlert(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Use a fixed time for deterministic behavior. + fixedNow := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-05", "2026-03-15"}, + }, + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + // Most recent inclusion date on or before now is 2026-03-05. + // Seed a trigger for that date so no alert is expected. + mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("monthly-close")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-05")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusCompleted}, + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // No IRREGULAR_SCHEDULE_MISSED events. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventIrregularScheduleMissed), *ev.Entries[0].DetailType, + "should not publish IRREGULAR_SCHEDULE_MISSED when trigger exists for date") + } +} + +func TestWatchdog_InclusionSchedule_TriggerMissing_PublishesAlert(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Use a fixed time for deterministic behavior. + fixedNow := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-05", "2026-03-15"}, + }, + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + // No trigger seeded for 2026-03-05 — should detect as missed. + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // Expect IRREGULAR_SCHEDULE_MISSED event. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + + var irregularMissedCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + irregularMissedCount++ + + // Verify event detail contains the expected pipeline and date. + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-05", detail.Date) + } + } + assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") +} + +func TestWatchdog_InclusionSchedule_DedupPreventsRepeat(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-05"}, + }, + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + // Seed dedup marker to simulate a previous watchdog run already detected this. + seedSensor(mock, "monthly-close", "irregular-missed-check#2026-03-05", map[string]interface{}{ + "alerted": "true", + }) + + // No trigger for 2026-03-05, but dedup marker exists — should NOT alert again. + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventIrregularScheduleMissed), *ev.Entries[0].DetailType, + "should not publish IRREGULAR_SCHEDULE_MISSED when dedup marker exists") + } +} + +func TestWatchdog_InclusionSchedule_AllFutureDates_NoAlert(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "quarterly-filing"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-15", "2026-06-15", "2026-09-15"}, + }, + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // No inclusion date on or before now — no alert. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventIrregularScheduleMissed), *ev.Entries[0].DetailType, + "should not publish IRREGULAR_SCHEDULE_MISSED when all dates are in the future") + } +} + +// --------------------------------------------------------------------------- +// Relative SLA breach detection (defense-in-depth) +// --------------------------------------------------------------------------- + +func TestWatchdog_RelativeSLA_NoMaxDuration_Skips(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 16, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + // Pipeline with no MaxDuration — relative SLA check should be skipped. + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "standard-pipeline"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "18:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo hello"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventRelativeSLABreach), *ev.Entries[0].DetailType, + "should not publish RELATIVE_SLA_BREACH when no maxDuration configured") + } +} + +func TestWatchdog_RelativeSLA_SensorArrival_NotYetBreached(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 15, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "adhoc-pipeline"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + MaxDuration: "2h", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo hello"}}, + } + seedConfig(mock, cfg) + + // Seed first-sensor-arrival at 14:00. With maxDuration=2h, breach is at 16:00. + // Current time is 15:00, so not yet breached. + date := fixedNow.Format("2006-01-02") + seedSensor(mock, "adhoc-pipeline", "first-sensor-arrival#"+date, map[string]interface{}{ + "arrivedAt": "2026-03-10T14:00:00Z", + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventRelativeSLABreach), *ev.Entries[0].DetailType, + "should not publish RELATIVE_SLA_BREACH before maxDuration elapses") + } +} + +func TestWatchdog_RelativeSLA_SensorArrival_Breached(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 17, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "adhoc-pipeline"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + MaxDuration: "2h", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo hello"}}, + } + seedConfig(mock, cfg) + + // Seed first-sensor-arrival at 14:00. With maxDuration=2h, breach is at 16:00. + // Current time is 17:00, so breached. + date := fixedNow.Format("2006-01-02") + seedSensor(mock, "adhoc-pipeline", "first-sensor-arrival#"+date, map[string]interface{}{ + "arrivedAt": "2026-03-10T14:00:00Z", + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + var breachCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventRelativeSLABreach) { + breachCount++ + } + } + assert.Equal(t, 1, breachCount, "expected one RELATIVE_SLA_BREACH event") +} + +func TestWatchdog_RelativeSLA_Breached_AlreadyCompleted_NoAlert(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedNow := time.Date(2026, 3, 10, 17, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "adhoc-pipeline"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + MaxDuration: "2h", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo hello"}}, + } + seedConfig(mock, cfg) + + date := fixedNow.Format("2006-01-02") + seedSensor(mock, "adhoc-pipeline", "first-sensor-arrival#"+date, map[string]interface{}{ + "arrivedAt": "2026-03-10T14:00:00Z", + }) + + // Seed a COMPLETED trigger — pipeline already finished. + mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("adhoc-pipeline")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", date)}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusCompleted}, + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventRelativeSLABreach), *ev.Entries[0].DetailType, + "should not publish RELATIVE_SLA_BREACH when pipeline already completed") + } +} diff --git a/internal/store/control.go b/internal/store/control.go index 27ae7dc..058fb40 100644 --- a/internal/store/control.go +++ b/internal/store/control.go @@ -208,6 +208,36 @@ func (s *Store) WriteSensor(ctx context.Context, pipelineID, sensorKey string, d return nil } +// WriteSensorIfAbsent writes a sensor row only if it does not already exist. +// Returns true if the write succeeded (key was absent), false if the key +// already existed. This is used for idempotent first-occurrence recording. +func (s *Store) WriteSensorIfAbsent(ctx context.Context, pipelineID, sensorKey string, data map[string]interface{}) (bool, error) { + rec := types.ControlRecord{ + PK: types.PipelinePK(pipelineID), + SK: types.SensorSK(sensorKey), + Data: data, + } + + item, err := attributevalue.MarshalMap(rec) + if err != nil { + return false, fmt.Errorf("marshal sensor %q for %q: %w", sensorKey, pipelineID, err) + } + + _, err = s.Client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: &s.ControlTable, + Item: item, + ConditionExpression: aws.String("attribute_not_exists(PK)"), + }) + if err != nil { + var ccfe *ddbtypes.ConditionalCheckFailedException + if errors.As(err, &ccfe) { + return false, nil + } + return false, fmt.Errorf("write sensor if absent %q for %q: %w", sensorKey, pipelineID, err) + } + return true, nil +} + // DeleteSensor removes a sensor row from the control table. func (s *Store) DeleteSensor(ctx context.Context, pipelineID, sensorKey string) error { _, err := s.Client.DeleteItem(ctx, &dynamodb.DeleteItemInput{ @@ -430,6 +460,20 @@ func (s *Store) SetTriggerStatus(ctx context.Context, pipelineID, schedule, date return nil } +// HasTriggerForDates checks multiple dates for existing triggers and returns a +// map of date to existence. Internally calls HasTriggerForDate for each date. +func (s *Store) HasTriggerForDates(ctx context.Context, pipelineID, schedule string, dates []string) (map[string]bool, error) { + result := make(map[string]bool, len(dates)) + for _, date := range dates { + found, err := s.HasTriggerForDate(ctx, pipelineID, schedule, date) + if err != nil { + return nil, fmt.Errorf("check trigger for date %s: %w", date, err) + } + result[date] = found + } + return result, nil +} + // CreateTriggerIfAbsent writes a trigger row with the given status only if no // trigger row exists for this pipeline/schedule/date. Returns true if the row // was created, false if a row already existed (TOCTOU-safe via conditional put). diff --git a/internal/store/control_test.go b/internal/store/control_test.go index 7ab7aff..3e536ec 100644 --- a/internal/store/control_test.go +++ b/internal/store/control_test.go @@ -1017,3 +1017,105 @@ func TestWriteRerunRequest_DynamoError(t *testing.T) { t.Errorf("expected 'write rerun request' in error, got: %v", err) } } + +// --- HasTriggerForDates tests --- + +func TestHasTriggerForDates_AllFound(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + pk := types.PipelinePK("pipe-1") + + // Seed triggers for two dates. + mock.putRaw("control", map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: pk}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-01")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + }) + mock.putRaw("control", map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: pk}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-05")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusCompleted}, + }) + + result, err := s.HasTriggerForDates(context.Background(), "pipe-1", "stream", []string{"2026-03-01", "2026-03-05"}) + if err != nil { + t.Fatalf("HasTriggerForDates: %v", err) + } + if !result["2026-03-01"] { + t.Error("expected 2026-03-01 to be found") + } + if !result["2026-03-05"] { + t.Error("expected 2026-03-05 to be found") + } +} + +func TestHasTriggerForDates_SomeFound(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + pk := types.PipelinePK("pipe-1") + + // Seed a trigger for only one date. + mock.putRaw("control", map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: pk}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-01")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + }) + + result, err := s.HasTriggerForDates(context.Background(), "pipe-1", "stream", []string{"2026-03-01", "2026-03-05"}) + if err != nil { + t.Fatalf("HasTriggerForDates: %v", err) + } + if !result["2026-03-01"] { + t.Error("expected 2026-03-01 to be found") + } + if result["2026-03-05"] { + t.Error("expected 2026-03-05 to NOT be found") + } +} + +func TestHasTriggerForDates_NoneFound(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + result, err := s.HasTriggerForDates(context.Background(), "pipe-1", "stream", []string{"2026-03-01", "2026-03-05"}) + if err != nil { + t.Fatalf("HasTriggerForDates: %v", err) + } + if result["2026-03-01"] { + t.Error("expected 2026-03-01 to NOT be found") + } + if result["2026-03-05"] { + t.Error("expected 2026-03-05 to NOT be found") + } +} + +func TestHasTriggerForDates_EmptyDates(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + result, err := s.HasTriggerForDates(context.Background(), "pipe-1", "stream", nil) + if err != nil { + t.Fatalf("HasTriggerForDates: %v", err) + } + if len(result) != 0 { + t.Errorf("expected empty map, got %v", result) + } +} + +func TestHasTriggerForDates_DynamoError(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + injected := errors.New("throttled") + mock.errFn = errOnOp("Query", injected) + + _, err := s.HasTriggerForDates(context.Background(), "pipe-1", "stream", []string{"2026-03-01"}) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, injected) { + t.Errorf("expected wrapped injected error, got: %v", err) + } +} diff --git a/internal/validation/config.go b/internal/validation/config.go index d01d783..be5a010 100644 --- a/internal/validation/config.go +++ b/internal/validation/config.go @@ -2,6 +2,7 @@ package validation import ( "fmt" + "time" "github.com/dwsmith1983/interlock/pkg/types" ) @@ -33,5 +34,35 @@ func ValidatePipelineConfig(cfg *types.PipelineConfig) []string { errs = append(errs, fmt.Sprintf("job.jobPollWindowSeconds %d out of range [60,86400]", *p)) } + // Cron and Include are mutually exclusive. + if cfg.Schedule.Cron != "" && cfg.Schedule.Include != nil { + errs = append(errs, "schedule.cron and schedule.include are mutually exclusive") + } + + // Inclusion calendar validation. + if inc := cfg.Schedule.Include; inc != nil { + if len(inc.Dates) == 0 { + errs = append(errs, "schedule.include.dates must not be empty") + } + for i, d := range inc.Dates { + if _, err := time.Parse("2006-01-02", d); err != nil { + errs = append(errs, fmt.Sprintf("schedule.include.dates[%d] invalid format %q (expected YYYY-MM-DD)", i, d)) + } + } + } + + // Relative SLA (maxDuration) validation. + if cfg.SLA != nil && cfg.SLA.MaxDuration != "" { + if cfg.Schedule.Trigger == nil { + errs = append(errs, "sla.maxDuration requires schedule.trigger (relative SLA needs a sensor signal)") + } + d, err := time.ParseDuration(cfg.SLA.MaxDuration) + if err != nil { + errs = append(errs, fmt.Sprintf("sla.maxDuration invalid Go duration %q", cfg.SLA.MaxDuration)) + } else if d > 24*time.Hour { + errs = append(errs, fmt.Sprintf("sla.maxDuration exceeds 24h (%s)", cfg.SLA.MaxDuration)) + } + } + return errs } diff --git a/internal/validation/config_test.go b/internal/validation/config_test.go index 9fd058d..3cf06e8 100644 --- a/internal/validation/config_test.go +++ b/internal/validation/config_test.go @@ -1,6 +1,7 @@ package validation import ( + "strings" "testing" "github.com/stretchr/testify/assert" @@ -8,6 +9,15 @@ import ( "github.com/dwsmith1983/interlock/pkg/types" ) +func containsSubstr(ss []string, sub string) bool { + for _, s := range ss { + if strings.Contains(s, sub) { + return true + } + } + return false +} + func intPtr(v int) *int { return &v } func TestValidatePipelineConfig(t *testing.T) { @@ -160,14 +170,106 @@ func TestValidatePipelineConfig(t *testing.T) { }, wantCount: 4, }, + // --- v0.8.0: inclusion calendar + relative SLA validation --- + { + name: "cron and include mutually exclusive", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Cron: "0 2 * * *", + Include: &types.InclusionConfig{Dates: []string{"2026-03-31"}}, + }, + }, + wantCount: 1, + wantSubstr: "schedule.cron and schedule.include are mutually exclusive", + }, + { + name: "include with valid dates", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{Dates: []string{"2026-03-31", "2026-06-30"}}, + }, + }, + wantCount: 0, + }, + { + name: "include with empty dates", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{Dates: []string{}}, + }, + }, + wantCount: 1, + wantSubstr: "schedule.include.dates must not be empty", + }, + { + name: "include with invalid date format", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{Dates: []string{"2026-03-31", "03/31/2026"}}, + }, + }, + wantCount: 1, + wantSubstr: "schedule.include.dates[1] invalid format", + }, + { + name: "maxDuration valid", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{Key: "some-sensor", Check: types.CheckExists}, + }, + SLA: &types.SLAConfig{MaxDuration: "2h"}, + }, + wantCount: 0, + }, + { + name: "maxDuration exceeds 24h", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{Key: "some-sensor", Check: types.CheckExists}, + }, + SLA: &types.SLAConfig{MaxDuration: "25h"}, + }, + wantCount: 1, + wantSubstr: "sla.maxDuration exceeds 24h", + }, + { + name: "maxDuration invalid format", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{Key: "some-sensor", Check: types.CheckExists}, + }, + SLA: &types.SLAConfig{MaxDuration: "not-a-duration"}, + }, + wantCount: 1, + wantSubstr: "sla.maxDuration invalid Go duration", + }, + { + name: "maxDuration requires trigger", + cfg: types.PipelineConfig{ + SLA: &types.SLAConfig{MaxDuration: "2h"}, + }, + wantCount: 1, + wantSubstr: "sla.maxDuration requires schedule.trigger", + }, + { + name: "maxDuration and deadline coexist", + cfg: types.PipelineConfig{ + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{Key: "some-sensor", Check: types.CheckExists}, + }, + SLA: &types.SLAConfig{Deadline: "08:00", MaxDuration: "2h"}, + }, + wantCount: 0, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { errs := ValidatePipelineConfig(&tt.cfg) assert.Len(t, errs, tt.wantCount) - if tt.wantSubstr != "" && len(errs) > 0 { - assert.Contains(t, errs[0], tt.wantSubstr) + if tt.wantSubstr != "" { + assert.True(t, containsSubstr(errs, tt.wantSubstr), + "expected one of %v to contain %q", errs, tt.wantSubstr) } }) } diff --git a/pkg/types/events.go b/pkg/types/events.go index b546c22..03bd7ad 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -34,6 +34,9 @@ const ( EventRerunAccepted EventDetailType = "RERUN_ACCEPTED" EventInfraAlarm EventDetailType = "INFRA_ALARM" EventSensorDeadlineExpired EventDetailType = "SENSOR_DEADLINE_EXPIRED" + EventIrregularScheduleMissed EventDetailType = "IRREGULAR_SCHEDULE_MISSED" + EventRelativeSLAWarning EventDetailType = "RELATIVE_SLA_WARNING" + EventRelativeSLABreach EventDetailType = "RELATIVE_SLA_BREACH" ) // EventSource is the EventBridge source for all interlock events. diff --git a/pkg/types/pipeline.go b/pkg/types/pipeline.go index 9bf09ea..0d8ac01 100644 --- a/pkg/types/pipeline.go +++ b/pkg/types/pipeline.go @@ -24,11 +24,18 @@ type ScheduleConfig struct { Timezone string `yaml:"timezone,omitempty" json:"timezone,omitempty"` Trigger *TriggerCondition `yaml:"trigger,omitempty" json:"trigger,omitempty"` Exclude *ExclusionConfig `yaml:"exclude,omitempty" json:"exclude,omitempty"` + Include *InclusionConfig `yaml:"include,omitempty" json:"include,omitempty"` Calendar string `yaml:"calendar,omitempty" json:"calendar,omitempty"` Time string `yaml:"time,omitempty" json:"time,omitempty"` Evaluation EvaluationWindow `yaml:"evaluation" json:"evaluation"` } +// InclusionConfig defines explicit dates when a pipeline SHOULD run. +// Mutually exclusive with Cron. +type InclusionConfig struct { + Dates []string `yaml:"dates" json:"dates"` // YYYY-MM-DD +} + // TriggerCondition defines which sensor write starts evaluation. type TriggerCondition struct { Key string `yaml:"key" json:"key"` @@ -53,8 +60,9 @@ type EvaluationWindow struct { // SLAConfig defines pipeline deadlines (observational, never stops execution). type SLAConfig struct { - Deadline string `yaml:"deadline" json:"deadline"` // "HH:MM" - ExpectedDuration string `yaml:"expectedDuration" json:"expectedDuration"` // e.g. "30m" + Deadline string `yaml:"deadline,omitempty" json:"deadline,omitempty"` // "HH:MM" wall-clock deadline + ExpectedDuration string `yaml:"expectedDuration,omitempty" json:"expectedDuration,omitempty"` // e.g. "30m" + MaxDuration string `yaml:"maxDuration,omitempty" json:"maxDuration,omitempty"` // e.g. "2h"; relative SLA from first sensor arrival Timezone string `yaml:"timezone,omitempty" json:"timezone,omitempty"` Critical bool `yaml:"critical,omitempty" json:"critical,omitempty"` } From 57338e646c3a278d84685bc3cce0d3cfce3c80cd Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Tue, 10 Mar 2026 23:57:34 +0700 Subject: [PATCH 2/4] fix: edge case hardening for inclusion calendar and relative SLA Inclusion calendar grace period: detectMissedInclusionSchedules now respects Schedule.Time before alerting, matching the existing cron path. Resolves today in the pipeline's timezone to handle UTC date != local date boundaries. Cross-day relative SLA: detectRelativeSLABreaches checks both today and yesterday for first-sensor-arrival keys, covering T+1 sensor pipelines. Extracted checkRelativeSLAForDate helper. Multi-date inclusion lookback: PastInclusionDates returns all past dates (capped at 3 internally). Watchdog now checks each past date instead of only the most recent. Cleanup: replaced 4 inline timezone resolution patterns with resolveTimezone() calls. Moved maxInclusionLookback cap into PastInclusionDates. --- internal/lambda/dynstream.go | 30 +++ internal/lambda/inclusion_test.go | 47 ++++ internal/lambda/watchdog.go | 303 ++++++++++++----------- internal/lambda/watchdog_test.go | 393 ++++++++++++++++++++++++++++++ 4 files changed, 633 insertions(+), 140 deletions(-) diff --git a/internal/lambda/dynstream.go b/internal/lambda/dynstream.go index 8bda4b9..54ce702 100644 --- a/internal/lambda/dynstream.go +++ b/internal/lambda/dynstream.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sort" "strconv" "strings" "time" @@ -208,6 +209,35 @@ func MostRecentInclusionDate(dates []string, now time.Time) (string, bool) { return best, found } +// maxInclusionLookback is the maximum number of past inclusion dates to check. +// Caps DynamoDB reads when the watchdog has been down for an extended period. +const maxInclusionLookback = 3 + +// PastInclusionDates returns dates from the list that are on or before now, +// sorted most recent first and capped at maxInclusionLookback (3) entries. +// The cap bounds DynamoDB reads when the watchdog has been down for an +// extended period. Dates must be YYYY-MM-DD strings; unparseable entries +// are silently skipped. Returns nil if no dates qualify. +func PastInclusionDates(dates []string, now time.Time) []string { + nowDate := now.Format("2006-01-02") + var past []string + for _, d := range dates { + if _, err := time.Parse("2006-01-02", d); err != nil { + continue + } + if d <= nowDate { + past = append(past, d) + } + } + // Sort descending (most recent first) using string comparison on YYYY-MM-DD. + sort.Sort(sort.Reverse(sort.StringSlice(past))) + // Cap to maxInclusionLookback to bound downstream DynamoDB reads. + if len(past) > maxInclusionLookback { + past = past[:maxInclusionLookback] + } + return past +} + // isExcludedTime is the core calendar exclusion check. It evaluates // whether the given time falls on a weekend or a specifically excluded date. func isExcludedTime(excl *types.ExclusionConfig, t time.Time) bool { diff --git a/internal/lambda/inclusion_test.go b/internal/lambda/inclusion_test.go index af4d999..a4143c6 100644 --- a/internal/lambda/inclusion_test.go +++ b/internal/lambda/inclusion_test.go @@ -92,3 +92,50 @@ func TestMostRecentInclusionDate(t *testing.T) { }) } } + +func TestPastInclusionDates_AllFuture(t *testing.T) { + now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + dates := []string{"2026-03-11", "2026-04-01", "2026-06-30"} + + got := lambda.PastInclusionDates(dates, now) + assert.Empty(t, got, "all future dates should return empty slice") +} + +func TestPastInclusionDates_MultiplePast(t *testing.T) { + now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + dates := []string{"2026-01-15", "2026-02-28", "2026-03-05"} + + got := lambda.PastInclusionDates(dates, now) + assert.Equal(t, []string{"2026-03-05", "2026-02-28", "2026-01-15"}, got, + "should return all past dates, most recent first") +} + +func TestPastInclusionDates_MixedPastFuture(t *testing.T) { + now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + dates := []string{"2026-01-01", "2026-03-08", "2026-03-12", "2026-04-01"} + + got := lambda.PastInclusionDates(dates, now) + assert.Equal(t, []string{"2026-03-08", "2026-01-01"}, got, + "should only return past dates, most recent first") +} + +func TestPastInclusionDates_TodayIncluded(t *testing.T) { + now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + dates := []string{"2026-03-05", "2026-03-10", "2026-03-15"} + + got := lambda.PastInclusionDates(dates, now) + assert.Equal(t, []string{"2026-03-10", "2026-03-05"}, got, + "today's date should be included in results") +} + +func TestPastInclusionDates_CappedAt3(t *testing.T) { + now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + dates := []string{ + "2026-01-01", "2026-01-15", "2026-02-01", "2026-02-15", "2026-03-01", + } + + got := lambda.PastInclusionDates(dates, now) + assert.Len(t, got, 3, "result should be capped at 3 entries") + assert.Equal(t, []string{"2026-03-01", "2026-02-15", "2026-02-01"}, got, + "should return the 3 most recent past dates, most recent first") +} diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 5504e0c..44918ea 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -295,12 +295,7 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { // Only alert for schedules that should have fired after this Lambda // started. Prevents retroactive alerts after fresh deploys. if !d.StartedAt.IsZero() { - loc := time.UTC - if cfg.Schedule.Timezone != "" { - if parsed, err := time.LoadLocation(cfg.Schedule.Timezone); err == nil { - loc = parsed - } - } + loc := resolveTimezone(cfg.Schedule.Timezone) if lastFire := lastCronFire(cfg.Schedule.Cron, now, loc); !lastFire.IsZero() && lastFire.Before(d.StartedAt) { continue } @@ -324,12 +319,7 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { // Check if we are past the expected start time. If the pipeline // has a schedule time configured, only alert after that time. if cfg.Schedule.Time != "" { - loc := time.UTC - if cfg.Schedule.Timezone != "" { - if parsed, err := time.LoadLocation(cfg.Schedule.Timezone); err == nil { - loc = parsed - } - } + loc := resolveTimezone(cfg.Schedule.Timezone) localNow := now.In(loc) expectedStart, err := time.ParseInLocation("2006-01-02 15:04", today+" "+cfg.Schedule.Time, loc) if err == nil && localNow.Before(expectedStart) { @@ -361,9 +351,9 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { // detectMissedInclusionSchedules checks pipelines with inclusion calendar config // for missed schedules on irregular dates. For each pipeline with an Include -// config, it finds the most recent applicable date and verifies that a trigger -// exists. If no trigger is found and no dedup marker exists, an -// IRREGULAR_SCHEDULE_MISSED event is published. +// config, it finds all past inclusion dates (capped at maxInclusionLookback) +// and verifies that a trigger exists for each. If no trigger is found and no +// dedup marker exists, an IRREGULAR_SCHEDULE_MISSED event is published. func detectMissedInclusionSchedules(ctx context.Context, d *Deps) error { configs, err := d.ConfigCache.GetAll(ctx) if err != nil { @@ -382,57 +372,78 @@ func detectMissedInclusionSchedules(ctx context.Context, d *Deps) error { continue } - date, ok := MostRecentInclusionDate(cfg.Schedule.Include.Dates, now) - if !ok { + pastDates := PastInclusionDates(cfg.Schedule.Include.Dates, now) + if len(pastDates) == 0 { continue } scheduleID := resolveScheduleID(cfg) - // Check if a trigger exists for the most recent inclusion date. - found, err := d.Store.HasTriggerForDate(ctx, id, scheduleID, date) - if err != nil { - d.Logger.Error("failed to check trigger for inclusion schedule", - "pipelineId", id, "date", date, "error", err) - continue - } - if found { - continue - } + // Resolve today in the pipeline's timezone so the grace-period + // guard fires correctly when UTC date != pipeline-local date. + tzLoc := resolveTimezone(cfg.Schedule.Timezone) + today := now.In(tzLoc).Format("2006-01-02") + + for _, date := range pastDates { + // If the inclusion date is today and the pipeline has a + // Schedule.Time, only alert after that time has passed. + // This mirrors the same check in detectMissedSchedules for + // cron pipelines to avoid false-positive alerts before the + // expected start time. Past dates are not gated because + // their Schedule.Time has necessarily already elapsed. + if cfg.Schedule.Time != "" && date == today { + localNow := now.In(tzLoc) + expectedStart, err := time.ParseInLocation("2006-01-02 15:04", date+" "+cfg.Schedule.Time, tzLoc) + if err == nil && localNow.Before(expectedStart) { + continue // not yet past expected start time + } + } - // Check dedup marker to avoid re-alerting on subsequent watchdog runs. - dedupKey := "irregular-missed-check#" + date - dedupData, err := d.Store.GetSensorData(ctx, id, dedupKey) - if err != nil { - d.Logger.Error("dedup marker lookup failed for inclusion schedule", - "pipelineId", id, "date", date, "error", err) - continue - } - if dedupData != nil { - continue - } + // Check if a trigger exists for this inclusion date. + found, err := d.Store.HasTriggerForDate(ctx, id, scheduleID, date) + if err != nil { + d.Logger.Error("failed to check trigger for inclusion schedule", + "pipelineId", id, "date", date, "error", err) + continue + } + if found { + continue + } - alertDetail := map[string]interface{}{ - "source": "watchdog", - "actionHint": fmt.Sprintf("inclusion date %s expected to have a trigger — none found", date), - } - if err := publishEvent(ctx, d, string(types.EventIrregularScheduleMissed), id, scheduleID, date, - fmt.Sprintf("missed inclusion schedule for %s on %s", id, date), alertDetail); err != nil { - d.Logger.Warn("failed to publish irregular schedule missed event", "error", err, "pipeline", id, "date", date) - } + // Check dedup marker to avoid re-alerting on subsequent watchdog runs. + dedupKey := "irregular-missed-check#" + date + dedupData, err := d.Store.GetSensorData(ctx, id, dedupKey) + if err != nil { + d.Logger.Error("dedup marker lookup failed for inclusion schedule", + "pipelineId", id, "date", date, "error", err) + continue + } + if dedupData != nil { + continue + } - // Write dedup marker. - if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ - "alerted": "true", - }); err != nil { - d.Logger.Warn("failed to write inclusion dedup marker", "error", err, "pipeline", id, "date", date) - } + alertDetail := map[string]interface{}{ + "source": "watchdog", + "actionHint": fmt.Sprintf("inclusion date %s expected to have a trigger — none found", date), + } + if err := publishEvent(ctx, d, string(types.EventIrregularScheduleMissed), id, scheduleID, date, + fmt.Sprintf("missed inclusion schedule for %s on %s", id, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish irregular schedule missed event", "error", err, "pipeline", id, "date", date) + } - d.Logger.Info("detected missed inclusion schedule", - "pipelineId", id, - "schedule", scheduleID, - "date", date, - ) + // Write dedup marker. + if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ + "alerted": "true", + }); err != nil { + d.Logger.Warn("failed to write inclusion dedup marker", "error", err, "pipeline", id, "date", date) + } + + d.Logger.Info("detected missed inclusion schedule", + "pipelineId", id, + "schedule", scheduleID, + "date", date, + ) + } } return nil } @@ -630,12 +641,7 @@ func resolveTriggerDeadlineDate(cfg *types.PipelineConfig, now time.Time) string // Unlike handleSLACalculate, this does NOT roll forward when the time is past. // Returns zero time on parse errors. func resolveTriggerDeadlineTime(deadline, date, timezone string) time.Time { - loc := time.UTC - if timezone != "" { - if parsed, err := time.LoadLocation(timezone); err == nil { - loc = parsed - } - } + loc := resolveTimezone(timezone) if strings.HasPrefix(deadline, ":") { // Relative (hourly): ":MM" — deadline is in the NEXT hour after the @@ -931,6 +937,11 @@ func hasPostRunSensorUpdate(rules []types.ValidationRule, sensors map[string]map // detectRelativeSLABreaches checks pipelines with MaxDuration SLA config for // breaches. This is a defense-in-depth fallback: if the EventBridge Scheduler // fails to fire the relative SLA breach alert, the watchdog catches it. +// +// Both today and yesterday are checked because stream_router writes the +// first-sensor-arrival key using ResolveExecutionDate(), which for T+1 +// sensor-triggered pipelines produces yesterday's date. Checking both dates +// covers the cross-day boundary. func detectRelativeSLABreaches(ctx context.Context, d *Deps) error { configs, err := d.ConfigCache.GetAll(ctx) if err != nil { @@ -938,7 +949,10 @@ func detectRelativeSLABreaches(ctx context.Context, d *Deps) error { } now := d.now() - today := now.Format("2006-01-02") + datesToCheck := []string{ + now.Format("2006-01-02"), + now.AddDate(0, 0, -1).Format("2006-01-02"), + } for id, cfg := range configs { if cfg.SLA == nil || cfg.SLA.MaxDuration == "" { @@ -954,89 +968,98 @@ func detectRelativeSLABreaches(ctx context.Context, d *Deps) error { scheduleID := resolveScheduleID(cfg) - // Look up the first-sensor-arrival marker for today. - arrivalKey := "first-sensor-arrival#" + today - arrivalData, err := d.Store.GetSensorData(ctx, id, arrivalKey) - if err != nil { - d.Logger.Error("first-sensor-arrival lookup failed", - "pipelineId", id, "error", err) - continue - } - if arrivalData == nil { - continue + for _, checkDate := range datesToCheck { + checkRelativeSLAForDate(ctx, d, id, cfg, scheduleID, checkDate, maxDur, now) } + } + return nil +} - arrivedAtStr, ok := arrivalData["arrivedAt"].(string) - if !ok || arrivedAtStr == "" { - continue - } - arrivedAt, err := time.Parse(time.RFC3339, arrivedAtStr) - if err != nil { - d.Logger.Warn("invalid arrivedAt in first-sensor-arrival", - "pipelineId", id, "arrivedAt", arrivedAtStr, "error", err) - continue - } +// checkRelativeSLAForDate checks a single date for a relative SLA breach on +// the given pipeline. It looks up the first-sensor-arrival marker, verifies +// the breach window has elapsed, and publishes an alert if needed. +func checkRelativeSLAForDate(ctx context.Context, d *Deps, id string, cfg *types.PipelineConfig, scheduleID, checkDate string, maxDur time.Duration, now time.Time) { + arrivalKey := "first-sensor-arrival#" + checkDate + arrivalData, err := d.Store.GetSensorData(ctx, id, arrivalKey) + if err != nil { + d.Logger.Error("first-sensor-arrival lookup failed", + "pipelineId", id, "date", checkDate, "error", err) + return + } + if arrivalData == nil { + return + } - // Check if the relative SLA has been breached. - breachAt := arrivedAt.Add(maxDur) - if now.Before(breachAt) { - continue - } + arrivedAtStr, ok := arrivalData["arrivedAt"].(string) + if !ok || arrivedAtStr == "" { + return + } + arrivedAt, err := time.Parse(time.RFC3339, arrivedAtStr) + if err != nil { + d.Logger.Warn("invalid arrivedAt in first-sensor-arrival", + "pipelineId", id, "arrivedAt", arrivedAtStr, "error", err) + return + } - // Skip if pipeline already completed or permanently failed. - tr, err := d.Store.GetTrigger(ctx, id, scheduleID, today) - if err != nil { - d.Logger.Warn("trigger lookup failed in relative SLA check", - "pipelineId", id, "error", err) - continue - } - if tr != nil && (tr.Status == types.TriggerStatusCompleted || tr.Status == types.TriggerStatusFailedFinal) { - continue - } - if isJobTerminal(ctx, d, id, scheduleID, today) { - continue - } + // Check if the relative SLA has been breached. + breachAt := arrivedAt.Add(maxDur) + if now.Before(breachAt) { + return + } - // Check dedup marker to avoid re-alerting on subsequent watchdog runs. - dedupKey := "relative-sla-breach-check#" + today - dedupData, err := d.Store.GetSensorData(ctx, id, dedupKey) - if err != nil { - d.Logger.Error("dedup marker lookup failed for relative SLA breach", - "pipelineId", id, "error", err) - continue - } - if dedupData != nil { - continue - } + // Skip if pipeline already completed or permanently failed. + tr, err := d.Store.GetTrigger(ctx, id, scheduleID, checkDate) + if err != nil { + d.Logger.Warn("trigger lookup failed in relative SLA check", + "pipelineId", id, "date", checkDate, "error", err) + return + } + if tr != nil && (tr.Status == types.TriggerStatusCompleted || tr.Status == types.TriggerStatusFailedFinal) { + return + } + if isJobTerminal(ctx, d, id, scheduleID, checkDate) { + return + } - alertDetail := map[string]interface{}{ - "source": "watchdog", - "maxDuration": cfg.SLA.MaxDuration, - "sensorArrivalAt": arrivedAtStr, - "breachAt": breachAt.UTC().Format(time.RFC3339), - "actionHint": "relative SLA breached — pipeline has exceeded maxDuration since first sensor arrival", - } - if err := publishEvent(ctx, d, string(types.EventRelativeSLABreach), id, scheduleID, today, - fmt.Sprintf("relative SLA breach for %s on %s", id, today), alertDetail); err != nil { - d.Logger.Warn("failed to publish relative SLA breach event", - "error", err, "pipeline", id, "date", today) - } + // Check dedup marker to avoid re-alerting on subsequent watchdog runs. + // The dedup key includes checkDate to avoid cross-date collisions. + dedupKey := "relative-sla-breach-check#" + checkDate + dedupData, err := d.Store.GetSensorData(ctx, id, dedupKey) + if err != nil { + d.Logger.Error("dedup marker lookup failed for relative SLA breach", + "pipelineId", id, "date", checkDate, "error", err) + return + } + if dedupData != nil { + return + } - // Write dedup marker. - if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ - "alerted": "true", - }); err != nil { - d.Logger.Warn("failed to write relative SLA breach dedup marker", - "error", err, "pipeline", id, "date", today) - } + alertDetail := map[string]interface{}{ + "source": "watchdog", + "maxDuration": cfg.SLA.MaxDuration, + "sensorArrivalAt": arrivedAtStr, + "breachAt": breachAt.UTC().Format(time.RFC3339), + "actionHint": "relative SLA breached — pipeline has exceeded maxDuration since first sensor arrival", + } + if err := publishEvent(ctx, d, string(types.EventRelativeSLABreach), id, scheduleID, checkDate, + fmt.Sprintf("relative SLA breach for %s on %s", id, checkDate), alertDetail); err != nil { + d.Logger.Warn("failed to publish relative SLA breach event", + "error", err, "pipeline", id, "date", checkDate) + } - d.Logger.Info("detected relative SLA breach", - "pipelineId", id, - "schedule", scheduleID, - "date", today, - "sensorArrivalAt", arrivedAtStr, - "breachAt", breachAt.UTC().Format(time.RFC3339), - ) + // Write dedup marker. + if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ + "alerted": "true", + }); err != nil { + d.Logger.Warn("failed to write relative SLA breach dedup marker", + "error", err, "pipeline", id, "date", checkDate) } - return nil + + d.Logger.Info("detected relative SLA breach", + "pipelineId", id, + "schedule", scheduleID, + "date", checkDate, + "sensorArrivalAt", arrivedAtStr, + "breachAt", breachAt.UTC().Format(time.RFC3339), + ) } diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index f0bca79..982b6d1 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -2513,6 +2513,303 @@ func TestWatchdog_InclusionSchedule_AllFutureDates_NoAlert(t *testing.T) { } } +func TestWatchdog_InclusionSchedule_MultiDateGap(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Fix time so both 2026-03-05 and 2026-03-08 are in the past. + fixedNow := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-05", "2026-03-08", "2026-03-15"}, + }, + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + // No triggers seeded for either date — should detect BOTH as missed. + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // Expect IRREGULAR_SCHEDULE_MISSED events for both dates. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + + var missedDates []string + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + missedDates = append(missedDates, detail.Date) + } + } + assert.Len(t, missedDates, 2, "expected IRREGULAR_SCHEDULE_MISSED events for both past dates") + assert.Contains(t, missedDates, "2026-03-05") + assert.Contains(t, missedDates, "2026-03-08") +} + +// --------------------------------------------------------------------------- +// Inclusion schedule: Schedule.Time grace period +// --------------------------------------------------------------------------- + +func TestWatchdog_InclusionSchedule_RespectsScheduleTime(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Inclusion date is today. Schedule.Time is "10:00" but current time is 08:00 UTC. + // Should NOT fire an alert because we haven't reached the expected start time yet. + fixedNow := time.Date(2026, 3, 31, 8, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-31"}, + }, + Time: "10:00", + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // No IRREGULAR_SCHEDULE_MISSED events — too early. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + if detail.PipelineID == "monthly-close" { + t.Error("should not publish IRREGULAR_SCHEDULE_MISSED before Schedule.Time") + } + } + } +} + +func TestWatchdog_InclusionSchedule_PastScheduleTime(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Inclusion date is today. Schedule.Time is "10:00" and current time is 11:00 UTC. + // No trigger exists — should fire an alert. + fixedNow := time.Date(2026, 3, 31, 11, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-31"}, + }, + Time: "10:00", + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + + var irregularMissedCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) + } + } + assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") +} + +func TestWatchdog_InclusionSchedule_WithTimezone(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Inclusion date is today (2026-03-31). Schedule.Time is "10:00", + // Timezone is "Asia/Tokyo" (UTC+9). Current time is 01:30 UTC = 10:30 JST. + // Since 10:30 JST is past 10:00 JST and no trigger exists, should fire alert. + fixedNow := time.Date(2026, 3, 31, 1, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-31"}, + }, + Timezone: "Asia/Tokyo", + Time: "10:00", + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + + var irregularMissedCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) + } + } + assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") +} + +func TestWatchdog_InclusionSchedule_NoTimeConfig(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // Inclusion date is today. No Schedule.Time set. + // Should fire alert immediately (backward compatible). + fixedNow := time.Date(2026, 3, 31, 0, 5, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "monthly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-31"}, + }, + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + + var irregularMissedCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) + } + } + assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") +} + +// TestWatchdog_InclusionSchedule_TimezoneUTCDateMismatch verifies that the +// grace-period check uses the pipeline's timezone for "today", not UTC. +// Scenario: UTC is 2026-04-01 00:30, but US/Eastern is 2026-03-31 20:30. +// Inclusion date is 2026-03-31 with Time "10:00" Eastern. Since 20:30 > 10:00 +// in Eastern and no trigger exists, the alert should fire. +func TestWatchdog_InclusionSchedule_TimezoneUTCDateMismatch(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // UTC is April 1, but US/Eastern is still March 31. + fixedNow := time.Date(2026, 4, 1, 0, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "quarterly-close"}, + Schedule: types.ScheduleConfig{ + Include: &types.InclusionConfig{ + Dates: []string{"2026-03-31"}, + }, + Timezone: "America/New_York", + Time: "10:00", + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo run"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + + var irregularMissedCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "quarterly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) + } + } + assert.Equal(t, 1, irregularMissedCount, + "expected IRREGULAR_SCHEDULE_MISSED: UTC is April 1 but Eastern is still March 31 past 10:00") +} + // --------------------------------------------------------------------------- // Relative SLA breach detection (defense-in-depth) // --------------------------------------------------------------------------- @@ -2695,3 +2992,99 @@ func TestWatchdog_RelativeSLA_Breached_AlreadyCompleted_NoAlert(t *testing.T) { "should not publish RELATIVE_SLA_BREACH when pipeline already completed") } } + +func TestWatchdog_RelativeSLA_CrossDayArrival(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // It's now March 11 at 03:00 UTC. The sensor arrived yesterday (March 10) + // at 23:00 UTC. With maxDuration=2h the breach was at 01:00 UTC on March 11. + // stream_router wrote the arrival key under yesterday's date (T+1 pattern). + fixedNow := time.Date(2026, 3, 11, 3, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "t1-pipeline"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + MaxDuration: "2h", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo hello"}}, + } + seedConfig(mock, cfg) + + // Seed the arrival key under YESTERDAY's date (T-1), as stream_router does + // for T+1 sensor-triggered pipelines. + yesterday := fixedNow.AddDate(0, 0, -1).Format("2006-01-02") + seedSensor(mock, "t1-pipeline", "first-sensor-arrival#"+yesterday, map[string]interface{}{ + "arrivedAt": "2026-03-10T23:00:00Z", + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + var breachCount int + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventRelativeSLABreach) { + breachCount++ + } + } + assert.Equal(t, 1, breachCount, "expected one RELATIVE_SLA_BREACH event for cross-day sensor arrival") +} + +func TestWatchdog_RelativeSLA_CrossDayNotBreached(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + // It's now March 11 at 00:30 UTC. The sensor arrived yesterday (March 10) + // at 23:00 UTC. With maxDuration=2h the breach would be at 01:00 UTC on + // March 11, so we're NOT yet breached. + fixedNow := time.Date(2026, 3, 11, 0, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "t1-pipeline"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "upstream-complete", + Check: types.CheckEquals, + Field: "status", + Value: "ready", + }, + Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + MaxDuration: "2h", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "command", Config: map[string]interface{}{"command": "echo hello"}}, + } + seedConfig(mock, cfg) + + // Seed the arrival key under YESTERDAY's date (T-1). + yesterday := fixedNow.AddDate(0, 0, -1).Format("2006-01-02") + seedSensor(mock, "t1-pipeline", "first-sensor-arrival#"+yesterday, map[string]interface{}{ + "arrivedAt": "2026-03-10T23:00:00Z", + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventRelativeSLABreach), *ev.Entries[0].DetailType, + "should not publish RELATIVE_SLA_BREACH before maxDuration is exceeded") + } +} From c4f0dce3964c78e5529a72bde6b3a0373041fbf8 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Wed, 11 Mar 2026 00:07:32 +0700 Subject: [PATCH 3/4] fix: gofmt and gocritic lint issues Fix gofmt formatting in sla_monitor_test.go. Invert if-condition pattern to reduce nesting in watchdog_test.go event assertion loops (gocritic nestingReduce). --- internal/lambda/sla_monitor_test.go | 12 +-- internal/lambda/watchdog_test.go | 160 ++++++++++++++++------------ 2 files changed, 95 insertions(+), 77 deletions(-) diff --git a/internal/lambda/sla_monitor_test.go b/internal/lambda/sla_monitor_test.go index 96e103d..4c0bf17 100644 --- a/internal/lambda/sla_monitor_test.go +++ b/internal/lambda/sla_monitor_test.go @@ -251,10 +251,10 @@ func TestSLAMonitor_FireAlert_RelativeSLAWarning(t *testing.T) { } ebMock := &mockEventBridge{} d := &lambda.Deps{ - Store: s, - EventBridge: ebMock, + Store: s, + EventBridge: ebMock, EventBusName: "interlock-bus", - Logger: slog.Default(), + Logger: slog.Default(), } out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ @@ -284,10 +284,10 @@ func TestSLAMonitor_FireAlert_RelativeSLABreach(t *testing.T) { } ebMock := &mockEventBridge{} d := &lambda.Deps{ - Store: s, - EventBridge: ebMock, + Store: s, + EventBridge: ebMock, EventBusName: "interlock-bus", - Logger: slog.Default(), + Logger: slog.Default(), } out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index 982b6d1..c93c8ad 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -270,9 +270,10 @@ func TestWatchdog_Reconcile_TriggersRecovery(t *testing.T) { defer ebMock.mu.Unlock() var recoveredCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventTriggerRecovered) { - recoveredCount++ + if *ev.Entries[0].DetailType != string(types.EventTriggerRecovered) { + continue } + recoveredCount++ } assert.Equal(t, 1, recoveredCount, "expected one TRIGGER_RECOVERED event") } @@ -465,9 +466,10 @@ func TestWatchdog_Reconcile_PerHour_MultipleRecoveries(t *testing.T) { defer ebMock.mu.Unlock() var recoveredCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventTriggerRecovered) { - recoveredCount++ + if *ev.Entries[0].DetailType != string(types.EventTriggerRecovered) { + continue } + recoveredCount++ } assert.Equal(t, 2, recoveredCount, "expected two TRIGGER_RECOVERED events") } @@ -521,9 +523,10 @@ func TestWatchdog_Reconcile_PerHour_PartiallyTriggered(t *testing.T) { defer ebMock.mu.Unlock() var recoveredCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventTriggerRecovered) { - recoveredCount++ + if *ev.Entries[0].DetailType != string(types.EventTriggerRecovered) { + continue } + recoveredCount++ } assert.Equal(t, 1, recoveredCount, "expected one TRIGGER_RECOVERED event for T11 only") } @@ -600,9 +603,10 @@ func TestWatchdog_MissedSchedule_AlertsPostDeployment(t *testing.T) { defer ebMock.mu.Unlock() var missedCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventScheduleMissed) { - missedCount++ + if *ev.Entries[0].DetailType != string(types.EventScheduleMissed) { + continue } + missedCount++ } assert.Equal(t, 1, missedCount, "expected one SCHEDULE_MISSED event for post-deployment schedule") } @@ -1117,13 +1121,14 @@ func TestWatchdog_MissedSchedule_BeforeScheduleTime_NoAlert(t *testing.T) { ebMock.mu.Lock() defer ebMock.mu.Unlock() for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventScheduleMissed) { - detailJSON := *ev.Entries[0].Detail - var evt types.InterlockEvent - _ = json.Unmarshal([]byte(detailJSON), &evt) - if evt.PipelineID == "bronze-late" { - t.Error("should not publish SCHEDULE_MISSED when current time is before Schedule.Time") - } + if *ev.Entries[0].DetailType != string(types.EventScheduleMissed) { + continue + } + detailJSON := *ev.Entries[0].Detail + var evt types.InterlockEvent + _ = json.Unmarshal([]byte(detailJSON), &evt) + if evt.PipelineID == "bronze-late" { + t.Error("should not publish SCHEDULE_MISSED when current time is before Schedule.Time") } } } @@ -1360,9 +1365,10 @@ func TestWatchdog_Reconcile_ProceedsOnNonTerminalJoblog(t *testing.T) { defer ebMock.mu.Unlock() var recovered int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventTriggerRecovered) { - recovered++ + if *ev.Entries[0].DetailType != string(types.EventTriggerRecovered) { + continue } + recovered++ } assert.Equal(t, 1, recovered, "should publish TRIGGER_RECOVERED for non-terminal joblog") } @@ -1621,10 +1627,11 @@ func TestWatchdog_ScheduleSLAAlerts_SensorDeadlineExpired_WritesFAILEDFINAL(t *t defer ebMock.mu.Unlock() var found bool for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventSensorDeadlineExpired) { - found = true - break + if *ev.Entries[0].DetailType != string(types.EventSensorDeadlineExpired) { + continue } + found = true + break } assert.True(t, found, "expected SENSOR_DEADLINE_EXPIRED event") } @@ -1801,10 +1808,11 @@ func TestWatchdog_ScheduleSLAAlerts_DailySensorDeadlineExpired_WritesFAILEDFINAL defer ebMock.mu.Unlock() var found bool for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventSensorDeadlineExpired) { - found = true - break + if *ev.Entries[0].DetailType != string(types.EventSensorDeadlineExpired) { + continue } + found = true + break } assert.True(t, found, "expected SENSOR_DEADLINE_EXPIRED event for daily pipeline") } @@ -1885,9 +1893,10 @@ func TestWatchdog_PostRunSensorMissing(t *testing.T) { defer ebMock.mu.Unlock() var missingCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventPostRunSensorMissing) { - missingCount++ + if *ev.Entries[0].DetailType != string(types.EventPostRunSensorMissing) { + continue } + missingCount++ } assert.Equal(t, 1, missingCount, "expected one POST_RUN_SENSOR_MISSING event") @@ -2419,15 +2428,16 @@ func TestWatchdog_InclusionSchedule_TriggerMissing_PublishesAlert(t *testing.T) var irregularMissedCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - irregularMissedCount++ - - // Verify event detail contains the expected pipeline and date. - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - assert.Equal(t, "monthly-close", detail.PipelineID) - assert.Equal(t, "2026-03-05", detail.Date) + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue } + irregularMissedCount++ + + // Verify event detail contains the expected pipeline and date. + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-05", detail.Date) } assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") } @@ -2550,11 +2560,12 @@ func TestWatchdog_InclusionSchedule_MultiDateGap(t *testing.T) { var missedDates []string for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - missedDates = append(missedDates, detail.Date) + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue } + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + missedDates = append(missedDates, detail.Date) } assert.Len(t, missedDates, 2, "expected IRREGULAR_SCHEDULE_MISSED events for both past dates") assert.Contains(t, missedDates, "2026-03-05") @@ -2601,12 +2612,13 @@ func TestWatchdog_InclusionSchedule_RespectsScheduleTime(t *testing.T) { ebMock.mu.Lock() defer ebMock.mu.Unlock() for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - if detail.PipelineID == "monthly-close" { - t.Error("should not publish IRREGULAR_SCHEDULE_MISSED before Schedule.Time") - } + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue + } + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + if detail.PipelineID == "monthly-close" { + t.Error("should not publish IRREGULAR_SCHEDULE_MISSED before Schedule.Time") } } } @@ -2648,13 +2660,14 @@ func TestWatchdog_InclusionSchedule_PastScheduleTime(t *testing.T) { var irregularMissedCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - irregularMissedCount++ - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - assert.Equal(t, "monthly-close", detail.PipelineID) - assert.Equal(t, "2026-03-31", detail.Date) + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue } + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) } assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") } @@ -2698,13 +2711,14 @@ func TestWatchdog_InclusionSchedule_WithTimezone(t *testing.T) { var irregularMissedCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - irregularMissedCount++ - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - assert.Equal(t, "monthly-close", detail.PipelineID) - assert.Equal(t, "2026-03-31", detail.Date) + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue } + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) } assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") } @@ -2745,13 +2759,14 @@ func TestWatchdog_InclusionSchedule_NoTimeConfig(t *testing.T) { var irregularMissedCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - irregularMissedCount++ - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - assert.Equal(t, "monthly-close", detail.PipelineID) - assert.Equal(t, "2026-03-31", detail.Date) + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue } + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "monthly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) } assert.Equal(t, 1, irregularMissedCount, "expected exactly one IRREGULAR_SCHEDULE_MISSED event") } @@ -2798,13 +2813,14 @@ func TestWatchdog_InclusionSchedule_TimezoneUTCDateMismatch(t *testing.T) { var irregularMissedCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventIrregularScheduleMissed) { - irregularMissedCount++ - var detail types.InterlockEvent - _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) - assert.Equal(t, "quarterly-close", detail.PipelineID) - assert.Equal(t, "2026-03-31", detail.Date) + if *ev.Entries[0].DetailType != string(types.EventIrregularScheduleMissed) { + continue } + irregularMissedCount++ + var detail types.InterlockEvent + _ = json.Unmarshal([]byte(*ev.Entries[0].Detail), &detail) + assert.Equal(t, "quarterly-close", detail.PipelineID) + assert.Equal(t, "2026-03-31", detail.Date) } assert.Equal(t, 1, irregularMissedCount, "expected IRREGULAR_SCHEDULE_MISSED: UTC is April 1 but Eastern is still March 31 past 10:00") @@ -2937,9 +2953,10 @@ func TestWatchdog_RelativeSLA_SensorArrival_Breached(t *testing.T) { defer ebMock.mu.Unlock() var breachCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventRelativeSLABreach) { - breachCount++ + if *ev.Entries[0].DetailType != string(types.EventRelativeSLABreach) { + continue } + breachCount++ } assert.Equal(t, 1, breachCount, "expected one RELATIVE_SLA_BREACH event") } @@ -3036,9 +3053,10 @@ func TestWatchdog_RelativeSLA_CrossDayArrival(t *testing.T) { defer ebMock.mu.Unlock() var breachCount int for _, ev := range ebMock.events { - if *ev.Entries[0].DetailType == string(types.EventRelativeSLABreach) { - breachCount++ + if *ev.Entries[0].DetailType != string(types.EventRelativeSLABreach) { + continue } + breachCount++ } assert.Equal(t, 1, breachCount, "expected one RELATIVE_SLA_BREACH event for cross-day sensor arrival") } From 0df62008a1cc88f90d697b391deac1d67fa0bb3e Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Wed, 11 Mar 2026 18:53:32 +0700 Subject: [PATCH 4/4] feat: detect Glue false-success via driver log ERROR/FATAL markers Glue can report SUCCEEDED when Spark actually failed (driver exits 0 despite exception). Add Check 2 to verifyGlueRCA: scan driver output stream for log4j ERROR/FATAL severity markers. Automatically catches any error the application logs without maintaining a list of specific failure patterns. Driver stdout (not stderr) avoids JVM noise false positives. --- CHANGELOG.md | 1 + internal/trigger/glue.go | 37 +++++++- internal/trigger/glue_test.go | 173 ++++++++++++++++++++++++++++++++-- 3 files changed, 198 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 492b630..00e68ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Watchdog defense-in-depth for relative SLA** — `detectRelativeSLABreaches` scans pipelines with `maxDuration` config and fires `RELATIVE_SLA_BREACH` if the EventBridge scheduler failed to fire. - **`WriteSensorIfAbsent` store method** — conditional PutItem that only writes if the key doesn't exist, used for first-sensor-arrival idempotency. - **Config validation** for new fields: cron/include mutual exclusion, inclusion date format (YYYY-MM-DD), maxDuration format and 24h cap, maxDuration requires trigger. +- **Glue false-success detection** — `verifyGlueRCA` now checks both the RCA insight stream (Check 1) and the driver output stream for ERROR/FATAL log4j severity markers (Check 2). Catches Spark failures that Glue reports as SUCCEEDED when the application framework swallows exceptions. ### Changed diff --git a/internal/trigger/glue.go b/internal/trigger/glue.go index 9b9be78..b78999d 100644 --- a/internal/trigger/glue.go +++ b/internal/trigger/glue.go @@ -102,10 +102,21 @@ func (r *Runner) checkGlueStatus(ctx context.Context, metadata map[string]interf } } -// verifyGlueRCA checks the RCA (root cause analysis) log stream for a Glue -// job run to detect false successes. Glue can report SUCCEEDED when the Spark -// job actually failed (driver exits 0 despite SparkException). The RCA stream -// contains GlueExceptionAnalysisJobFailed events for these cases. +// driverOutputFilterPattern matches ERROR or FATAL log4j severity markers in +// the Glue driver output (stdout) stream. The surrounding spaces ensure we +// match log4j format ("timestamp ERROR class - message") and avoid false +// positives from JVM flags like -XX:OnOutOfMemoryError. +// +// This is intentionally scoped to the driver output stream (runID), NOT the +// stderr stream (runID-driver-stderr) which contains benign JVM noise. +const driverOutputFilterPattern = `?" ERROR " ?" FATAL "` + +// verifyGlueRCA checks CloudWatch log streams for a Glue job run to detect +// false successes. Glue can report SUCCEEDED when the Spark job actually +// failed (driver exits 0 despite SparkException). +// +// Check 1: RCA stream — looks for GlueExceptionAnalysisJobFailed events. +// Check 2: Driver output stream — looks for ERROR/FATAL log4j severity markers. // // Returns (true, reason) if failure evidence is found. Returns (false, "") on // any error or if no failure is found. @@ -119,8 +130,9 @@ func (r *Runner) verifyGlueRCA(ctx context.Context, runID string, logGroupName * if logGroupName != nil && *logGroupName != "" { logGroup = *logGroupName } - rcaStream := runID + "-job-insights-rca-driver" + // Check 1: RCA stream for GlueExceptionAnalysisJobFailed events. + rcaStream := runID + "-job-insights-rca-driver" out, err := client.FilterLogEvents(ctx, &cloudwatchlogs.FilterLogEventsInput{ LogGroupName: &logGroup, LogStreamNames: []string{rcaStream}, @@ -135,6 +147,21 @@ func (r *Runner) verifyGlueRCA(ctx context.Context, runID string, logGroupName * return true, "RCA: JobFailed" } + // Check 2: Driver output stream for ERROR/FATAL log4j severity markers. + dout, derr := client.FilterLogEvents(ctx, &cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: &logGroup, + LogStreamNames: []string{runID}, + FilterPattern: aws.String(driverOutputFilterPattern), + Limit: aws.Int32(1), + }) + if derr == nil && len(dout.Events) > 0 { + msg := aws.ToString(dout.Events[0].Message) + if len(msg) > 200 { + msg = msg[:200] + } + return true, "driver log: " + msg + } + return false, "" } diff --git a/internal/trigger/glue_test.go b/internal/trigger/glue_test.go index 21704ba..9fb7c26 100644 --- a/internal/trigger/glue_test.go +++ b/internal/trigger/glue_test.go @@ -125,9 +125,24 @@ func TestCheckGlueStatus_MissingMetadata(t *testing.T) { type mockCWLogsClient struct { filterOut *cloudwatchlogs.FilterLogEventsOutput filterErr error + // streamResponses maps a log stream name to a specific response. + // When set, FilterLogEvents returns the response matching the first + // stream name in the request; it falls back to filterOut/filterErr + // for unmatched streams. + streamResponses map[string]mockCWLogsResponse +} + +type mockCWLogsResponse struct { + out *cloudwatchlogs.FilterLogEventsOutput + err error } func (m *mockCWLogsClient) FilterLogEvents(ctx context.Context, params *cloudwatchlogs.FilterLogEventsInput, optFns ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.FilterLogEventsOutput, error) { + if m.streamResponses != nil && len(params.LogStreamNames) > 0 { + if resp, ok := m.streamResponses[params.LogStreamNames[0]]; ok { + return resp.out, resp.err + } + } return m.filterOut, m.filterErr } @@ -251,8 +266,9 @@ func TestCheckGlueStatus_RCAUsesJobRunLogGroup(t *testing.T) { "glue_job_run_id": "jr_abc123", }) require.NoError(t, err) - require.Len(t, capturedLogGroups, 1) + require.Len(t, capturedLogGroups, 2, "both RCA and driver output checks should use custom log group") assert.Equal(t, customLogGroup, capturedLogGroups[0], "RCA check should use custom log group") + assert.Equal(t, customLogGroup, capturedLogGroups[1], "driver output check should use custom log group") } type capturingCWLogsClient struct { @@ -267,11 +283,10 @@ func (c *capturingCWLogsClient) FilterLogEvents(ctx context.Context, params *clo return c.delegate.FilterLogEvents(ctx, params, optFns...) } -func TestCheckGlueStatus_RCAOnlyCheck(t *testing.T) { - // Verify that only the RCA check is performed (no error log group check). - // The error log group check was removed because Glue's stderr always - // contains benign JVM startup output with "Error" in class names, - // causing 100% false positive rate. +func TestCheckGlueStatus_RCAAndDriverOutputChecks(t *testing.T) { + // Verify that both the RCA check and driver output check are performed, + // but NOT the error log group check (removed in PR #60 due to 100% + // false positive rate from benign JVM startup stderr). glueClient := &mockGlueClient{ getOut: &glue.GetJobRunOutput{ JobRun: &gluetypes.JobRun{ @@ -298,10 +313,152 @@ func TestCheckGlueStatus_RCAOnlyCheck(t *testing.T) { require.NoError(t, err) assert.Equal(t, RunCheckSucceeded, result.State) - // Only one FilterLogEvents call (RCA check), no error log group check. - require.Len(t, capturedInputs, 1) + // Two FilterLogEvents calls: RCA check + driver output check. + require.Len(t, capturedInputs, 2) + + // First call: RCA stream check. assert.Equal(t, "/aws-glue/jobs/logs-v2", aws.ToString(capturedInputs[0].LogGroupName)) + assert.Equal(t, []string{"jr_abc123-job-insights-rca-driver"}, capturedInputs[0].LogStreamNames) assert.Contains(t, aws.ToString(capturedInputs[0].FilterPattern), "GlueExceptionAnalysisJobFailed") + + // Second call: driver output stream check. + assert.Equal(t, "/aws-glue/jobs/logs-v2", aws.ToString(capturedInputs[1].LogGroupName)) + assert.Equal(t, []string{"jr_abc123"}, capturedInputs[1].LogStreamNames) + assert.Contains(t, aws.ToString(capturedInputs[1].FilterPattern), `" ERROR "`) + assert.Contains(t, aws.ToString(capturedInputs[1].FilterPattern), `" FATAL "`) +} + +func TestCheckGlueStatus_DriverLogDetectsError(t *testing.T) { + // Glue reports SUCCEEDED, RCA stream is empty, but driver output stream + // contains an ERROR log4j line → should detect as FAILED. + glueClient := &mockGlueClient{ + getOut: &glue.GetJobRunOutput{ + JobRun: &gluetypes.JobRun{ + JobRunState: gluetypes.JobRunStateSucceeded, + }, + }, + } + cwClient := &mockCWLogsClient{ + streamResponses: map[string]mockCWLogsResponse{ + "jr_abc123-job-insights-rca-driver": { + out: &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, + }, + "jr_abc123": { + out: &cloudwatchlogs.FilterLogEventsOutput{ + Events: []cwltypes.FilteredLogEvent{ + {Message: aws.String("2026-01-15 10:00:00,000 ERROR org.apache.spark.sql.FileFormatWriter: Aborting job abc123. java.io.IOException: No space left on device")}, + }, + }, + }, + }, + } + + r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient)) + result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{ + "glue_job_name": "my-job", + "glue_job_run_id": "jr_abc123", + }) + require.NoError(t, err) + assert.Equal(t, RunCheckFailed, result.State) + assert.Contains(t, result.Message, "driver log") + assert.Equal(t, types.FailureTransient, result.FailureCategory) +} + +func TestCheckGlueStatus_DriverLogDetectsFatal(t *testing.T) { + // Glue reports SUCCEEDED, RCA empty, driver output has a FATAL log4j + // line → should detect as FAILED. + glueClient := &mockGlueClient{ + getOut: &glue.GetJobRunOutput{ + JobRun: &gluetypes.JobRun{ + JobRunState: gluetypes.JobRunStateSucceeded, + }, + }, + } + cwClient := &mockCWLogsClient{ + streamResponses: map[string]mockCWLogsResponse{ + "jr_abc123-job-insights-rca-driver": { + out: &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, + }, + "jr_abc123": { + out: &cloudwatchlogs.FilterLogEventsOutput{ + Events: []cwltypes.FilteredLogEvent{ + {Message: aws.String("2026-01-15 10:00:00,000 FATAL org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception")}, + }, + }, + }, + }, + } + + r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient)) + result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{ + "glue_job_name": "my-job", + "glue_job_run_id": "jr_abc123", + }) + require.NoError(t, err) + assert.Equal(t, RunCheckFailed, result.State) + assert.Contains(t, result.Message, "driver log") + assert.Contains(t, result.Message, "FATAL") + assert.Equal(t, types.FailureTransient, result.FailureCategory) +} + +func TestCheckGlueStatus_DriverLogClean(t *testing.T) { + // Glue SUCCEEDED, RCA empty, driver log empty → SUCCEEDED (no false positive). + glueClient := &mockGlueClient{ + getOut: &glue.GetJobRunOutput{ + JobRun: &gluetypes.JobRun{ + JobRunState: gluetypes.JobRunStateSucceeded, + }, + }, + } + cwClient := &mockCWLogsClient{ + streamResponses: map[string]mockCWLogsResponse{ + "jr_abc123-job-insights-rca-driver": { + out: &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, + }, + "jr_abc123": { + out: &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, + }, + }, + } + + r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient)) + result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{ + "glue_job_name": "my-job", + "glue_job_run_id": "jr_abc123", + }) + require.NoError(t, err) + assert.Equal(t, RunCheckSucceeded, result.State) + assert.Equal(t, "SUCCEEDED", result.Message) +} + +func TestCheckGlueStatus_DriverLogErrorFallsThrough(t *testing.T) { + // Glue SUCCEEDED, RCA empty, driver log query returns error → + // should fall through to SUCCEEDED (graceful degradation). + glueClient := &mockGlueClient{ + getOut: &glue.GetJobRunOutput{ + JobRun: &gluetypes.JobRun{ + JobRunState: gluetypes.JobRunStateSucceeded, + }, + }, + } + cwClient := &mockCWLogsClient{ + streamResponses: map[string]mockCWLogsResponse{ + "jr_abc123-job-insights-rca-driver": { + out: &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, + }, + "jr_abc123": { + err: fmt.Errorf("ResourceNotFoundException: log stream not found"), + }, + }, + } + + r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient)) + result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{ + "glue_job_name": "my-job", + "glue_job_run_id": "jr_abc123", + }) + require.NoError(t, err) + assert.Equal(t, RunCheckSucceeded, result.State, "should fall through to SUCCEEDED when driver log check fails") } func TestExtractGlueFailureReason(t *testing.T) {