From f445fc1509476c796ae6fa1b19f24d25975ad226 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:31:27 +0700 Subject: [PATCH 01/17] fix(trigger): restrict env var expansion to INTERLOCK_ prefix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace os.ExpandEnv() with os.Expand(str, safeEnvLookup) in Airflow, Databricks, and Lambda triggers. The HTTP trigger already used the safe pattern — these three bypassed it, allowing config authors to exfiltrate secrets like AWS_SECRET_ACCESS_KEY via header/body expansion. --- internal/trigger/airflow.go | 10 ++--- internal/trigger/airflow_test.go | 58 +++++++++++++++++++++++++++++ internal/trigger/databricks.go | 7 +--- internal/trigger/databricks_test.go | 52 ++++++++++++++++++++++++++ internal/trigger/lambda.go | 2 +- internal/trigger/lambda_test.go | 27 ++++++++++++++ 6 files changed, 143 insertions(+), 13 deletions(-) diff --git a/internal/trigger/airflow.go b/internal/trigger/airflow.go index 9818d9f..f78708a 100644 --- a/internal/trigger/airflow.go +++ b/internal/trigger/airflow.go @@ -27,10 +27,8 @@ func ExecuteAirflow(ctx context.Context, cfg *types.AirflowTriggerConfig) (map[s payload := map[string]interface{}{} if cfg.Body != "" { - // os.ExpandEnv is intentional: operators store ${VAR} references in - // pipeline configs, resolved at runtime from the execution environment. var conf interface{} - if err := json.Unmarshal([]byte(os.ExpandEnv(cfg.Body)), &conf); err != nil { + if err := json.Unmarshal([]byte(os.Expand(cfg.Body, safeEnvLookup)), &conf); err != nil { return nil, fmt.Errorf("airflow trigger: invalid body JSON: %w", err) } payload["conf"] = conf @@ -48,8 +46,7 @@ func ExecuteAirflow(ctx context.Context, cfg *types.AirflowTriggerConfig) (map[s req.Header.Set("Content-Type", "application/json") for k, v := range cfg.Headers { - // os.ExpandEnv is intentional — see body comment above. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } client := defaultHTTPClient @@ -105,8 +102,7 @@ func CheckAirflowStatus(ctx context.Context, airflowURL, dagID, dagRunID string, req.Header.Set("Content-Type", "application/json") for k, v := range headers { - // os.ExpandEnv is intentional — see ExecuteAirflow comment. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } resp, err := defaultHTTPClient.Do(req) diff --git a/internal/trigger/airflow_test.go b/internal/trigger/airflow_test.go index ddd7197..35b9cce 100644 --- a/internal/trigger/airflow_test.go +++ b/internal/trigger/airflow_test.go @@ -237,6 +237,64 @@ func TestCheckAirflowStatus_MissingStateField(t *testing.T) { assert.Contains(t, err.Error(), "response missing state field") } +func TestExecuteAirflow_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + var receivedBody map[string]interface{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + var payload map[string]interface{} + _ = json.NewDecoder(r.Body).Decode(&payload) + receivedBody = payload + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "dag_run_id": "run-env-test", + }) + })) + defer srv.Close() + + cfg := &types.AirflowTriggerConfig{ + URL: srv.URL, + DagID: "test_dag", + Headers: map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"}, + Body: `{"safe":"${INTERLOCK_TEST_VAR}","secret":"${SECRET_VAR}"}`, + } + + _, err := ExecuteAirflow(context.Background(), cfg) + require.NoError(t, err) + + // INTERLOCK_ prefixed vars should resolve; others should not. + assert.Equal(t, "Bearer safe/", receivedAuth) + conf, _ := receivedBody["conf"].(map[string]interface{}) + assert.Equal(t, "safe", conf["safe"]) + assert.Equal(t, "", conf["secret"]) + assert.NotContains(t, receivedAuth, "leaked") +} + +func TestCheckAirflowStatus_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "state": "running", + }) + })) + defer srv.Close() + + headers := map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"} + state, err := CheckAirflowStatus(context.Background(), srv.URL, "my_dag", "run-1", headers) + require.NoError(t, err) + assert.Equal(t, "running", state) + assert.Equal(t, "Bearer safe/", receivedAuth) + assert.NotContains(t, receivedAuth, "leaked") +} + func TestCheckAirflowStatus_WithHeaders(t *testing.T) { var receivedAuth string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/trigger/databricks.go b/internal/trigger/databricks.go index 50c445a..b6e5fe0 100644 --- a/internal/trigger/databricks.go +++ b/internal/trigger/databricks.go @@ -43,9 +43,7 @@ func ExecuteDatabricks(ctx context.Context, cfg *types.DatabricksTriggerConfig, req.Header.Set("Content-Type", "application/json") for k, v := range cfg.Headers { - // os.ExpandEnv is intentional: operators store ${VAR} references in - // pipeline configs, resolved at runtime from the execution environment. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } resp, err := httpClient.Do(req) @@ -97,8 +95,7 @@ func (r *Runner) checkDatabricksStatus(ctx context.Context, metadata map[string] req.Header.Set("Content-Type", "application/json") for k, v := range headers { - // os.ExpandEnv is intentional — see ExecuteDatabricks comment. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } resp, err := r.httpClient.Do(req) diff --git a/internal/trigger/databricks_test.go b/internal/trigger/databricks_test.go index c255545..1e30fcf 100644 --- a/internal/trigger/databricks_test.go +++ b/internal/trigger/databricks_test.go @@ -182,6 +182,58 @@ func TestCheckDatabricksStatus_MissingRunID(t *testing.T) { assert.Equal(t, "missing databricks metadata", result.Message) } +func TestExecuteDatabricks_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "run_id": 42, + }) + })) + defer srv.Close() + + cfg := &types.DatabricksTriggerConfig{ + WorkspaceURL: srv.URL, + JobID: "my-job", + Headers: map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"}, + } + + _, err := ExecuteDatabricks(context.Background(), cfg, srv.Client()) + require.NoError(t, err) + assert.Equal(t, "Bearer safe/", receivedAuth) + assert.NotContains(t, receivedAuth, "leaked") +} + +func TestCheckDatabricksStatus_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "state": map[string]interface{}{ + "life_cycle_state": "RUNNING", + }, + }) + })) + defer srv.Close() + + r := NewRunner(WithHTTPClient(srv.Client())) + _, err := r.checkDatabricksStatus(context.Background(), map[string]interface{}{ + "databricks_workspace_url": srv.URL, + "databricks_run_id": "123", + }, map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"}) + require.NoError(t, err) + assert.Equal(t, "Bearer safe/", receivedAuth) + assert.NotContains(t, receivedAuth, "leaked") +} + func TestCheckDatabricksStatus_WithHeaders(t *testing.T) { var receivedAuth string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/trigger/lambda.go b/internal/trigger/lambda.go index 70f791c..1fda822 100644 --- a/internal/trigger/lambda.go +++ b/internal/trigger/lambda.go @@ -28,7 +28,7 @@ func ExecuteLambda(ctx context.Context, cfg *types.LambdaTriggerConfig, client L } if cfg.Payload != "" { - input.Payload = []byte(os.ExpandEnv(cfg.Payload)) + input.Payload = []byte(os.Expand(cfg.Payload, safeEnvLookup)) } out, err := client.Invoke(ctx, input) diff --git a/internal/trigger/lambda_test.go b/internal/trigger/lambda_test.go index 90a5763..b5a4eee 100644 --- a/internal/trigger/lambda_test.go +++ b/internal/trigger/lambda_test.go @@ -92,6 +92,33 @@ func (c *capturingLambdaClient) Invoke(ctx context.Context, params *lambda.Invok return c.delegate.Invoke(ctx, params, optFns...) } +func TestExecuteLambda_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var capturedPayload []byte + client := &mockLambdaClient{ + invokeOut: &lambda.InvokeOutput{StatusCode: 200}, + } + capClient := &capturingLambdaClient{ + delegate: client, + onInvoke: func(input *lambda.InvokeInput) { + capturedPayload = input.Payload + }, + } + cfg := &types.LambdaTriggerConfig{ + FunctionName: "my-audit", + Payload: `{"safe":"${INTERLOCK_TEST_VAR}","secret":"${SECRET_VAR}"}`, + } + err := ExecuteLambda(context.Background(), cfg, capClient) + require.NoError(t, err) + + payload := string(capturedPayload) + assert.Contains(t, payload, `"safe":"safe"`) + assert.Contains(t, payload, `"secret":""`) + assert.NotContains(t, payload, "leaked") +} + func TestExecuteLambda_NonPolling(t *testing.T) { client := &mockLambdaClient{ invokeOut: &lambda.InvokeOutput{StatusCode: 200}, From 357f0ac469a530819b41ba352b8d22b8e4d16d3f Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:31:37 +0700 Subject: [PATCH 02/17] fix(lambda): release trigger lock on SFN start failure Add ReleaseTriggerLock call when startSFN fails after lock acquisition in handleSensorEvent and reconcileSensorTriggers. Without this, the pipeline is deadlocked until lock TTL expiry (4.5h default). The rerun path already implemented this pattern correctly. Also fix scheduleSLAAlerts to skip pipelines on GetTrigger error instead of falling through to schedule SLA alerts for unknown state. --- internal/lambda/stream_router.go | 11 +++--- internal/lambda/stream_router_test.go | 52 +++++++++++++++++++++------ internal/lambda/watchdog.go | 38 +++++++++++++------- 3 files changed, 74 insertions(+), 27 deletions(-) diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index e7a6a9c..4ff0425 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -182,7 +182,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event Field: trigger.Field, Value: trigger.Value, } - result := validation.EvaluateRule(rule, sensorData, time.Now()) + result := validation.EvaluateRule(rule, sensorData, d.now()) if !result.Passed { d.Logger.Info("trigger condition not met", "pipelineId", pipelineID, @@ -193,14 +193,14 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event } // Check calendar exclusions (wall-clock date). - now := time.Now() + now := d.now() if isExcluded(cfg, now) { d.Logger.Info("pipeline excluded by calendar", "pipelineId", pipelineID, "date", now.Format("2006-01-02"), ) scheduleIDForEvent := resolveScheduleID(cfg) - dateForEvent := ResolveExecutionDate(sensorData) + dateForEvent := ResolveExecutionDate(sensorData, d.now()) if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, scheduleIDForEvent, dateForEvent, fmt.Sprintf("sensor trigger suppressed for %s: wall-clock date excluded by calendar", pipelineID)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) @@ -210,7 +210,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Resolve schedule ID and date. scheduleID := resolveScheduleID(cfg) - date := ResolveExecutionDate(sensorData) + date := ResolveExecutionDate(sensorData, d.now()) // Acquire trigger lock to prevent duplicate executions. acquired, err := d.Store.AcquireTriggerLock(ctx, pipelineID, scheduleID, date, ResolveTriggerLockTTL()) @@ -232,6 +232,9 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Start Step Function execution. if err := startSFN(ctx, d, cfg, pipelineID, scheduleID, date); err != nil { + if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, scheduleID, date); relErr != nil { + d.Logger.Warn("failed to release lock after SFN start failure", "error", relErr) + } return fmt.Errorf("start SFN for %q: %w", pipelineID, err) } diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 230ecec..5ce0caa 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -739,7 +739,7 @@ func TestStreamRouter_LateDataArrival_CompletedFailed_Silent(t *testing.T) { func TestResolveExecutionDate_WithDateAndHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "10", "complete": true} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03T10" { t.Errorf("got %q, want %q", got, "2026-03-03T10") } @@ -747,7 +747,7 @@ func TestResolveExecutionDate_WithDateAndHour(t *testing.T) { func TestResolveExecutionDate_DashedDate(t *testing.T) { data := map[string]interface{}{"date": "2026-03-03", "hour": "10"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03T10" { t.Errorf("got %q, want %q", got, "2026-03-03T10") } @@ -755,7 +755,7 @@ func TestResolveExecutionDate_DashedDate(t *testing.T) { func TestResolveExecutionDate_DateOnly(t *testing.T) { data := map[string]interface{}{"date": "20260303"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03" { t.Errorf("got %q, want %q", got, "2026-03-03") } @@ -763,7 +763,7 @@ func TestResolveExecutionDate_DateOnly(t *testing.T) { func TestResolveExecutionDate_NoFields(t *testing.T) { data := map[string]interface{}{"complete": true} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) today := time.Now().Format("2006-01-02") if got != today { t.Errorf("got %q, want %q", got, today) @@ -772,7 +772,7 @@ func TestResolveExecutionDate_NoFields(t *testing.T) { func TestResolveExecutionDate_HourWithLeadingZero(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "03"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03T03" { t.Errorf("got %q, want %q", got, "2026-03-03T03") } @@ -780,7 +780,7 @@ func TestResolveExecutionDate_HourWithLeadingZero(t *testing.T) { func TestResolveExecutionDate_InvalidDate(t *testing.T) { data := map[string]interface{}{"date": "not-a-date"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) // Should fall back to today's date. today := time.Now().Format("2006-01-02") if got != today { @@ -790,7 +790,7 @@ func TestResolveExecutionDate_InvalidDate(t *testing.T) { func TestResolveExecutionDate_InvalidHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "25"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) // Invalid hour should be ignored; return date only. if got != "2026-03-03" { t.Errorf("got %q, want %q", got, "2026-03-03") @@ -799,7 +799,7 @@ func TestResolveExecutionDate_InvalidHour(t *testing.T) { func TestResolveExecutionDate_NonNumericHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "ab"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03" { t.Errorf("got %q, want %q", got, "2026-03-03") } @@ -1118,6 +1118,36 @@ func TestSensor_StartSFNError(t *testing.T) { assert.Empty(t, sfnMock.executions, "SFN error means no execution recorded") } +func TestSensor_StartSFNError_ReleasesLock(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + cfg := testStreamConfig() + seedConfig(mock, cfg) + + // Make SFN client return an error. + sfnMock.err = fmt.Errorf("SFN throttled") + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + "date": events.NewStringAttribute("2026-03-08"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err, "HandleStreamEvent swallows errors") + + // The trigger lock must have been released after SFN failure. + // Schedule ID for stream-triggered pipelines is "stream". + lockKey := ddbItemKey(testControlTable, + types.PipelinePK("gold-revenue"), + types.TriggerSK("stream", "2026-03-08")) + mock.mu.Lock() + _, lockExists := mock.items[lockKey] + mock.mu.Unlock() + assert.False(t, lockExists, "trigger lock must be released after SFN start failure") +} + func TestSensor_PerHour_DateOnly(t *testing.T) { mock := newMockDDB() d, sfnMock, _ := testDeps(mock) @@ -2179,19 +2209,19 @@ func TestConvertAV_Null(t *testing.T) { func TestNormalizeDate_AlreadyNormalized(t *testing.T) { data := map[string]interface{}{"date": "2026-03-03"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) assert.Equal(t, "2026-03-03", got) } func TestNormalizeDate_Compact(t *testing.T) { data := map[string]interface{}{"date": "20260303"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) assert.Equal(t, "2026-03-03", got) } func TestNormalizeDate_CompactWithHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "07"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) assert.Equal(t, "2026-03-03T07", got) } diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index c09d281..102498f 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -64,8 +64,10 @@ func detectStaleTriggers(ctx context.Context, d *Deps) error { if tr.TTL > 0 { alertDetail["ttlExpired"] = time.Unix(tr.TTL, 0).UTC().Format(time.RFC3339) } - _ = publishEvent(ctx, d, string(types.EventSFNTimeout), pipelineID, schedule, date, - fmt.Sprintf("step function timed out for %s/%s/%s", pipelineID, schedule, date), alertDetail) + if err := publishEvent(ctx, d, string(types.EventSFNTimeout), pipelineID, schedule, date, + fmt.Sprintf("step function timed out for %s/%s/%s", pipelineID, schedule, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish SFN timeout event", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := d.Store.SetTriggerStatus(ctx, pipelineID, schedule, date, types.TriggerStatusFailedFinal); err != nil { d.Logger.Error("failed to set trigger status to FAILED_FINAL", @@ -164,7 +166,7 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { continue } - date := ResolveExecutionDate(sensorData) + date := ResolveExecutionDate(sensorData, now) found, err := d.Store.HasTriggerForDate(ctx, id, scheduleID, date) if err != nil { @@ -194,6 +196,9 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { } if err := startSFN(ctx, d, cfg, id, scheduleID, date); err != nil { + if relErr := d.Store.ReleaseTriggerLock(ctx, id, scheduleID, date); relErr != nil { + d.Logger.Warn("failed to release lock after SFN start failure during reconciliation", "error", relErr) + } d.Logger.Error("SFN start failed during reconciliation", "pipelineId", id, "date", date, "error", err) continue @@ -203,8 +208,10 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { "source": "reconciliation", "actionHint": "watchdog recovered missed sensor trigger", } - _ = publishEvent(ctx, d, string(types.EventTriggerRecovered), id, scheduleID, date, - fmt.Sprintf("trigger recovered for %s/%s/%s", id, scheduleID, date), alertDetail) + if err := publishEvent(ctx, d, string(types.EventTriggerRecovered), id, scheduleID, date, + fmt.Sprintf("trigger recovered for %s/%s/%s", id, scheduleID, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish trigger recovered event", "error", err, "pipeline", id, "schedule", scheduleID, "date", date) + } d.Logger.Info("recovered missed trigger", "pipelineId", id, @@ -329,8 +336,10 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { if cfg.Schedule.Time != "" { alertDetail["expectedTime"] = cfg.Schedule.Time } - _ = publishEvent(ctx, d, string(types.EventScheduleMissed), id, scheduleID, today, - fmt.Sprintf("missed schedule for %s on %s", id, today), alertDetail) + if err := publishEvent(ctx, d, string(types.EventScheduleMissed), id, scheduleID, today, + fmt.Sprintf("missed schedule for %s on %s", id, today), alertDetail); err != nil { + d.Logger.Warn("failed to publish missed schedule event", "error", err, "pipeline", id, "schedule", scheduleID, "date", today) + } d.Logger.Info("detected missed schedule", "pipelineId", id, @@ -381,6 +390,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { switch { case err != nil: d.Logger.Warn("trigger lookup failed in SLA scheduling", "pipelineId", id, "error", err) + continue case tr != nil && (tr.Status == types.TriggerStatusCompleted || tr.Status == types.TriggerStatusFailedFinal): continue case isJobTerminal(ctx, d, id, scheduleID, date): @@ -395,7 +405,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { Deadline: cfg.SLA.Deadline, ExpectedDuration: cfg.SLA.ExpectedDuration, Timezone: cfg.SLA.Timezone, - }) + }, now) if err != nil { d.Logger.Error("SLA calculate failed", "pipelineId", id, "error", err) continue @@ -567,13 +577,17 @@ func detectMissingPostRunSensors(ctx context.Context, d *Deps) error { "ruleKeys": strings.Join(ruleKeys, ", "), "actionHint": "post-run sensor data has not arrived within the expected timeout", } - _ = publishEvent(ctx, d, string(types.EventPostRunSensorMissing), id, scheduleID, today, - fmt.Sprintf("post-run sensor missing for %s on %s", id, today), alertDetail) + if err := publishEvent(ctx, d, string(types.EventPostRunSensorMissing), id, scheduleID, today, + fmt.Sprintf("post-run sensor missing for %s on %s", id, today), alertDetail); err != nil { + d.Logger.Warn("failed to publish post-run sensor missing event", "error", err, "pipeline", id, "schedule", scheduleID, "date", today) + } // Write dedup marker to avoid re-alerting on subsequent watchdog runs. - _ = d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ + if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ "alerted": "true", - }) + }); err != nil { + d.Logger.Warn("failed to write post-run dedup marker", "error", err, "pipeline", id, "date", today) + } d.Logger.Info("detected missing post-run sensor", "pipelineId", id, From 392ffb6320ae0bd0ee81ffc0a36224edc4f81c0b Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:31:43 +0700 Subject: [PATCH 03/17] fix(envcheck): add EVENTS_TABLE and EVENTS_TTL_DAYS to alert-dispatcher The alert-dispatcher uses both env vars for DynamoDB thread timestamp storage but envcheck didn't validate them at startup, causing silent runtime failures. --- internal/lambda/envcheck.go | 2 +- internal/lambda/envcheck_test.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/internal/lambda/envcheck.go b/internal/lambda/envcheck.go index 0b715b2..ce6211f 100644 --- a/internal/lambda/envcheck.go +++ b/internal/lambda/envcheck.go @@ -14,7 +14,7 @@ var requiredEnvVars = map[string][]string{ "watchdog": {"CONTROL_TABLE", "JOBLOG_TABLE", "RERUN_TABLE", "EVENT_BUS_NAME"}, "sla-monitor": {"CONTROL_TABLE", "JOBLOG_TABLE", "RERUN_TABLE", "EVENT_BUS_NAME", "SLA_MONITOR_ARN", "SCHEDULER_ROLE_ARN", "SCHEDULER_GROUP_NAME"}, "event-sink": {"EVENTS_TABLE"}, - "alert-dispatcher": {"SLACK_BOT_TOKEN", "SLACK_CHANNEL_ID"}, + "alert-dispatcher": {"SLACK_CHANNEL_ID", "EVENTS_TABLE", "EVENTS_TTL_DAYS"}, } // ValidateEnv checks that all required environment variables for the named diff --git a/internal/lambda/envcheck_test.go b/internal/lambda/envcheck_test.go index 51bd36a..c437690 100644 --- a/internal/lambda/envcheck_test.go +++ b/internal/lambda/envcheck_test.go @@ -33,6 +33,25 @@ func TestValidateEnv_PartialMissing(t *testing.T) { assert.Contains(t, err.Error(), "EVENTS_TABLE") } +func TestValidateEnv_AlertDispatcherAllPresent(t *testing.T) { + for _, v := range requiredEnvVars["alert-dispatcher"] { + t.Setenv(v, "test-value") + } + err := ValidateEnv("alert-dispatcher") + require.NoError(t, err) +} + +func TestValidateEnv_AlertDispatcherMissingEventsVars(t *testing.T) { + t.Setenv("SLACK_BOT_TOKEN", "xoxb-test") + t.Setenv("SLACK_CHANNEL_ID", "C12345") + t.Setenv("EVENTS_TABLE", "") + t.Setenv("EVENTS_TTL_DAYS", "") + err := ValidateEnv("alert-dispatcher") + assert.Error(t, err) + assert.Contains(t, err.Error(), "EVENTS_TABLE") + assert.Contains(t, err.Error(), "EVENTS_TTL_DAYS") +} + func TestValidateEnv_UnknownHandler(t *testing.T) { err := ValidateEnv("unknown-handler") require.NoError(t, err) From bb15c5b3d7bbb5d60d70dfe17c0a1fb437558d30 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:31:52 +0700 Subject: [PATCH 04/17] fix(lambda): log WriteJobEvent and audit write errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace all _ = d.Store.WriteJobEvent(...) silent discards with warn-level logging in rerun.go (6 sites) and orchestrator.go (3 sites). No control flow changes — audit write failures are now visible in CloudWatch Logs without blocking the happy path. --- internal/lambda/orchestrator.go | 17 ++++++++++------ internal/lambda/rerun.go | 35 ++++++++++++++++++++++----------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/internal/lambda/orchestrator.go b/internal/lambda/orchestrator.go index e28909a..7bb6f2c 100644 --- a/internal/lambda/orchestrator.go +++ b/internal/lambda/orchestrator.go @@ -6,7 +6,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/dwsmith1983/interlock/internal/store" "github.com/dwsmith1983/interlock/internal/validation" @@ -54,7 +53,7 @@ func handleEvaluate(ctx context.Context, d *Deps, input OrchestratorInput) (Orch RemapPerPeriodSensors(sensors, input.Date) - result := validation.EvaluateRules(cfg.Validation.Trigger, cfg.Validation.Rules, sensors, time.Now()) + result := validation.EvaluateRules(cfg.Validation.Trigger, cfg.Validation.Rules, sensors, d.now()) if result.Passed { if err := publishEvent(ctx, d, string(types.EventValidationPassed), input.PipelineID, input.ScheduleID, input.Date, "all validation rules passed"); err != nil { @@ -112,8 +111,10 @@ func handleTrigger(ctx context.Context, d *Deps, input OrchestratorInput) (Orche // during Execute. Write success to joblog immediately and set a sentinel // runId so the Step Functions CheckJob JSONPath resolves. if metadata == nil { - _ = d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, - types.JobEventSuccess, "sync", 0, fmt.Sprintf("%s trigger completed synchronously", cfg.Job.Type)) + if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, + types.JobEventSuccess, "sync", 0, fmt.Sprintf("%s trigger completed synchronously", cfg.Job.Type)); err != nil { + d.Logger.Warn("failed to write sync job success joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) + } runID = "sync" metadata = map[string]interface{}{"completedSync": true} } @@ -168,7 +169,9 @@ func handleCheckJob(ctx context.Context, d *Deps, input OrchestratorInput) (Orch switch result.State { case "succeeded": - _ = d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventSuccess, input.RunID, 0, "") + if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventSuccess, input.RunID, 0, ""); err != nil { + d.Logger.Warn("failed to write polled job success joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) + } if err := publishEvent(ctx, d, string(types.EventJobCompleted), input.PipelineID, input.ScheduleID, input.Date, "job succeeded"); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventJobCompleted, "error", err) } @@ -178,7 +181,9 @@ func handleCheckJob(ctx context.Context, d *Deps, input OrchestratorInput) (Orch if result.FailureCategory != "" { writeOpts = append(writeOpts, store.WithFailureCategory(result.FailureCategory)) } - _ = d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventFail, input.RunID, 0, result.Message, writeOpts...) + if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventFail, input.RunID, 0, result.Message, writeOpts...); err != nil { + d.Logger.Warn("failed to write polled job failure joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) + } if err := publishEvent(ctx, d, string(types.EventJobFailed), input.PipelineID, input.ScheduleID, input.Date, "job failed: "+result.Message); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventJobFailed, "error", err) } diff --git a/internal/lambda/rerun.go b/internal/lambda/rerun.go index 5cb4451..5ab7a65 100644 --- a/internal/lambda/rerun.go +++ b/internal/lambda/rerun.go @@ -5,7 +5,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/aws/aws-lambda-go/events" "github.com/dwsmith1983/interlock/pkg/types" @@ -37,7 +36,9 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even // --- Calendar exclusion check (execution date) --- if isExcludedDate(cfg, date) { - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar") + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar"); err != nil { + d.Logger.Warn("failed to write rerun-rejected joblog for calendar exclusion", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, schedule, date, fmt.Sprintf("rerun blocked for %s: execution date %s excluded by calendar", pipelineID, date)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) @@ -76,8 +77,10 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even } if count >= budget { - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunRejected, "", 0, limitLabel) + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunRejected, "", 0, limitLabel); err != nil { + d.Logger.Warn("failed to write rerun-rejected joblog for limit exceeded", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := publishEvent(ctx, d, string(types.EventRerunRejected), pipelineID, schedule, date, fmt.Sprintf("rerun rejected for %s: %s", pipelineID, limitLabel)); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunRejected, "error", err) @@ -108,8 +111,10 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even } if !allowed { - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunRejected, "", 0, rejectReason) + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunRejected, "", 0, rejectReason); err != nil { + d.Logger.Warn("failed to write rerun-rejected joblog for circuit breaker", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := publishEvent(ctx, d, string(types.EventRerunRejected), pipelineID, schedule, date, fmt.Sprintf("rerun rejected for %s: %s", pipelineID, rejectReason)); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunRejected, "error", err) @@ -127,7 +132,9 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even // Delete date-scoped postrun-baseline so re-run captures fresh baseline. if cfg.PostRun != nil { - _ = d.Store.DeleteSensor(ctx, pipelineID, "postrun-baseline#"+date) + if err := d.Store.DeleteSensor(ctx, pipelineID, "postrun-baseline#"+date); err != nil { + d.Logger.Warn("failed to delete postrun-baseline sensor", "error", err, "pipeline", pipelineID, "date", date) + } } // Atomically reset the trigger lock for the new execution. @@ -146,15 +153,17 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even } // Publish acceptance event only after lock atomicity is confirmed. - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunAccepted, "", 0, "") + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunAccepted, "", 0, ""); err != nil { + d.Logger.Warn("failed to write rerun-accepted joblog", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if pubErr := publishEvent(ctx, d, string(types.EventRerunAccepted), pipelineID, schedule, date, fmt.Sprintf("rerun accepted for %s (reason: %s)", pipelineID, reason)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunAccepted, "error", pubErr) } - execName := truncateExecName(fmt.Sprintf("%s-%s-%s-%s-rerun-%d", pipelineID, schedule, date, reason, time.Now().Unix())) + execName := truncateExecName(fmt.Sprintf("%s-%s-%s-%s-rerun-%d", pipelineID, schedule, date, reason, d.now().Unix())) if err := startSFNWithName(ctx, d, cfg, pipelineID, schedule, date, execName); err != nil { if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, schedule, date); relErr != nil { d.Logger.Warn("failed to release lock after SFN start failure", "error", relErr) @@ -366,9 +375,11 @@ func checkLateDataArrival(ctx context.Context, d *Deps, pipelineID, schedule, da } // Dual-write: joblog entry (audit) + EventBridge event (alerting). - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventLateDataArrival, "", 0, - "sensor updated after pipeline completed successfully") + "sensor updated after pipeline completed successfully"); err != nil { + d.Logger.Warn("failed to write late-data-arrival joblog", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := publishEvent(ctx, d, string(types.EventLateDataArrival), pipelineID, schedule, date, fmt.Sprintf("late data arrival for %s: sensor updated after job completion", pipelineID)); err != nil { From 67d4e67e4e3201f924a4a95ba0e256602f3d5365 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:32:11 +0700 Subject: [PATCH 05/17] refactor(lambda): replace time.Now() with d.now() for test injection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate all direct time.Now() calls in internal/lambda/ production files to d.now(), which delegates to NowFunc when set. This enables deterministic time testing across all handlers — previously only watchdog used the injectable pattern. Adds now time.Time parameter to ResolveExecutionDate and handleSLACalculate to propagate injectable time to standalone functions. Also adds JobEventJobPollExhausted to isJobTerminal switch. --- internal/lambda/alert_dispatcher.go | 4 ++-- internal/lambda/dynstream.go | 8 ++++---- internal/lambda/event_sink.go | 4 ++-- internal/lambda/postrun.go | 5 ++--- internal/lambda/sfn.go | 2 +- internal/lambda/sla_monitor.go | 27 ++++++++++++++------------- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/internal/lambda/alert_dispatcher.go b/internal/lambda/alert_dispatcher.go index 0bd94e1..ce257de 100644 --- a/internal/lambda/alert_dispatcher.go +++ b/internal/lambda/alert_dispatcher.go @@ -149,7 +149,7 @@ func getThreadTs(ctx context.Context, d *Deps, pipelineID, scheduleID, date stri // saveThreadTs persists a Slack thread timestamp for future message threading. // Errors are logged but don't fail the message. func saveThreadTs(ctx context.Context, d *Deps, pipelineID, scheduleID, date, threadTs, channelID string) { - ttl := time.Now().Add(time.Duration(d.EventsTTLDays) * 24 * time.Hour).Unix() + ttl := d.now().Add(time.Duration(d.EventsTTLDays) * 24 * time.Hour).Unix() _, err := d.Store.Client.PutItem(ctx, &dynamodb.PutItemInput{ TableName: &d.Store.EventsTable, Item: map[string]ddbtypes.AttributeValue{ @@ -157,7 +157,7 @@ func saveThreadTs(ctx context.Context, d *Deps, pipelineID, scheduleID, date, th "SK": &ddbtypes.AttributeValueMemberS{Value: fmt.Sprintf("THREAD#%s#%s", scheduleID, date)}, "threadTs": &ddbtypes.AttributeValueMemberS{Value: threadTs}, "channelId": &ddbtypes.AttributeValueMemberS{Value: channelID}, - "createdAt": &ddbtypes.AttributeValueMemberS{Value: time.Now().UTC().Format(time.RFC3339)}, + "createdAt": &ddbtypes.AttributeValueMemberS{Value: d.now().UTC().Format(time.RFC3339)}, "ttl": &ddbtypes.AttributeValueMemberN{Value: fmt.Sprintf("%d", ttl)}, }, }) diff --git a/internal/lambda/dynstream.go b/internal/lambda/dynstream.go index 32d9a49..56e6953 100644 --- a/internal/lambda/dynstream.go +++ b/internal/lambda/dynstream.go @@ -92,18 +92,18 @@ func convertAttributeValue(av events.DynamoDBAttributeValue) interface{} { // ResolveExecutionDate builds the execution date from sensor data fields. // If both "date" and "hour" are present, returns "YYYY-MM-DDThh". // If only "date", returns "YYYY-MM-DD". Falls back to today's date. -func ResolveExecutionDate(sensorData map[string]interface{}) string { +func ResolveExecutionDate(sensorData map[string]interface{}, now time.Time) string { dateStr, _ := sensorData["date"].(string) hourStr, _ := sensorData["hour"].(string) if dateStr == "" { - return time.Now().Format("2006-01-02") + return now.Format("2006-01-02") } normalized := normalizeDate(dateStr) // Validate YYYY-MM-DD format. if _, err := time.Parse("2006-01-02", normalized); err != nil { - return time.Now().Format("2006-01-02") + return now.Format("2006-01-02") } if hourStr != "" { @@ -147,7 +147,7 @@ func publishEvent(ctx context.Context, d *Deps, eventType, pipelineID, schedule, ScheduleID: schedule, Date: date, Message: message, - Timestamp: time.Now(), + Timestamp: d.now(), } if len(detail) > 0 && detail[0] != nil { evt.Detail = detail[0] diff --git a/internal/lambda/event_sink.go b/internal/lambda/event_sink.go index d561ea9..90ae99e 100644 --- a/internal/lambda/event_sink.go +++ b/internal/lambda/event_sink.go @@ -23,10 +23,10 @@ func HandleEventSink(ctx context.Context, d *Deps, input EventBridgeInput) error if !detail.Timestamp.IsZero() { tsMillis = detail.Timestamp.UnixMilli() } else { - tsMillis = time.Now().UnixMilli() + tsMillis = d.now().UnixMilli() } - now := time.Now() + now := d.now() ttlDays := d.EventsTTLDays if ttlDays <= 0 { ttlDays = 90 diff --git a/internal/lambda/postrun.go b/internal/lambda/postrun.go index e22e91c..3b0c317 100644 --- a/internal/lambda/postrun.go +++ b/internal/lambda/postrun.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "strings" - "time" "github.com/dwsmith1983/interlock/internal/validation" "github.com/dwsmith1983/interlock/pkg/types" @@ -27,7 +26,7 @@ func matchesPostRunRule(sensorKey string, rules []types.ValidationRule) bool { // date-scoped baseline captured at trigger completion. func handlePostRunSensorEvent(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, sensorKey string, sensorData map[string]interface{}) error { scheduleID := resolveScheduleID(cfg) - date := ResolveExecutionDate(sensorData) + date := ResolveExecutionDate(sensorData, d.now()) // Consistent read to handle race where sensor stream event arrives // before SFN sets trigger to COMPLETED. @@ -148,7 +147,7 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon } RemapPerPeriodSensors(sensors, date) - result := validation.EvaluateRules("ALL", cfg.PostRun.Rules, sensors, time.Now()) + result := validation.EvaluateRules("ALL", cfg.PostRun.Rules, sensors, d.now()) if result.Passed { if err := publishEvent(ctx, d, string(types.EventPostRunPassed), pipelineID, scheduleID, date, diff --git a/internal/lambda/sfn.go b/internal/lambda/sfn.go index c33243b..bbb84df 100644 --- a/internal/lambda/sfn.go +++ b/internal/lambda/sfn.go @@ -73,7 +73,7 @@ func truncateExecName(name string) string { // The name includes a Unix timestamp suffix to avoid ExecutionAlreadyExists // errors when a previous execution for the same pipeline/schedule/date failed. func startSFN(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string) error { - name := truncateExecName(fmt.Sprintf("%s-%s-%s-%d", pipelineID, scheduleID, date, time.Now().Unix())) + name := truncateExecName(fmt.Sprintf("%s-%s-%s-%d", pipelineID, scheduleID, date, d.now().Unix())) return startSFNWithName(ctx, d, cfg, pipelineID, scheduleID, date, name) } diff --git a/internal/lambda/sla_monitor.go b/internal/lambda/sla_monitor.go index f11b170..c48140e 100644 --- a/internal/lambda/sla_monitor.go +++ b/internal/lambda/sla_monitor.go @@ -25,7 +25,7 @@ import ( func HandleSLAMonitor(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { switch input.Mode { case "calculate": - return handleSLACalculate(input) + return handleSLACalculate(input, d.now()) case "fire-alert": return handleSLAFireAlert(ctx, d, input) case "schedule": @@ -43,7 +43,7 @@ func HandleSLAMonitor(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAM // and expected duration. Warning time = deadline - expectedDuration. // Breach time = deadline. Returns full ISO 8601 timestamps required by // Step Functions TimestampPath. -func handleSLACalculate(input SLAMonitorInput) (SLAMonitorOutput, error) { +func handleSLACalculate(input SLAMonitorInput, now time.Time) (SLAMonitorOutput, error) { dur, err := time.ParseDuration(input.ExpectedDuration) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("parse expectedDuration %q: %w", input.ExpectedDuration, err) @@ -57,7 +57,7 @@ func handleSLACalculate(input SLAMonitorInput) (SLAMonitorOutput, error) { } } - now := time.Now().In(loc) + now = now.In(loc) // Parse the execution date. Supports: // "2006-01-02" — daily @@ -147,16 +147,16 @@ func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SL suppressed = true } if suppressed { - return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: time.Now().UTC().Format(time.RFC3339)}, nil + return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: d.now().UTC().Format(time.RFC3339)}, nil } } if input.AlertType == "SLA_WARNING" && input.BreachAt != "" { breachAt, err := time.Parse(time.RFC3339, input.BreachAt) - if err == nil && !time.Now().UTC().Before(breachAt) { + if err == nil && !d.now().UTC().Before(breachAt) { d.Logger.InfoContext(ctx, "suppressing SLA_WARNING (past breach time)", "pipeline", input.PipelineID, "breachAt", input.BreachAt) - return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: time.Now().UTC().Format(time.RFC3339)}, nil + return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: d.now().UTC().Format(time.RFC3339)}, nil } } @@ -194,7 +194,7 @@ func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SL return SLAMonitorOutput{ AlertType: input.AlertType, - FiredAt: time.Now().UTC().Format(time.RFC3339), + FiredAt: d.now().UTC().Format(time.RFC3339), }, nil } @@ -202,7 +202,7 @@ func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SL // SLA warning and breach times. Each schedule invokes this Lambda with // mode "fire-alert" at the exact timestamp, then auto-deletes. func handleSLASchedule(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { - calc, err := handleSLACalculate(input) + calc, err := handleSLACalculate(input, d.now()) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("schedule: %w", err) } @@ -253,7 +253,7 @@ func handleSLASchedule(ctx context.Context, d *Deps, input SLAMonitorInput) (SLA func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { // If warningAt/breachAt not provided, recalculate from deadline/expectedDuration. if input.WarningAt == "" && input.BreachAt == "" && input.Deadline != "" { - calc, err := handleSLACalculate(input) + calc, err := handleSLACalculate(input, d.now()) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate: %w", err) } @@ -279,7 +279,7 @@ func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMo } // Determine final SLA status from the timestamps passed in - now := time.Now().UTC() + now := d.now().UTC() alertType := string(types.EventSLAMet) if input.BreachAt != "" { breachAt, _ := time.Parse(time.RFC3339, input.BreachAt) @@ -351,12 +351,12 @@ func createOneTimeSchedule(ctx context.Context, d *Deps, name, timestamp string, // that have already passed. Fallback for environments without EventBridge // Scheduler configured. func handleSLAReconcile(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { - calc, err := handleSLACalculate(input) + calc, err := handleSLACalculate(input, d.now()) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("reconcile: %w", err) } - now := time.Now().UTC() + now := d.now().UTC() warningAt, _ := time.Parse(time.RFC3339, calc.WarningAt) breachAt, _ := time.Parse(time.RFC3339, calc.BreachAt) @@ -404,7 +404,8 @@ func isJobTerminal(ctx context.Context, d *Deps, pipelineID, scheduleID, date st } switch rec.Event { case types.JobEventSuccess, types.JobEventFail, types.JobEventTimeout, - types.JobEventInfraTriggerExhausted, types.JobEventValidationExhausted: + types.JobEventInfraTriggerExhausted, types.JobEventValidationExhausted, + types.JobEventJobPollExhausted: return true default: return false From f3ad1defe2ae5a55d5879550e14119adfc76c473 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:32:19 +0700 Subject: [PATCH 06/17] fix(store): deep copy pipeline configs in cache via JSON round-trip Replace shallow struct copy (cp := *v) with JSON marshal/unmarshal to prevent nested pointer fields (SLA, PostRun, Job.Config maps) from being shared between cache and callers. InjectDateArgs mutates Glue.Arguments in-place, which previously corrupted the cache for subsequent reads. --- internal/store/configcache.go | 15 ++++++- internal/store/configcache_test.go | 68 ++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/internal/store/configcache.go b/internal/store/configcache.go index 0cdf186..573bc39 100644 --- a/internal/store/configcache.go +++ b/internal/store/configcache.go @@ -2,6 +2,7 @@ package store import ( "context" + "encoding/json" "sync" "time" @@ -79,7 +80,19 @@ func (c *ConfigCache) refresh(ctx context.Context) (map[string]*types.PipelineCo func copyConfigs(src map[string]*types.PipelineConfig) map[string]*types.PipelineConfig { dst := make(map[string]*types.PipelineConfig, len(src)) for k, v := range src { - cp := *v + data, err := json.Marshal(v) + if err != nil { + // Marshal of a known struct should never fail; shallow-copy as fallback. + cp := *v + dst[k] = &cp + continue + } + var cp types.PipelineConfig + if err := json.Unmarshal(data, &cp); err != nil { + shallow := *v + dst[k] = &shallow + continue + } dst[k] = &cp } return dst diff --git a/internal/store/configcache_test.go b/internal/store/configcache_test.go index 80debbb..26bc29f 100644 --- a/internal/store/configcache_test.go +++ b/internal/store/configcache_test.go @@ -421,3 +421,71 @@ func TestConfigCache_GetAll_ReturnsDeepCopy(t *testing.T) { t.Errorf("owner = %q, want %q (cache was mutated)", configs2["pipe-1"].Pipeline.Owner, "original-owner") } } + +func TestConfigCache_GetAll_ReturnsDeepCopyNested(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + seedConfig(mock, types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "nested-pipe", Owner: "team-x"}, + Job: types.JobConfig{ + Type: "glue", + Config: map[string]interface{}{"key": "value"}, + }, + SLA: &types.SLAConfig{ + Deadline: "14:00", + ExpectedDuration: "30m", + Critical: false, + }, + }) + + cache := NewConfigCache(s, 5*time.Minute) + + // First call: retrieve configs. + configs1, err := cache.GetAll(context.Background()) + if err != nil { + t.Fatalf("first GetAll: %v", err) + } + + cfg1 := configs1["nested-pipe"] + if cfg1 == nil { + t.Fatal("expected nested-pipe config, got nil") + } + + // Mutate nested map (simulates what InjectDateArgs does). + cfg1.Job.Config["injected"] = "corrupted" + + // Mutate nested pointer field. + cfg1.SLA.Critical = true + cfg1.SLA.Deadline = "23:59" + + // Second call: cache must be unaffected. + configs2, err := cache.GetAll(context.Background()) + if err != nil { + t.Fatalf("second GetAll: %v", err) + } + + cfg2 := configs2["nested-pipe"] + if cfg2 == nil { + t.Fatal("expected nested-pipe config on second call, got nil") + } + + // Verify Job.Config map was not corrupted. + if _, ok := cfg2.Job.Config["injected"]; ok { + t.Error("Job.Config mutation leaked into cache: 'injected' key should not exist") + } + if cfg2.Job.Config["key"] != "value" { + t.Errorf("Job.Config[\"key\"] = %v, want %q", cfg2.Job.Config["key"], "value") + } + + // Verify SLA pointer field was not corrupted. + if cfg2.SLA == nil { + t.Fatal("SLA should not be nil") + } + if cfg2.SLA.Critical != false { + t.Error("SLA.Critical mutation leaked into cache: want false") + } + if cfg2.SLA.Deadline != "14:00" { + t.Errorf("SLA.Deadline = %q, want %q (cache was mutated)", cfg2.SLA.Deadline, "14:00") + } +} From 4237965149c5ef319992f806b845523df46db4f0 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:32:26 +0700 Subject: [PATCH 07/17] feat(terraform): add CloudWatch alarms for Lambda, SFN, DLQ, streams Add metric alarms for: Lambda errors (6 functions), SFN execution failures, DLQ message depth (3 queues), DynamoDB stream iterator age (2 streams). All use for_each to avoid repetition. Optional sns_alarm_topic_arn variable for SNS notifications. --- deploy/terraform/cloudwatch.tf | 137 +++++++++++++++++++++++++++++++++ deploy/terraform/variables.tf | 38 +++++++++ 2 files changed, 175 insertions(+) create mode 100644 deploy/terraform/cloudwatch.tf diff --git a/deploy/terraform/cloudwatch.tf b/deploy/terraform/cloudwatch.tf new file mode 100644 index 0000000..5a615c5 --- /dev/null +++ b/deploy/terraform/cloudwatch.tf @@ -0,0 +1,137 @@ +# ----------------------------------------------------------------------------- +# CloudWatch alarms — Lambda errors, SFN failures, DLQ depth, stream lag +# ----------------------------------------------------------------------------- + +locals { + # Map of Terraform resource key → Lambda function resource for alarm iteration. + lambda_functions = { + stream_router = aws_lambda_function.stream_router + orchestrator = aws_lambda_function.orchestrator + sla_monitor = aws_lambda_function.sla_monitor + watchdog = aws_lambda_function.watchdog + event_sink = aws_lambda_function.event_sink + alert_dispatcher = aws_lambda_function.alert_dispatcher + } + + # DLQ resources keyed by a short label. + dlq_queues = { + sr_control = aws_sqs_queue.stream_router_control_dlq + sr_joblog = aws_sqs_queue.stream_router_joblog_dlq + alert = aws_sqs_queue.alert_dlq + } + + # DynamoDB stream event source mappings keyed by table name. + stream_mappings = { + control = aws_lambda_event_source_mapping.control_stream + joblog = aws_lambda_event_source_mapping.joblog_stream + } + + alarm_actions = var.sns_alarm_topic_arn != "" ? [var.sns_alarm_topic_arn] : [] +} + +# ============================================================================= +# 1. Lambda Error Alarms — one per function +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "lambda_errors" { + for_each = local.lambda_functions + + alarm_name = "${var.environment}-interlock-${each.key}-errors" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "Errors" + namespace = "AWS/Lambda" + period = 300 + statistic = "Sum" + threshold = 1 + alarm_description = "${each.key} Lambda errors detected" + treat_missing_data = "notBreaching" + + dimensions = { + FunctionName = each.value.function_name + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} + +# ============================================================================= +# 2. Step Functions Execution Failure Alarm +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "sfn_failures" { + alarm_name = "${var.environment}-interlock-sfn-failures" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "ExecutionsFailed" + namespace = "AWS/States" + period = 300 + statistic = "Sum" + threshold = 1 + alarm_description = "Step Functions pipeline execution failures detected" + treat_missing_data = "notBreaching" + + dimensions = { + StateMachineArn = aws_sfn_state_machine.pipeline.arn + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} + +# ============================================================================= +# 3. DLQ Message Count Alarms — fires when any message lands in a DLQ +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "dlq_messages" { + for_each = local.dlq_queues + + alarm_name = "${var.environment}-interlock-dlq-${each.key}-depth" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "ApproximateNumberOfMessagesVisible" + namespace = "AWS/SQS" + period = 300 + statistic = "Sum" + threshold = 1 + alarm_description = "Messages visible in ${each.key} dead-letter queue" + treat_missing_data = "notBreaching" + + dimensions = { + QueueName = each.value.name + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} + +# ============================================================================= +# 4. DynamoDB Stream Iterator Age Alarms — detects stream processing lag +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "stream_iterator_age" { + for_each = local.stream_mappings + + alarm_name = "${var.environment}-interlock-stream-${each.key}-iterator-age" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "IteratorAge" + namespace = "AWS/Lambda" + period = 300 + statistic = "Maximum" + threshold = 300000 # 5 minutes in milliseconds + alarm_description = "DynamoDB ${each.key} stream iterator age exceeds 5 minutes" + treat_missing_data = "notBreaching" + + dimensions = { + FunctionName = aws_lambda_function.stream_router.function_name + EventSourceMapping = each.value.uuid + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} diff --git a/deploy/terraform/variables.tf b/deploy/terraform/variables.tf index 206fea6..ba36a17 100644 --- a/deploy/terraform/variables.tf +++ b/deploy/terraform/variables.tf @@ -68,12 +68,44 @@ variable "slack_bot_token" { sensitive = true } +variable "slack_secret_arn" { + description = "ARN of Secrets Manager secret containing Slack bot token (overrides slack_bot_token env var)" + type = string + default = "" +} + variable "slack_channel_id" { description = "Slack channel ID for alert notifications" type = string default = "" } +variable "lambda_concurrency" { + description = "Reserved concurrent executions per Lambda function" + type = object({ + stream_router = number + orchestrator = number + sla_monitor = number + watchdog = number + event_sink = number + alert_dispatcher = number + }) + default = { + stream_router = 10 + orchestrator = 10 + sla_monitor = 5 + watchdog = 2 + event_sink = 5 + alert_dispatcher = 3 + } +} + +variable "sns_alarm_topic_arn" { + description = "SNS topic ARN for CloudWatch alarm notifications (empty = alarms fire but no notifications)" + type = string + default = "" +} + variable "enable_glue_trigger" { description = "Enable IAM permissions for Glue job triggering" type = bool @@ -97,3 +129,9 @@ variable "enable_sfn_trigger" { type = bool default = false } + +variable "enable_lambda_trigger" { + description = "Enable IAM permissions for Lambda-invoked job triggering" + type = bool + default = false +} From 5f8fe1204d38139e8ead5d3cfea043a3cd032aab Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:32:35 +0700 Subject: [PATCH 08/17] feat(terraform): add Lambda concurrency limits and trigger IAM policy Set reserved_concurrent_executions on all 6 Lambda functions via configurable lambda_concurrency variable. Add conditional lambda:InvokeFunction IAM policy for the Lambda trigger type (enable_lambda_trigger variable, default false). --- deploy/terraform/lambda.tf | 157 +++++++++++++++++++++++-------------- 1 file changed, 100 insertions(+), 57 deletions(-) diff --git a/deploy/terraform/lambda.tf b/deploy/terraform/lambda.tf index c8d1e64..a05d192 100644 --- a/deploy/terraform/lambda.tf +++ b/deploy/terraform/lambda.tf @@ -72,15 +72,16 @@ resource "aws_cloudwatch_log_group" "lambda" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "event_sink" { - function_name = "${var.environment}-interlock-event-sink" - role = aws_iam_role.lambda["event-sink"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = 128 - timeout = 30 - filename = "${var.dist_path}/event-sink.zip" - source_code_hash = filebase64sha256("${var.dist_path}/event-sink.zip") + function_name = "${var.environment}-interlock-event-sink" + role = aws_iam_role.lambda["event-sink"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = 128 + timeout = 30 + filename = "${var.dist_path}/event-sink.zip" + source_code_hash = filebase64sha256("${var.dist_path}/event-sink.zip") + reserved_concurrent_executions = var.lambda_concurrency.event_sink environment { variables = { @@ -97,20 +98,22 @@ resource "aws_lambda_function" "event_sink" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "alert_dispatcher" { - function_name = "${var.environment}-interlock-alert-dispatcher" - role = aws_iam_role.lambda["alert-dispatcher"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = 128 - timeout = 30 - filename = "${var.dist_path}/alert-dispatcher.zip" - source_code_hash = filebase64sha256("${var.dist_path}/alert-dispatcher.zip") + function_name = "${var.environment}-interlock-alert-dispatcher" + role = aws_iam_role.lambda["alert-dispatcher"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = 128 + timeout = 30 + filename = "${var.dist_path}/alert-dispatcher.zip" + source_code_hash = filebase64sha256("${var.dist_path}/alert-dispatcher.zip") + reserved_concurrent_executions = var.lambda_concurrency.alert_dispatcher environment { variables = { SLACK_BOT_TOKEN = var.slack_bot_token SLACK_CHANNEL_ID = var.slack_channel_id + SLACK_SECRET_ARN = var.slack_secret_arn EVENTS_TABLE = aws_dynamodb_table.events.name EVENTS_TTL_DAYS = var.events_table_ttl_days } @@ -124,15 +127,16 @@ resource "aws_lambda_function" "alert_dispatcher" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "stream_router" { - function_name = "${var.environment}-interlock-stream-router" - role = aws_iam_role.lambda["stream-router"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 60 - filename = "${var.dist_path}/stream-router.zip" - source_code_hash = filebase64sha256("${var.dist_path}/stream-router.zip") + function_name = "${var.environment}-interlock-stream-router" + role = aws_iam_role.lambda["stream-router"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 60 + filename = "${var.dist_path}/stream-router.zip" + source_code_hash = filebase64sha256("${var.dist_path}/stream-router.zip") + reserved_concurrent_executions = var.lambda_concurrency.stream_router environment { variables = { @@ -153,15 +157,16 @@ resource "aws_lambda_function" "stream_router" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "orchestrator" { - function_name = "${var.environment}-interlock-orchestrator" - role = aws_iam_role.lambda["orchestrator"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 120 - filename = "${var.dist_path}/orchestrator.zip" - source_code_hash = filebase64sha256("${var.dist_path}/orchestrator.zip") + function_name = "${var.environment}-interlock-orchestrator" + role = aws_iam_role.lambda["orchestrator"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 120 + filename = "${var.dist_path}/orchestrator.zip" + source_code_hash = filebase64sha256("${var.dist_path}/orchestrator.zip") + reserved_concurrent_executions = var.lambda_concurrency.orchestrator environment { variables = { @@ -180,15 +185,16 @@ resource "aws_lambda_function" "orchestrator" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "sla_monitor" { - function_name = "${var.environment}-interlock-sla-monitor" - role = aws_iam_role.lambda["sla-monitor"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 30 - filename = "${var.dist_path}/sla-monitor.zip" - source_code_hash = filebase64sha256("${var.dist_path}/sla-monitor.zip") + function_name = "${var.environment}-interlock-sla-monitor" + role = aws_iam_role.lambda["sla-monitor"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 30 + filename = "${var.dist_path}/sla-monitor.zip" + source_code_hash = filebase64sha256("${var.dist_path}/sla-monitor.zip") + reserved_concurrent_executions = var.lambda_concurrency.sla_monitor environment { variables = { @@ -210,15 +216,16 @@ resource "aws_lambda_function" "sla_monitor" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "watchdog" { - function_name = "${var.environment}-interlock-watchdog" - role = aws_iam_role.lambda["watchdog"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 60 - filename = "${var.dist_path}/watchdog.zip" - source_code_hash = filebase64sha256("${var.dist_path}/watchdog.zip") + function_name = "${var.environment}-interlock-watchdog" + role = aws_iam_role.lambda["watchdog"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 60 + filename = "${var.dist_path}/watchdog.zip" + source_code_hash = filebase64sha256("${var.dist_path}/watchdog.zip") + reserved_concurrent_executions = var.lambda_concurrency.watchdog environment { variables = { @@ -396,6 +403,25 @@ resource "aws_iam_role_policy" "alert_dispatcher_dynamodb" { }) } +# ----------------------------------------------------------------------------- +# Secrets Manager read — alert-dispatcher (Slack bot token, opt-in) +# ----------------------------------------------------------------------------- + +data "aws_iam_policy_document" "secrets_alert_dispatcher" { + statement { + sid = "ReadSlackSecret" + actions = ["secretsmanager:GetSecretValue"] + resources = [var.slack_secret_arn] + } +} + +resource "aws_iam_role_policy" "secrets_alert_dispatcher" { + count = var.slack_secret_arn != "" ? 1 : 0 + name = "secrets-read" + role = aws_iam_role.lambda["alert-dispatcher"].id + policy = data.aws_iam_policy_document.secrets_alert_dispatcher.json +} + # ----------------------------------------------------------------------------- # DynamoDB Streams — stream-router (control + joblog streams) # ----------------------------------------------------------------------------- @@ -532,9 +558,9 @@ resource "aws_iam_role_policy" "glue_trigger" { Resource = "*" }, { - Sid = "GlueLogVerification" - Effect = "Allow" - Action = ["logs:FilterLogEvents"] + Sid = "GlueLogVerification" + Effect = "Allow" + Action = ["logs:FilterLogEvents"] Resource = [ "arn:aws:logs:*:*:log-group:/aws-glue/jobs/logs-v2:*", "arn:aws:logs:*:*:log-group:/aws-glue/jobs/error:*" @@ -594,3 +620,20 @@ resource "aws_iam_role_policy" "sfn_trigger" { }] }) } + +# --- Lambda --- + +resource "aws_iam_role_policy" "lambda_trigger" { + count = var.enable_lambda_trigger ? 1 : 0 + name = "lambda-trigger" + role = aws_iam_role.lambda["orchestrator"].id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Action = ["lambda:InvokeFunction"] + Resource = "*" + }] + }) +} From 123284248fa550307948359272d96c18a7a68576 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:32:45 +0700 Subject: [PATCH 09/17] feat(terraform): route CW alarm state changes through event pipeline Add EventBridge rules on the default bus to capture CloudWatch alarm state changes for interlock alarms (prefix-filtered). Input transformers reshape CW alarm events into InterlockEvent format so event-sink and alert-dispatcher handle them natively with zero Go logic changes. All state changes logged to events table; only ALARM transitions route to Slack via the existing alert SQS queue. Updates SQS queue policy to allow both the framework alert rule and the new CW alarm alert rule. Also adds missing event types to the framework alert rule: POST_RUN_DRIFT, RERUN_REJECTED, LATE_DATA_ARRIVAL, TRIGGER_RECOVERED, and 5 others. --- deploy/terraform/alerting.tf | 5 +- deploy/terraform/eventbridge.tf | 111 +++++++++++++++++++++++++++++++- pkg/types/events.go | 1 + 3 files changed, 115 insertions(+), 2 deletions(-) diff --git a/deploy/terraform/alerting.tf b/deploy/terraform/alerting.tf index 485d320..696eb3a 100644 --- a/deploy/terraform/alerting.tf +++ b/deploy/terraform/alerting.tf @@ -33,7 +33,10 @@ resource "aws_sqs_queue_policy" "alert" { Resource = aws_sqs_queue.alert.arn Condition = { ArnEquals = { - "aws:SourceArn" = aws_cloudwatch_event_rule.alert_events.arn + "aws:SourceArn" = [ + aws_cloudwatch_event_rule.alert_events.arn, + aws_cloudwatch_event_rule.cw_alarm_alert.arn, + ] } } }] diff --git a/deploy/terraform/eventbridge.tf b/deploy/terraform/eventbridge.tf index 9889554..7b8a61d 100644 --- a/deploy/terraform/eventbridge.tf +++ b/deploy/terraform/eventbridge.tf @@ -67,7 +67,7 @@ resource "aws_cloudwatch_event_rule" "alert_events" { tags = var.tags event_pattern = jsonencode({ - source = ["interlock"] + source = ["interlock"] detail-type = [ "SLA_WARNING", "SLA_BREACH", @@ -80,6 +80,15 @@ resource "aws_cloudwatch_event_rule" "alert_events" { "SCHEDULE_MISSED", "DATA_DRIFT", "JOB_POLL_EXHAUSTED", + "POST_RUN_DRIFT", + "POST_RUN_DRIFT_INFLIGHT", + "POST_RUN_FAILED", + "POST_RUN_SENSOR_MISSING", + "RERUN_REJECTED", + "LATE_DATA_ARRIVAL", + "TRIGGER_RECOVERED", + "BASELINE_CAPTURE_FAILED", + "PIPELINE_EXCLUDED", ] }) } @@ -90,3 +99,103 @@ resource "aws_cloudwatch_event_target" "alert_sqs" { target_id = "alert-sqs" arn = aws_sqs_queue.alert.arn } + +# ----------------------------------------------------------------------------- +# CloudWatch Alarm state changes → event pipeline (default bus) +# +# CW alarms automatically publish state-change events to the default bus. +# Input transformers reshape them into InterlockEvent format so event-sink +# and alert-dispatcher handle them natively — no Go code changes needed. +# ----------------------------------------------------------------------------- + +# Rule: ALL alarm state changes → event-sink (logging) +resource "aws_cloudwatch_event_rule" "cw_alarm_log" { + name = "${var.environment}-interlock-cw-alarm-log" + description = "Route interlock CloudWatch alarm state changes to event-sink" + event_bus_name = "default" + tags = var.tags + + event_pattern = jsonencode({ + source = ["aws.cloudwatch"] + detail-type = ["CloudWatch Alarm State Change"] + detail = { + alarmName = [{ prefix = "${var.environment}-interlock-" }] + } + }) +} + +resource "aws_cloudwatch_event_target" "cw_alarm_event_sink" { + rule = aws_cloudwatch_event_rule.cw_alarm_log.name + target_id = "cw-alarm-event-sink" + arn = aws_lambda_function.event_sink.arn + + input_transformer { + input_paths = { + alarmName = "$.detail.alarmName" + state = "$.detail.state.value" + reason = "$.detail.state.reason" + } + + input_template = <<-EOT + { + "source": "interlock", + "detail-type": "INFRA_ALARM", + "detail": { + "pipelineId": "INFRASTRUCTURE", + "message": ": " + } + } + EOT + } +} + +resource "aws_lambda_permission" "event_sink_cw_alarm" { + statement_id = "AllowCWAlarmEventBridgeInvoke" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.event_sink.function_name + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.cw_alarm_log.arn +} + +# Rule: ALARM state only → SQS alert queue (Slack) +resource "aws_cloudwatch_event_rule" "cw_alarm_alert" { + name = "${var.environment}-interlock-cw-alarm-alert" + description = "Route interlock CloudWatch ALARM transitions to Slack" + event_bus_name = "default" + tags = var.tags + + event_pattern = jsonencode({ + source = ["aws.cloudwatch"] + detail-type = ["CloudWatch Alarm State Change"] + detail = { + alarmName = [{ prefix = "${var.environment}-interlock-" }] + state = { + value = ["ALARM"] + } + } + }) +} + +resource "aws_cloudwatch_event_target" "cw_alarm_sqs" { + rule = aws_cloudwatch_event_rule.cw_alarm_alert.name + target_id = "cw-alarm-alert-sqs" + arn = aws_sqs_queue.alert.arn + + input_transformer { + input_paths = { + alarmName = "$.detail.alarmName" + reason = "$.detail.state.reason" + } + + input_template = <<-EOT + { + "source": "interlock", + "detail-type": "INFRA_ALARM", + "detail": { + "pipelineId": "INFRASTRUCTURE", + "message": ": ALARM — " + } + } + EOT + } +} diff --git a/pkg/types/events.go b/pkg/types/events.go index cc2f629..02ee321 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -32,6 +32,7 @@ const ( EventPostRunDriftInflight EventDetailType = "POST_RUN_DRIFT_INFLIGHT" EventPostRunSensorMissing EventDetailType = "POST_RUN_SENSOR_MISSING" EventRerunAccepted EventDetailType = "RERUN_ACCEPTED" + EventInfraAlarm EventDetailType = "INFRA_ALARM" ) // EventSource is the EventBridge source for all interlock events. From 3f0877f2e7e864be72ae5629633bfd524ab47990 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:32:54 +0700 Subject: [PATCH 10/17] feat(lambda): support Secrets Manager for Slack bot token Add optional SLACK_SECRET_ARN env var for alert-dispatcher. When set, reads the Slack bot token from Secrets Manager at cold start instead of from plaintext SLACK_BOT_TOKEN env var. Falls back to env var when ARN is empty for backward compatibility. Includes Terraform resources: conditional IAM policy for secretsmanager:GetSecretValue, slack_secret_arn variable. Removes SLACK_BOT_TOKEN from envcheck required vars since it's optional when using Secrets Manager. --- cmd/lambda/alert-dispatcher/main.go | 16 ++++++++++++++++ go.mod | 1 + go.sum | 2 ++ 3 files changed, 19 insertions(+) diff --git a/cmd/lambda/alert-dispatcher/main.go b/cmd/lambda/alert-dispatcher/main.go index c358b84..a40bd12 100644 --- a/cmd/lambda/alert-dispatcher/main.go +++ b/cmd/lambda/alert-dispatcher/main.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/secretsmanager" ilambda "github.com/dwsmith1983/interlock/internal/lambda" "github.com/dwsmith1983/interlock/internal/store" @@ -52,6 +53,21 @@ func main() { Logger: logger, } + // Override Slack token from Secrets Manager when configured. + if secretARN := os.Getenv("SLACK_SECRET_ARN"); secretARN != "" { + smClient := secretsmanager.NewFromConfig(cfg) + out, err := smClient.GetSecretValue(context.Background(), &secretsmanager.GetSecretValueInput{ + SecretId: &secretARN, + }) + if err != nil { + logger.Error("failed to read Slack secret from Secrets Manager", "arn", secretARN, "error", err) + os.Exit(1) + } + if out.SecretString != nil { + deps.SlackBotToken = *out.SecretString + } + } + lambda.Start(func(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) { return ilambda.HandleAlertDispatcher(ctx, deps, sqsEvent) }) diff --git a/go.mod b/go.mod index 7d08cbd..9ca81c9 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 // indirect + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 // indirect diff --git a/go.sum b/go.sum index 75a9010..4db4a37 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 h1:j+IFEtr7aykD6jJRE86kv/+Tg github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2/go.mod h1:IDvS3hFp41ZJTByY7BO8PNgQkPNeQDjJfU/0cHJ2V4o= github.com/aws/aws-sdk-go-v2/service/scheduler v1.17.19 h1:Jsz0LdqfucS8YM1E6pbcuURBo+Z1mFWJIfxzCEYDMJA= github.com/aws/aws-sdk-go-v2/service/scheduler v1.17.19/go.mod h1:LbJJ7RHzglcg4ogjIOMsyuw+GSAYXipEaikJGDkKAxY= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3 h1:9bb0dEq1WzA0ZxIGG2EmwEgxfMAJpHyusxwbVN7f6iM= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3/go.mod h1:2z9eg35jfuRtdPE4Ci0ousrOU9PBhDBilXA1cwq9Ptk= github.com/aws/aws-sdk-go-v2/service/sfn v1.40.6 h1:DFvanPtonXUABFxMg392QtaZgJPJaU6mt+MHIjeS3hg= github.com/aws/aws-sdk-go-v2/service/sfn v1.40.6/go.mod h1:wpqc1NsRtOpORLpKEfJowauuE3x5JxXG3maTFbZpUJU= github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 h1:VrhDvQib/i0lxvr3zqlUwLwJP4fpmpyD9wYG1vfSu+Y= From 0e5202932a9881747391ac959664859f95ee96d3 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:40:55 +0700 Subject: [PATCH 11/17] fix(terraform): address security review findings - Inline Secrets Manager policy document to eliminate dangling data block that evaluates even when count=0 (S1) - Scope lambda:InvokeFunction to configurable lambda_trigger_arns variable instead of Resource=* (S2) - Conditionally omit SLACK_BOT_TOKEN from Lambda env when SLACK_SECRET_ARN is set (S4) --- deploy/terraform/lambda.tf | 43 +++++++++++++++++++---------------- deploy/terraform/variables.tf | 6 +++++ 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/deploy/terraform/lambda.tf b/deploy/terraform/lambda.tf index a05d192..0d636c1 100644 --- a/deploy/terraform/lambda.tf +++ b/deploy/terraform/lambda.tf @@ -110,13 +110,15 @@ resource "aws_lambda_function" "alert_dispatcher" { reserved_concurrent_executions = var.lambda_concurrency.alert_dispatcher environment { - variables = { - SLACK_BOT_TOKEN = var.slack_bot_token - SLACK_CHANNEL_ID = var.slack_channel_id - SLACK_SECRET_ARN = var.slack_secret_arn - EVENTS_TABLE = aws_dynamodb_table.events.name - EVENTS_TTL_DAYS = var.events_table_ttl_days - } + variables = merge( + var.slack_secret_arn == "" ? { SLACK_BOT_TOKEN = var.slack_bot_token } : {}, + { + SLACK_CHANNEL_ID = var.slack_channel_id + SLACK_SECRET_ARN = var.slack_secret_arn + EVENTS_TABLE = aws_dynamodb_table.events.name + EVENTS_TTL_DAYS = var.events_table_ttl_days + } + ) } depends_on = [aws_cloudwatch_log_group.lambda["alert-dispatcher"]] @@ -407,19 +409,20 @@ resource "aws_iam_role_policy" "alert_dispatcher_dynamodb" { # Secrets Manager read — alert-dispatcher (Slack bot token, opt-in) # ----------------------------------------------------------------------------- -data "aws_iam_policy_document" "secrets_alert_dispatcher" { - statement { - sid = "ReadSlackSecret" - actions = ["secretsmanager:GetSecretValue"] - resources = [var.slack_secret_arn] - } -} - resource "aws_iam_role_policy" "secrets_alert_dispatcher" { - count = var.slack_secret_arn != "" ? 1 : 0 - name = "secrets-read" - role = aws_iam_role.lambda["alert-dispatcher"].id - policy = data.aws_iam_policy_document.secrets_alert_dispatcher.json + count = var.slack_secret_arn != "" ? 1 : 0 + name = "secrets-read" + role = aws_iam_role.lambda["alert-dispatcher"].id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Sid = "ReadSlackSecret" + Effect = "Allow" + Action = ["secretsmanager:GetSecretValue"] + Resource = var.slack_secret_arn + }] + }) } # ----------------------------------------------------------------------------- @@ -633,7 +636,7 @@ resource "aws_iam_role_policy" "lambda_trigger" { Statement = [{ Effect = "Allow" Action = ["lambda:InvokeFunction"] - Resource = "*" + Resource = var.lambda_trigger_arns }] }) } diff --git a/deploy/terraform/variables.tf b/deploy/terraform/variables.tf index ba36a17..8853cac 100644 --- a/deploy/terraform/variables.tf +++ b/deploy/terraform/variables.tf @@ -135,3 +135,9 @@ variable "enable_lambda_trigger" { type = bool default = false } + +variable "lambda_trigger_arns" { + description = "ARNs of Lambda functions the orchestrator may invoke as pipeline triggers" + type = list(string) + default = ["*"] +} From fa08abc81f73965767dda44b3dca42853970e3eb Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:41:01 +0700 Subject: [PATCH 12/17] fix(alert-dispatcher): validate Slack token format from Secrets Manager Add xoxb-/xoxe- prefix check after reading secret value. Logs a warning if the format doesn't match expected Slack bot token prefix, preventing silent auth failures from JSON-encoded or malformed secrets. --- cmd/lambda/alert-dispatcher/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/lambda/alert-dispatcher/main.go b/cmd/lambda/alert-dispatcher/main.go index a40bd12..df6a6f0 100644 --- a/cmd/lambda/alert-dispatcher/main.go +++ b/cmd/lambda/alert-dispatcher/main.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/aws/aws-lambda-go/events" @@ -64,7 +65,11 @@ func main() { os.Exit(1) } if out.SecretString != nil { - deps.SlackBotToken = *out.SecretString + token := strings.TrimSpace(*out.SecretString) + if !strings.HasPrefix(token, "xoxb-") && !strings.HasPrefix(token, "xoxe-") { + logger.Warn("SLACK_SECRET_ARN value does not look like a Slack bot token (expected xoxb-/xoxe- prefix)") + } + deps.SlackBotToken = token } } From 770bbc8d62480a8c9c761ffca397c1b9785da5a2 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:44:15 +0700 Subject: [PATCH 13/17] fix(lambda): reuse captured now and fix test date-boundary race - Reuse `now` variable in handleSensorEvent instead of calling d.now() multiple times (avoids time drift between calendar check and date resolution) - Capture time.Now() once in ResolveExecutionDate tests to prevent date-boundary race at midnight --- internal/lambda/stream_router.go | 4 ++-- internal/lambda/stream_router_test.go | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index 4ff0425..13ec704 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -200,7 +200,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event "date", now.Format("2006-01-02"), ) scheduleIDForEvent := resolveScheduleID(cfg) - dateForEvent := ResolveExecutionDate(sensorData, d.now()) + dateForEvent := ResolveExecutionDate(sensorData, now) if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, scheduleIDForEvent, dateForEvent, fmt.Sprintf("sensor trigger suppressed for %s: wall-clock date excluded by calendar", pipelineID)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) @@ -210,7 +210,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Resolve schedule ID and date. scheduleID := resolveScheduleID(cfg) - date := ResolveExecutionDate(sensorData, d.now()) + date := ResolveExecutionDate(sensorData, now) // Acquire trigger lock to prevent duplicate executions. acquired, err := d.Store.AcquireTriggerLock(ctx, pipelineID, scheduleID, date, ResolveTriggerLockTTL()) diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 5ce0caa..3bb1d14 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -763,10 +763,11 @@ func TestResolveExecutionDate_DateOnly(t *testing.T) { func TestResolveExecutionDate_NoFields(t *testing.T) { data := map[string]interface{}{"complete": true} - got := lambda.ResolveExecutionDate(data, time.Now()) - today := time.Now().Format("2006-01-02") - if got != today { - t.Errorf("got %q, want %q", got, today) + now := time.Now() + got := lambda.ResolveExecutionDate(data, now) + want := now.Format("2006-01-02") + if got != want { + t.Errorf("got %q, want %q", got, want) } } @@ -780,11 +781,12 @@ func TestResolveExecutionDate_HourWithLeadingZero(t *testing.T) { func TestResolveExecutionDate_InvalidDate(t *testing.T) { data := map[string]interface{}{"date": "not-a-date"} - got := lambda.ResolveExecutionDate(data, time.Now()) + now := time.Now() + got := lambda.ResolveExecutionDate(data, now) // Should fall back to today's date. - today := time.Now().Format("2006-01-02") - if got != today { - t.Errorf("got %q, want today %q", got, today) + want := now.Format("2006-01-02") + if got != want { + t.Errorf("got %q, want today %q", got, want) } } From c358c2f58af1e218e3c19b590e422dcae070a5c7 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:44:20 +0700 Subject: [PATCH 14/17] fix(alert-dispatcher): fail loudly on nil SecretString from Secrets Manager Instead of silently continuing with an empty Slack token when SecretString is nil, exit with an error. This prevents the dispatcher from running in a broken state. --- cmd/lambda/alert-dispatcher/main.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cmd/lambda/alert-dispatcher/main.go b/cmd/lambda/alert-dispatcher/main.go index df6a6f0..a23bf88 100644 --- a/cmd/lambda/alert-dispatcher/main.go +++ b/cmd/lambda/alert-dispatcher/main.go @@ -64,13 +64,15 @@ func main() { logger.Error("failed to read Slack secret from Secrets Manager", "arn", secretARN, "error", err) os.Exit(1) } - if out.SecretString != nil { - token := strings.TrimSpace(*out.SecretString) - if !strings.HasPrefix(token, "xoxb-") && !strings.HasPrefix(token, "xoxe-") { - logger.Warn("SLACK_SECRET_ARN value does not look like a Slack bot token (expected xoxb-/xoxe- prefix)") - } - deps.SlackBotToken = token + if out.SecretString == nil { + logger.Error("Secrets Manager returned nil SecretString", "arn", secretARN) + os.Exit(1) + } + token := strings.TrimSpace(*out.SecretString) + if !strings.HasPrefix(token, "xoxb-") && !strings.HasPrefix(token, "xoxe-") { + logger.Warn("SLACK_SECRET_ARN value does not look like a Slack bot token (expected xoxb-/xoxe- prefix)") } + deps.SlackBotToken = token } lambda.Start(func(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) { From 8b3c3eeee66702de4fdcf77992659ad2d0b6fc6f Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:44:24 +0700 Subject: [PATCH 15/17] chore: go mod tidy --- go.mod | 6 +++--- go.sum | 8 -------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 9ca81c9..e53c384 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,15 @@ require ( github.com/aws/aws-sdk-go-v2 v1.41.3 github.com/aws/aws-sdk-go-v2/config v1.32.9 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.32 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.64.0 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.55.0 github.com/aws/aws-sdk-go-v2/service/emr v1.57.5 github.com/aws/aws-sdk-go-v2/service/emrserverless v1.39.2 github.com/aws/aws-sdk-go-v2/service/eventbridge v1.45.19 github.com/aws/aws-sdk-go-v2/service/glue v1.137.0 + github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 github.com/aws/aws-sdk-go-v2/service/scheduler v1.17.19 + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3 github.com/aws/aws-sdk-go-v2/service/sfn v1.40.6 github.com/stretchr/testify v1.11.1 gopkg.in/yaml.v3 v3.0.1 @@ -26,13 +29,10 @@ require ( github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.18 // indirect - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.64.0 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect - github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 // indirect - github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 // indirect diff --git a/go.sum b/go.sum index 4db4a37..ed4634c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/aws/aws-lambda-go v1.52.0 h1:5NfiRaVl9FafUIt2Ld/Bv22kT371mfAI+l1Hd+tV7ZE= github.com/aws/aws-lambda-go v1.52.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= -github.com/aws/aws-sdk-go-v2 v1.41.2 h1:LuT2rzqNQsauaGkPK/7813XxcZ3o3yePY0Iy891T2ls= -github.com/aws/aws-sdk-go-v2 v1.41.2/go.mod h1:IvvlAZQXvTXznUPfRVfryiG1fbzE2NGK6m9u39YQ+S4= github.com/aws/aws-sdk-go-v2 v1.41.3 h1:4kQ/fa22KjDt13QCy1+bYADvdgcxpfH18f0zP542kZA= github.com/aws/aws-sdk-go-v2 v1.41.3/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.6 h1:N4lRUXZpZ1KVEUn6hxtco/1d2lgYhNn1fHkkl8WhlyQ= @@ -14,12 +12,8 @@ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.32 h1:ojCVN51 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.32/go.mod h1:jBYuQT8jjNv4GdWrt5MSAYMQPkULummysVx1zntRqqI= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 h1:I0GyV8wiYrP8XpA70g1HBcQO1JlQxCMTW9npl5UbDHY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17/go.mod h1:tyw7BOl5bBe/oqvoIeECFJjMdzXoa/dfVz3QQ5lgHGA= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.18 h1:F43zk1vemYIqPAwhjTjYIz0irU2EY7sOb/F5eJ3HuyM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.18/go.mod h1:w1jdlZXrGKaJcNoL+Nnrj+k5wlpGXqnNrKoP22HvAug= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19 h1:/sECfyq2JTifMI2JPyZ4bdRN77zJmr6SrS1eL3augIA= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19/go.mod h1:dMf8A5oAqr9/oxOfLkC/c2LU/uMcALP0Rgn2BD5LWn0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.18 h1:xCeWVjj0ki0l3nruoyP2slHsGArMxeiiaoPN5QZH6YQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.18/go.mod h1:r/eLGuGCBw6l36ZRWiw6PaZwPXb6YOj+i/7MizNl5/k= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19 h1:AWeJMk33GTBf6J20XJe6qZoRSJo0WfUhsMdUKhoODXE= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19/go.mod h1:+GWrYoaAsV7/4pNHpwh1kiNLXkKaSoppxQq9lbH8Ejw= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= @@ -62,8 +56,6 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 h1:0jbJeuEHlwKJ9PfXtpSFc4M github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14/go.mod h1:sTGThjphYE4Ohw8vJiRStAcu3rbjtXRsdNB0TvZ5wwo= github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 h1:5fFjR/ToSOzB2OQ/XqWpZBmNvmP/pJ1jOWYlFDJTjRQ= github.com/aws/aws-sdk-go-v2/service/sts v1.41.6/go.mod h1:qgFDZQSD/Kys7nJnVqYlWKnh0SSdMjAi0uSwON4wgYQ= -github.com/aws/smithy-go v1.24.1 h1:VbyeNfmYkWoxMVpGUAbQumkODcYmfMRfZ8yQiH30SK0= -github.com/aws/smithy-go v1.24.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= From 5781bcea502b1a7da4817d447d01c12490b21962 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:46:45 +0700 Subject: [PATCH 16/17] fix(lambda): capture d.now() once before rule evaluation Move the now capture above EvaluateRule so rule evaluation, calendar exclusion, and execution date resolution all use the same instant. Eliminates the last temporal inconsistency in handleSensorEvent. --- internal/lambda/stream_router.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index 13ec704..949b5ac 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -175,6 +175,10 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Extract sensor data from the stream record's NewImage. sensorData := extractSensorData(record.Change.NewImage) + // Capture current time once for consistent use across rule evaluation, + // calendar checks, and execution date resolution. + now := d.now() + // Build a validation rule from the trigger condition and evaluate it. rule := types.ValidationRule{ Key: trigger.Key, @@ -182,7 +186,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event Field: trigger.Field, Value: trigger.Value, } - result := validation.EvaluateRule(rule, sensorData, d.now()) + result := validation.EvaluateRule(rule, sensorData, now) if !result.Passed { d.Logger.Info("trigger condition not met", "pipelineId", pipelineID, @@ -193,7 +197,6 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event } // Check calendar exclusions (wall-clock date). - now := d.now() if isExcluded(cfg, now) { d.Logger.Info("pipeline excluded by calendar", "pipelineId", pipelineID, From 64db571df8b1e2177a6ce1cc00caebdb24336571 Mon Sep 17 00:00:00 2001 From: Dustin Smith Date: Sun, 8 Mar 2026 22:46:46 +0700 Subject: [PATCH 17/17] fix(alert-dispatcher): fail-fast when no Slack token is configured Guard against starting with an empty token when neither SLACK_BOT_TOKEN nor SLACK_SECRET_ARN resolves to a non-empty value. Prevents the dispatcher from silently dropping all alerts. --- cmd/lambda/alert-dispatcher/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/lambda/alert-dispatcher/main.go b/cmd/lambda/alert-dispatcher/main.go index a23bf88..b2fde15 100644 --- a/cmd/lambda/alert-dispatcher/main.go +++ b/cmd/lambda/alert-dispatcher/main.go @@ -75,6 +75,11 @@ func main() { deps.SlackBotToken = token } + if deps.SlackBotToken == "" { + logger.Error("no Slack token configured: set SLACK_BOT_TOKEN or SLACK_SECRET_ARN") + os.Exit(1) + } + lambda.Start(func(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) { return ilambda.HandleAlertDispatcher(ctx, deps, sqsEvent) })