Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.8.0] - 2026-03-10

### Added

- **Inclusion calendar scheduling** (`schedule.include.dates`) — explicit YYYY-MM-DD date lists for pipelines that run on known irregular dates (monthly close, quarterly filing, specific business dates). Mutually exclusive with cron. Watchdog detects missed inclusion dates and publishes `IRREGULAR_SCHEDULE_MISSED` events.
- **Relative SLA** (`sla.maxDuration`) — duration-based SLA for ad-hoc pipelines with no predictable schedule. Clock starts at first sensor arrival and covers the entire lifecycle: evaluation → trigger → job → post-run → completion. Warning at 75% of maxDuration (or `breachAt - expectedDuration` when set). New events: `RELATIVE_SLA_WARNING`, `RELATIVE_SLA_BREACH`.
- **First-sensor-arrival tracking** — stream-router records `first-sensor-arrival#<date>` on lock acquisition (idempotent conditional write). Used as T=0 for relative SLA calculation.
- **Watchdog defense-in-depth for relative SLA** — `detectRelativeSLABreaches` scans pipelines with `maxDuration` config and fires `RELATIVE_SLA_BREACH` if the EventBridge scheduler failed to fire.
- **`WriteSensorIfAbsent` store method** — conditional PutItem that only writes if the key doesn't exist, used for first-sensor-arrival idempotency.
- **Config validation** for new fields: cron/include mutual exclusion, inclusion date format (YYYY-MM-DD), maxDuration format and 24h cap, maxDuration requires trigger.
- **Glue false-success detection** — `verifyGlueRCA` now checks both the RCA insight stream (Check 1) and the driver output stream for ERROR/FATAL log4j severity markers (Check 2). Catches Spark failures that Glue reports as SUCCEEDED when the application framework swallows exceptions.

### Changed

- `SLAConfig.Deadline` and `SLAConfig.ExpectedDuration` are now `omitempty` — relative SLA configs may omit the wall-clock deadline entirely.
- SFN ASL passes `maxDuration` and `sensorArrivalAt` to `CancelSLASchedules` and `CancelSLAOnCompleteTriggerFailure` states.
- sla-monitor `handleSLACalculate` routes to relative path when `MaxDuration` + `SensorArrivalAt` are present.

## [0.7.4] - 2026-03-10

### Fixed
Expand Down Expand Up @@ -360,6 +378,7 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline

Released under the [Elastic License 2.0](LICENSE).

[0.8.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.8.0
[0.7.4]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.4
[0.7.3]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.3
[0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2
Expand Down
8 changes: 6 additions & 2 deletions deploy/statemachine.asl.json
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@
"scheduleId.$": "$.scheduleId",
"date.$": "$.date",
"deadline.$": "$.config.sla.deadline",
"expectedDuration.$": "$.config.sla.expectedDuration"
"expectedDuration.$": "$.config.sla.expectedDuration",
"maxDuration.$": "$.config.sla.maxDuration",
"sensorArrivalAt.$": "$.sensorArrivalAt"
},
"ResultPath": "$.slaResult",
"Next": "CompleteTriggerFailed",
Expand Down Expand Up @@ -447,7 +449,9 @@
"scheduleId.$": "$.scheduleId",
"date.$": "$.date",
"deadline.$": "$.config.sla.deadline",
"expectedDuration.$": "$.config.sla.expectedDuration"
"expectedDuration.$": "$.config.sla.expectedDuration",
"maxDuration.$": "$.config.sla.maxDuration",
"sensorArrivalAt.$": "$.sensorArrivalAt"
},
"ResultPath": "$.slaResult",
"Next": "Done",
Expand Down
50 changes: 50 additions & 0 deletions internal/lambda/dynstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -188,6 +189,55 @@ func resolveTimezone(tz string) *time.Location {
return time.UTC
}

