Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.3] - 2026-03-10

### Added

- **Configurable drift detection field** (`PostRunConfig.DriftField`) — specifies which sensor field to compare for post-run drift detection. Defaults to `"sensor_count"` for backward compatibility. Fixes broken drift detection when sensors use a different field name (e.g., `"count"`).

### Fixed

- **Post-run drift detection never fired when sensor field was `count`** — drift comparison was hardcoded to `"sensor_count"` but bronze consumers write `"count"` in hourly-status sensors, causing `ExtractFloat` to return 0 for both baseline and current values. The `prevCount > 0 && currCount > 0` guard silently suppressed all drift detection.
- **Two flaky time-sensitive tests** — `TestSLAMonitor_Reconcile_PastWarningFutureBreach` and `TestWatchdog_MissedSchedule_DetailFields` used real wall-clock time instead of injected `NowFunc`, causing failures depending on time of day.

## [0.7.2] - 2026-03-08

### Added
Expand Down Expand Up @@ -343,6 +354,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline

Released under the [Elastic License 2.0](LICENSE).

[0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3
[0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2
[0.7.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.1
[0.7.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.0
Expand Down
95 changes: 95 additions & 0 deletions internal/lambda/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,70 @@ func TestE2E_PostRunMonitoring(t *testing.T) {
assertAlertFormats(t, eb)
})

t.Run("DriftDetected_CustomField", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
sc := &mockStatusPoller{seq: []lambda.StatusResult{{State: "succeeded"}}}
d, _, eb := buildE2EDeps(mock, tr, sc)

cfg := e2ePipeline("pipe-b2-cf")
cfg.PostRun = &types.PostRunConfig{
Evaluation: &types.EvaluationWindow{Window: "10m", Interval: "5m"},
Rules: []types.ValidationRule{{Key: "audit-result", Check: types.CheckExists}},
DriftField: "count",
}

r := runSFN(t, ctx, d, mock, eb, e2eScenario{
pipeline: cfg,
sensors: map[string]map[string]interface{}{
"upstream-complete": {"status": "ready"},
"audit-result": {"count": float64(100)},
},
postRunSensorUpdates: []map[string]map[string]interface{}{
{}, // iteration 0: no updates; baseline saved
{"audit-result": {"count": float64(120), "date": "2026-03-07"}}, // iteration 1: count changed
},
})

assert.Equal(t, sfnDone, r.terminal)
assert.Contains(t, r.events, "POST_RUN_DRIFT")
assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-b2-cf"))
assert.True(t, hasRerunRequest(mock, "pipe-b2-cf"), "drift on custom field should write RERUN_REQUEST")
assertAlertFormats(t, eb)
})

t.Run("DriftDetected_DefaultField", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
sc := &mockStatusPoller{seq: []lambda.StatusResult{{State: "succeeded"}}}
d, _, eb := buildE2EDeps(mock, tr, sc)

cfg := e2ePipeline("pipe-b2-df")
cfg.PostRun = &types.PostRunConfig{
Evaluation: &types.EvaluationWindow{Window: "10m", Interval: "5m"},
Rules: []types.ValidationRule{{Key: "audit-result", Check: types.CheckExists}},
// DriftField intentionally left empty — should default to "sensor_count"
}

r := runSFN(t, ctx, d, mock, eb, e2eScenario{
pipeline: cfg,
sensors: map[string]map[string]interface{}{
"upstream-complete": {"status": "ready"},
"audit-result": {"sensor_count": float64(100)},
},
postRunSensorUpdates: []map[string]map[string]interface{}{
{}, // iteration 0: no updates; baseline saved
{"audit-result": {"sensor_count": float64(120), "date": "2026-03-07"}}, // iteration 1: count changed
},
})

assert.Equal(t, sfnDone, r.terminal)
assert.Contains(t, r.events, "POST_RUN_DRIFT")
assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-b2-df"))
assert.True(t, hasRerunRequest(mock, "pipe-b2-df"), "drift on default field should write RERUN_REQUEST")
assertAlertFormats(t, eb)
})

t.Run("WindowExhausted", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
Expand Down Expand Up @@ -2074,6 +2138,37 @@ func TestE2E_PostRunInflight(t *testing.T) {
assertAlertFormats(t, eb)
})

t.Run("InflightDrift_CustomField", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
sc := &mockStatusPoller{}
d, sfnM, eb := buildE2EDeps(mock, tr, sc)

cfg := e2ePipeline("pipe-inf-cf")
cfg.PostRun = &types.PostRunConfig{
Rules: []types.ValidationRule{{Key: "audit-result", Check: types.CheckExists}},
DriftField: "count",
}
seedConfig(mock, cfg)
seedTriggerWithStatus(mock, "pipe-inf-cf", "2026-03-07", types.TriggerStatusRunning)

// Baseline uses custom field "count".
require.NoError(t, d.Store.WriteSensor(ctx, "pipe-inf-cf", "postrun-baseline#2026-03-07",
map[string]interface{}{"count": float64(100)}))

// Sensor arrives with different count while job is running.
record := makeSensorRecord("pipe-inf-cf", "audit-result", toStreamAttributes(map[string]interface{}{
"count": float64(200),
"date": "2026-03-07",
}))
require.NoError(t, lambda.HandleStreamEvent(ctx, d, lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}}))

