diff --git a/CHANGELOG.md b/CHANGELOG.md index da221d4..0853dea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,34 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.2] - 2026-03-08 + +### Added + +- **Configurable sensor trigger deadline** (`trigger.deadline`) — closes auto-trigger window after expiry, publishes `SENSOR_DEADLINE_EXPIRED`. +- **TOCTOU-safe `CreateTriggerIfAbsent` store method** using DynamoDB conditional writes. +- **CloudWatch alarms**: Per-function Lambda error alarms, Step Functions failure alarm, DLQ depth alarms (control, joblog, alert queues), and DynamoDB Stream iterator age alarms. All alarm actions conditionally route to an SNS topic via `sns_alarm_topic_arn`. +- **EventBridge input transformers for alarm routing**: CloudWatch alarm state changes are reshaped into `INFRA_ALARM` InterlockEvent format and routed to both event-sink and alert-dispatcher — zero Go code changes required. +- **Lambda concurrency limits**: Per-function reserved concurrent executions via `lambda_concurrency` object variable (defaults: stream-router=10, orchestrator=10, sla-monitor=5, watchdog=2, event-sink=5, alert-dispatcher=3). +- **Secrets Manager Slack token**: `slack_secret_arn` variable enables alert-dispatcher to read the Slack bot token from Secrets Manager instead of an environment variable. Falls back to `SLACK_BOT_TOKEN` env var if not configured. +- **Lambda trigger IAM scoping**: `enable_lambda_trigger` and `lambda_trigger_arns` variables grant orchestrator `lambda:InvokeFunction` permission scoped to specific function ARNs. + +### Changed + +- **Sensor-triggered pipelines now receive proactive SLA scheduling** (removed cron-only guard). +- **Trigger deadline check extracted into independent `checkTriggerDeadlines` watchdog scan**. +- **Env var expansion restricted to `INTERLOCK_` prefix**: `os.ExpandEnv` in trigger config (Airflow, Databricks, Lambda) now only expands variables prefixed with `INTERLOCK_`, preventing unintended system variable substitution. +- **`time.Now()` → `d.now()` across all handlers**: All Lambda handlers use dependency-injected time for consistent testability. +- **Config cache deep copy via JSON round-trip**: `GetAll()` returns a deep copy preventing callers from mutating shared cache state. +- **Single-instant rule evaluation**: All validation rules within an evaluation cycle use the same timestamp for temporal consistency. + +### Fixed + +- **Trigger lock release on SFN start failure**: Both rerun and job-failure retry paths release the trigger lock if `StartExecution` fails, preventing permanently stuck pipelines (previously caused 4.5h deadlock). +- **`scheduleSLAAlerts` skip-on-error**: SLA alert scheduling now correctly skips on error instead of falling through to the next handler. +- **9 silent audit write error discards → WARN logging**: All `publishEvent` call sites across stream-router and orchestrator now log errors at WARN level instead of silently discarding them. +- **Missing `EVENTS_TABLE`/`EVENTS_TTL_DAYS` envcheck for alert-dispatcher**: Startup validation now checks for required environment variables. + ## [0.7.1] - 2026-03-08 ### Fixed @@ -315,6 +343,8 @@ Initial release of the Interlock STAMP-based safety framework for data pipeline Released under the [Elastic License 2.0](LICENSE). +[0.7.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.2 +[0.7.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.1 [0.7.0]: https://github.com/dwsmith1983/interlock/releases/tag/v0.7.0 [0.6.2]: https://github.com/dwsmith1983/interlock/releases/tag/v0.6.2 [0.6.1]: https://github.com/dwsmith1983/interlock/releases/tag/v0.6.1 diff --git a/README.md b/README.md index 21107d2..e8f930e 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,7 @@ job: | `emr-serverless` | AWS SDK | EMR Serverless job runs | | `step-function` | AWS SDK | AWS Step Functions executions | | `databricks` | HTTP (REST 2.1) | Databricks job runs | +| `lambda` | AWS SDK | Direct Lambda invocation | ## Deployment @@ -170,7 +171,7 @@ module "interlock" { } ``` -The module creates all required infrastructure: DynamoDB tables, Lambda functions, Step Functions state machine, EventBridge rules, and IAM roles. +The module creates all required infrastructure: DynamoDB tables, Lambda functions, Step Functions state machine, EventBridge rules, CloudWatch alarms, and IAM roles. See the [deployment docs](https://dwsmith1983.github.io/interlock/docs/deployment/terraform/) for the full variable reference. ## Example @@ -208,7 +209,7 @@ interlock/ ```bash make test # Run all tests -make build-lambda # Build 4 Lambda handlers (linux/arm64) +make build-lambda # Build 6 Lambda handlers (linux/arm64) make lint # go fmt + go vet ``` diff --git a/docs/content/docs/architecture/aws.md b/docs/content/docs/architecture/aws.md index a87c9dc..5d7052c 100644 --- a/docs/content/docs/architecture/aws.md +++ b/docs/content/docs/architecture/aws.md @@ -112,7 +112,7 @@ Multi-mode dispatcher invoked by Step Functions. Each invocation specifies a `mo | `job-poll-exhausted` | Publish `JOB_POLL_EXHAUSTED` event, write timeout joblog entry, set trigger to `FAILED_FINAL` when the job poll window expires | | `complete-trigger` | Set trigger status to `COMPLETED` (on success) or `FAILED_FINAL` (on failure/timeout). Ensures trigger records reflect terminal state | -Supported trigger types: `http`, `command`, `airflow`, `glue`, `emr`, `emr-serverless`, `step-function`, `databricks`. +Supported trigger types: `http`, `command`, `airflow`, `glue`, `emr`, `emr-serverless`, `step-function`, `databricks`, `lambda`. ### sla-monitor @@ -145,6 +145,8 @@ Processes messages from the SQS alert queue. Formats pipeline events into Slack **Threading**: looks up existing thread records in the events table (`THREAD#{scheduleId}#{date}`). If a thread exists for the pipeline-day, replies in-thread. Otherwise, posts a new message and saves the thread timestamp for subsequent alerts. +**Secrets Manager**: when `SLACK_SECRET_ARN` is set, the Slack bot token is read from Secrets Manager at cold start instead of the `SLACK_BOT_TOKEN` environment variable. The module conditionally grants `secretsmanager:GetSecretValue` to the alert-dispatcher role. + **Error handling**: Slack API errors return batch item failures so SQS retries individual messages. Thread lookup/save errors are logged but don't fail the message delivery. ## Step Functions State Machine @@ -217,6 +219,21 @@ The ASL template at `deploy/statemachine.asl.json` uses substitution variables r - `${sfn_timeout_seconds}` — global execution timeout (default 14400 / 4h) - `${trigger_max_attempts}` — trigger infrastructure retry count (default 3) +## CloudWatch Alarms + +The Terraform module creates CloudWatch alarms across four categories to monitor infrastructure health: + +| Category | Alarms | Metric | Threshold | +|---|---|---|---| +| Lambda errors | 6 (one per function) | `Errors` | `>= 1` per 5-minute period | +| SFN failures | 1 (pipeline state machine) | `ExecutionsFailed` | `>= 1` per 5-minute period | +| DLQ depth | 3 (control, joblog, alert queues) | `ApproximateNumberOfMessagesVisible` | `>= 1` | +| Stream iterator age | 2 (control, joblog streams) | `IteratorAge` | `>= 300,000ms` (5 min) | + +Alarm state changes are reshaped into `INFRA_ALARM` events via EventBridge input transformers and routed to event-sink and alert-dispatcher. Optionally, alarms also publish to an SNS topic via the `sns_alarm_topic_arn` variable. + +See [Alerting](../../reference/alerting/#cloudwatch-alarms) for details on the event flow and consumer patterns. + ## EventBridge A custom event bus (`{environment}-interlock-events`) receives all framework events. All four Lambda functions have `events:PutEvents` permission on this bus. @@ -245,7 +262,7 @@ Each Lambda function has its own IAM role with least-privilege policies: | sla-monitor | None | PutEvents | `scheduler:CreateSchedule`, `scheduler:DeleteSchedule` | | watchdog | Read control, joblog, rerun; write control only | PutEvents | -- | | event-sink | Write events table | -- | -- | -| alert-dispatcher | Read/write events table (thread storage) | -- | SQS ReceiveMessage/DeleteMessage | +| alert-dispatcher | Read/write events table (thread storage) | -- | SQS ReceiveMessage/DeleteMessage, conditional `secretsmanager:GetSecretValue` | Trigger permissions for the orchestrator are opt-in via Terraform variables (`enable_glue_trigger`, `enable_emr_trigger`, etc.). diff --git a/docs/content/docs/deployment/terraform.md b/docs/content/docs/deployment/terraform.md index c311657..4a3d290 100644 --- a/docs/content/docs/deployment/terraform.md +++ b/docs/content/docs/deployment/terraform.md @@ -113,6 +113,69 @@ terraform apply | `enable_emr_trigger` | bool | `false` | Grant orchestrator Lambda permission to submit EMR steps | | `enable_emr_serverless_trigger` | bool | `false` | Grant orchestrator Lambda permission to start EMR Serverless jobs | | `enable_sfn_trigger` | bool | `false` | Grant orchestrator Lambda permission to start Step Functions executions | +| `enable_lambda_trigger` | bool | `false` | Grant orchestrator Lambda permission to invoke Lambda functions | +| `lambda_trigger_arns` | list(string) | `["*"]` | Lambda function ARNs the orchestrator may invoke (scoped IAM). Only used when `enable_lambda_trigger` is `true` | +| `slack_bot_token` | string (sensitive) | `""` | Slack Bot API token with `chat:write` scope for alert-dispatcher | +| `slack_channel_id` | string | `""` | Slack channel ID for alert notifications | +| `slack_secret_arn` | string | `""` | AWS Secrets Manager ARN containing the Slack bot token. When set, alert-dispatcher reads the token from Secrets Manager instead of the `slack_bot_token` environment variable | +| `events_table_ttl_days` | number | `90` | TTL in days for events table records | +| `lambda_concurrency` | object | See below | Reserved concurrent executions per Lambda function | +| `sns_alarm_topic_arn` | string | `""` | SNS topic ARN for CloudWatch alarm notifications. When set, all alarms send to this topic | + +#### Lambda Concurrency Defaults + +The `lambda_concurrency` variable is an object with per-function keys: + +```hcl +lambda_concurrency = { + stream_router = 10 + orchestrator = 10 + sla_monitor = 5 + watchdog = 2 + event_sink = 5 + alert_dispatcher = 3 +} +``` + +Set any key to `-1` to use unreserved concurrency (Lambda default). + +## Secrets Manager + +By default, alert-dispatcher reads the Slack bot token from the `SLACK_BOT_TOKEN` environment variable. For production deployments, store the token in AWS Secrets Manager and pass the ARN: + +```hcl +module "interlock" { + source = "path/to/interlock/deploy/terraform" + + slack_secret_arn = "arn:aws:secretsmanager:us-east-1:123456789012:secret:interlock/slack-token-AbCdEf" + slack_channel_id = "C0123456789" + # ... +} +``` + +When `slack_secret_arn` is set, the module automatically grants `secretsmanager:GetSecretValue` to the alert-dispatcher Lambda role. The token must have the `xoxb-` prefix (Bot token). + +## CloudWatch Alarms + +The module creates CloudWatch alarms across four categories. All alarms conditionally route to an SNS topic when `sns_alarm_topic_arn` is set. + +| Category | Alarms | Threshold | +|---|---|---| +| Lambda errors | One per function (6 total) | `Errors >= 1` per 5-minute period | +| Step Functions failures | One for the pipeline state machine | `ExecutionsFailed >= 1` per 5-minute period | +| DLQ depth | One per dead-letter queue (3 total) | `ApproximateNumberOfMessagesVisible >= 1` | +| Stream iterator age | One per DynamoDB Stream mapping (2 total) | `IteratorAge >= 300,000ms` (5 minutes) | + +CloudWatch alarm state changes are reshaped via EventBridge input transformers into `INFRA_ALARM` events and routed to both event-sink and alert-dispatcher. This means infrastructure alerts appear in the events table and Slack alongside pipeline lifecycle events — no additional Go code required. + +```hcl +module "interlock" { + source = "path/to/interlock/deploy/terraform" + + sns_alarm_topic_arn = aws_sns_topic.ops_alerts.arn + # ... +} +``` ## What Gets Created diff --git a/docs/content/docs/reference/alerting.md b/docs/content/docs/reference/alerting.md index 3c8038d..4c2bd1d 100644 --- a/docs/content/docs/reference/alerting.md +++ b/docs/content/docs/reference/alerting.md @@ -53,6 +53,12 @@ Published by the **orchestrator** and **stream-router** Lambdas during the pipel | `INFRA_FAILURE` | Unrecoverable infrastructure error | Step Functions execution reaches Fail state | | `SFN_TIMEOUT` | Step Functions execution timed out | Global `TimeoutSeconds` exceeded (configurable via `sfn_timeout_seconds` Terraform variable) | | `DATA_DRIFT` | Post-run drift detected | Post-run evaluation detected data quality drift against baseline | +| `POST_RUN_DRIFT` | Post-run sensor changed after completion | Sensor value drifted from baseline after job completed | +| `POST_RUN_DRIFT_INFLIGHT` | Post-run sensor changed while job running | Informational drift detected during an active execution | +| `POST_RUN_SENSOR_MISSING` | No post-run sensor data received | Watchdog detected no post-run sensor within `sensorTimeout` | +| `BASELINE_CAPTURE_FAILED` | Baseline capture error | Error occurred while capturing the post-run baseline at trigger completion | +| `SENSOR_DEADLINE_EXPIRED` | Sensor trigger window closed without pipeline starting | Sensor trigger window closed without pipeline starting; manual restart required via `RERUN_REQUEST` | +| `PIPELINE_EXCLUDED` | Pipeline excluded by calendar | Sensor, rerun, job-failure, or post-run drift skipped due to calendar exclusion | ### Rerun Events @@ -62,6 +68,7 @@ Published by the **stream-router** when processing rerun requests and late data |---|---|---| | `LATE_DATA_ARRIVAL` | Sensor updated after job completed | Sensor `updatedAt` is newer than joblog `completedAt` | | `RERUN_REJECTED` | Rerun request rejected by circuit breaker | No new sensor data since last job completion | +| `RERUN_ACCEPTED` | Rerun request accepted | Rerun passed circuit breaker validation and trigger lock was reset | ### Watchdog Events @@ -215,3 +222,38 @@ Since EventBridge supports many target types, you can build any alert delivery p | Custom processing | Lambda function | Enrich, deduplicate, or aggregate events | | Queue for batch processing | SQS queue | Downstream systems that process events in batches | | Cross-account delivery | EventBridge in another account | Centralized observability | + +## CloudWatch Alarms + +The Terraform module creates CloudWatch alarms that monitor infrastructure health independently of pipeline events. Alarm state changes are reshaped into `INFRA_ALARM` events via EventBridge input transformers and routed to both event-sink and alert-dispatcher — no additional Go code required. + +### Alarm Categories + +| Category | Count | Metric | Threshold | +|---|---|---|---| +| Lambda errors | 6 (one per function) | `Errors` | `>= 1` per 5-minute period | +| Step Functions failures | 1 | `ExecutionsFailed` | `>= 1` per 5-minute period | +| DLQ depth | 3 (control, joblog, alert) | `ApproximateNumberOfMessagesVisible` | `>= 1` | +| Stream iterator age | 2 (control, joblog) | `IteratorAge` | `>= 300,000ms` (5 minutes) | + +### How It Works + +1. CloudWatch detects a metric threshold breach and transitions the alarm to `ALARM` state +2. The alarm state change publishes to the default EventBridge bus +3. An EventBridge rule with an **input transformer** reshapes the alarm into an `INFRA_ALARM` event with the standard Interlock event structure +4. The transformed event routes to both event-sink (→ events table) and the SQS alert queue (→ alert-dispatcher → Slack) + +### SNS Integration + +Optionally route alarm notifications to an SNS topic for external consumers (PagerDuty, email, etc.): + +```hcl +module "interlock" { + source = "path/to/interlock/deploy/terraform" + + sns_alarm_topic_arn = aws_sns_topic.ops_alerts.arn + # ... +} +``` + +When `sns_alarm_topic_arn` is set, all alarms add the topic as an alarm action alongside the EventBridge route. diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index 102498f..8a5be32 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -30,6 +30,9 @@ func HandleWatchdog(ctx context.Context, d *Deps) error { if err := scheduleSLAAlerts(ctx, d); err != nil { d.Logger.Error("proactive SLA scheduling failed", "error", err) } + if err := checkTriggerDeadlines(ctx, d); err != nil { + d.Logger.Error("trigger deadline check failed", "error", err) + } if err := detectMissingPostRunSensors(ctx, d); err != nil { d.Logger.Error("post-run sensor absence detection failed", "error", err) } @@ -375,13 +378,6 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { continue } - // Sensor-triggered pipelines (no cron) get SLA scheduling when - // their SFN execution starts — proactive scheduling here would - // fire alerts for the previous hour before the pipeline runs. - if cfg.Schedule.Cron == "" { - continue - } - scheduleID := resolveScheduleID(cfg) date := resolveWatchdogSLADate(cfg, now) @@ -412,49 +408,90 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { } breachAt, _ := time.Parse(time.RFC3339, calc.BreachAt) - if !breachAt.IsZero() && !breachAt.After(now) { - continue - } - - var scheduleErr bool - for _, alert := range []struct { - suffix string - alertType string - timestamp string - }{ - {"warning", "SLA_WARNING", calc.WarningAt}, - {"breach", "SLA_BREACH", calc.BreachAt}, - } { - name := slaScheduleName(id, scheduleID, date, alert.suffix) - payload := SLAMonitorInput{ - Mode: "fire-alert", - PipelineID: id, - ScheduleID: scheduleID, - Date: date, - AlertType: alert.alertType, - } - if alert.alertType == "SLA_WARNING" { - payload.BreachAt = calc.BreachAt - } - if err := createOneTimeSchedule(ctx, d, name, alert.timestamp, payload); err != nil { - var conflict *schedulerTypes.ConflictException - if errors.As(err, &conflict) { - continue + if breachAt.IsZero() || breachAt.After(now) { + // SLA breach is in the future — create schedules. + var scheduleErr bool + for _, alert := range []struct { + suffix string + alertType string + timestamp string + }{ + {"warning", "SLA_WARNING", calc.WarningAt}, + {"breach", "SLA_BREACH", calc.BreachAt}, + } { + name := slaScheduleName(id, scheduleID, date, alert.suffix) + payload := SLAMonitorInput{ + Mode: "fire-alert", + PipelineID: id, + ScheduleID: scheduleID, + Date: date, + AlertType: alert.alertType, + } + if alert.alertType == "SLA_WARNING" { + payload.BreachAt = calc.BreachAt } - d.Logger.Error("create SLA schedule failed", - "pipelineId", id, "suffix", alert.suffix, "error", err) - scheduleErr = true + if err := createOneTimeSchedule(ctx, d, name, alert.timestamp, payload); err != nil { + var conflict *schedulerTypes.ConflictException + if errors.As(err, &conflict) { + continue + } + d.Logger.Error("create SLA schedule failed", + "pipelineId", id, "suffix", alert.suffix, "error", err) + scheduleErr = true + } + } + + if !scheduleErr { + d.Logger.Info("proactive SLA schedules ensured", + "pipelineId", id, + "date", date, + "warningAt", calc.WarningAt, + "breachAt", calc.BreachAt, + ) } } + } + return nil +} - if !scheduleErr { - d.Logger.Info("proactive SLA schedules ensured", - "pipelineId", id, - "date", date, - "warningAt", calc.WarningAt, - "breachAt", calc.BreachAt, - ) +// checkTriggerDeadlines evaluates trigger deadlines independently of SLA +// configuration. Pipelines with a Trigger.Deadline but no SLA config are +// checked here. For each pipeline, if the trigger deadline has passed and +// no trigger exists, the sensor trigger window is closed. +func checkTriggerDeadlines(ctx context.Context, d *Deps) error { + configs, err := d.ConfigCache.GetAll(ctx) + if err != nil { + return fmt.Errorf("load configs: %w", err) + } + + now := d.now() + + for id, cfg := range configs { + if cfg.Schedule.Trigger == nil || cfg.Schedule.Trigger.Deadline == "" { + continue + } + + if isExcluded(cfg, now) { + continue + } + + scheduleID := resolveScheduleID(cfg) + triggerDate := resolveTriggerDeadlineDate(cfg, now) + + triggerRec, err := d.Store.GetTrigger(ctx, id, scheduleID, triggerDate) + if err != nil { + d.Logger.Warn("trigger lookup failed in deadline check", "pipelineId", id, "error", err) + continue + } + if triggerRec != nil { + continue + } + + if isJobTerminal(ctx, d, id, scheduleID, triggerDate) { + continue } + + closeSensorTriggerWindow(ctx, d, id, scheduleID, triggerDate, cfg, now) } return nil } @@ -472,6 +509,126 @@ func resolveWatchdogSLADate(cfg *types.PipelineConfig, now time.Time) string { return now.Format("2006-01-02") } +// resolveTriggerDeadlineDate determines the execution date for trigger +// deadline evaluation. Uses the trigger deadline format (not SLA deadline) +// to decide between hourly composite date and daily date. +func resolveTriggerDeadlineDate(cfg *types.PipelineConfig, now time.Time) string { + if strings.HasPrefix(cfg.Schedule.Trigger.Deadline, ":") { + prev := now.Add(-time.Hour) + return prev.Format("2006-01-02") + "T" + fmt.Sprintf("%02d", prev.Hour()) + } + return now.Format("2006-01-02") +} + +// resolveTriggerDeadlineTime computes the absolute time when the trigger +// window closes for the given deadline string and execution date. +// +// For relative (hourly) deadlines like ":45" with composite date "2026-03-09T13": +// - Data for hour 13 is processed in hour 14 +// - The deadline resolves to 2026-03-09T14:45:00 in the configured timezone +// +// For absolute (daily) deadlines like "09:00" with date "2026-03-09": +// - The deadline resolves to 2026-03-09T09:00:00 in the configured timezone +// +// Unlike handleSLACalculate, this does NOT roll forward when the time is past. +// Returns zero time on parse errors. +func resolveTriggerDeadlineTime(deadline, date, timezone string) time.Time { + loc := time.UTC + if timezone != "" { + if parsed, err := time.LoadLocation(timezone); err == nil { + loc = parsed + } + } + + if strings.HasPrefix(deadline, ":") { + // Relative (hourly): ":MM" — deadline is in the NEXT hour after the + // composite date's hour, since data for hour H is processed in hour H+1. + minute, err := strconv.Atoi(strings.TrimPrefix(deadline, ":")) + if err != nil { + return time.Time{} + } + // Parse composite date "YYYY-MM-DDThh". + if len(date) < 13 || date[10] != 'T' { + return time.Time{} + } + t, err := time.ParseInLocation("2006-01-02T15", date, loc) + if err != nil { + return time.Time{} + } + // Add 1 hour for the processing window, then set the minute. + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, minute, 0, 0, loc) + } + + // Absolute (daily): "HH:MM". + parts := strings.SplitN(deadline, ":", 2) + if len(parts) != 2 { + return time.Time{} + } + hour, err := strconv.Atoi(parts[0]) + if err != nil { + return time.Time{} + } + minute, err := strconv.Atoi(parts[1]) + if err != nil { + return time.Time{} + } + t, err := time.ParseInLocation("2006-01-02", date, loc) + if err != nil { + return time.Time{} + } + return time.Date(t.Year(), t.Month(), t.Day(), hour, minute, 0, 0, loc) +} + +// closeSensorTriggerWindow checks whether the trigger deadline has passed for +// a sensor-triggered pipeline that never started. If expired, it writes a +// FAILED_FINAL trigger record (blocking future auto-triggers) and publishes +// a SENSOR_DEADLINE_EXPIRED event. A human can still restart via RERUN_REQUEST. +func closeSensorTriggerWindow(ctx context.Context, d *Deps, pipelineID, scheduleID, date string, cfg *types.PipelineConfig, now time.Time) { + // Compute the absolute trigger deadline time directly — we do NOT use + // handleSLACalculate here because it rolls daily deadlines forward 24h + // when past, which defeats the purpose of checking for expiry. + tz := "" + if cfg.SLA != nil { + tz = cfg.SLA.Timezone + } + triggerDeadline := resolveTriggerDeadlineTime(cfg.Schedule.Trigger.Deadline, date, tz) + if triggerDeadline.IsZero() || triggerDeadline.After(now) { + return + } + + // Use conditional put to avoid overwriting a trigger that was acquired + // between the GetTrigger read and this write (TOCTOU protection). + created, err := d.Store.CreateTriggerIfAbsent(ctx, pipelineID, scheduleID, date, types.TriggerStatusFailedFinal) + if err != nil { + d.Logger.Error("failed to write FAILED_FINAL for expired trigger deadline", + "pipelineId", pipelineID, "schedule", scheduleID, "date", date, "error", err) + return + } + if !created { + // Trigger row appeared since the read — pipeline started, don't interfere. + d.Logger.Info("trigger appeared during deadline check, skipping window close", + "pipelineId", pipelineID, "schedule", scheduleID, "date", date) + return + } + + alertDetail := map[string]interface{}{ + "source": "watchdog", + "triggerDeadline": cfg.Schedule.Trigger.Deadline, + "actionHint": "auto-trigger window closed — use RERUN_REQUEST to restart", + } + if err := publishEvent(ctx, d, string(types.EventSensorDeadlineExpired), pipelineID, scheduleID, date, + fmt.Sprintf("trigger deadline expired for %s/%s/%s", pipelineID, scheduleID, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish sensor deadline expired event", "error", err, "pipeline", pipelineID) + } + + d.Logger.Info("sensor trigger window closed", + "pipelineId", pipelineID, + "schedule", scheduleID, + "date", date, + "triggerDeadline", cfg.Schedule.Trigger.Deadline, + ) +} + // defaultSensorTimeout is the default grace period for post-run sensors to // arrive after a pipeline completes. If no SensorTimeout is configured in // PostRunConfig, this value is used. diff --git a/internal/lambda/watchdog_test.go b/internal/lambda/watchdog_test.go index 70434b5..b48b0dd 100644 --- a/internal/lambda/watchdog_test.go +++ b/internal/lambda/watchdog_test.go @@ -1516,7 +1516,7 @@ func TestWatchdog_ScheduleSLAAlerts_CalendarExcluded_Skips(t *testing.T) { assert.Empty(t, schedMock.created, "calendar-excluded day should skip SLA scheduling") } -func TestWatchdog_ScheduleSLAAlerts_SensorTriggered_Skips(t *testing.T) { +func TestWatchdog_ScheduleSLAAlerts_SensorTriggered_CreatesSLASchedules(t *testing.T) { mock := newMockDDB() d, _, _ := testDeps(mock) schedMock := &mockScheduler{} @@ -1525,25 +1525,30 @@ func TestWatchdog_ScheduleSLAAlerts_SensorTriggered_Skips(t *testing.T) { d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" d.SchedulerGroupName = "interlock-sla" - // Sensor-triggered pipeline (no cron) — SLA scheduling happens in SFN, - // not proactively from the watchdog. + // Fix time at 14:10 UTC — :30 deadline (T14:30 breach) is in the future. + fixedNow := time.Date(2026, 3, 9, 14, 10, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + // Sensor-triggered pipeline (no cron) with SLA — should get proactive + // SLA scheduling from the watchdog. cfg := types.PipelineConfig{ - Pipeline: types.PipelineIdentity{ID: "bronze-cdr"}, + Pipeline: types.PipelineIdentity{ID: "silver-seq-hour"}, Schedule: types.ScheduleConfig{ Trigger: &types.TriggerCondition{ - Key: "hourly-status", + Key: "audit-result", Check: "equals", - Field: "status", - Value: "ready", + Field: "match", + Value: true, }, - Evaluation: types.EvaluationWindow{Window: "1h", Interval: "5m"}, + Evaluation: types.EvaluationWindow{Window: "15m", Interval: "2m"}, }, SLA: &types.SLAConfig{ - Deadline: ":15", - ExpectedDuration: "3m", + Deadline: ":30", + ExpectedDuration: "10m", }, Validation: types.ValidationConfig{Trigger: "ALL"}, - Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "bronze-cdr-etl"}}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "seq-agg-hour"}}, } seedConfig(mock, cfg) @@ -1552,7 +1557,254 @@ func TestWatchdog_ScheduleSLAAlerts_SensorTriggered_Skips(t *testing.T) { schedMock.mu.Lock() defer schedMock.mu.Unlock() - assert.Empty(t, schedMock.created, "sensor-triggered pipeline should not get proactive SLA scheduling") + assert.Len(t, schedMock.created, 2, "sensor-triggered pipeline should get proactive SLA scheduling (warning + breach)") + for _, s := range schedMock.created { + assert.Contains(t, *s.Name, "silver-seq-hour") + } +} + +func TestWatchdog_ScheduleSLAAlerts_SensorDeadlineExpired_WritesFAILEDFINAL(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + // Fix time at 14:50 UTC — both SLA breach (:30 → T14:30) and trigger + // deadline (:45 → T14:45) are in the past. + fixedNow := time.Date(2026, 3, 9, 14, 50, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "silver-seq-hour"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "audit-result", + Check: "equals", + Field: "match", + Value: true, + Deadline: ":45", + }, + Evaluation: types.EvaluationWindow{Window: "15m", Interval: "2m"}, + }, + SLA: &types.SLAConfig{ + Deadline: ":30", + ExpectedDuration: "10m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "seq-agg-hour"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // Verify FAILED_FINAL trigger was written for the resolved date. + // resolveWatchdogSLADate at 14:50 with ":30" deadline → "2026-03-09T13" + triggerKey := ddbItemKey(testControlTable, + types.PipelinePK("silver-seq-hour"), + types.TriggerSK("stream", "2026-03-09T13")) + mock.mu.Lock() + item, ok := mock.items[triggerKey] + mock.mu.Unlock() + require.True(t, ok, "FAILED_FINAL trigger row should have been written") + statusVal := item["status"].(*ddbtypes.AttributeValueMemberS).Value + assert.Equal(t, types.TriggerStatusFailedFinal, statusVal) + + // Verify SENSOR_DEADLINE_EXPIRED event was published. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + var found bool + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventSensorDeadlineExpired) { + found = true + break + } + } + assert.True(t, found, "expected SENSOR_DEADLINE_EXPIRED event") +} + +func TestWatchdog_ScheduleSLAAlerts_SensorDeadlineExpired_SkipsIfTriggerExists(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + // Fix time at 14:50 UTC — trigger deadline (:45) is past. + fixedNow := time.Date(2026, 3, 9, 14, 50, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "silver-seq-hour"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "audit-result", + Check: "equals", + Field: "match", + Value: true, + Deadline: ":45", + }, + Evaluation: types.EvaluationWindow{Window: "15m", Interval: "2m"}, + }, + SLA: &types.SLAConfig{ + Deadline: ":30", + ExpectedDuration: "10m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "seq-agg-hour"}}, + } + seedConfig(mock, cfg) + + // Seed a RUNNING trigger for the resolved date — pipeline started but is + // still running. The deadline-expired logic should NOT write FAILED_FINAL. + mock.putRaw(testControlTable, map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK("silver-seq-hour")}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK("stream", "2026-03-09T13")}, + "status": &ddbtypes.AttributeValueMemberS{Value: types.TriggerStatusRunning}, + }) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // No SENSOR_DEADLINE_EXPIRED event — pipeline already has a trigger. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventSensorDeadlineExpired), *ev.Entries[0].DetailType, + "should not publish SENSOR_DEADLINE_EXPIRED when trigger exists") + } +} + +func TestWatchdog_ScheduleSLAAlerts_SensorDeadlineNotExpired_SchedulesSLAOnly(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + // Fix time at 14:10 UTC — SLA deadline :30 (breach T14:30) is future, + // trigger deadline :45 (T14:45) is also future. Should create SLA + // schedules but NOT write FAILED_FINAL. + fixedNow := time.Date(2026, 3, 9, 14, 10, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "silver-seq-hour"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "audit-result", + Check: "equals", + Field: "match", + Value: true, + Deadline: ":45", + }, + Evaluation: types.EvaluationWindow{Window: "15m", Interval: "2m"}, + }, + SLA: &types.SLAConfig{ + Deadline: ":30", + ExpectedDuration: "10m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "seq-agg-hour"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // SLA schedules should be created. + schedMock.mu.Lock() + defer schedMock.mu.Unlock() + assert.Len(t, schedMock.created, 2, "expected SLA schedules (warning + breach)") + + // No FAILED_FINAL trigger should exist. + triggerKey := ddbItemKey(testControlTable, + types.PipelinePK("silver-seq-hour"), + types.TriggerSK("stream", "2026-03-09T13")) + mock.mu.Lock() + _, ok := mock.items[triggerKey] + mock.mu.Unlock() + assert.False(t, ok, "no FAILED_FINAL trigger should exist when deadline not expired") + + // No SENSOR_DEADLINE_EXPIRED event. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + for _, ev := range ebMock.events { + assert.NotEqual(t, string(types.EventSensorDeadlineExpired), *ev.Entries[0].DetailType) + } +} + +func TestWatchdog_ScheduleSLAAlerts_DailySensorDeadlineExpired_WritesFAILEDFINAL(t *testing.T) { + mock := newMockDDB() + d, _, ebMock := testDeps(mock) + schedMock := &mockScheduler{} + d.Scheduler = schedMock + d.SLAMonitorARN = "arn:aws:lambda:us-east-1:123:function:sla-monitor" + d.SchedulerRoleARN = "arn:aws:iam::123:role/scheduler-role" + d.SchedulerGroupName = "interlock-sla" + + // Fix time at 10:00 UTC — both SLA breach (08:00) and trigger deadline + // (09:00) are in the past. Daily pipeline with absolute deadlines. + fixedNow := time.Date(2026, 3, 9, 10, 0, 0, 0, time.UTC) + d.NowFunc = func() time.Time { return fixedNow } + d.StartedAt = fixedNow.Add(-5 * time.Minute) + + cfg := types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "silver-seq-day"}, + Schedule: types.ScheduleConfig{ + Trigger: &types.TriggerCondition{ + Key: "daily-status", + Check: "equals", + Field: "all_hours_complete", + Value: true, + Deadline: "09:00", + }, + Evaluation: types.EvaluationWindow{Window: "30m", Interval: "5m"}, + }, + SLA: &types.SLAConfig{ + Deadline: "08:00", + ExpectedDuration: "30m", + }, + Validation: types.ValidationConfig{Trigger: "ALL"}, + Job: types.JobConfig{Type: "glue", Config: map[string]interface{}{"jobName": "seq-agg-day"}}, + } + seedConfig(mock, cfg) + + err := lambda.HandleWatchdog(context.Background(), d) + require.NoError(t, err) + + // Verify FAILED_FINAL trigger for daily date (no composite T). + triggerKey := ddbItemKey(testControlTable, + types.PipelinePK("silver-seq-day"), + types.TriggerSK("stream", "2026-03-09")) + mock.mu.Lock() + item, ok := mock.items[triggerKey] + mock.mu.Unlock() + require.True(t, ok, "FAILED_FINAL trigger row should have been written for daily pipeline") + statusVal := item["status"].(*ddbtypes.AttributeValueMemberS).Value + assert.Equal(t, types.TriggerStatusFailedFinal, statusVal) + + // Verify SENSOR_DEADLINE_EXPIRED event. + ebMock.mu.Lock() + defer ebMock.mu.Unlock() + var found bool + for _, ev := range ebMock.events { + if *ev.Entries[0].DetailType == string(types.EventSensorDeadlineExpired) { + found = true + break + } + } + assert.True(t, found, "expected SENSOR_DEADLINE_EXPIRED event for daily pipeline") } // --------------------------------------------------------------------------- diff --git a/internal/store/control.go b/internal/store/control.go index 2c14ed5..27ae7dc 100644 --- a/internal/store/control.go +++ b/internal/store/control.go @@ -429,3 +429,26 @@ func (s *Store) SetTriggerStatus(ctx context.Context, pipelineID, schedule, date } return nil } + +// CreateTriggerIfAbsent writes a trigger row with the given status only if no +// trigger row exists for this pipeline/schedule/date. Returns true if the row +// was created, false if a row already existed (TOCTOU-safe via conditional put). +func (s *Store) CreateTriggerIfAbsent(ctx context.Context, pipelineID, schedule, date, status string) (bool, error) { + _, err := s.Client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: &s.ControlTable, + Item: map[string]ddbtypes.AttributeValue{ + "PK": &ddbtypes.AttributeValueMemberS{Value: types.PipelinePK(pipelineID)}, + "SK": &ddbtypes.AttributeValueMemberS{Value: types.TriggerSK(schedule, date)}, + "status": &ddbtypes.AttributeValueMemberS{Value: status}, + }, + ConditionExpression: aws.String("attribute_not_exists(PK)"), + }) + if err != nil { + var condErr *ddbtypes.ConditionalCheckFailedException + if errors.As(err, &condErr) { + return false, nil + } + return false, fmt.Errorf("create trigger if absent %q/%s/%s: %w", pipelineID, schedule, date, err) + } + return true, nil +} diff --git a/pkg/types/events.go b/pkg/types/events.go index 02ee321..b546c22 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -33,6 +33,7 @@ const ( EventPostRunSensorMissing EventDetailType = "POST_RUN_SENSOR_MISSING" EventRerunAccepted EventDetailType = "RERUN_ACCEPTED" EventInfraAlarm EventDetailType = "INFRA_ALARM" + EventSensorDeadlineExpired EventDetailType = "SENSOR_DEADLINE_EXPIRED" ) // EventSource is the EventBridge source for all interlock events. diff --git a/pkg/types/pipeline.go b/pkg/types/pipeline.go index 2d663e0..57dbd74 100644 --- a/pkg/types/pipeline.go +++ b/pkg/types/pipeline.go @@ -31,10 +31,11 @@ type ScheduleConfig struct { // TriggerCondition defines which sensor write starts evaluation. type TriggerCondition struct { - Key string `yaml:"key" json:"key"` - Check CheckOp `yaml:"check" json:"check"` - Field string `yaml:"field" json:"field"` - Value interface{} `yaml:"value" json:"value"` + Key string `yaml:"key" json:"key"` + Check CheckOp `yaml:"check" json:"check"` + Field string `yaml:"field" json:"field"` + Value interface{} `yaml:"value" json:"value"` + Deadline string `yaml:"deadline,omitempty" json:"deadline,omitempty"` // ":MM" or "HH:MM" — auto-trigger window closes after this deadline } // ExclusionConfig defines when a pipeline should NOT run.