// MostRecentInclusionDate returns the most recent date from dates that is on
// or before now (comparing date only, ignoring time of day). Dates must be
// YYYY-MM-DD strings; unparseable entries are silently skipped. Returns
// ("", false) if no dates qualify.
func MostRecentInclusionDate(dates []string, now time.Time) (string, bool) {
nowDate := now.Format("2006-01-02")
best := ""
found := false
for _, d := range dates {
if _, err := time.Parse("2006-01-02", d); err != nil {
continue
}
if d <= nowDate && d > best {
best = d
found = true
}
}
return best, found
}

// maxInclusionLookback is the maximum number of past inclusion dates to check.
// Caps DynamoDB reads when the watchdog has been down for an extended period.
const maxInclusionLookback = 3

// PastInclusionDates returns dates from the list that are on or before now,
// sorted most recent first and capped at maxInclusionLookback (3) entries.
// The cap bounds DynamoDB reads when the watchdog has been down for an
// extended period. Dates must be YYYY-MM-DD strings; unparseable entries
// are silently skipped. Returns nil if no dates qualify.
func PastInclusionDates(dates []string, now time.Time) []string {
nowDate := now.Format("2006-01-02")
var past []string
for _, d := range dates {
if _, err := time.Parse("2006-01-02", d); err != nil {
continue
}
if d <= nowDate {
past = append(past, d)
}
}
// Sort descending (most recent first) using string comparison on YYYY-MM-DD.
sort.Sort(sort.Reverse(sort.StringSlice(past)))
// Cap to maxInclusionLookback to bound downstream DynamoDB reads.
if len(past) > maxInclusionLookback {
past = past[:maxInclusionLookback]
}
return past
}

// isExcludedTime is the core calendar exclusion check. It evaluates
// whether the given time falls on a weekend or a specifically excluded date.
func isExcludedTime(excl *types.ExclusionConfig, t time.Time) bool {
Expand Down
141 changes: 141 additions & 0 deletions internal/lambda/inclusion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package lambda_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/dwsmith1983/interlock/internal/lambda"
)

func TestMostRecentInclusionDate(t *testing.T) {
// Fixed reference time: 2026-03-10T14:00:00Z
now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC)

tests := []struct {
name string
dates []string
now time.Time
wantDate string
wantOK bool
}{
{
name: "empty dates list",
dates: nil,
now: now,
wantOK: false,
},
{
name: "all dates in the future",
dates: []string{"2026-03-11", "2026-04-01", "2026-06-30"},
now: now,
wantOK: false,
},
{
name: "single past date",
dates: []string{"2026-03-01"},
now: now,
wantDate: "2026-03-01",
wantOK: true,
},
{
name: "multiple past dates picks most recent",
dates: []string{"2026-01-15", "2026-02-28", "2026-03-05"},
now: now,
wantDate: "2026-03-05",
wantOK: true,
},
{
name: "today's date is included (on or before now)",
dates: []string{"2026-03-05", "2026-03-10", "2026-03-15"},
now: now,
wantDate: "2026-03-10",
wantOK: true,
},
{
name: "mix of past and future picks most recent past",
dates: []string{"2026-01-01", "2026-03-08", "2026-03-12", "2026-04-01"},
now: now,
wantDate: "2026-03-08",
wantOK: true,
},
{
name: "invalid date format is skipped",
dates: []string{"not-a-date", "2026-13-01"},
now: now,
wantOK: false,
},
{
name: "invalid dates mixed with valid date",
dates: []string{"bad", "2026-03-01", "also-bad"},
now: now,
wantDate: "2026-03-01",
wantOK: true,
},
{
name: "now at midnight still includes today",
dates: []string{"2026-03-10"},
now: time.Date(2026, 3, 10, 0, 0, 0, 0, time.UTC),
wantDate: "2026-03-10",
wantOK: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotDate, gotOK := lambda.MostRecentInclusionDate(tt.dates, tt.now)
assert.Equal(t, tt.wantOK, gotOK)
if tt.wantOK {
assert.Equal(t, tt.wantDate, gotDate)
}
})
}
}