assert.Contains(t, collectEventTypes(eb), "POST_RUN_DRIFT_INFLIGHT")
assert.False(t, hasRerunRequest(mock, "pipe-inf-cf"), "should NOT write rerun request while running")
assert.Equal(t, 0, countSFNExecutions(sfnM))
assertAlertFormats(t, eb)
})

t.Run("NoDriftWhileRunning_NoEvent", func(t *testing.T) {
mock := newMockDDB()
tr := &mockTriggerRunner{}
Expand Down
24 changes: 20 additions & 4 deletions internal/lambda/postrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ import (
"github.com/dwsmith1983/interlock/pkg/types"
)

// defaultDriftField is the sensor field used for drift comparison when
// PostRunConfig.DriftField is not set.
const defaultDriftField = "sensor_count"

func resolveDriftField(cfg *types.PostRunConfig) string {
if cfg.DriftField != "" {
return cfg.DriftField
}
return defaultDriftField
}

// matchesPostRunRule returns true if the sensor key matches any post-run rule key
// (prefix match to support per-period sensor keys).
func matchesPostRunRule(sensorKey string, rules []types.ValidationRule) bool {
Expand Down Expand Up @@ -66,8 +77,9 @@ func handlePostRunInflight(ctx context.Context, d *Deps, cfg *types.PipelineConf
return nil // No baseline yet — job hasn't completed once.
}

prevCount := ExtractFloat(baseline, "sensor_count")
currCount := ExtractFloat(sensorData, "sensor_count")
driftField := resolveDriftField(cfg.PostRun)
prevCount := ExtractFloat(baseline, driftField)
currCount := ExtractFloat(sensorData, driftField)
threshold := 0.0
if cfg.PostRun.DriftThreshold != nil {
threshold = *cfg.PostRun.DriftThreshold
Expand All @@ -78,7 +90,9 @@ func handlePostRunInflight(ctx context.Context, d *Deps, cfg *types.PipelineConf
map[string]interface{}{
"previousCount": prevCount,
"currentCount": currCount,
"delta": currCount - prevCount,
"driftThreshold": threshold,
"driftField": driftField,
"sensorKey": sensorKey,
"source": "post-run-stream",
}); err != nil {
Expand All @@ -101,8 +115,9 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon

// Check for data drift if baseline exists.
if baseline != nil {
prevCount := ExtractFloat(baseline, "sensor_count")
currCount := ExtractFloat(sensorData, "sensor_count")
driftField := resolveDriftField(cfg.PostRun)
prevCount := ExtractFloat(baseline, driftField)
currCount := ExtractFloat(sensorData, driftField)
threshold := 0.0
if cfg.PostRun.DriftThreshold != nil {
threshold = *cfg.PostRun.DriftThreshold
Expand All @@ -116,6 +131,7 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon
"currentCount": currCount,
"delta": delta,
"driftThreshold": threshold,
"driftField": driftField,
"source": "post-run-stream",
}); err != nil {
d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPostRunDrift, "error", err)
Expand Down
4 changes: 3 additions & 1 deletion internal/lambda/sla_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,16 +1405,18 @@ func TestSLAScheduleName_Deterministic(t *testing.T) {

func TestSLAMonitor_Reconcile_PastWarningFutureBreach(t *testing.T) {
eb := &mockEventBridge{}
fixedNow := time.Date(2026, 3, 10, 12, 0, 0, 0, time.UTC)
d := &lambda.Deps{
EventBridge: eb,
EventBusName: "test-bus",
Logger: slog.Default(),
NowFunc: func() time.Time { return fixedNow },
}

// We need warning in the past and breach in the future.
// Use tomorrow's date with deadline "23:59" and a 48h expected duration.
// Breach = tomorrow 23:59 (future); warning = yesterday 23:59 (past).
tomorrow := time.Now().Add(24 * time.Hour).Format("2006-01-02")
tomorrow := fixedNow.Add(24 * time.Hour).Format("2006-01-02")
out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{
Mode: "reconcile",
PipelineID: "silver-cdr-hour",
Expand Down
1 change: 1 addition & 0 deletions internal/lambda/watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ func TestWatchdog_StaleTrigger_DetailFields(t *testing.T) {
func TestWatchdog_MissedSchedule_DetailFields(t *testing.T) {
mock := newMockDDB()
d, _, ebMock := testDeps(mock)
d.NowFunc = func() time.Time { return time.Date(2026, 3, 10, 0, 30, 0, 0, time.UTC) }
// Push StartedAt far into the past so the pre-deployment filter never skips.
d.StartedAt = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)

Expand Down
1 change: 1 addition & 0 deletions pkg/types/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,5 @@ type PostRunConfig struct {
Rules []ValidationRule `yaml:"rules" json:"rules"`
SensorTimeout string `yaml:"sensorTimeout,omitempty" json:"sensorTimeout,omitempty"` // e.g. "2h"; default 2h
DriftThreshold *float64 `yaml:"driftThreshold,omitempty" json:"driftThreshold,omitempty"` // minimum absolute delta to trigger drift; default 0 (any change)
DriftField string `yaml:"driftField,omitempty" json:"driftField,omitempty"` // sensor field for drift comparison; default "sensor_count"
}