Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.9.4] - 2026-03-29

### Refactored

- **Split `internal/lambda/` into handler-aligned sub-packages** — Monolithic package replaced with focused sub-packages: `orchestrator/`, `stream/`, `watchdog/`, `sla/`, `alert/`, `sink/`.
- **Extracted shared utilities into focused root files** — Common logic moved to dedicated files: publish, date, exclusion, sensor, schedule, config, terminal.
- **Trigger config registry** — Replaced `buildTriggerConfig` switch statement with generic registry map (`trigger_registry.go`).
- **SLA deadline calculations wired through `pkg/sla/`** — Pure functions for SLA deadline resolution, decoupled from Lambda handler context.

### Added

- **`pkg/sla/` package** — Pure SLA deadline calculation functions usable across packages.
- **`PipelineConfig.DeepCopy()` method** — Safe config cache isolation without JSON marshal/unmarshal roundtrip.
- **`EventWatchdogDegraded` event type** — Watchdog health observability event for degraded-state detection.
- **Smoke tests for all 6 `cmd/lambda/` packages** — `ValidateEnv` coverage for every Lambda entry point.

### Fixed

- **`HandleWatchdog` silent error suppression** — Now returns aggregate errors via `errors.Join` instead of silently returning nil.
- **`HandleWatchdog` degraded-state signaling** — Publishes `WATCHDOG_DEGRADED` event when checks fail.
- **Config cache isolation** — Uses typed `DeepCopy()` instead of JSON marshal/unmarshal roundtrip, eliminating silent data loss on unexported fields.

## [0.9.3] - 2026-03-14

### Changed
Expand Down
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,20 @@ interlock/
│ ├── watchdog/ # Missed schedule + stale run detection
│ ├── event-sink/ # EventBridge → events table
│ └── alert-dispatcher/ # SQS → Slack (Bot API with threading)
├── pkg/types/ # Public domain types (pipeline config, events, DynamoDB keys)
├── pkg/
│ ├── types/ # Public domain types (pipeline config, events, DynamoDB keys)
│ ├── validation/ # Declarative validation rule engine
│ └── sla/ # Pure SLA deadline calculations
├── internal/
│ ├── lambda/ # Lambda handler logic + shared types
│ ├── lambda/ # Shared types, interfaces, utilities
│ │ ├── orchestrator/ # Evaluate, trigger, check-job handlers
│ │ ├── stream/ # DynamoDB stream routing, reruns, post-run
│ │ ├── watchdog/ # Stale trigger + missed schedule detection
│ │ ├── sla/ # SLA deadline calculation + alerts
│ │ ├── alert/ # Slack notification formatting
│ │ └── sink/ # EventBridge event persistence
│ ├── store/ # DynamoDB storage layer (3-table design)
│ ├── config/ # Pipeline YAML config loading
│ ├── validation/ # Declarative validation rule engine
│ ├── trigger/ # Trigger execution (8 types)
│ └── calendar/ # Calendar exclusion registry
├── deploy/
Expand Down
3 changes: 2 additions & 1 deletion cmd/lambda/alert-dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/dwsmith1983/interlock/internal/lambda/alert"
"github.com/dwsmith1983/interlock/internal/store"
)

Expand Down Expand Up @@ -81,6 +82,6 @@ func main() {
}

lambda.Start(func(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) {
return ilambda.HandleAlertDispatcher(ctx, deps, sqsEvent)
return alert.HandleAlertDispatcher(ctx, deps, sqsEvent)
})
}
31 changes: 31 additions & 0 deletions cmd/lambda/alert-dispatcher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"testing"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidateEnv_MissingVars(t *testing.T) {
err := ilambda.ValidateEnv("alert-dispatcher")
assert.Error(t, err, "should report missing env vars")
assert.Contains(t, err.Error(), "SLACK_CHANNEL_ID")
assert.Contains(t, err.Error(), "EVENTS_TABLE")
assert.Contains(t, err.Error(), "EVENTS_TTL_DAYS")
}

func TestValidateEnv_AllSet(t *testing.T) {
envVars := map[string]string{
"SLACK_CHANNEL_ID": "C12345",
"EVENTS_TABLE": "events",
"EVENTS_TTL_DAYS": "90",
}
for k, v := range envVars {
t.Setenv(k, v)
}

err := ilambda.ValidateEnv("alert-dispatcher")
require.NoError(t, err, "should pass when all env vars are set")
}
3 changes: 2 additions & 1 deletion cmd/lambda/event-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/dwsmith1983/interlock/internal/lambda/sink"
"github.com/dwsmith1983/interlock/internal/store"
)

Expand Down Expand Up @@ -50,6 +51,6 @@ func main() {
}

lambda.Start(func(ctx context.Context, input ilambda.EventBridgeInput) error {
return ilambda.HandleEventSink(ctx, deps, input)
return sink.HandleEventSink(ctx, deps, input)
})
}
22 changes: 22 additions & 0 deletions cmd/lambda/event-sink/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"testing"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidateEnv_MissingVars(t *testing.T) {
err := ilambda.ValidateEnv("event-sink")
assert.Error(t, err, "should report missing env vars")
assert.Contains(t, err.Error(), "EVENTS_TABLE")
}

