Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b06a886
fix: validation mode case-insensitive matching (BUG-8)
dwsmith1983 Mar 13, 2026
a4dd4d6
security: harden IAM policies and EventBridge bus access (SEC-1,2,4,5)
dwsmith1983 Mar 13, 2026
6b9b0e9
fix: suppress SLA_MET when pipeline never ran and log reconcile error…
dwsmith1983 Mar 13, 2026
59875c4
fix: trigger deadline uses schedule timezone with SLA fallback (BUG-6)
dwsmith1983 Mar 13, 2026
351eaa6
fix: extract shared drift detection with zero-value support (BUG-1, D…
dwsmith1983 Mar 13, 2026
76a01c4
fix: batch item failures, baseline namespacing, rerun ordering, and e…
dwsmith1983 Mar 13, 2026
3c0df9d
docs: CHANGELOG for v0.9.2 audit remediation
dwsmith1983 Mar 13, 2026
a7ae361
refactor: extract shared HTTP client construction (DRY-2)
dwsmith1983 Mar 13, 2026
0badbee
refactor: extract shared SLA schedule creation loop (DRY-3)
dwsmith1983 Mar 13, 2026
8812a0a
security: replace sh -c with direct exec in command trigger (SEC-3)
dwsmith1983 Mar 13, 2026
eb15c39
refactor: split watchdog.go into focused files (1079 → ~200 lines each)
dwsmith1983 Mar 13, 2026
2c59bd5
docs: CHANGELOG for v0.9.3 restructuring
dwsmith1983 Mar 13, 2026
6f8fa52
feat: add dry-run rerun/retry event type constants
dwsmith1983 Mar 13, 2026
c323cf4
feat: dry-run rerun/retry observability with full evaluation
dwsmith1983 Mar 13, 2026
2e37a8d
chore: route dry-run rerun events to alert rule
dwsmith1983 Mar 13, 2026
10c01b2
fix: detect EventBridge PutEvents partial failures (GO-C1)
dwsmith1983 Mar 14, 2026
8c5a150
security: add SSRF protection to trigger HTTP clients (SEC-C2)
dwsmith1983 Mar 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,40 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.9.3] - 2026-03-13

### Changed

- **Shared HTTP client construction (DRY-2)** — Extracted `resolveHTTPClient()` replacing identical 7-line blocks in `ExecuteHTTP` and `ExecuteAirflow`.
- **Shared SLA schedule creation loop (DRY-3)** — Extracted `createSLASchedules()` replacing duplicated warning/breach schedule loops in watchdog and sla-monitor.
- **Split watchdog.go into focused files** — 1079-line monolith split into 5 files by domain: stale triggers, missed schedules, SLA alerting, and post-run monitoring (~200 lines each).

### Security

- **Command trigger shell injection eliminated (SEC-3)** — Replaced `sh -c` with direct `exec.CommandContext` + `strings.Fields` argument splitting. No shell interpretation of pipes, redirects, or variable expansion.

## [0.9.2] - 2026-03-13

### Fixed

