From 81790007fab446e8abf02d3b4ec111ae5c6681ed Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 00:27:23 +0700 Subject: [PATCH 1/5] feat: dry-run shadow mode for pipeline evaluation Observation-only mode that evaluates trigger conditions, validation rules, and SLA projections against real sensor data without executing jobs or starting Step Functions. Publishes DRY_RUN_WOULD_TRIGGER, DRY_RUN_LATE_DATA, DRY_RUN_SLA_PROJECTION, and DRY_RUN_DRIFT events to EventBridge for comparison against the existing orchestrator. --- CHANGELOG.md | 9 + README.md | 41 ++ docs/content/docs/configuration/pipelines.md | 12 + docs/content/docs/reference/alerting.md | 13 + internal/lambda/dryrun.go | 209 +++++++++ internal/lambda/postrun.go | 5 + internal/lambda/stream_router.go | 5 + internal/lambda/stream_router_test.go | 450 +++++++++++++++++++ internal/store/control.go | 52 +++ internal/store/control_test.go | 123 +++++ internal/validation/config.go | 10 + internal/validation/config_test.go | 30 ++ pkg/types/dynamo.go | 5 + pkg/types/events.go | 4 + pkg/types/pipeline.go | 1 + 15 files changed, 969 insertions(+) create mode 100644 internal/lambda/dryrun.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 00e68ac..df8ab6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- **Dry-run / shadow mode** (`dryRun: true`) — observation-only mode for evaluating Interlock against running pipelines without executing jobs. The stream-router evaluates trigger conditions, validation rules, and SLA projections inline, publishing all observations as EventBridge events. No Step Function executions, no job triggers, no rerun requests. New events: `DRY_RUN_WOULD_TRIGGER`, `DRY_RUN_LATE_DATA`, `DRY_RUN_SLA_PROJECTION`, `DRY_RUN_DRIFT`. DRY_RUN# markers stored with 7-day TTL for dedup and late-data detection. Post-run drift detection captures baseline at trigger time and compares when sensors update. SLA projection reuses production `handleSLACalculate` for consistent deadline resolution. Requires `schedule.trigger` and `job.type`. Calendar exclusions honored. +- **`DryRunSK` key helper** for DynamoDB DRY_RUN# sort keys. +- **`WriteDryRunMarker` / `GetDryRunMarker` store methods** with conditional write (idempotent dedup) and consistent read. +- **Config validation** for dry-run: requires both `job.type` and `schedule.trigger`. + ## [0.8.0] - 2026-03-10 ### Added diff --git a/README.md b/README.md index e8f930e..026fbe0 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,47 @@ job: maxRetries: 2 ``` +## Dry-Run / Shadow Mode + +Evaluate Interlock against running pipelines without executing any jobs. Set `dryRun: true` in your pipeline config — the stream-router observes the real sensor stream and records what it *would* do as EventBridge events, while your existing orchestrator continues running as-is. + +```yaml +pipeline: + id: gold-revenue-dryrun +schedule: + trigger: + key: upstream-complete + check: equals + field: status + value: ready +sla: + deadline: "10:00" + expectedDuration: 30m +validation: + trigger: "ALL" + rules: + - key: upstream-complete + check: equals + field: status + value: ready +job: + type: glue + config: + jobName: gold-revenue-etl +dryRun: true +``` + +Dry-run publishes four observation events: + +| Event | Meaning | +|-------|---------| +| `DRY_RUN_WOULD_TRIGGER` | All validation rules passed — Interlock would have triggered the job | +| `DRY_RUN_LATE_DATA` | Sensor updated after the trigger point — would have triggered a re-run | +| `DRY_RUN_SLA_PROJECTION` | Estimated completion vs. deadline — would the SLA be met or breached? | +| `DRY_RUN_DRIFT` | Post-run sensor data changed — would have detected drift and re-run | + +No Step Function executions, no job triggers, no rerun requests. Remove `dryRun: true` to switch to live mode — `DRY_RUN#` markers have a 7-day TTL and don't interfere with `TRIGGER#` rows. + ## Trigger Types | Type | SDK/Protocol | Use Case | diff --git a/docs/content/docs/configuration/pipelines.md b/docs/content/docs/configuration/pipelines.md index 812d62c..94fe6cf 100644 --- a/docs/content/docs/configuration/pipelines.md +++ b/docs/content/docs/configuration/pipelines.md @@ -77,6 +77,18 @@ Time-based constraints for pipeline completion. See [Schedules](../schedules/) f Declarative rules that determine pipeline readiness. See [Validation Rules](#validation-rules) below. +### `dryRun` + +Enables observation-only mode. When `true`, Interlock evaluates trigger conditions and validation rules against real sensor data but never starts a Step Function execution or triggers any job. All observations are published as EventBridge events (`DRY_RUN_WOULD_TRIGGER`, `DRY_RUN_LATE_DATA`, `DRY_RUN_SLA_PROJECTION`, `DRY_RUN_DRIFT`). + +| Field | Type | Default | Description | +|---|---|---|---| +| `dryRun` | bool | `false` | Enable dry-run / shadow mode | + +Requires `schedule.trigger` (sensor-driven evaluation) and `job.type` to be configured. Calendar exclusions are still honored. Remove `dryRun: true` to switch to live mode — `DRY_RUN#` markers have a 7-day TTL and don't interfere with `TRIGGER#` rows. + +See [Alerting](../../reference/alerting/) for the full dry-run event reference. + ### `job` Defines how to start the downstream job when validation passes. See [Triggers](../triggers/) for all 8 supported job types. diff --git a/docs/content/docs/reference/alerting.md b/docs/content/docs/reference/alerting.md index 4c2bd1d..a013610 100644 --- a/docs/content/docs/reference/alerting.md +++ b/docs/content/docs/reference/alerting.md @@ -70,6 +70,19 @@ Published by the **stream-router** when processing rerun requests and late data | `RERUN_REJECTED` | Rerun request rejected by circuit breaker | No new sensor data since last job completion | | `RERUN_ACCEPTED` | Rerun request accepted | Rerun passed circuit breaker validation and trigger lock was reset | +### Dry-Run Events + +Published by the **stream-router** Lambda for pipelines with `dryRun: true`. These events record what Interlock *would* do without executing any jobs or starting Step Functions. + +| Detail Type | Meaning | When | +|---|---|---| +| `DRY_RUN_WOULD_TRIGGER` | All validation rules passed | Interlock would have triggered the pipeline job at this time | +| `DRY_RUN_LATE_DATA` | Sensor updated after trigger point | Sensor data arrived after the dry-run trigger was recorded — would have triggered a re-run | +| `DRY_RUN_SLA_PROJECTION` | Estimated completion vs. deadline | Projects whether the SLA would be met or breached based on `expectedDuration` and `deadline` | +| `DRY_RUN_DRIFT` | Post-run sensor data changed | Sensor value drifted from baseline captured at trigger time — would have triggered a drift re-run | + +The `DRY_RUN_SLA_PROJECTION` detail includes `status` (`"met"` or `"breach"`), `estimatedCompletion`, `deadline`, and `marginSeconds` fields. + ### Watchdog Events Published by the **watchdog** Lambda, invoked on an EventBridge schedule (default: every 5 minutes). diff --git a/internal/lambda/dryrun.go b/internal/lambda/dryrun.go new file mode 100644 index 0000000..c101b29 --- /dev/null +++ b/internal/lambda/dryrun.go @@ -0,0 +1,209 @@ +package lambda + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/dwsmith1983/interlock/internal/validation" + "github.com/dwsmith1983/interlock/pkg/types" +) + +// handleDryRunTrigger processes a sensor event for a dry-run pipeline. +// It evaluates trigger conditions and validation rules, records observations +// as EventBridge events, but never starts a Step Function execution. +// Calendar exclusions are already checked before this function is called. +func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, sensorData map[string]interface{}, now time.Time) error { + // Check for existing dry-run marker — if present, this is late data. + marker, err := d.Store.GetDryRunMarker(ctx, pipelineID, scheduleID, date) + if err != nil { + return fmt.Errorf("get dry-run marker for %q: %w", pipelineID, err) + } + + if marker != nil { + // Late data: sensor arrived after we already recorded a would-trigger. + triggeredAtStr, _ := marker.Data["triggeredAt"].(string) + triggeredAt, _ := time.Parse(time.RFC3339, triggeredAtStr) + lateBy := now.Sub(triggeredAt) + + if pubErr := publishEvent(ctx, d, string(types.EventDryRunLateData), pipelineID, scheduleID, date, + fmt.Sprintf("dry-run: late data arrived %.0fm after trigger point for %s", lateBy.Minutes(), pipelineID), + map[string]interface{}{ + "triggeredAt": triggeredAtStr, + "lateBy": lateBy.String(), + }); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunLateData, "error", pubErr) + } + return nil + } + + // No marker yet — evaluate full validation rules. + sensors, err := d.Store.GetAllSensors(ctx, pipelineID) + if err != nil { + return fmt.Errorf("get sensors for dry-run %q: %w", pipelineID, err) + } + RemapPerPeriodSensors(sensors, date) + + result := validation.EvaluateRules(cfg.Validation.Trigger, cfg.Validation.Rules, sensors, now) + if !result.Passed { + d.Logger.Info("dry-run: trigger condition met but validation rules not satisfied", + "pipelineId", pipelineID, + "date", date, + ) + return nil + } + + // All rules pass — write dry-run marker. + written, err := d.Store.WriteDryRunMarker(ctx, pipelineID, scheduleID, date, now) + if err != nil { + return fmt.Errorf("write dry-run marker for %q: %w", pipelineID, err) + } + if !written { + // Race: another invocation wrote the marker first. Treat as late data. + return nil + } + + // Capture post-run baseline if PostRun is configured. + if cfg.PostRun != nil && len(cfg.PostRun.Rules) > 0 { + if baselineErr := capturePostRunBaseline(ctx, d, pipelineID, scheduleID, date); baselineErr != nil { + d.Logger.WarnContext(ctx, "dry-run: failed to capture post-run baseline", + "pipelineId", pipelineID, "error", baselineErr) + } + } + + // Publish WOULD_TRIGGER event. + if pubErr := publishEvent(ctx, d, string(types.EventDryRunWouldTrigger), pipelineID, scheduleID, date, + fmt.Sprintf("dry-run: would trigger %s at %s", pipelineID, now.Format(time.RFC3339)), + map[string]interface{}{ + "triggeredAt": now.UTC().Format(time.RFC3339), + "rulesEvaluated": len(cfg.Validation.Rules), + }); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunWouldTrigger, "error", pubErr) + } + + // SLA projection if configured. + if cfg.SLA != nil && cfg.SLA.ExpectedDuration != "" { + publishDryRunSLAProjection(ctx, d, cfg, pipelineID, scheduleID, date, now) + } + + d.Logger.Info("dry-run: would trigger", + "pipelineId", pipelineID, + "schedule", scheduleID, + "date", date, + ) + return nil +} + +// publishDryRunSLAProjection computes and publishes an SLA projection event +// for a dry-run pipeline. Reuses handleSLACalculate to resolve the breach +// deadline consistently with production SLA monitoring (including hourly +// pipeline T+1 adjustment). +func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, triggeredAt time.Time) { + expectedDur, err := time.ParseDuration(cfg.SLA.ExpectedDuration) + if err != nil { + d.Logger.WarnContext(ctx, "dry-run: invalid expectedDuration", "error", err) + return + } + + estimatedCompletion := triggeredAt.Add(expectedDur) + detail := map[string]interface{}{ + "triggeredAt": triggeredAt.UTC().Format(time.RFC3339), + "estimatedCompletion": estimatedCompletion.UTC().Format(time.RFC3339), + "expectedDuration": cfg.SLA.ExpectedDuration, + } + + status := "met" + message := fmt.Sprintf("dry-run: SLA projection for %s — estimated completion %s", + pipelineID, estimatedCompletion.Format(time.RFC3339)) + + if cfg.SLA.Deadline != "" { + // Reuse the production SLA calculation to resolve the breach deadline. + slaInput := SLAMonitorInput{ + Mode: "calculate", + PipelineID: pipelineID, + ScheduleID: scheduleID, + Date: date, + Deadline: cfg.SLA.Deadline, + ExpectedDuration: cfg.SLA.ExpectedDuration, + Timezone: cfg.SLA.Timezone, + } + slaOutput, calcErr := handleSLACalculate(slaInput, triggeredAt) + if calcErr != nil { + d.Logger.WarnContext(ctx, "dry-run: SLA deadline resolution failed", "error", calcErr) + } else if slaOutput.BreachAt != "" { + breachAt, parseErr := time.Parse(time.RFC3339, slaOutput.BreachAt) + if parseErr == nil { + detail["deadline"] = slaOutput.BreachAt + margin := breachAt.Sub(estimatedCompletion) + detail["marginSeconds"] = margin.Seconds() + if estimatedCompletion.After(breachAt) { + status = "breach" + message = fmt.Sprintf("dry-run: SLA projection for %s — would breach by %.0fm", + pipelineID, math.Abs(margin.Minutes())) + } else { + message = fmt.Sprintf("dry-run: SLA projection for %s — SLA met with %.0fm margin", + pipelineID, margin.Minutes()) + } + } + } + } + + detail["status"] = status + + if pubErr := publishEvent(ctx, d, string(types.EventDryRunSLAProjection), pipelineID, scheduleID, date, message, detail); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunSLAProjection, "error", pubErr) + } +} + +// handleDryRunPostRunSensor handles post-run sensor events for dry-run pipelines. +// Compares sensor data against the baseline captured at WOULD_TRIGGER time. +func handleDryRunPostRunSensor(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, sensorKey string, sensorData map[string]interface{}) error { + scheduleID := resolveScheduleID(cfg) + date := ResolveExecutionDate(sensorData, d.now()) + + // Check DRY_RUN# marker — if nil, no trigger happened yet. + marker, err := d.Store.GetDryRunMarker(ctx, pipelineID, scheduleID, date) + if err != nil { + return fmt.Errorf("get dry-run marker for post-run %q: %w", pipelineID, err) + } + if marker == nil { + return nil + } + + // Read baseline captured at WOULD_TRIGGER time. + baselineKey := "postrun-baseline#" + date + baseline, err := d.Store.GetSensorData(ctx, pipelineID, baselineKey) + if err != nil { + return fmt.Errorf("get baseline for dry-run post-run: %w", err) + } + if baseline == nil { + return nil + } + + // Compare drift. + driftField := resolveDriftField(cfg.PostRun) + prevCount := ExtractFloat(baseline, driftField) + currCount := ExtractFloat(sensorData, driftField) + threshold := 0.0 + if cfg.PostRun.DriftThreshold != nil { + threshold = *cfg.PostRun.DriftThreshold + } + + if prevCount > 0 && currCount > 0 && math.Abs(currCount-prevCount) > threshold { + if pubErr := publishEvent(ctx, d, string(types.EventDryRunDrift), pipelineID, scheduleID, date, + fmt.Sprintf("dry-run: drift detected for %s: %.0f → %.0f — would re-run", pipelineID, prevCount, currCount), + map[string]interface{}{ + "previousCount": prevCount, + "currentCount": currCount, + "delta": currCount - prevCount, + "driftThreshold": threshold, + "driftField": driftField, + "sensorKey": sensorKey, + }); pubErr != nil { + d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunDrift, "error", pubErr) + } + } + + return nil +} diff --git a/internal/lambda/postrun.go b/internal/lambda/postrun.go index d7c6810..7b8beae 100644 --- a/internal/lambda/postrun.go +++ b/internal/lambda/postrun.go @@ -36,6 +36,11 @@ func matchesPostRunRule(sensorKey string, rules []types.ValidationRule) bool { // arrives via DynamoDB Stream. Compares current sensor values against the // date-scoped baseline captured at trigger completion. func handlePostRunSensorEvent(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, sensorKey string, sensorData map[string]interface{}) error { + // For dry-run pipelines, check DRY_RUN# marker instead of TRIGGER# row. + if cfg.DryRun { + return handleDryRunPostRunSensor(ctx, d, cfg, pipelineID, sensorKey, sensorData) + } + scheduleID := resolveScheduleID(cfg) date := ResolveExecutionDate(sensorData, d.now()) diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index 60aa64a..1ad6ddb 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -215,6 +215,11 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event scheduleID := resolveScheduleID(cfg) date := ResolveExecutionDate(sensorData, now) + // Dry-run mode: observe and record what would happen, but never start SFN. + if cfg.DryRun { + return handleDryRunTrigger(ctx, d, cfg, pipelineID, scheduleID, date, sensorData, now) + } + // Acquire trigger lock to prevent duplicate executions. acquired, err := d.Store.AcquireTriggerLock(ctx, pipelineID, scheduleID, date, ResolveTriggerLockTTL()) if err != nil { diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 3afd45c..fab02df 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -3793,3 +3793,453 @@ func TestSensorEvent_CalendarExclusion_PublishesEvent(t *testing.T) { require.NotEmpty(t, ebMock.events, "PIPELINE_EXCLUDED event must be published on sensor exclusion") assert.Equal(t, string(types.EventPipelineExcluded), *ebMock.events[0].Entries[0].DetailType) } + +// --------------------------------------------------------------------------- +// Dry-run / shadow mode tests +// --------------------------------------------------------------------------- + +// fixedTestDate is used across dry-run tests for deterministic date handling. +const fixedTestDate = "2026-03-11" + +// seedDryRunMarker inserts a DRY_RUN# row directly into the mock. +// triggeredAt is stored in the "data" attribute map so ControlRecord.Data picks it up. +func seedDryRunMarker(mock *mockDDB, pipelineID, schedule, date, triggeredAt string) { + mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK(pipelineID)}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.DryRunSK(schedule, date)}, + "data": &ddbtypes.AttributeValueMemberM{Value: map[string]ddbtypes.AttributeValue{ + "triggeredAt": &ddbtypes.AttributeValueMemberS{Value: triggeredAt}, + }}, + }) +} + +// testDryRunConfig returns a PipelineConfig with DryRun: true and a stream trigger. +func testDryRunConfig() types.PipelineConfig { + cfg := testStreamConfig() + cfg.DryRun = true + return cfg +} + +// gatherEventDetailTypes collects all EventBridge detail-type strings from a mock. +func gatherEventDetailTypes(ebMock *mockEventBridge) []string { + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + var out []string + for _, input := range ebMock.events { + for _, e := range input.Entries { + if e.DetailType != nil { + out = append(out, *e.DetailType) + } + } + } + return out +} + +func TestHandleSensorEvent_DryRun_WouldTrigger(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + seedConfig(mock, cfg) + + // Seed the trigger sensor so validation rules pass. + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // NO SFN execution must be started. + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN execution") + sfnMock.mu.Unlock() + + // EventBridge must have DRY_RUN_WOULD_TRIGGER event. + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunWouldTrigger), "expected DRY_RUN_WOULD_TRIGGER event") + + // DRY_RUN# marker must be written. + markerKey := ddbItemKey(testControlTable, + types.PipelinePK("gold-revenue"), + types.DryRunSK("stream", fixedTestDate)) + mock.mu.Lock() + _, markerExists := mock.items[markerKey] + mock.mu.Unlock() + assert.True(t, markerExists, "expected DRY_RUN# marker to be written in mock DDB") +} + +func TestHandleSensorEvent_DryRun_LateData(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 30, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + seedConfig(mock, cfg) + + // Pre-seed DRY_RUN# marker — simulates a previous WOULD_TRIGGER. + seedDryRunMarker(mock, "gold-revenue", "stream", fixedTestDate, "2026-03-11T01:15:00Z") + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // NO SFN execution. + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run with late data must not start SFN") + sfnMock.mu.Unlock() + + // Must have DRY_RUN_LATE_DATA event. + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunLateData), "expected DRY_RUN_LATE_DATA event") +} + +func TestHandleSensorEvent_DryRun_SLAProjection_Met(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + // 01:15 UTC — well before the 08:00 deadline, 30m duration → finishes 01:45, well within SLA. + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.SLA = &types.SLAConfig{ + Deadline: "08:00", + ExpectedDuration: "30m", + } + seedConfig(mock, cfg) + + // Seed validation sensor so rules pass. + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN") + sfnMock.mu.Unlock() + + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunWouldTrigger), "expected WOULD_TRIGGER event") + assert.Contains(t, evtTypes, string(types.EventDryRunSLAProjection), "expected SLA_PROJECTION event") + + // Parse the SLA projection event detail and check status="met". + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, input := range ebMock.events { + for _, e := range input.Entries { + if e.DetailType != nil && *e.DetailType == string(types.EventDryRunSLAProjection) { + var detail map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) + innerDetail, ok := detail["detail"].(map[string]interface{}) + require.True(t, ok, "expected detail.detail map in SLA projection event") + assert.Equal(t, "met", innerDetail["status"], "SLA projection status should be 'met'") + return + } + } + } + t.Fatal("DRY_RUN_SLA_PROJECTION event not found in published events") +} + +func TestHandleSensorEvent_DryRun_SLAProjection_Breach(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + // 01:50 UTC — 30m duration → estimated completion 02:20, which is AFTER 02:00 deadline → breach. + fixedTime := time.Date(2026, 3, 11, 1, 50, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.SLA = &types.SLAConfig{ + Deadline: "02:00", + ExpectedDuration: "30m", + } + seedConfig(mock, cfg) + + // Seed validation sensor so rules pass. + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN") + sfnMock.mu.Unlock() + + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunSLAProjection), "expected SLA_PROJECTION event") + + // Find the SLA projection event and verify status="breach". + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, input := range ebMock.events { + for _, e := range input.Entries { + if e.DetailType != nil && *e.DetailType == string(types.EventDryRunSLAProjection) { + var detail map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) + innerDetail, ok := detail["detail"].(map[string]interface{}) + require.True(t, ok, "expected detail.detail map") + assert.Equal(t, "breach", innerDetail["status"], "SLA projection status should be 'breach'") + return + } + } + } + t.Fatal("DRY_RUN_SLA_PROJECTION event not found") +} + +func TestHandleSensorEvent_DryRun_ValidationNotReady(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + // Config with a second validation rule for a sensor that does NOT exist. + cfg := testDryRunConfig() + cfg.Validation.Rules = append(cfg.Validation.Rules, types.ValidationRule{ + Key: "other-required-sensor", + Check: types.CheckExists, + }) + seedConfig(mock, cfg) + + // Seed only the trigger sensor — "other-required-sensor" is absent. + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // No SFN. + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN") + sfnMock.mu.Unlock() + + // No DRY_RUN# marker written. + markerKey := ddbItemKey(testControlTable, + types.PipelinePK("gold-revenue"), + types.DryRunSK("stream", fixedTestDate)) + mock.mu.Lock() + _, markerExists := mock.items[markerKey] + mock.mu.Unlock() + assert.False(t, markerExists, "no DRY_RUN# marker should be written when validation fails") + + // No events published. + evtTypes := gatherEventDetailTypes(ebMock) + assert.Empty(t, evtTypes, "no events should be published when dry-run validation fails") +} + +func TestHandleSensorEvent_DryRun_CapturesBaseline(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.PostRun = &types.PostRunConfig{ + Rules: []types.ValidationRule{ + {Key: "audit-result", Check: types.CheckExists}, + }, + } + seedConfig(mock, cfg) + + // Seed the trigger sensor (for validation rules). + seedSensor(mock, "gold-revenue", "upstream-complete", map[string]interface{}{ + "status": "ready", + }) + // Seed the post-run sensor that the baseline will capture. + seedSensor(mock, "gold-revenue", "audit-result", map[string]interface{}{ + "sensor_count": float64(500), + }) + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run must not start SFN") + sfnMock.mu.Unlock() + + // WOULD_TRIGGER event published. + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunWouldTrigger)) + + // postrun-baseline# sensor key must be written. + baselineKey := ddbItemKey(testControlTable, + types.PipelinePK("gold-revenue"), + types.SensorSK("postrun-baseline#"+fixedTestDate)) + mock.mu.Lock() + _, baselineExists := mock.items[baselineKey] + mock.mu.Unlock() + assert.True(t, baselineExists, "expected postrun-baseline# sensor to be written in DDB") +} + +// --------------------------------------------------------------------------- +// Post-run dry-run sensor tests +// --------------------------------------------------------------------------- + +func TestDryRunPostRunSensor_DriftDetected(t *testing.T) { + mock := newMockDDB() + d, sfnMock, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.PostRun = &types.PostRunConfig{ + Rules: []types.ValidationRule{ + {Key: "audit-result", Check: types.CheckExists}, + }, + } + seedConfig(mock, cfg) + + // Pre-seed DRY_RUN# marker (would-trigger already happened). + seedDryRunMarker(mock, "gold-revenue", "stream", fixedTestDate, "2026-03-11T01:15:00Z") + + // Pre-seed baseline with sensor_count=500. + seedSensor(mock, "gold-revenue", "postrun-baseline#"+fixedTestDate, map[string]interface{}{ + "sensor_count": float64(500), + }) + + // Sensor arrives for post-run key with sensor_count=520 (drift detected). + record := makeSensorRecord("gold-revenue", "audit-result", map[string]events.DynamoDBAttributeValue{ + "data": events.NewMapAttribute(map[string]events.DynamoDBAttributeValue{ + "sensor_count": events.NewNumberAttribute("520"), + "date": events.NewStringAttribute(fixedTestDate), + }), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // DRY_RUN_DRIFT event published. + evtTypes := gatherEventDetailTypes(ebMock) + assert.Contains(t, evtTypes, string(types.EventDryRunDrift), "expected DRY_RUN_DRIFT event") + + // No SFN execution. + sfnMock.mu.Lock() + assert.Empty(t, sfnMock.executions, "dry-run drift must not start SFN") + sfnMock.mu.Unlock() + + // No RERUN_REQUEST written. + rerunKey := ddbItemKey(testControlTable, + types.PipelinePK("gold-revenue"), + types.RerunRequestSK("stream", fixedTestDate)) + mock.mu.Lock() + _, rerunExists := mock.items[rerunKey] + mock.mu.Unlock() + assert.False(t, rerunExists, "dry-run must not write a rerun request") +} + +func TestDryRunPostRunSensor_NoDrift(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.PostRun = &types.PostRunConfig{ + Rules: []types.ValidationRule{ + {Key: "audit-result", Check: types.CheckExists}, + }, + } + seedConfig(mock, cfg) + + // Pre-seed DRY_RUN# marker. + seedDryRunMarker(mock, "gold-revenue", "stream", fixedTestDate, "2026-03-11T01:15:00Z") + + // Baseline with sensor_count=500. + seedSensor(mock, "gold-revenue", "postrun-baseline#"+fixedTestDate, map[string]interface{}{ + "sensor_count": float64(500), + }) + + // Sensor arrives with same sensor_count=500 — no drift. + record := makeSensorRecord("gold-revenue", "audit-result", map[string]events.DynamoDBAttributeValue{ + "data": events.NewMapAttribute(map[string]events.DynamoDBAttributeValue{ + "sensor_count": events.NewNumberAttribute("500"), + "date": events.NewStringAttribute(fixedTestDate), + }), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // No events published (no drift). + evtTypes := gatherEventDetailTypes(ebMock) + assert.Empty(t, evtTypes, "no events should be published when no drift detected") +} + +func TestDryRunPostRunSensor_NoMarker(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + + fixedTime := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedTime } + + cfg := testDryRunConfig() + cfg.PostRun = &types.PostRunConfig{ + Rules: []types.ValidationRule{ + {Key: "audit-result", Check: types.CheckExists}, + }, + } + seedConfig(mock, cfg) + + // NO DRY_RUN# marker seeded — no trigger happened yet. + + // Sensor arrives for post-run key. + record := makeSensorRecord("gold-revenue", "audit-result", map[string]events.DynamoDBAttributeValue{ + "data": events.NewMapAttribute(map[string]events.DynamoDBAttributeValue{ + "sensor_count": events.NewNumberAttribute("500"), + "date": events.NewStringAttribute(fixedTestDate), + }), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err) + + // No events published (no marker means no trigger happened). + evtTypes := gatherEventDetailTypes(ebMock) + assert.Empty(t, evtTypes, "no events when no DRY_RUN# marker exists") +} diff --git a/internal/store/control.go b/internal/store/control.go index 058fb40..0c46588 100644 --- a/internal/store/control.go +++ b/internal/store/control.go @@ -273,6 +273,58 @@ func (s *Store) WriteRerunRequest(ctx context.Context, pipelineID, schedule, dat return nil } +// WriteDryRunMarker writes a DRY_RUN# marker row with a 7-day TTL. +// The write is conditional (attribute_not_exists) so repeated calls are +// idempotent — the original triggeredAt is preserved. +func (s *Store) WriteDryRunMarker(ctx context.Context, pipelineID, schedule, date string, triggeredAt time.Time) (bool, error) { + ttlEpoch := time.Now().Add(7 * 24 * time.Hour).Unix() + _, err := s.Client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: &s.ControlTable, + Item: map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK(pipelineID)}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.DryRunSK(schedule, date)}, + "data": &ddbtypes.AttributeValueMemberM{Value: map[string]ddbtypes.AttributeValue{ + "triggeredAt": &ddbtypes.AttributeValueMemberS{Value: triggeredAt.UTC().Format(time.RFC3339)}, + }}, + "ttl": &ddbtypes.AttributeValueMemberN{Value: fmt.Sprintf("%d", ttlEpoch)}, + }, + ConditionExpression: aws.String("attribute_not_exists(PK)"), + }) + if err != nil { + var ccfe *ddbtypes.ConditionalCheckFailedException + if errors.As(err, &ccfe) { + return false, nil + } + return false, fmt.Errorf("write dry-run marker %q/%s/%s: %w", pipelineID, schedule, date, err) + } + return true, nil +} + +// GetDryRunMarker reads the DRY_RUN# marker for a given pipeline/schedule/date. +// Returns nil, nil if no marker exists. +func (s *Store) GetDryRunMarker(ctx context.Context, pipelineID, schedule, date string) (*types.ControlRecord, error) { + out, err := s.Client.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: &s.ControlTable, + ConsistentRead: aws.Bool(true), + Key: map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK(pipelineID)}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.DryRunSK(schedule, date)}, + }, + }) + if err != nil { + return nil, fmt.Errorf("get dry-run marker %q/%s/%s: %w", pipelineID, schedule, date, err) + } + if out.Item == nil { + return nil, nil + } + + var rec types.ControlRecord + if err := attributevalue.UnmarshalMap(out.Item, &rec); err != nil { + return nil, fmt.Errorf("unmarshal dry-run marker %q/%s/%s: %w", pipelineID, schedule, date, err) + } + return &rec, nil +} + // AcquireTriggerLock attempts to acquire a trigger lock for a given pipeline, // schedule, and date. Returns true if the lock was acquired, false if it is // already held. The lock is set with a TTL for automatic expiration. diff --git a/internal/store/control_test.go b/internal/store/control_test.go index 3e536ec..3701914 100644 --- a/internal/store/control_test.go +++ b/internal/store/control_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "strings" "testing" "time" @@ -1119,3 +1120,125 @@ func TestHasTriggerForDates_DynamoError(t *testing.T) { t.Errorf("expected wrapped injected error, got: %v", err) } } + +// --- WriteDryRunMarker / GetDryRunMarker tests --- + +func TestWriteDryRunMarker(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + triggeredAt := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + + written, err := s.WriteDryRunMarker(context.Background(), "gold-revenue", "stream", "2026-03-11", triggeredAt) + if err != nil { + t.Fatalf("WriteDryRunMarker: %v", err) + } + if !written { + t.Fatal("expected written=true on first call") + } + + // Read back via GetDryRunMarker. + rec, err := s.GetDryRunMarker(context.Background(), "gold-revenue", "stream", "2026-03-11") + if err != nil { + t.Fatalf("GetDryRunMarker: %v", err) + } + if rec == nil { + t.Fatal("expected non-nil record") + } + + // Verify PK/SK. + if rec.PK != types.PipelinePK("gold-revenue") { + t.Errorf("PK = %q, want %q", rec.PK, types.PipelinePK("gold-revenue")) + } + if rec.SK != types.DryRunSK("stream", "2026-03-11") { + t.Errorf("SK = %q, want %q", rec.SK, types.DryRunSK("stream", "2026-03-11")) + } + + // Verify triggeredAt stored inside the data map in the raw item. + key := itemKey("control", types.PipelinePK("gold-revenue"), types.DryRunSK("stream", "2026-03-11")) + mock.mu.Lock() + item, ok := mock.items[key] + mock.mu.Unlock() + + if !ok { + t.Fatal("expected item in mock store") + } + + dataAttr, hasData := item["data"] + if !hasData { + t.Fatal("expected data attribute to be present") + } + dataMap := dataAttr.(*ddbtypes.AttributeValueMemberM).Value + triggeredAtAttr, hasTA := dataMap["triggeredAt"] + if !hasTA { + t.Fatal("expected triggeredAt inside data map") + } + triggeredAtStr := triggeredAtAttr.(*ddbtypes.AttributeValueMemberS).Value + if triggeredAtStr != "2026-03-11T01:15:00Z" { + t.Errorf("triggeredAt = %q, want %q", triggeredAtStr, "2026-03-11T01:15:00Z") + } + + // Verify triggeredAt is also accessible via ControlRecord.Data (unmarshaled). + if rec.Data == nil { + t.Fatal("expected non-nil Data in ControlRecord") + } + if rec.Data["triggeredAt"] != "2026-03-11T01:15:00Z" { + t.Errorf("rec.Data[triggeredAt] = %v, want %q", rec.Data["triggeredAt"], "2026-03-11T01:15:00Z") + } + + // Verify TTL is set (future epoch). + ttlAttr, hasTTL := item["ttl"] + if !hasTTL { + t.Fatal("expected ttl attribute to be present") + } + ttlStr := ttlAttr.(*ddbtypes.AttributeValueMemberN).Value + if ttlStr == "" { + t.Error("expected non-empty TTL value") + } + // TTL must be in the future (greater than now). + now := time.Now().Unix() + var ttlEpoch int64 + if _, scanErr := fmt.Sscanf(ttlStr, "%d", &ttlEpoch); scanErr != nil { + t.Errorf("failed to parse ttl %q: %v", ttlStr, scanErr) + } else if ttlEpoch <= now { + t.Errorf("ttl %d is not in the future (now=%d)", ttlEpoch, now) + } +} + +func TestWriteDryRunMarker_Dedup(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + firstTriggeredAt := time.Date(2026, 3, 11, 1, 15, 0, 0, time.UTC) + secondTriggeredAt := time.Date(2026, 3, 11, 1, 30, 0, 0, time.UTC) + + // First write should succeed. + written, err := s.WriteDryRunMarker(context.Background(), "gold-revenue", "stream", "2026-03-11", firstTriggeredAt) + if err != nil { + t.Fatalf("first WriteDryRunMarker: %v", err) + } + if !written { + t.Fatal("expected written=true on first call") + } + + // Second write should return false (idempotent — key already exists). + written, err = s.WriteDryRunMarker(context.Background(), "gold-revenue", "stream", "2026-03-11", secondTriggeredAt) + if err != nil { + t.Fatalf("second WriteDryRunMarker: %v", err) + } + if written { + t.Error("expected written=false on duplicate call (conditional check failed)") + } + + // Verify original triggeredAt is preserved (not overwritten). + key := itemKey("control", types.PipelinePK("gold-revenue"), types.DryRunSK("stream", "2026-03-11")) + mock.mu.Lock() + item := mock.items[key] + mock.mu.Unlock() + + dataMap := item["data"].(*ddbtypes.AttributeValueMemberM).Value + triggeredAtStr := dataMap["triggeredAt"].(*ddbtypes.AttributeValueMemberS).Value + if triggeredAtStr != "2026-03-11T01:15:00Z" { + t.Errorf("triggeredAt = %q, want original %q (must not be overwritten)", triggeredAtStr, "2026-03-11T01:15:00Z") + } +} diff --git a/internal/validation/config.go b/internal/validation/config.go index be5a010..f87b3bb 100644 --- a/internal/validation/config.go +++ b/internal/validation/config.go @@ -64,5 +64,15 @@ func ValidatePipelineConfig(cfg *types.PipelineConfig) []string { } } + // Dry-run validation. + if cfg.DryRun { + if cfg.Job.Type == "" { + errs = append(errs, "dryRun requires job.type to be configured") + } + if cfg.Schedule.Trigger == nil { + errs = append(errs, "dryRun requires schedule.trigger (sensor-driven evaluation)") + } + } + return errs } diff --git a/internal/validation/config_test.go b/internal/validation/config_test.go index 3cf06e8..50e6170 100644 --- a/internal/validation/config_test.go +++ b/internal/validation/config_test.go @@ -261,6 +261,36 @@ func TestValidatePipelineConfig(t *testing.T) { }, wantCount: 0, }, + // --- dry-run validation --- + { + name: "dryRun without job.type or trigger", + cfg: types.PipelineConfig{ + DryRun: true, + Job: types.JobConfig{Type: ""}, + }, + wantCount: 2, + wantSubstr: "dryRun requires job.type", + }, + { + name: "dryRun with job.type but no trigger", + cfg: types.PipelineConfig{ + DryRun: true, + Job: types.JobConfig{Type: "glue"}, + }, + wantCount: 1, + wantSubstr: "dryRun requires schedule.trigger", + }, + { + name: "dryRun fully configured", + cfg: types.PipelineConfig{ + DryRun: true, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{Key: "upstream", Check: types.CheckExists}, + }, + Job: types.JobConfig{Type: "glue"}, + }, + wantCount: 0, + }, } for _, tt := range tests { diff --git a/pkg/types/dynamo.go b/pkg/types/dynamo.go index 8468b14..909fa4a 100644 --- a/pkg/types/dynamo.go +++ b/pkg/types/dynamo.go @@ -36,6 +36,11 @@ func RerunRequestSK(schedule, date string) string { return "RERUN_REQUEST#" + schedule + "#" + date } +// DryRunSK returns the sort key for a dry-run marker. +func DryRunSK(schedule, date string) string { + return "DRY_RUN#" + schedule + "#" + date +} + // --------------------------------------------------------------------------- // Status / event constants // --------------------------------------------------------------------------- diff --git a/pkg/types/events.go b/pkg/types/events.go index 03bd7ad..653025a 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -37,6 +37,10 @@ const ( EventIrregularScheduleMissed EventDetailType = "IRREGULAR_SCHEDULE_MISSED" EventRelativeSLAWarning EventDetailType = "RELATIVE_SLA_WARNING" EventRelativeSLABreach EventDetailType = "RELATIVE_SLA_BREACH" + EventDryRunWouldTrigger EventDetailType = "DRY_RUN_WOULD_TRIGGER" + EventDryRunLateData EventDetailType = "DRY_RUN_LATE_DATA" + EventDryRunSLAProjection EventDetailType = "DRY_RUN_SLA_PROJECTION" + EventDryRunDrift EventDetailType = "DRY_RUN_DRIFT" ) // EventSource is the EventBridge source for all interlock events. diff --git a/pkg/types/pipeline.go b/pkg/types/pipeline.go index 0d8ac01..97f01f4 100644 --- a/pkg/types/pipeline.go +++ b/pkg/types/pipeline.go @@ -9,6 +9,7 @@ type PipelineConfig struct { Validation ValidationConfig `yaml:"validation" json:"validation"` Job JobConfig `yaml:"job" json:"job"` PostRun *PostRunConfig `yaml:"postRun,omitempty" json:"postRun,omitempty"` + DryRun bool `yaml:"dryRun,omitempty" json:"dryRun,omitempty"` } // PipelineIdentity holds pipeline metadata. From 3fd696b1ad739597d69ae45c8f23b90bde027579 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 07:15:14 +0700 Subject: [PATCH 2/5] fix: lint issues and version changelog for dry-run feature Remove unused sensorData parameter from handleDryRunTrigger (unparam), invert nested if-blocks to use continue (gocritic nestingReduce), and assign v0.9.0 to the dry-run changelog entry. --- CHANGELOG.md | 3 ++- internal/lambda/dryrun.go | 2 +- internal/lambda/stream_router.go | 2 +- internal/lambda/stream_router_test.go | 30 ++++++++++++++------------- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df8ab6d..63f3e99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.9.0] - 2026-03-12 ### Added @@ -387,6 +387,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline Released under the [Elastic License 2.0](LICENSE). +[0.9.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.9.0 [0.8.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.8.0 [0.7.4]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.4 [0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3 diff --git a/internal/lambda/dryrun.go b/internal/lambda/dryrun.go index c101b29..74240da 100644 --- a/internal/lambda/dryrun.go +++ b/internal/lambda/dryrun.go @@ -14,7 +14,7 @@ import ( // It evaluates trigger conditions and validation rules, records observations // as EventBridge events, but never starts a Step Function execution. // Calendar exclusions are already checked before this function is called. -func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, sensorData map[string]interface{}, now time.Time) error { +func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, now time.Time) error { // Check for existing dry-run marker — if present, this is late data. marker, err := d.Store.GetDryRunMarker(ctx, pipelineID, scheduleID, date) if err != nil { diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index 1ad6ddb..66bedb5 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -217,7 +217,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Dry-run mode: observe and record what would happen, but never start SFN. if cfg.DryRun { - return handleDryRunTrigger(ctx, d, cfg, pipelineID, scheduleID, date, sensorData, now) + return handleDryRunTrigger(ctx, d, cfg, pipelineID, scheduleID, date, now) } // Acquire trigger lock to prevent duplicate executions. diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index fab02df..28bb2c5 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -3949,14 +3949,15 @@ func TestHandleSensorEvent_DryRun_SLAProjection_Met(t *testing.T) { defer ebMock.mu.Unlock() for _, input := range ebMock.events { for _, e := range input.Entries { - if e.DetailType != nil && *e.DetailType == string(types.EventDryRunSLAProjection) { - var detail map[string]interface{} - require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) - innerDetail, ok := detail["detail"].(map[string]interface{}) - require.True(t, ok, "expected detail.detail map in SLA projection event") - assert.Equal(t, "met", innerDetail["status"], "SLA projection status should be 'met'") - return + if e.DetailType == nil || *e.DetailType != string(types.EventDryRunSLAProjection) { + continue } + var detail map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) + innerDetail, ok := detail["detail"].(map[string]interface{}) + require.True(t, ok, "expected detail.detail map in SLA projection event") + assert.Equal(t, "met", innerDetail["status"], "SLA projection status should be 'met'") + return } } t.Fatal("DRY_RUN_SLA_PROJECTION event not found in published events") @@ -4002,14 +4003,15 @@ func TestHandleSensorEvent_DryRun_SLAProjection_Breach(t *testing.T) { defer ebMock.mu.Unlock() for _, input := range ebMock.events { for _, e := range input.Entries { - if e.DetailType != nil && *e.DetailType == string(types.EventDryRunSLAProjection) { - var detail map[string]interface{} - require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) - innerDetail, ok := detail["detail"].(map[string]interface{}) - require.True(t, ok, "expected detail.detail map") - assert.Equal(t, "breach", innerDetail["status"], "SLA projection status should be 'breach'") - return + if e.DetailType == nil || *e.DetailType != string(types.EventDryRunSLAProjection) { + continue } + var detail map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(*e.Detail), &detail)) + innerDetail, ok := detail["detail"].(map[string]interface{}) + require.True(t, ok, "expected detail.detail map") + assert.Equal(t, "breach", innerDetail["status"], "SLA projection status should be 'breach'") + return } } t.Fatal("DRY_RUN_SLA_PROJECTION event not found") From a5577a552c6903ec5e129b95eaa01b2d32a360a8 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 19:31:16 +0700 Subject: [PATCH 3/5] docs: add positioning section to README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses common misidentification of Interlock as an orchestrator. New "What Interlock Is (and Isn't)" section clarifies that Interlock is a safety controller that gates the trigger path — not a scheduler or orchestrator replacement. --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 026fbe0..734f9dc 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,14 @@ STAMP-based safety framework for data pipeline reliability. Interlock prevents p The framework applies [Leveson's Systems-Theoretic Accident Model](https://mitpress.mit.edu/9780262016629/engineering-a-safer-world/) to data engineering: pipelines have **declarative validation rules** (feedback), **sensor data in DynamoDB** (process models), and **conditional execution** (safe control actions). +## What Interlock Is (and Isn't) + +Interlock is **not** an orchestrator and **not** a scheduler. It's a safety controller — the layer that decides whether a pipeline *should* run, not just whether it's *scheduled* to run. It works with whatever you already have: schedulers like cron, Airflow, Databricks Workflows, or EventBridge, and orchestrators like Dagster, Prefect, or Step Functions. + +A scheduler fires because the clock says so. An orchestrator sequences tasks once they're kicked off. Neither asks whether the data your pipeline needs is actually present, fresh, and correct before executing. **Interlock does.** You route the trigger path through Interlock: sensor data lands in DynamoDB, Interlock evaluates readiness against declarative YAML rules, and only triggers the job when preconditions are met. Your scheduler can still provide the clock signal — an EventBridge cron writing a sensor tick, for example — but Interlock decides whether that tick becomes a job run. + +**After a run completes**, Interlock keeps watching. It detects post-completion drift (source data changed after your job succeeded), late data arrival, SLA breaches, and silently missed schedules — failure modes that schedulers and orchestrators don't address because they stop paying attention once a job finishes. + ## How It Works External processes push sensor data into a DynamoDB **control table**. When a trigger condition is met (cron schedule or sensor arrival), a Step Functions workflow evaluates all validation rules against the current sensor state. If all rules pass, the pipeline job is triggered. EventBridge events provide observability at every stage. From a9c6ab440925af681724c715067d67fda90d80a7 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 20:11:10 +0700 Subject: [PATCH 4/5] fix: publish SLA verdict on cancel for rerun visibility Cancel previously only published SLA_MET and swallowed BREACH, leaving reruns with no SLA outcome notification. Now always publishes a binary MET or BREACH verdict and propagates publish errors so the step function can retry on transient failures. Also adds RERUN_ACCEPTED and JOB_COMPLETED to the EventBridge alert filter so rerun lifecycle events reach Slack. --- deploy/terraform/eventbridge.tf | 2 ++ internal/lambda/e2e_test.go | 10 +++++----- internal/lambda/sla_monitor.go | 23 ++++++++++++----------- internal/lambda/sla_monitor_test.go | 18 +++++++++++------- 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/deploy/terraform/eventbridge.tf b/deploy/terraform/eventbridge.tf index 7b8a61d..3f51690 100644 --- a/deploy/terraform/eventbridge.tf +++ b/deploy/terraform/eventbridge.tf @@ -84,7 +84,9 @@ resource "aws_cloudwatch_event_rule" "alert_events" { "POST_RUN_DRIFT_INFLIGHT", "POST_RUN_FAILED", "POST_RUN_SENSOR_MISSING", + "RERUN_ACCEPTED", "RERUN_REJECTED", + "JOB_COMPLETED", "LATE_DATA_ARRIVAL", "TRIGGER_RECOVERED", "BASELINE_CAPTURE_FAILED", diff --git a/internal/lambda/e2e_test.go b/internal/lambda/e2e_test.go index a7a8b50..0149f7a 100644 --- a/internal/lambda/e2e_test.go +++ b/internal/lambda/e2e_test.go @@ -1710,9 +1710,9 @@ func TestE2E_Watchdog(t *testing.T) { func TestE2E_SLABranchCompleteness(t *testing.T) { ctx := context.Background() - t.Run("SLAWarningOutcome", func(t *testing.T) { + t.Run("SLACancelPastWarningBeforeBreach", func(t *testing.T) { // Cancel when warning has passed but breach is still in the future. - // Expected: cancel returns SLA_WARNING (not SLA_MET or SLA_BREACH). + // Expected: cancel returns SLA_MET (completed before breach = MET). mock := newMockDDB() tr := &mockTriggerRunner{} sc := &mockStatusPoller{seq: []lambda.StatusResult{{State: "succeeded"}}} @@ -1732,9 +1732,9 @@ func TestE2E_SLABranchCompleteness(t *testing.T) { }) assert.Equal(t, sfnDone, r.terminal) - assert.Equal(t, "SLA_WARNING", r.slaOutcome) - // SLA_MET event should NOT be published (warning already fired). - assert.NotContains(t, r.events, "SLA_MET") + assert.Equal(t, "SLA_MET", r.slaOutcome) + // SLA_MET event should be published (binary verdict: completed before breach). + assert.Contains(t, r.events, "SLA_MET") assertAlertFormats(t, eb) }) diff --git a/internal/lambda/sla_monitor.go b/internal/lambda/sla_monitor.go index ac5bd8a..e4164c8 100644 --- a/internal/lambda/sla_monitor.go +++ b/internal/lambda/sla_monitor.go @@ -330,29 +330,30 @@ func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMo } } - // Determine final SLA status from the timestamps passed in + // Determine final SLA status: binary MET or BREACH. + // WARNING is not a valid completion outcome — if the job finished, it either + // beat the breach deadline (MET) or missed it (BREACH). now := d.now().UTC() alertType := string(types.EventSLAMet) if input.BreachAt != "" { breachAt, _ := time.Parse(time.RFC3339, input.BreachAt) - warningAt, _ := time.Parse(time.RFC3339, input.WarningAt) if !breachAt.IsZero() && (now.After(breachAt) || now.Equal(breachAt)) { - alertType = "SLA_BREACH" - } else if !warningAt.IsZero() && (now.After(warningAt) || now.Equal(warningAt)) { - alertType = "SLA_WARNING" + alertType = string(types.EventSLABreach) } } - // Publish SLA_MET — the only outcome not already fired by the Scheduler - if alertType == string(types.EventSLAMet) { - _ = publishEvent(ctx, d, string(types.EventSLAMet), input.PipelineID, input.ScheduleID, input.Date, - fmt.Sprintf("pipeline %s: %s", input.PipelineID, types.EventSLAMet)) - } - + // Always publish the verdict. For first runs, Scheduler entries would have + // fired WARNING/BREACH but are now deleted — MET is the only new signal. + // For reruns, the Scheduler entries were already deleted by the first run's + // cancel, so this publish is the only path to a notification. d.Logger.InfoContext(ctx, "cancelled SLA schedules", "pipeline", input.PipelineID, "alertType", alertType, ) + if err := publishEvent(ctx, d, alertType, input.PipelineID, input.ScheduleID, input.Date, + fmt.Sprintf("pipeline %s: %s", input.PipelineID, alertType)); err != nil { + return SLAMonitorOutput{}, fmt.Errorf("publish SLA cancel verdict: %w", err) + } return SLAMonitorOutput{ AlertType: alertType, diff --git a/internal/lambda/sla_monitor_test.go b/internal/lambda/sla_monitor_test.go index 4c0bf17..c8228c1 100644 --- a/internal/lambda/sla_monitor_test.go +++ b/internal/lambda/sla_monitor_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go-v2/aws" ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" schedulerTypes "github.com/aws/aws-sdk-go-v2/service/scheduler/types" "github.com/stretchr/testify/assert" @@ -449,9 +450,11 @@ func TestSLAMonitor_Cancel_PastBreach(t *testing.T) { if out.AlertType != "SLA_BREACH" { t.Errorf("alertType = %q, want %q", out.AlertType, "SLA_BREACH") } - // Should NOT publish SLA_MET event (breach already fired by Scheduler) - if len(eb.events) != 0 { - t.Errorf("expected 0 EventBridge calls for past breach, got %d", len(eb.events)) + // Should publish SLA_BREACH so reruns get a notification + if len(eb.events) != 1 { + t.Errorf("expected 1 EventBridge call for past breach, got %d", len(eb.events)) + } else if dt := aws.ToString(eb.events[0].Entries[0].DetailType); dt != "SLA_BREACH" { + t.Errorf("expected SLA_BREACH event, got %q", dt) } } @@ -1427,7 +1430,7 @@ func TestSLAMonitor_Cancel_PastWarningFutureBreach(t *testing.T) { Logger: slog.Default(), } - // Warning in the past, breach in the future — should be SLA_WARNING. + // Warning in the past, breach in the future — completed before breach = SLA_MET. out, err := lambda.HandleSLAMonitor(context.Background(), d, lambda.SLAMonitorInput{ Mode: "cancel", PipelineID: "gold-orders", @@ -1437,9 +1440,10 @@ func TestSLAMonitor_Cancel_PastWarningFutureBreach(t *testing.T) { BreachAt: "2099-12-31T23:59:00Z", }) require.NoError(t, err) - assert.Equal(t, "SLA_WARNING", out.AlertType) - // SLA_WARNING/SLA_BREACH are already fired by Scheduler; only SLA_MET publishes. - assert.Empty(t, eb.events, "should not publish event for SLA_WARNING cancel") + assert.Equal(t, "SLA_MET", out.AlertType) + // Completed before breach = MET, always published for rerun visibility. + require.Len(t, eb.events, 1, "should publish SLA_MET event on cancel") + assert.Equal(t, "SLA_MET", aws.ToString(eb.events[0].Entries[0].DetailType)) } func TestSLAMonitor_Cancel_EmptyTimesNoDeadline(t *testing.T) { From 6130a16de994539382e91c7f4b14f1624e4e7677 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Thu, 12 Mar 2026 22:37:03 +0700 Subject: [PATCH 5/5] docs: add dry-run guide and guides section index Add dedicated guide page walking through the end-to-end dry-run workflow: setup, evaluation flow, monitoring events, SLA projection, drift detection, going live, and troubleshooting. Add missing _index.md for the guides section with card navigation. --- docs/content/docs/guides/_index.md | 12 +++ docs/content/docs/guides/dry-run.md | 150 ++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 docs/content/docs/guides/_index.md create mode 100644 docs/content/docs/guides/dry-run.md diff --git a/docs/content/docs/guides/_index.md b/docs/content/docs/guides/_index.md new file mode 100644 index 0000000..b11654a --- /dev/null +++ b/docs/content/docs/guides/_index.md @@ -0,0 +1,12 @@ +--- +title: Guides +weight: 5 +description: Operational guides for deploying and managing Interlock pipelines. +--- + +Step-by-step guides for common Interlock workflows. + +{{< cards >}} + {{< card link="retry-loop-asl" title="Step Functions State Machine" subtitle="ASL patterns for evaluation, triggering, job polling, and SLA scheduling." >}} + {{< card link="dry-run" title="Dry-Run / Shadow Mode" subtitle="Evaluate pipelines against real data without triggering jobs." >}} +{{< /cards >}} diff --git a/docs/content/docs/guides/dry-run.md b/docs/content/docs/guides/dry-run.md new file mode 100644 index 0000000..354b446 --- /dev/null +++ b/docs/content/docs/guides/dry-run.md @@ -0,0 +1,150 @@ +--- +title: "Dry-Run / Shadow Mode" +weight: 20 +--- + +# Dry-Run / Shadow Mode + +Dry-run mode lets you evaluate a pipeline against real sensor data without triggering any jobs or starting Step Function executions. Interlock records what it *would* do as EventBridge events, giving you a complete picture of trigger timing, SLA projections, late data arrivals, and drift detection — all without side effects. + +## When to Use Dry-Run + +- **Onboarding a new pipeline.** Validate that sensor data, validation rules, and scheduling work correctly before committing compute resources. +- **Testing config changes.** Modify thresholds, rules, or SLA deadlines on a dry-run copy and compare its decisions against the live pipeline. +- **Comparing against an existing orchestrator.** Run Interlock in shadow mode alongside Airflow, Dagster, or Prefect to verify it makes the same triggering decisions. + +## Setup + +Add `dryRun: true` to your pipeline config. The pipeline must have `schedule.trigger` (sensor-driven evaluation) and `job.type` configured. + +```yaml +id: bronze-hourly-dryrun +dryRun: true + +schedule: + trigger: "sensor" + timezone: "UTC" + +validation: + trigger: all_pass + rules: + - field: "files_processed" + operator: ">=" + value: 4 + +job: + type: glue + name: bronze-etl + +sla: + deadline: "06:00" + expectedDuration: "45m" + +postRun: + rules: + - field: "count" + operator: ">=" + value: 1000 + driftField: "count" +``` + +Deploy the config the same way as any live pipeline — via `jsonencode(yamldecode(...))` in Terraform or direct DynamoDB writes. + +## What Happens + +When sensor data arrives, the stream-router Lambda evaluates the dry-run pipeline through the same validation path as a live pipeline. Instead of starting a Step Function execution, it writes a `DRY_RUN#` marker to the control table and publishes EventBridge events. + +``` +Sensor arrives → Stream-router + → Check calendar exclusions (still enforced) + → Check for existing DRY_RUN# marker + → Marker exists: publish DRY_RUN_LATE_DATA event + → No marker: evaluate validation rules + → Rules fail: no action + → Rules pass: + → Write DRY_RUN# marker (7-day TTL) + → Capture post-run baseline (if PostRun configured) + → Publish DRY_RUN_WOULD_TRIGGER event + → Compute and publish DRY_RUN_SLA_PROJECTION (if SLA configured) +``` + +`DRY_RUN#` markers have a 7-day TTL and are automatically cleaned up by DynamoDB. + +## Monitoring Events + +Dry-run pipelines emit four event types to EventBridge: + +| Event | Meaning | +|---|---| +| `DRY_RUN_WOULD_TRIGGER` | All validation rules passed — Interlock would have triggered the job | +| `DRY_RUN_LATE_DATA` | Sensor data arrived after the trigger point was already recorded | +| `DRY_RUN_SLA_PROJECTION` | Estimated completion time vs. deadline — `met` or `breach` status | +| `DRY_RUN_DRIFT` | Post-run sensor data changed from the baseline captured at trigger time | + +Create an EventBridge rule to capture these events: + +```json +{ + "source": ["interlock"], + "detail-type": [ + { "prefix": "DRY_RUN_" } + ] +} +``` + +Route events to CloudWatch Logs, SNS, or an SQS queue for analysis. See the [Alerting reference](../../reference/alerting/) for full event payload details. + +## SLA Projection + +When `sla.expectedDuration` and `sla.deadline` are configured, Interlock computes a projected completion time at the moment validation rules pass. The projection reuses the same SLA calculation logic as production monitoring (including hourly pipeline T+1 adjustment). + +The `DRY_RUN_SLA_PROJECTION` event includes: + +| Field | Description | +|---|---| +| `status` | `"met"` or `"breach"` | +| `estimatedCompletion` | Trigger time + `expectedDuration` | +| `deadline` | Resolved breach deadline | +| `marginSeconds` | Seconds between estimated completion and deadline (negative = breach) | + +This lets you tune `expectedDuration` before going live, avoiding false SLA alerts on day one. + +## Drift Detection + +If `postRun` is configured, Interlock captures a baseline snapshot of the drift field at the moment validation passes. When the post-run sensor updates later, it compares the new value against the baseline. + +If the delta exceeds `driftThreshold`, a `DRY_RUN_DRIFT` event is published with the previous count, current count, delta, and the sensor key that changed. This tells you whether your production pipeline would have triggered a drift re-run. + +## Going Live + +To promote a dry-run pipeline to live: + +1. Remove `dryRun: true` from the pipeline config (or set it to `false`). +2. Deploy the updated config. + +No cleanup is needed. Existing `DRY_RUN#` markers expire naturally via their 7-day TTL and do not interfere with `TRIGGER#` rows that live pipelines use. + +## Side-by-Side Deployment + +You can run a dry-run pipeline alongside a live pipeline watching the same sensors. Use different pipeline IDs: + +```yaml +# Live pipeline +id: bronze-hourly + +# Dry-run shadow +id: bronze-hourly-dryrun +dryRun: true +``` + +Both pipelines receive the same sensor events. The live pipeline triggers jobs; the dry-run pipeline records observations. This is useful for validating config changes before applying them to the live pipeline. + +## Troubleshooting + +**No events appearing.** Verify that `schedule.trigger` is set to `"sensor"`. Dry-run requires sensor-driven evaluation — cron-only pipelines won't produce events. + +**Stale markers.** `DRY_RUN#` markers have a 7-day TTL. If you need to re-evaluate the same date, wait for the marker to expire or delete it manually from the control table. + +**Calendar exclusions blocking evaluation.** Calendar exclusions apply to dry-run pipelines the same way as live pipelines. Check your calendar config if events stop appearing on expected dates. + +**SLA projection shows `met` but you expected `breach`.** The projection uses the same deadline resolution as production, including timezone handling and T+1 adjustment for hourly pipelines. Verify that `sla.deadline` and `sla.timezone` match your expectations.