func TestValidateEnv_AllSet(t *testing.T) {
t.Setenv("EVENTS_TABLE", "events")

err := ilambda.ValidateEnv("event-sink")
require.NoError(t, err, "should pass when all env vars are set")
}
3 changes: 2 additions & 1 deletion cmd/lambda/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sfn"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/dwsmith1983/interlock/internal/lambda/orchestrator"
"github.com/dwsmith1983/interlock/internal/store"
"github.com/dwsmith1983/interlock/internal/trigger"
"github.com/dwsmith1983/interlock/pkg/types"
Expand Down Expand Up @@ -75,6 +76,6 @@ func main() {
}

lambda.Start(func(ctx context.Context, input ilambda.OrchestratorInput) (ilambda.OrchestratorOutput, error) {
return ilambda.HandleOrchestrator(ctx, deps, input)
return orchestrator.HandleOrchestrator(ctx, deps, input)
})
}
33 changes: 33 additions & 0 deletions cmd/lambda/orchestrator/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"testing"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidateEnv_MissingVars(t *testing.T) {
err := ilambda.ValidateEnv("orchestrator")
assert.Error(t, err, "should report missing env vars")
assert.Contains(t, err.Error(), "CONTROL_TABLE")
assert.Contains(t, err.Error(), "JOBLOG_TABLE")
assert.Contains(t, err.Error(), "RERUN_TABLE")
assert.Contains(t, err.Error(), "EVENT_BUS_NAME")
}

func TestValidateEnv_AllSet(t *testing.T) {
envVars := map[string]string{
"CONTROL_TABLE": "ctl",
"JOBLOG_TABLE": "jl",
"RERUN_TABLE": "rr",
"EVENT_BUS_NAME": "bus",
}
for k, v := range envVars {
t.Setenv(k, v)
}

err := ilambda.ValidateEnv("orchestrator")
require.NoError(t, err, "should pass when all env vars are set")
}
3 changes: 2 additions & 1 deletion cmd/lambda/sla-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/scheduler"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/dwsmith1983/interlock/internal/lambda/sla"
"github.com/dwsmith1983/interlock/internal/store"
)

Expand Down Expand Up @@ -52,6 +53,6 @@ func main() {
}

lambda.Start(func(ctx context.Context, input ilambda.SLAMonitorInput) (ilambda.SLAMonitorOutput, error) {
return ilambda.HandleSLAMonitor(ctx, deps, input)
return sla.HandleSLAMonitor(ctx, deps, input)
})
}
39 changes: 39 additions & 0 deletions cmd/lambda/sla-monitor/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"testing"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidateEnv_MissingVars(t *testing.T) {
err := ilambda.ValidateEnv("sla-monitor")
assert.Error(t, err, "should report missing env vars")
assert.Contains(t, err.Error(), "CONTROL_TABLE")
assert.Contains(t, err.Error(), "JOBLOG_TABLE")
assert.Contains(t, err.Error(), "RERUN_TABLE")
assert.Contains(t, err.Error(), "EVENT_BUS_NAME")
assert.Contains(t, err.Error(), "SLA_MONITOR_ARN")
assert.Contains(t, err.Error(), "SCHEDULER_ROLE_ARN")
assert.Contains(t, err.Error(), "SCHEDULER_GROUP_NAME")
}

func TestValidateEnv_AllSet(t *testing.T) {
envVars := map[string]string{
"CONTROL_TABLE": "ctl",
"JOBLOG_TABLE": "jl",
"RERUN_TABLE": "rr",
"EVENT_BUS_NAME": "bus",
"SLA_MONITOR_ARN": "arn:aws:lambda:us-east-1:123:function:sla",
"SCHEDULER_ROLE_ARN": "arn:aws:iam::123:role/sched",
"SCHEDULER_GROUP_NAME": "interlock",
}
for k, v := range envVars {
t.Setenv(k, v)
}

err := ilambda.ValidateEnv("sla-monitor")
require.NoError(t, err, "should pass when all env vars are set")
}
3 changes: 2 additions & 1 deletion cmd/lambda/stream-router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sfn"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/dwsmith1983/interlock/internal/lambda/stream"
"github.com/dwsmith1983/interlock/internal/store"
)

Expand Down Expand Up @@ -55,6 +56,6 @@ func main() {
}