- **Drift detection silently skipped zero values (BUG-1)** — `ExtractFloat` returned 0 for both missing keys and actual zero values, causing the `prevCount > 0` guard to silently skip legitimate transitions like 5000→0 or 0→5000. New `ExtractFloatOk` distinguishes absent from zero. Shared `DetectDrift` function consolidates 3 duplicated drift comparison sites.
- **RemapPerPeriodSensors map mutation during range (BUG-2)** — Adding keys during `range` iteration over a Go map is nondeterministic per the spec. Staging map now collects additions, merged after iteration.
- **Orphaned rerun burns retry budget (BUG-3)** — `handleRerunRequest` wrote the rerun record before acquiring the trigger lock. If lock acquisition failed, the rerun record was left orphaned and permanently consumed retry budget. Reordered to lock-first, then write.
- **Stream router discarded partial batch failures (BUG-4)** — `HandleStreamEvent` returned a single error, causing Lambda to retry the entire batch. Now returns `DynamoDBEventResponse` with per-record `BatchItemFailures` for partial retry via `ReportBatchItemFailures`.
- **SLA_MET published when pipeline never ran (BUG-5)** — `handleSLACancel` published SLA_MET regardless of whether a trigger existed. Now checks for trigger existence first.
- **Trigger deadline used SLA timezone instead of schedule timezone (BUG-6)** — `closeSensorTriggerWindow` read timezone from `cfg.SLA.Timezone` instead of `cfg.Schedule.Timezone`. Falls back to SLA timezone if schedule timezone is not set.
- **Validation mode case-sensitive (BUG-8)** — `EvaluateRules` matched mode with `switch mode` so "any" fell through to the default ALL branch. Now uses `strings.ToUpper(mode)`.
- **Epoch timestamp unit mismatch in rerun freshness (BUG-9)** — `checkSensorFreshness` compared raw epoch values without normalizing units. Timestamps below 1e12 (seconds) are now converted to milliseconds.
- **Post-run baseline field collision (BUG-10)** — Baseline was stored as a flat map, so two rules with the same field name overwrote each other. Now namespaced by rule key. Clean break: existing flat baselines self-heal on next pipeline completion.
- **publishEvent errors silently discarded in SLA reconcile (CQ-5)** — Replaced `_ = publishEvent(...)` with error-logged calls.

### Security

- **lambda_trigger_arns default changed to [] with precondition (SEC-1)** — Wildcard default removed; explicit ARN list required when triggers are enabled.
- **Slack plaintext token deprecation warning (SEC-2)** — Terraform `check` block warns at plan time when plaintext token is used without Secrets Manager.
- **Trigger IAM policy scoping (SEC-4)** — New variables `glue_job_arns`, `emr_cluster_arns`, `emr_serverless_app_arns`, `sfn_trigger_arns` (all default `[]`) with preconditions requiring non-empty values when the corresponding trigger is enabled.
- **EventBridge bus resource policy (SEC-5)** — Restricts PutEvents to Lambda execution roles only.

## [0.9.1] - 2026-03-13

### Added
Expand Down
3 changes: 2 additions & 1 deletion cmd/lambda/stream-router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
Expand Down Expand Up @@ -53,7 +54,7 @@ func main() {
Logger: logger,
}

lambda.Start(func(ctx context.Context, event ilambda.StreamEvent) error {
lambda.Start(func(ctx context.Context, event ilambda.StreamEvent) (events.DynamoDBEventResponse, error) {
return ilambda.HandleStreamEvent(ctx, deps, event)
})
}
23 changes: 23 additions & 0 deletions deploy/terraform/eventbridge.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,25 @@ resource "aws_cloudwatch_event_bus" "interlock" {
tags = var.tags
}

resource "aws_cloudwatch_event_bus_policy" "interlock_bus" {
event_bus_name = aws_cloudwatch_event_bus.interlock.name

policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowInterlockLambdas"
Effect = "Allow"
Principal = {
AWS = [for name in local.lambda_names : aws_iam_role.lambda[name].arn]
}
Action = "events:PutEvents"
Resource = aws_cloudwatch_event_bus.interlock.arn
}
]
})
}

# Watchdog schedule
resource "aws_cloudwatch_event_rule" "watchdog" {
name = "${var.environment}-interlock-watchdog"
Expand Down Expand Up @@ -100,6 +119,10 @@ resource "aws_cloudwatch_event_rule" "alert_events" {
"DRY_RUN_SLA_PROJECTION",
"DRY_RUN_DRIFT",
"DRY_RUN_COMPLETED",
"DRY_RUN_WOULD_RERUN",
"DRY_RUN_RERUN_REJECTED",
"DRY_RUN_WOULD_RETRY",
"DRY_RUN_RETRY_EXHAUSTED",
]
})
}
Expand Down
54 changes: 50 additions & 4 deletions deploy/terraform/lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,17 @@ resource "aws_lambda_event_source_mapping" "joblog_stream" {
}
}

