diff --git a/CHANGELOG.md b/CHANGELOG.md index 0853dea..d0ac6c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.3] - 2026-03-10 + +### Added + +- **Configurable drift detection field** (`PostRunConfig.DriftField`) — specifies which sensor field to compare for post-run drift detection. Defaults to `"sensor_count"` for backward compatibility. Fixes broken drift detection when sensors use a different field name (e.g., `"count"`). + +### Fixed + +- **Post-run drift detection never fired when sensor field was `count`** — drift comparison was hardcoded to `"sensor_count"` but bronze consumers write `"count"` in hourly-status sensors, causing `ExtractFloat` to return 0 for both baseline and current values. The `prevCount > 0 && currCount > 0` guard silently suppressed all drift detection. +- **Two flaky time-sensitive tests** — `TestSLAMonitor_Reconcile_PastWarningFutureBreach` and `TestWatchdog_MissedSchedule_DetailFields` used real wall-clock time instead of injected `NowFunc`, causing failures depending on time of day. + ## [0.7.2] - 2026-03-08 ### Added @@ -343,6 +354,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline Released under the [Elastic License 2.0](LICENSE). +[0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3 [0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2 [0.7.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.1 [0.7.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.0 diff --git a/internal/lambda/e2e_test.go b/internal/lambda/e2e_test.go index 54360b8..a7a8b50 100644 --- a/internal/lambda/e2e_test.go +++ b/internal/lambda/e2e_test.go @@ -791,6 +791,70 @@ func TestE2E_PostRunMonitoring(t *testing.T) { assertAlertFormats(t, eb) }) + t.Run("DriftDetected_CustomField", func(t *testing.T) { + mock := newMockDDB() + tr := &mockTriggerRunner{} + sc := &mockStatusPoller{seq: []lambda.StatusResult{{State: "succeeded"}}} + d, _, eb := buildE2EDeps(mock, tr, sc) + + cfg := e2ePipeline("pipe-b2-cf") + cfg.PostRun = &types.PostRunConfig{ + Evaluation: &types.EvaluationWindow{Window: "10m", Interval: "5m"}, + Rules: []types.ValidationRule{{Key: "audit-result", Check: types.CheckExists}}, + DriftField: "count", + } + + r := runSFN(t, ctx, d, mock, eb, e2eScenario{ + pipeline: cfg, + sensors: map[string]map[string]interface{}{ + "upstream-complete": {"status": "ready"}, + "audit-result": {"count": float64(100)}, + }, + postRunSensorUpdates: []map[string]map[string]interface{}{ + {}, // iteration 0: no updates; baseline saved + {"audit-result": {"count": float64(120), "date": "2026-03-07"}}, // iteration 1: count changed + }, + }) + + assert.Equal(t, sfnDone, r.terminal) + assert.Contains(t, r.events, "POST_RUN_DRIFT") + assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-b2-cf")) + assert.True(t, hasRerunRequest(mock, "pipe-b2-cf"), "drift on custom field should write RERUN_REQUEST") + assertAlertFormats(t, eb) + }) + + t.Run("DriftDetected_DefaultField", func(t *testing.T) { + mock := newMockDDB() + tr := &mockTriggerRunner{} + sc := &mockStatusPoller{seq: []lambda.StatusResult{{State: "succeeded"}}} + d, _, eb := buildE2EDeps(mock, tr, sc) + + cfg := e2ePipeline("pipe-b2-df") + cfg.PostRun = &types.PostRunConfig{ + Evaluation: &types.EvaluationWindow{Window: "10m", Interval: "5m"}, + Rules: []types.ValidationRule{{Key: "audit-result", Check: types.CheckExists}}, + // DriftField intentionally left empty — should default to "sensor_count" + } + + r := runSFN(t, ctx, d, mock, eb, e2eScenario{ + pipeline: cfg, + sensors: map[string]map[string]interface{}{ + "upstream-complete": {"status": "ready"}, + "audit-result": {"sensor_count": float64(100)}, + }, + postRunSensorUpdates: []map[string]map[string]interface{}{ + {}, // iteration 0: no updates; baseline saved + {"audit-result": {"sensor_count": float64(120), "date": "2026-03-07"}}, // iteration 1: count changed + }, + }) + + assert.Equal(t, sfnDone, r.terminal) + assert.Contains(t, r.events, "POST_RUN_DRIFT") + assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-b2-df")) + assert.True(t, hasRerunRequest(mock, "pipe-b2-df"), "drift on default field should write RERUN_REQUEST") + assertAlertFormats(t, eb) + }) + t.Run("WindowExhausted", func(t *testing.T) { mock := newMockDDB() tr := &mockTriggerRunner{} @@ -2074,6 +2138,37 @@ func TestE2E_PostRunInflight(t *testing.T) { assertAlertFormats(t, eb) }) + t.Run("InflightDrift_CustomField", func(t *testing.T) { + mock := newMockDDB() + tr := &mockTriggerRunner{} + sc := &mockStatusPoller{} + d, sfnM, eb := buildE2EDeps(mock, tr, sc) + + cfg := e2ePipeline("pipe-inf-cf") + cfg.PostRun = &types.PostRunConfig{ + Rules: []types.ValidationRule{{Key: "audit-result", Check: types.CheckExists}}, + DriftField: "count", + } + seedConfig(mock, cfg) + seedTriggerWithStatus(mock, "pipe-inf-cf", "2026-03-07", types.TriggerStatusRunning) + + // Baseline uses custom field "count". + require.NoError(t, d.Store.WriteSensor(ctx, "pipe-inf-cf", "postrun-baseline#2026-03-07", + map[string]interface{}{"count": float64(100)})) + + // Sensor arrives with different count while job is running. + record := makeSensorRecord("pipe-inf-cf", "audit-result", toStreamAttributes(map[string]interface{}{ + "count": float64(200), + "date": "2026-03-07", + })) + require.NoError(t, lambda.HandleStreamEvent(ctx, d, lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}})) + + assert.Contains(t, collectEventTypes(eb), "POST_RUN_DRIFT_INFLIGHT") + assert.False(t, hasRerunRequest(mock, "pipe-inf-cf"), "should NOT write rerun request while running") + assert.Equal(t, 0, countSFNExecutions(sfnM)) + assertAlertFormats(t, eb) + }) + t.Run("NoDriftWhileRunning_NoEvent", func(t *testing.T) { mock := newMockDDB() tr := &mockTriggerRunner{} diff --git a/internal/lambda/postrun.go b/internal/lambda/postrun.go index 3b0c317..d7c6810 100644 --- a/internal/lambda/postrun.go +++ b/internal/lambda/postrun.go @@ -10,6 +10,17 @@ import ( "github.com/dwsmith1983/interlock/pkg/types" ) +// defaultDriftField is the sensor field used for drift comparison when +// PostRunConfig.DriftField is not set. +const defaultDriftField = "sensor_count" + +func resolveDriftField(cfg *types.PostRunConfig) string { + if cfg.DriftField != "" { + return cfg.DriftField + } + return defaultDriftField +} + // matchesPostRunRule returns true if the sensor key matches any post-run rule key // (prefix match to support per-period sensor keys). func matchesPostRunRule(sensorKey string, rules []types.ValidationRule) bool { @@ -66,8 +77,9 @@ func handlePostRunInflight(ctx context.Context, d *Deps, cfg *types.PipelineConf return nil // No baseline yet — job hasn't completed once. } - prevCount := ExtractFloat(baseline, "sensor_count") - currCount := ExtractFloat(sensorData, "sensor_count") + driftField := resolveDriftField(cfg.PostRun) + prevCount := ExtractFloat(baseline, driftField) + currCount := ExtractFloat(sensorData, driftField) threshold := 0.0 if cfg.PostRun.DriftThreshold != nil { threshold = *cfg.PostRun.DriftThreshold @@ -78,7 +90,9 @@ func handlePostRunInflight(ctx context.Context, d *Deps, cfg *types.PipelineConf map[string]interface{}{ "previousCount": prevCount, "currentCount": currCount, + "delta": currCount - prevCount, "driftThreshold": threshold, + "driftField": driftField, "sensorKey": sensorKey, "source": "post-run-stream", }); err != nil { @@ -101,8 +115,9 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon // Check for data drift if baseline exists. if baseline != nil { - prevCount := ExtractFloat(baseline, "sensor_count") - currCount := ExtractFloat(sensorData, "sensor_count") + driftField := resolveDriftField(cfg.PostRun) + prevCount := ExtractFloat(baseline, driftField) + currCount := ExtractFloat(sensorData, driftField) threshold := 0.0 if cfg.PostRun.DriftThreshold != nil { threshold = *cfg.PostRun.DriftThreshold @@ -116,6 +131,7 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon "currentCount": currCount, "delta": delta, "driftThreshold": threshold, + "driftField": driftField, "source": "post-run-stream", }); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPostRunDrift, "error", err) diff --git a/internal/lambda/sla_monitor_test.go b/internal/lambda/sla_monitor_test.go index b3d4a75..59aef85 100644 --- a/internal/lambda/sla_monitor_test.go +++ b/internal/lambda/sla_monitor_test.go @@ -1405,16 +1405,18 @@ func TestSLAScheduleName_Deterministic(t *testing.T) { func TestSLAMonitor_Reconcile_PastWarningFutureBreach(t *testing.T) { eb := &mockEventBridge{} + fixedNow := time.Date(2026, 3, 10, 12, 0, 0, 0, time.UTC) d := &lambda.Deps{ EventBridge: eb, EventBusName: "test-bus", Logger: slog.Default(), + NowFunc: func() time.Time { return fixedNow }, } // We need warning in the past and breach in the future. // Use tomorrow's date with deadline "23:59" and a 48h expected duration. // Breach = tomorrow 23:59 (future); warning = yesterday 23:59 (past). - tomorrow := time.Now().Add(24 * time.Hour).Format("2006-01-02") + tomorrow := fixedNow.Add(24 * time.Hour).Format("2006-01-02") out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ Mode: "reconcile", PipelineID: "silver-cdr-hour", diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index b48b0dd..baf8463 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -1037,6 +1037,7 @@ func TestWatchdog_StaleTrigger_DetailFields(t *testing.T) { func TestWatchdog_MissedSchedule_DetailFields(t *testing.T) { mock := newMockDDB() d, _, ebMock := testDeps(mock) + d.NowFunc = func() time.Time { return time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) } // Push StartedAt far into the past so the pre-deployment filter never skips. d.StartedAt = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) diff --git a/pkg/types/pipeline.go b/pkg/types/pipeline.go index 57dbd74..9bf09ea 100644 --- a/pkg/types/pipeline.go +++ b/pkg/types/pipeline.go @@ -111,4 +111,5 @@ type PostRunConfig struct { Rules []ValidationRule `yaml:"rules" json:"rules"` SensorTimeout string `yaml:"sensorTimeout,omitempty" json:"sensorTimeout,omitempty"` // e.g. "2h"; default 2h DriftThreshold *float64 `yaml:"driftThreshold,omitempty" json:"driftThreshold,omitempty"` // minimum absolute delta to trigger drift; default 0 (any change) + DriftField string `yaml:"driftField,omitempty" json:"driftField,omitempty"` // sensor field for drift comparison; default "sensor_count" }