func TestPastInclusionDates_AllFuture(t *testing.T) {
now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC)
dates := []string{"2026-03-11", "2026-04-01", "2026-06-30"}

got := lambda.PastInclusionDates(dates, now)
assert.Empty(t, got, "all future dates should return empty slice")
}

func TestPastInclusionDates_MultiplePast(t *testing.T) {
now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC)
dates := []string{"2026-01-15", "2026-02-28", "2026-03-05"}

got := lambda.PastInclusionDates(dates, now)
assert.Equal(t, []string{"2026-03-05", "2026-02-28", "2026-01-15"}, got,
"should return all past dates, most recent first")
}

func TestPastInclusionDates_MixedPastFuture(t *testing.T) {
now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC)
dates := []string{"2026-01-01", "2026-03-08", "2026-03-12", "2026-04-01"}

got := lambda.PastInclusionDates(dates, now)
assert.Equal(t, []string{"2026-03-08", "2026-01-01"}, got,
"should only return past dates, most recent first")
}

func TestPastInclusionDates_TodayIncluded(t *testing.T) {
now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC)
dates := []string{"2026-03-05", "2026-03-10", "2026-03-15"}

got := lambda.PastInclusionDates(dates, now)
assert.Equal(t, []string{"2026-03-10", "2026-03-05"}, got,
"today's date should be included in results")
}

func TestPastInclusionDates_CappedAt3(t *testing.T) {
now := time.Date(2026, 3, 10, 14, 0, 0, 0, time.UTC)
dates := []string{
"2026-01-01", "2026-01-15", "2026-02-01", "2026-02-15", "2026-03-01",
}

got := lambda.PastInclusionDates(dates, now)
assert.Len(t, got, 3, "result should be capped at 3 entries")
assert.Equal(t, []string{"2026-03-01", "2026-02-15", "2026-02-01"}, got,
"should return the 3 most recent past dates, most recent first")
}
24 changes: 20 additions & 4 deletions internal/lambda/sfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
// sfnInput is the top-level input for the Step Function state machine.
// It includes pipeline identity fields and a config block used by Wait states.
type sfnInput struct {
PipelineID string `json:"pipelineId"`
ScheduleID string `json:"scheduleId"`
Date string `json:"date"`
Config sfnConfig `json:"config"`
PipelineID string `json:"pipelineId"`
ScheduleID string `json:"scheduleId"`
Date string `json:"date"`
SensorArrivalAt string `json:"sensorArrivalAt,omitempty"` // RFC3339; first sensor arrival for relative SLA
Config sfnConfig `json:"config"`
}

// sfnConfig holds timing parameters for the SFN evaluation loop and SLA branch.
Expand Down Expand Up @@ -100,6 +101,21 @@ func startSFNWithName(ctx context.Context, d *Deps, cfg *types.PipelineConfig, p
Date: date,
Config: sc,
}

// Populate sensorArrivalAt for relative SLA passthrough.
if sc.SLA != nil && sc.SLA.MaxDuration != "" && d.Store != nil {
arrivalKey := "first-sensor-arrival#" + date
arrivalData, readErr := d.Store.GetSensorData(ctx, pipelineID, arrivalKey)
if readErr != nil {
d.Logger.WarnContext(ctx, "failed to read first-sensor-arrival for SFN input",
"pipelineId", pipelineID, "error", readErr)
} else if arrivalData != nil {
if at, ok := arrivalData["arrivedAt"].(string); ok {
input.SensorArrivalAt = at
}
}
}

payload, err := json.Marshal(input)
if err != nil {
return fmt.Errorf("marshal SFN input: %w", err)
Expand Down
74 changes: 63 additions & 11 deletions internal/lambda/sla_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ func HandleSLAMonitor(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAM
}
}