# =============================================================================
# Security checks
# =============================================================================

check "slack_token_deprecation" {
assert {
condition = var.slack_bot_token == "" || var.slack_secret_arn != ""
error_message = "DEPRECATED: Passing a plaintext Slack bot token is deprecated. Use var.slack_secret_arn with an AWS Secrets Manager ARN instead. Plaintext path still works but will be removed in a future version."
}
}

# =============================================================================
# Conditional trigger permissions for orchestrator (opt-in per trigger type)
# =============================================================================
Expand All @@ -552,13 +563,20 @@ resource "aws_iam_role_policy" "glue_trigger" {
name = "glue-trigger"
role = aws_iam_role.lambda["orchestrator"].id

lifecycle {
precondition {
condition = length(var.glue_job_arns) > 0
error_message = "glue_job_arns must be non-empty when enable_glue_trigger is true."
}
}

policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["glue:StartJobRun", "glue:GetJobRun"]
Resource = "*"
Resource = var.glue_job_arns
},
{
Sid = "GlueLogVerification"
Expand All @@ -580,12 +598,19 @@ resource "aws_iam_role_policy" "emr_trigger" {
name = "emr-trigger"
role = aws_iam_role.lambda["orchestrator"].id

lifecycle {
precondition {
condition = length(var.emr_cluster_arns) > 0
error_message = "emr_cluster_arns must be non-empty when enable_emr_trigger is true."
}
}

policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = ["elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep"]
Resource = "*"
Resource = var.emr_cluster_arns
}]
})
}
Expand All @@ -597,12 +622,19 @@ resource "aws_iam_role_policy" "emr_serverless_trigger" {
name = "emr-serverless-trigger"
role = aws_iam_role.lambda["orchestrator"].id

lifecycle {
precondition {
condition = length(var.emr_serverless_app_arns) > 0
error_message = "emr_serverless_app_arns must be non-empty when enable_emr_serverless_trigger is true."
}
}

policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = ["emr-serverless:StartJobRun", "emr-serverless:GetJobRun"]
Resource = "*"
Resource = var.emr_serverless_app_arns
}]
})
}
Expand All @@ -614,12 +646,19 @@ resource "aws_iam_role_policy" "sfn_trigger" {
name = "sfn-trigger"
role = aws_iam_role.lambda["orchestrator"].id

lifecycle {
precondition {
condition = length(var.sfn_trigger_arns) > 0
error_message = "sfn_trigger_arns must be non-empty when enable_sfn_trigger is true."
}
}

policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = ["states:StartExecution", "states:DescribeExecution"]
Resource = "*"
Resource = var.sfn_trigger_arns
}]
})
}
Expand All @@ -631,6 +670,13 @@ resource "aws_iam_role_policy" "lambda_trigger" {
name = "lambda-trigger"
role = aws_iam_role.lambda["orchestrator"].id

lifecycle {
precondition {
condition = length(var.lambda_trigger_arns) > 0
error_message = "lambda_trigger_arns must be non-empty when enable_lambda_trigger is true."
}
}

policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Expand Down
26 changes: 25 additions & 1 deletion deploy/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,29 @@ variable "enable_lambda_trigger" {
variable "lambda_trigger_arns" {
description = "ARNs of Lambda functions the orchestrator may invoke as pipeline triggers"
type = list(string)
default = ["*"]
default = []
}

variable "glue_job_arns" {
description = "ARNs of Glue jobs that the orchestrator Lambda can start. Required when enable_glue_trigger is true."
type = list(string)
default = []
}

variable "emr_cluster_arns" {
description = "ARNs of EMR clusters the orchestrator can submit steps to. Required when enable_emr_trigger is true."
type = list(string)
default = []
}

variable "emr_serverless_app_arns" {
description = "ARNs of EMR Serverless applications. Required when enable_emr_serverless_trigger is true."
type = list(string)
default = []
}

variable "sfn_trigger_arns" {
description = "ARNs of Step Functions the orchestrator can start. Required when enable_sfn_trigger is true."
type = list(string)
default = []
}
63 changes: 63 additions & 0 deletions internal/lambda/drift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package lambda

import (
"math"
"strconv"
)

// ExtractFloatOk retrieves a numeric value from a sensor data map.
// Returns (value, true) if the key exists and is numeric, (0, false) otherwise.
// Unlike ExtractFloat, this distinguishes zero values from missing keys.
func ExtractFloatOk(data map[string]interface{}, key string) (float64, bool) {
if data == nil {
return 0, false
}
v, ok := data[key]
if !ok {
return 0, false
}
switch n := v.(type) {
case float64:
return n, true
case string:
f, err := strconv.ParseFloat(n, 64)
if err != nil {
return 0, false
}
return f, true
default:
return 0, false
}
}

// DriftResult holds the outcome of a drift comparison.
type DriftResult struct {
Drifted bool
Previous float64
Current float64
Delta float64
PrevFound bool
CurrFound bool
}

// DetectDrift compares baseline and current sensor data for a drift field.
// Both values must be present for drift to be detected. Returns whether
// the absolute delta exceeds the threshold.
func DetectDrift(baseline, current map[string]interface{}, driftField string, threshold float64) DriftResult {
prev, prevOk := ExtractFloatOk(baseline, driftField)
curr, currOk := ExtractFloatOk(current, driftField)

result := DriftResult{
Previous: prev,
Current: curr,
PrevFound: prevOk,
CurrFound: currOk,
}

if prevOk && currOk {
result.Delta = curr - prev
result.Drifted = math.Abs(result.Delta) > threshold
}

return result
}
62 changes: 62 additions & 0 deletions internal/lambda/drift_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package lambda

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractFloatOk(t *testing.T) {
tests := []struct {
name string
data map[string]interface{}
key string
wantVal float64
wantOk bool
}{
{"present float", map[string]interface{}{"count": float64(42)}, "count", 42, true},
{"present zero", map[string]interface{}{"count": float64(0)}, "count", 0, true},
{"present string", map[string]interface{}{"count": "123.5"}, "count", 123.5, true},
{"missing key", map[string]interface{}{}, "count", 0, false},
{"nil map", nil, "count", 0, false},
{"wrong type", map[string]interface{}{"count": true}, "count", 0, false},
{"invalid string", map[string]interface{}{"count": "abc"}, "count", 0, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
val, ok := ExtractFloatOk(tt.data, tt.key)
assert.Equal(t, tt.wantOk, ok)
assert.InDelta(t, tt.wantVal, val, 0.001)
})
}
}

func TestDetectDrift(t *testing.T) {
m := func(k string, v float64) map[string]interface{} {
return map[string]interface{}{k: v}
}
tests := []struct {
name string
baseline map[string]interface{}
current map[string]interface{}
field string
threshold float64
wantDrift bool
}{
{"5000→0 drifts", m("count", 5000.0), m("count", 0.0), "count", 0, true},
{"0→5000 drifts", m("count", 0.0), m("count", 5000.0), "count", 0, true},
{"same value no drift", m("count", 100.0), m("count", 100.0), "count", 0, false},
{"within threshold", m("count", 100.0), m("count", 150.0), "count", 100, false},
{"exceeds threshold", m("count", 100.0), m("count", 250.0), "count", 100, true},
{"prev missing no drift", map[string]interface{}{}, m("count", 100.0), "count", 0, false},
{"curr missing no drift", m("count", 100.0), map[string]interface{}{}, "count", 0, false},
{"both missing no drift", map[string]interface{}{}, map[string]interface{}{}, "count", 0, false},
{"negative drift", m("count", 100.0), m("count", 50.0), "count", 0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := DetectDrift(tt.baseline, tt.current, tt.field, tt.threshold)
assert.Equal(t, tt.wantDrift, result.Drifted)
})
}
}
Loading
Loading