lambda.Start(func(ctx context.Context, event ilambda.StreamEvent) (events.DynamoDBEventResponse, error) {
return ilambda.HandleStreamEvent(ctx, deps, event)
return stream.HandleStreamEvent(ctx, deps, event)
})
}
35 changes: 35 additions & 0 deletions cmd/lambda/stream-router/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"testing"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidateEnv_MissingVars(t *testing.T) {
err := ilambda.ValidateEnv("stream-router")
assert.Error(t, err, "should report missing env vars")
assert.Contains(t, err.Error(), "CONTROL_TABLE")
assert.Contains(t, err.Error(), "JOBLOG_TABLE")
assert.Contains(t, err.Error(), "RERUN_TABLE")
assert.Contains(t, err.Error(), "STATE_MACHINE_ARN")
assert.Contains(t, err.Error(), "EVENT_BUS_NAME")
}

func TestValidateEnv_AllSet(t *testing.T) {
envVars := map[string]string{
"CONTROL_TABLE": "ctl",
"JOBLOG_TABLE": "jl",
"RERUN_TABLE": "rr",
"STATE_MACHINE_ARN": "arn:aws:states:us-east-1:123:stateMachine:test",
"EVENT_BUS_NAME": "bus",
}
for k, v := range envVars {
t.Setenv(k, v)
}

err := ilambda.ValidateEnv("stream-router")
require.NoError(t, err, "should pass when all env vars are set")
}
3 changes: 2 additions & 1 deletion cmd/lambda/watchdog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sfn"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/dwsmith1983/interlock/internal/lambda/watchdog"
"github.com/dwsmith1983/interlock/internal/store"
)

Expand Down Expand Up @@ -60,6 +61,6 @@ func main() {
}

lambda.Start(func(ctx context.Context) error {
return ilambda.HandleWatchdog(ctx, deps)
return watchdog.HandleWatchdog(ctx, deps)
})
}
33 changes: 33 additions & 0 deletions cmd/lambda/watchdog/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"testing"

ilambda "github.com/dwsmith1983/interlock/internal/lambda"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestValidateEnv_MissingVars(t *testing.T) {
err := ilambda.ValidateEnv("watchdog")
assert.Error(t, err, "should report missing env vars")
assert.Contains(t, err.Error(), "CONTROL_TABLE")
assert.Contains(t, err.Error(), "JOBLOG_TABLE")
assert.Contains(t, err.Error(), "RERUN_TABLE")
assert.Contains(t, err.Error(), "EVENT_BUS_NAME")
}

func TestValidateEnv_AllSet(t *testing.T) {
envVars := map[string]string{
"CONTROL_TABLE": "ctl",
"JOBLOG_TABLE": "jl",
"RERUN_TABLE": "rr",
"EVENT_BUS_NAME": "bus",
}
for k, v := range envVars {
t.Setenv(k, v)
}

err := ilambda.ValidateEnv("watchdog")
require.NoError(t, err, "should pass when all env vars are set")
}
11 changes: 9 additions & 2 deletions docs/content/docs/architecture/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,18 @@ When a Scheduler entry fires, it invokes this Lambda to publish the correspondin

### watchdog

Invoked by an EventBridge scheduled rule (default: every 5 minutes). Runs three independent scans:
Invoked by an EventBridge scheduled rule (default: every 5 minutes). Runs eight independent checks in a table-driven loop:

1. **Stale triggers** -- scans for `TRIGGER#` records with `RUNNING` status whose TTL has expired. Publishes `SFN_TIMEOUT` events and sets status to `FAILED_FINAL`.
2. **Missed schedules** -- loads all cron-scheduled pipeline configs, checks for missing `TRIGGER#` records for today's date. Publishes `SCHEDULE_MISSED` events for pipelines past their expected start time.
3. **Missing post-run sensors** -- for pipelines with `postRun` config and a completed trigger, checks whether post-run sensors have arrived within the `sensorTimeout` grace period. Publishes `POST_RUN_SENSOR_MISSING` events.
3. **Missed inclusion schedules** -- checks pipelines with inclusion calendar config for missing triggers on scheduled dates. Publishes `IRREGULAR_SCHEDULE_MISSED` events.
4. **Sensor-trigger reconciliation** -- re-evaluates trigger conditions for sensor-triggered pipelines. Self-heals missed triggers caused by silent completion-write failures.
5. **SLA scheduling** -- proactively creates EventBridge Scheduler entries for pipelines with SLA configs, ensuring warnings and breaches fire even when data never arrives.
6. **Trigger deadlines** -- evaluates trigger deadlines for sensor-triggered pipelines. Closes the auto-trigger window and publishes `SENSOR_DEADLINE_EXPIRED` when the deadline passes with no trigger.
7. **Missing post-run sensors** -- for pipelines with `postRun` config and a completed trigger, checks whether post-run sensors have arrived within the `sensorTimeout` grace period. Publishes `POST_RUN_SENSOR_MISSING` events.
8. **Relative SLA breaches** -- checks pipelines with `maxDuration` SLA config for breaches relative to the first sensor arrival time.

If any check fails, the watchdog publishes a `WATCHDOG_DEGRADED` event listing the failed checks and returns an aggregate error. Individual check failures do not prevent the remaining checks from running.

See [Watchdog](../watchdog) for the full algorithm.

Expand Down
Loading
Loading