// handleSLACalculate computes warning and breach times from the deadline
// and expected duration. Warning time = deadline - expectedDuration.
// Breach time = deadline. Returns full ISO 8601 timestamps required by
// Step Functions TimestampPath.
// handleSLACalculate computes warning and breach times. Supports two modes:
//
// 1. Schedule-based (deadline): breachAt = deadline, warningAt = deadline - expectedDuration.
// 2. Relative (maxDuration + sensorArrivalAt): breachAt = sensorArrivalAt + maxDuration,
// warningAt = breachAt - expectedDuration (or breachAt - 25% of maxDuration if no expectedDuration).
//
// Returns full ISO 8601 timestamps required by Step Functions TimestampPath.
func handleSLACalculate(input SLAMonitorInput, now time.Time) (SLAMonitorOutput, error) {
// Relative SLA path: maxDuration + sensorArrivalAt.
if input.MaxDuration != "" && input.SensorArrivalAt != "" {
return handleRelativeSLACalculate(input)
}

dur, err := time.ParseDuration(input.ExpectedDuration)
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("parse expectedDuration %q: %w", input.ExpectedDuration, err)
Expand Down Expand Up @@ -122,6 +130,41 @@ func handleSLACalculate(input SLAMonitorInput, now time.Time) (SLAMonitorOutput,
}, nil
}

// handleRelativeSLACalculate computes warning and breach times from
// sensorArrivalAt + maxDuration. Warning offset uses expectedDuration
// if provided, otherwise defaults to 25% of maxDuration (i.e. warning
// fires at 75% of the total allowed time).
func handleRelativeSLACalculate(input SLAMonitorInput) (SLAMonitorOutput, error) {
maxDur, err := time.ParseDuration(input.MaxDuration)
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("parse maxDuration %q: %w", input.MaxDuration, err)
}

arrivalAt, err := time.Parse(time.RFC3339, input.SensorArrivalAt)
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("parse sensorArrivalAt %q: %w", input.SensorArrivalAt, err)
}

breachAt := arrivalAt.Add(maxDur)

// Warning offset: use expectedDuration if provided, otherwise 25% of maxDuration.
var warningOffset time.Duration
if input.ExpectedDuration != "" {
warningOffset, err = time.ParseDuration(input.ExpectedDuration)
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("parse expectedDuration %q: %w", input.ExpectedDuration, err)
}
} else {
warningOffset = maxDur / 4
}
warningAt := breachAt.Add(-warningOffset)

return SLAMonitorOutput{
WarningAt: warningAt.UTC().Format(time.RFC3339),
BreachAt: breachAt.UTC().Format(time.RFC3339),
}, nil
}

// handleSLAFireAlert publishes an SLA alert event to EventBridge and
// returns the alert metadata.
func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) {
Expand Down Expand Up @@ -251,14 +294,23 @@ func handleSLASchedule(ctx context.Context, d *Deps, input SLAMonitorInput) (SLA
// Warning/breach events were already published at the correct time by the
// Scheduler-invoked fire-alert calls.
func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) {
// If warningAt/breachAt not provided, recalculate from deadline/expectedDuration.
if input.WarningAt == "" && input.BreachAt == "" && input.Deadline != "" {
calc, err := handleSLACalculate(input, d.now())
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate: %w", err)
// If warningAt/breachAt not provided, recalculate from the available config.
if input.WarningAt == "" && input.BreachAt == "" {
if input.MaxDuration != "" && input.SensorArrivalAt != "" {
calc, err := handleRelativeSLACalculate(input)
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate (relative): %w", err)
}
input.WarningAt = calc.WarningAt
input.BreachAt = calc.BreachAt
} else if input.Deadline != "" {
calc, err := handleSLACalculate(input, d.now())
if err != nil {
return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate: %w", err)
}
input.WarningAt = calc.WarningAt
input.BreachAt = calc.BreachAt
}
input.WarningAt = calc.WarningAt
input.BreachAt = calc.BreachAt
}

if d.Scheduler != nil {
Expand Down
Loading
Loading