diff --git a/cmd/lambda/alert-dispatcher/main.go b/cmd/lambda/alert-dispatcher/main.go index c358b84..b2fde15 100644 --- a/cmd/lambda/alert-dispatcher/main.go +++ b/cmd/lambda/alert-dispatcher/main.go @@ -8,12 +8,14 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/secretsmanager" ilambda "github.com/dwsmith1983/interlock/internal/lambda" "github.com/dwsmith1983/interlock/internal/store" @@ -52,6 +54,32 @@ func main() { Logger: logger, } + // Override Slack token from Secrets Manager when configured. + if secretARN := os.Getenv("SLACK_SECRET_ARN"); secretARN != "" { + smClient := secretsmanager.NewFromConfig(cfg) + out, err := smClient.GetSecretValue(context.Background(), &secretsmanager.GetSecretValueInput{ + SecretId: &secretARN, + }) + if err != nil { + logger.Error("failed to read Slack secret from Secrets Manager", "arn", secretARN, "error", err) + os.Exit(1) + } + if out.SecretString == nil { + logger.Error("Secrets Manager returned nil SecretString", "arn", secretARN) + os.Exit(1) + } + token := strings.TrimSpace(*out.SecretString) + if !strings.HasPrefix(token, "xoxb-") && !strings.HasPrefix(token, "xoxe-") { + logger.Warn("SLACK_SECRET_ARN value does not look like a Slack bot token (expected xoxb-/xoxe- prefix)") + } + deps.SlackBotToken = token + } + + if deps.SlackBotToken == "" { + logger.Error("no Slack token configured: set SLACK_BOT_TOKEN or SLACK_SECRET_ARN") + os.Exit(1) + } + lambda.Start(func(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) { return ilambda.HandleAlertDispatcher(ctx, deps, sqsEvent) }) diff --git a/deploy/terraform/alerting.tf b/deploy/terraform/alerting.tf index 485d320..696eb3a 100644 --- a/deploy/terraform/alerting.tf +++ b/deploy/terraform/alerting.tf @@ -33,7 +33,10 @@ resource "aws_sqs_queue_policy" "alert" { Resource = aws_sqs_queue.alert.arn Condition = { ArnEquals = { - "aws:SourceArn" = aws_cloudwatch_event_rule.alert_events.arn + "aws:SourceArn" = [ + aws_cloudwatch_event_rule.alert_events.arn, + aws_cloudwatch_event_rule.cw_alarm_alert.arn, + ] } } }] diff --git a/deploy/terraform/cloudwatch.tf b/deploy/terraform/cloudwatch.tf new file mode 100644 index 0000000..5a615c5 --- /dev/null +++ b/deploy/terraform/cloudwatch.tf @@ -0,0 +1,137 @@ +# ----------------------------------------------------------------------------- +# CloudWatch alarms — Lambda errors, SFN failures, DLQ depth, stream lag +# ----------------------------------------------------------------------------- + +locals { + # Map of Terraform resource key → Lambda function resource for alarm iteration. + lambda_functions = { + stream_router = aws_lambda_function.stream_router + orchestrator = aws_lambda_function.orchestrator + sla_monitor = aws_lambda_function.sla_monitor + watchdog = aws_lambda_function.watchdog + event_sink = aws_lambda_function.event_sink + alert_dispatcher = aws_lambda_function.alert_dispatcher + } + + # DLQ resources keyed by a short label. + dlq_queues = { + sr_control = aws_sqs_queue.stream_router_control_dlq + sr_joblog = aws_sqs_queue.stream_router_joblog_dlq + alert = aws_sqs_queue.alert_dlq + } + + # DynamoDB stream event source mappings keyed by table name. + stream_mappings = { + control = aws_lambda_event_source_mapping.control_stream + joblog = aws_lambda_event_source_mapping.joblog_stream + } + + alarm_actions = var.sns_alarm_topic_arn != "" ? [var.sns_alarm_topic_arn] : [] +} + +# ============================================================================= +# 1. Lambda Error Alarms — one per function +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "lambda_errors" { + for_each = local.lambda_functions + + alarm_name = "${var.environment}-interlock-${each.key}-errors" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "Errors" + namespace = "AWS/Lambda" + period = 300 + statistic = "Sum" + threshold = 1 + alarm_description = "${each.key} Lambda errors detected" + treat_missing_data = "notBreaching" + + dimensions = { + FunctionName = each.value.function_name + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} + +# ============================================================================= +# 2. Step Functions Execution Failure Alarm +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "sfn_failures" { + alarm_name = "${var.environment}-interlock-sfn-failures" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "ExecutionsFailed" + namespace = "AWS/States" + period = 300 + statistic = "Sum" + threshold = 1 + alarm_description = "Step Functions pipeline execution failures detected" + treat_missing_data = "notBreaching" + + dimensions = { + StateMachineArn = aws_sfn_state_machine.pipeline.arn + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} + +# ============================================================================= +# 3. DLQ Message Count Alarms — fires when any message lands in a DLQ +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "dlq_messages" { + for_each = local.dlq_queues + + alarm_name = "${var.environment}-interlock-dlq-${each.key}-depth" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "ApproximateNumberOfMessagesVisible" + namespace = "AWS/SQS" + period = 300 + statistic = "Sum" + threshold = 1 + alarm_description = "Messages visible in ${each.key} dead-letter queue" + treat_missing_data = "notBreaching" + + dimensions = { + QueueName = each.value.name + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} + +# ============================================================================= +# 4. DynamoDB Stream Iterator Age Alarms — detects stream processing lag +# ============================================================================= + +resource "aws_cloudwatch_metric_alarm" "stream_iterator_age" { + for_each = local.stream_mappings + + alarm_name = "${var.environment}-interlock-stream-${each.key}-iterator-age" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = 1 + metric_name = "IteratorAge" + namespace = "AWS/Lambda" + period = 300 + statistic = "Maximum" + threshold = 300000 # 5 minutes in milliseconds + alarm_description = "DynamoDB ${each.key} stream iterator age exceeds 5 minutes" + treat_missing_data = "notBreaching" + + dimensions = { + FunctionName = aws_lambda_function.stream_router.function_name + EventSourceMapping = each.value.uuid + } + + alarm_actions = local.alarm_actions + ok_actions = local.alarm_actions + tags = var.tags +} diff --git a/deploy/terraform/eventbridge.tf b/deploy/terraform/eventbridge.tf index 9889554..7b8a61d 100644 --- a/deploy/terraform/eventbridge.tf +++ b/deploy/terraform/eventbridge.tf @@ -67,7 +67,7 @@ resource "aws_cloudwatch_event_rule" "alert_events" { tags = var.tags event_pattern = jsonencode({ - source = ["interlock"] + source = ["interlock"] detail-type = [ "SLA_WARNING", "SLA_BREACH", @@ -80,6 +80,15 @@ resource "aws_cloudwatch_event_rule" "alert_events" { "SCHEDULE_MISSED", "DATA_DRIFT", "JOB_POLL_EXHAUSTED", + "POST_RUN_DRIFT", + "POST_RUN_DRIFT_INFLIGHT", + "POST_RUN_FAILED", + "POST_RUN_SENSOR_MISSING", + "RERUN_REJECTED", + "LATE_DATA_ARRIVAL", + "TRIGGER_RECOVERED", + "BASELINE_CAPTURE_FAILED", + "PIPELINE_EXCLUDED", ] }) } @@ -90,3 +99,103 @@ resource "aws_cloudwatch_event_target" "alert_sqs" { target_id = "alert-sqs" arn = aws_sqs_queue.alert.arn } + +# ----------------------------------------------------------------------------- +# CloudWatch Alarm state changes → event pipeline (default bus) +# +# CW alarms automatically publish state-change events to the default bus. +# Input transformers reshape them into InterlockEvent format so event-sink +# and alert-dispatcher handle them natively — no Go code changes needed. +# ----------------------------------------------------------------------------- + +# Rule: ALL alarm state changes → event-sink (logging) +resource "aws_cloudwatch_event_rule" "cw_alarm_log" { + name = "${var.environment}-interlock-cw-alarm-log" + description = "Route interlock CloudWatch alarm state changes to event-sink" + event_bus_name = "default" + tags = var.tags + + event_pattern = jsonencode({ + source = ["aws.cloudwatch"] + detail-type = ["CloudWatch Alarm State Change"] + detail = { + alarmName = [{ prefix = "${var.environment}-interlock-" }] + } + }) +} + +resource "aws_cloudwatch_event_target" "cw_alarm_event_sink" { + rule = aws_cloudwatch_event_rule.cw_alarm_log.name + target_id = "cw-alarm-event-sink" + arn = aws_lambda_function.event_sink.arn + + input_transformer { + input_paths = { + alarmName = "$.detail.alarmName" + state = "$.detail.state.value" + reason = "$.detail.state.reason" + } + + input_template = <<-EOT + { + "source": "interlock", + "detail-type": "INFRA_ALARM", + "detail": { + "pipelineId": "INFRASTRUCTURE", + "message": ": " + } + } + EOT + } +} + +resource "aws_lambda_permission" "event_sink_cw_alarm" { + statement_id = "AllowCWAlarmEventBridgeInvoke" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.event_sink.function_name + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.cw_alarm_log.arn +} + +# Rule: ALARM state only → SQS alert queue (Slack) +resource "aws_cloudwatch_event_rule" "cw_alarm_alert" { + name = "${var.environment}-interlock-cw-alarm-alert" + description = "Route interlock CloudWatch ALARM transitions to Slack" + event_bus_name = "default" + tags = var.tags + + event_pattern = jsonencode({ + source = ["aws.cloudwatch"] + detail-type = ["CloudWatch Alarm State Change"] + detail = { + alarmName = [{ prefix = "${var.environment}-interlock-" }] + state = { + value = ["ALARM"] + } + } + }) +} + +resource "aws_cloudwatch_event_target" "cw_alarm_sqs" { + rule = aws_cloudwatch_event_rule.cw_alarm_alert.name + target_id = "cw-alarm-alert-sqs" + arn = aws_sqs_queue.alert.arn + + input_transformer { + input_paths = { + alarmName = "$.detail.alarmName" + reason = "$.detail.state.reason" + } + + input_template = <<-EOT + { + "source": "interlock", + "detail-type": "INFRA_ALARM", + "detail": { + "pipelineId": "INFRASTRUCTURE", + "message": ": ALARM — " + } + } + EOT + } +} diff --git a/deploy/terraform/lambda.tf b/deploy/terraform/lambda.tf index c8d1e64..0d636c1 100644 --- a/deploy/terraform/lambda.tf +++ b/deploy/terraform/lambda.tf @@ -72,15 +72,16 @@ resource "aws_cloudwatch_log_group" "lambda" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "event_sink" { - function_name = "${var.environment}-interlock-event-sink" - role = aws_iam_role.lambda["event-sink"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = 128 - timeout = 30 - filename = "${var.dist_path}/event-sink.zip" - source_code_hash = filebase64sha256("${var.dist_path}/event-sink.zip") + function_name = "${var.environment}-interlock-event-sink" + role = aws_iam_role.lambda["event-sink"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = 128 + timeout = 30 + filename = "${var.dist_path}/event-sink.zip" + source_code_hash = filebase64sha256("${var.dist_path}/event-sink.zip") + reserved_concurrent_executions = var.lambda_concurrency.event_sink environment { variables = { @@ -97,23 +98,27 @@ resource "aws_lambda_function" "event_sink" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "alert_dispatcher" { - function_name = "${var.environment}-interlock-alert-dispatcher" - role = aws_iam_role.lambda["alert-dispatcher"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = 128 - timeout = 30 - filename = "${var.dist_path}/alert-dispatcher.zip" - source_code_hash = filebase64sha256("${var.dist_path}/alert-dispatcher.zip") + function_name = "${var.environment}-interlock-alert-dispatcher" + role = aws_iam_role.lambda["alert-dispatcher"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = 128 + timeout = 30 + filename = "${var.dist_path}/alert-dispatcher.zip" + source_code_hash = filebase64sha256("${var.dist_path}/alert-dispatcher.zip") + reserved_concurrent_executions = var.lambda_concurrency.alert_dispatcher environment { - variables = { - SLACK_BOT_TOKEN = var.slack_bot_token - SLACK_CHANNEL_ID = var.slack_channel_id - EVENTS_TABLE = aws_dynamodb_table.events.name - EVENTS_TTL_DAYS = var.events_table_ttl_days - } + variables = merge( + var.slack_secret_arn == "" ? { SLACK_BOT_TOKEN = var.slack_bot_token } : {}, + { + SLACK_CHANNEL_ID = var.slack_channel_id + SLACK_SECRET_ARN = var.slack_secret_arn + EVENTS_TABLE = aws_dynamodb_table.events.name + EVENTS_TTL_DAYS = var.events_table_ttl_days + } + ) } depends_on = [aws_cloudwatch_log_group.lambda["alert-dispatcher"]] @@ -124,15 +129,16 @@ resource "aws_lambda_function" "alert_dispatcher" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "stream_router" { - function_name = "${var.environment}-interlock-stream-router" - role = aws_iam_role.lambda["stream-router"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 60 - filename = "${var.dist_path}/stream-router.zip" - source_code_hash = filebase64sha256("${var.dist_path}/stream-router.zip") + function_name = "${var.environment}-interlock-stream-router" + role = aws_iam_role.lambda["stream-router"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 60 + filename = "${var.dist_path}/stream-router.zip" + source_code_hash = filebase64sha256("${var.dist_path}/stream-router.zip") + reserved_concurrent_executions = var.lambda_concurrency.stream_router environment { variables = { @@ -153,15 +159,16 @@ resource "aws_lambda_function" "stream_router" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "orchestrator" { - function_name = "${var.environment}-interlock-orchestrator" - role = aws_iam_role.lambda["orchestrator"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 120 - filename = "${var.dist_path}/orchestrator.zip" - source_code_hash = filebase64sha256("${var.dist_path}/orchestrator.zip") + function_name = "${var.environment}-interlock-orchestrator" + role = aws_iam_role.lambda["orchestrator"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 120 + filename = "${var.dist_path}/orchestrator.zip" + source_code_hash = filebase64sha256("${var.dist_path}/orchestrator.zip") + reserved_concurrent_executions = var.lambda_concurrency.orchestrator environment { variables = { @@ -180,15 +187,16 @@ resource "aws_lambda_function" "orchestrator" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "sla_monitor" { - function_name = "${var.environment}-interlock-sla-monitor" - role = aws_iam_role.lambda["sla-monitor"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 30 - filename = "${var.dist_path}/sla-monitor.zip" - source_code_hash = filebase64sha256("${var.dist_path}/sla-monitor.zip") + function_name = "${var.environment}-interlock-sla-monitor" + role = aws_iam_role.lambda["sla-monitor"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 30 + filename = "${var.dist_path}/sla-monitor.zip" + source_code_hash = filebase64sha256("${var.dist_path}/sla-monitor.zip") + reserved_concurrent_executions = var.lambda_concurrency.sla_monitor environment { variables = { @@ -210,15 +218,16 @@ resource "aws_lambda_function" "sla_monitor" { # ----------------------------------------------------------------------------- resource "aws_lambda_function" "watchdog" { - function_name = "${var.environment}-interlock-watchdog" - role = aws_iam_role.lambda["watchdog"].arn - handler = "bootstrap" - runtime = "provided.al2023" - architectures = ["arm64"] - memory_size = var.lambda_memory_size - timeout = 60 - filename = "${var.dist_path}/watchdog.zip" - source_code_hash = filebase64sha256("${var.dist_path}/watchdog.zip") + function_name = "${var.environment}-interlock-watchdog" + role = aws_iam_role.lambda["watchdog"].arn + handler = "bootstrap" + runtime = "provided.al2023" + architectures = ["arm64"] + memory_size = var.lambda_memory_size + timeout = 60 + filename = "${var.dist_path}/watchdog.zip" + source_code_hash = filebase64sha256("${var.dist_path}/watchdog.zip") + reserved_concurrent_executions = var.lambda_concurrency.watchdog environment { variables = { @@ -396,6 +405,26 @@ resource "aws_iam_role_policy" "alert_dispatcher_dynamodb" { }) } +# ----------------------------------------------------------------------------- +# Secrets Manager read — alert-dispatcher (Slack bot token, opt-in) +# ----------------------------------------------------------------------------- + +resource "aws_iam_role_policy" "secrets_alert_dispatcher" { + count = var.slack_secret_arn != "" ? 1 : 0 + name = "secrets-read" + role = aws_iam_role.lambda["alert-dispatcher"].id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Sid = "ReadSlackSecret" + Effect = "Allow" + Action = ["secretsmanager:GetSecretValue"] + Resource = var.slack_secret_arn + }] + }) +} + # ----------------------------------------------------------------------------- # DynamoDB Streams — stream-router (control + joblog streams) # ----------------------------------------------------------------------------- @@ -532,9 +561,9 @@ resource "aws_iam_role_policy" "glue_trigger" { Resource = "*" }, { - Sid = "GlueLogVerification" - Effect = "Allow" - Action = ["logs:FilterLogEvents"] + Sid = "GlueLogVerification" + Effect = "Allow" + Action = ["logs:FilterLogEvents"] Resource = [ "arn:aws:logs:*:*:log-group:/aws-glue/jobs/logs-v2:*", "arn:aws:logs:*:*:log-group:/aws-glue/jobs/error:*" @@ -594,3 +623,20 @@ resource "aws_iam_role_policy" "sfn_trigger" { }] }) } + +# --- Lambda --- + +resource "aws_iam_role_policy" "lambda_trigger" { + count = var.enable_lambda_trigger ? 1 : 0 + name = "lambda-trigger" + role = aws_iam_role.lambda["orchestrator"].id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Action = ["lambda:InvokeFunction"] + Resource = var.lambda_trigger_arns + }] + }) +} diff --git a/deploy/terraform/variables.tf b/deploy/terraform/variables.tf index 206fea6..8853cac 100644 --- a/deploy/terraform/variables.tf +++ b/deploy/terraform/variables.tf @@ -68,12 +68,44 @@ variable "slack_bot_token" { sensitive = true } +variable "slack_secret_arn" { + description = "ARN of Secrets Manager secret containing Slack bot token (overrides slack_bot_token env var)" + type = string + default = "" +} + variable "slack_channel_id" { description = "Slack channel ID for alert notifications" type = string default = "" } +variable "lambda_concurrency" { + description = "Reserved concurrent executions per Lambda function" + type = object({ + stream_router = number + orchestrator = number + sla_monitor = number + watchdog = number + event_sink = number + alert_dispatcher = number + }) + default = { + stream_router = 10 + orchestrator = 10 + sla_monitor = 5 + watchdog = 2 + event_sink = 5 + alert_dispatcher = 3 + } +} + +variable "sns_alarm_topic_arn" { + description = "SNS topic ARN for CloudWatch alarm notifications (empty = alarms fire but no notifications)" + type = string + default = "" +} + variable "enable_glue_trigger" { description = "Enable IAM permissions for Glue job triggering" type = bool @@ -97,3 +129,15 @@ variable "enable_sfn_trigger" { type = bool default = false } + +variable "enable_lambda_trigger" { + description = "Enable IAM permissions for Lambda-invoked job triggering" + type = bool + default = false +} + +variable "lambda_trigger_arns" { + description = "ARNs of Lambda functions the orchestrator may invoke as pipeline triggers" + type = list(string) + default = ["*"] +} diff --git a/go.mod b/go.mod index 7d08cbd..e53c384 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,15 @@ require ( github.com/aws/aws-sdk-go-v2 v1.41.3 github.com/aws/aws-sdk-go-v2/config v1.32.9 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.32 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.64.0 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.55.0 github.com/aws/aws-sdk-go-v2/service/emr v1.57.5 github.com/aws/aws-sdk-go-v2/service/emrserverless v1.39.2 github.com/aws/aws-sdk-go-v2/service/eventbridge v1.45.19 github.com/aws/aws-sdk-go-v2/service/glue v1.137.0 + github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 github.com/aws/aws-sdk-go-v2/service/scheduler v1.17.19 + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3 github.com/aws/aws-sdk-go-v2/service/sfn v1.40.6 github.com/stretchr/testify v1.11.1 gopkg.in/yaml.v3 v3.0.1 @@ -26,12 +29,10 @@ require ( github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.18 // indirect - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.64.0 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect - github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 // indirect diff --git a/go.sum b/go.sum index 75a9010..ed4634c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/aws/aws-lambda-go v1.52.0 h1:5NfiRaVl9FafUIt2Ld/Bv22kT371mfAI+l1Hd+tV7ZE= github.com/aws/aws-lambda-go v1.52.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= -github.com/aws/aws-sdk-go-v2 v1.41.2 h1:LuT2rzqNQsauaGkPK/7813XxcZ3o3yePY0Iy891T2ls= -github.com/aws/aws-sdk-go-v2 v1.41.2/go.mod h1:IvvlAZQXvTXznUPfRVfryiG1fbzE2NGK6m9u39YQ+S4= github.com/aws/aws-sdk-go-v2 v1.41.3 h1:4kQ/fa22KjDt13QCy1+bYADvdgcxpfH18f0zP542kZA= github.com/aws/aws-sdk-go-v2 v1.41.3/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.6 h1:N4lRUXZpZ1KVEUn6hxtco/1d2lgYhNn1fHkkl8WhlyQ= @@ -14,12 +12,8 @@ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.32 h1:ojCVN51 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.32/go.mod h1:jBYuQT8jjNv4GdWrt5MSAYMQPkULummysVx1zntRqqI= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 h1:I0GyV8wiYrP8XpA70g1HBcQO1JlQxCMTW9npl5UbDHY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17/go.mod h1:tyw7BOl5bBe/oqvoIeECFJjMdzXoa/dfVz3QQ5lgHGA= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.18 h1:F43zk1vemYIqPAwhjTjYIz0irU2EY7sOb/F5eJ3HuyM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.18/go.mod h1:w1jdlZXrGKaJcNoL+Nnrj+k5wlpGXqnNrKoP22HvAug= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19 h1:/sECfyq2JTifMI2JPyZ4bdRN77zJmr6SrS1eL3augIA= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.19/go.mod h1:dMf8A5oAqr9/oxOfLkC/c2LU/uMcALP0Rgn2BD5LWn0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.18 h1:xCeWVjj0ki0l3nruoyP2slHsGArMxeiiaoPN5QZH6YQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.18/go.mod h1:r/eLGuGCBw6l36ZRWiw6PaZwPXb6YOj+i/7MizNl5/k= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19 h1:AWeJMk33GTBf6J20XJe6qZoRSJo0WfUhsMdUKhoODXE= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.19/go.mod h1:+GWrYoaAsV7/4pNHpwh1kiNLXkKaSoppxQq9lbH8Ejw= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= @@ -50,6 +44,8 @@ github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2 h1:j+IFEtr7aykD6jJRE86kv/+Tg github.com/aws/aws-sdk-go-v2/service/lambda v1.88.2/go.mod h1:IDvS3hFp41ZJTByY7BO8PNgQkPNeQDjJfU/0cHJ2V4o= github.com/aws/aws-sdk-go-v2/service/scheduler v1.17.19 h1:Jsz0LdqfucS8YM1E6pbcuURBo+Z1mFWJIfxzCEYDMJA= github.com/aws/aws-sdk-go-v2/service/scheduler v1.17.19/go.mod h1:LbJJ7RHzglcg4ogjIOMsyuw+GSAYXipEaikJGDkKAxY= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3 h1:9bb0dEq1WzA0ZxIGG2EmwEgxfMAJpHyusxwbVN7f6iM= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.3/go.mod h1:2z9eg35jfuRtdPE4Ci0ousrOU9PBhDBilXA1cwq9Ptk= github.com/aws/aws-sdk-go-v2/service/sfn v1.40.6 h1:DFvanPtonXUABFxMg392QtaZgJPJaU6mt+MHIjeS3hg= github.com/aws/aws-sdk-go-v2/service/sfn v1.40.6/go.mod h1:wpqc1NsRtOpORLpKEfJowauuE3x5JxXG3maTFbZpUJU= github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 h1:VrhDvQib/i0lxvr3zqlUwLwJP4fpmpyD9wYG1vfSu+Y= @@ -60,8 +56,6 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 h1:0jbJeuEHlwKJ9PfXtpSFc4M github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14/go.mod h1:sTGThjphYE4Ohw8vJiRStAcu3rbjtXRsdNB0TvZ5wwo= github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 h1:5fFjR/ToSOzB2OQ/XqWpZBmNvmP/pJ1jOWYlFDJTjRQ= github.com/aws/aws-sdk-go-v2/service/sts v1.41.6/go.mod h1:qgFDZQSD/Kys7nJnVqYlWKnh0SSdMjAi0uSwON4wgYQ= -github.com/aws/smithy-go v1.24.1 h1:VbyeNfmYkWoxMVpGUAbQumkODcYmfMRfZ8yQiH30SK0= -github.com/aws/smithy-go v1.24.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/internal/lambda/alert_dispatcher.go b/internal/lambda/alert_dispatcher.go index 0bd94e1..ce257de 100644 --- a/internal/lambda/alert_dispatcher.go +++ b/internal/lambda/alert_dispatcher.go @@ -149,7 +149,7 @@ func getThreadTs(ctx context.Context, d *Deps, pipelineID, scheduleID, date stri // saveThreadTs persists a Slack thread timestamp for future message threading. // Errors are logged but don't fail the message. func saveThreadTs(ctx context.Context, d *Deps, pipelineID, scheduleID, date, threadTs, channelID string) { - ttl := time.Now().Add(time.Duration(d.EventsTTLDays) * 24 * time.Hour).Unix() + ttl := d.now().Add(time.Duration(d.EventsTTLDays) * 24 * time.Hour).Unix() _, err := d.Store.Client.PutItem(ctx, &dynamodb.PutItemInput{ TableName: &d.Store.EventsTable, Item: map[string]ddbtypes.AttributeValue{ @@ -157,7 +157,7 @@ func saveThreadTs(ctx context.Context, d *Deps, pipelineID, scheduleID, date, th "SK": &ddbtypes.AttributeValueMemberS{Value: fmt.Sprintf("THREAD#%s#%s", scheduleID, date)}, "threadTs": &ddbtypes.AttributeValueMemberS{Value: threadTs}, "channelId": &ddbtypes.AttributeValueMemberS{Value: channelID}, - "createdAt": &ddbtypes.AttributeValueMemberS{Value: time.Now().UTC().Format(time.RFC3339)}, + "createdAt": &ddbtypes.AttributeValueMemberS{Value: d.now().UTC().Format(time.RFC3339)}, "ttl": &ddbtypes.AttributeValueMemberN{Value: fmt.Sprintf("%d", ttl)}, }, }) diff --git a/internal/lambda/dynstream.go b/internal/lambda/dynstream.go index 32d9a49..56e6953 100644 --- a/internal/lambda/dynstream.go +++ b/internal/lambda/dynstream.go @@ -92,18 +92,18 @@ func convertAttributeValue(av events.DynamoDBAttributeValue) interface{} { // ResolveExecutionDate builds the execution date from sensor data fields. // If both "date" and "hour" are present, returns "YYYY-MM-DDThh". // If only "date", returns "YYYY-MM-DD". Falls back to today's date. -func ResolveExecutionDate(sensorData map[string]interface{}) string { +func ResolveExecutionDate(sensorData map[string]interface{}, now time.Time) string { dateStr, _ := sensorData["date"].(string) hourStr, _ := sensorData["hour"].(string) if dateStr == "" { - return time.Now().Format("2006-01-02") + return now.Format("2006-01-02") } normalized := normalizeDate(dateStr) // Validate YYYY-MM-DD format. if _, err := time.Parse("2006-01-02", normalized); err != nil { - return time.Now().Format("2006-01-02") + return now.Format("2006-01-02") } if hourStr != "" { @@ -147,7 +147,7 @@ func publishEvent(ctx context.Context, d *Deps, eventType, pipelineID, schedule, ScheduleID: schedule, Date: date, Message: message, - Timestamp: time.Now(), + Timestamp: d.now(), } if len(detail) > 0 && detail[0] != nil { evt.Detail = detail[0] diff --git a/internal/lambda/envcheck.go b/internal/lambda/envcheck.go index 0b715b2..ce6211f 100644 --- a/internal/lambda/envcheck.go +++ b/internal/lambda/envcheck.go @@ -14,7 +14,7 @@ var requiredEnvVars = map[string][]string{ "watchdog": {"CONTROL_TABLE", "JOBLOG_TABLE", "RERUN_TABLE", "EVENT_BUS_NAME"}, "sla-monitor": {"CONTROL_TABLE", "JOBLOG_TABLE", "RERUN_TABLE", "EVENT_BUS_NAME", "SLA_MONITOR_ARN", "SCHEDULER_ROLE_ARN", "SCHEDULER_GROUP_NAME"}, "event-sink": {"EVENTS_TABLE"}, - "alert-dispatcher": {"SLACK_BOT_TOKEN", "SLACK_CHANNEL_ID"}, + "alert-dispatcher": {"SLACK_CHANNEL_ID", "EVENTS_TABLE", "EVENTS_TTL_DAYS"}, } // ValidateEnv checks that all required environment variables for the named diff --git a/internal/lambda/envcheck_test.go b/internal/lambda/envcheck_test.go index 51bd36a..c437690 100644 --- a/internal/lambda/envcheck_test.go +++ b/internal/lambda/envcheck_test.go @@ -33,6 +33,25 @@ func TestValidateEnv_PartialMissing(t *testing.T) { assert.Contains(t, err.Error(), "EVENTS_TABLE") } +func TestValidateEnv_AlertDispatcherAllPresent(t *testing.T) { + for _, v := range requiredEnvVars["alert-dispatcher"] { + t.Setenv(v, "test-value") + } + err := ValidateEnv("alert-dispatcher") + require.NoError(t, err) +} + +func TestValidateEnv_AlertDispatcherMissingEventsVars(t *testing.T) { + t.Setenv("SLACK_BOT_TOKEN", "xoxb-test") + t.Setenv("SLACK_CHANNEL_ID", "C12345") + t.Setenv("EVENTS_TABLE", "") + t.Setenv("EVENTS_TTL_DAYS", "") + err := ValidateEnv("alert-dispatcher") + assert.Error(t, err) + assert.Contains(t, err.Error(), "EVENTS_TABLE") + assert.Contains(t, err.Error(), "EVENTS_TTL_DAYS") +} + func TestValidateEnv_UnknownHandler(t *testing.T) { err := ValidateEnv("unknown-handler") require.NoError(t, err) diff --git a/internal/lambda/event_sink.go b/internal/lambda/event_sink.go index d561ea9..90ae99e 100644 --- a/internal/lambda/event_sink.go +++ b/internal/lambda/event_sink.go @@ -23,10 +23,10 @@ func HandleEventSink(ctx context.Context, d *Deps, input EventBridgeInput) error if !detail.Timestamp.IsZero() { tsMillis = detail.Timestamp.UnixMilli() } else { - tsMillis = time.Now().UnixMilli() + tsMillis = d.now().UnixMilli() } - now := time.Now() + now := d.now() ttlDays := d.EventsTTLDays if ttlDays <= 0 { ttlDays = 90 diff --git a/internal/lambda/orchestrator.go b/internal/lambda/orchestrator.go index e28909a..7bb6f2c 100644 --- a/internal/lambda/orchestrator.go +++ b/internal/lambda/orchestrator.go @@ -6,7 +6,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/dwsmith1983/interlock/internal/store" "github.com/dwsmith1983/interlock/internal/validation" @@ -54,7 +53,7 @@ func handleEvaluate(ctx context.Context, d *Deps, input OrchestratorInput) (Orch RemapPerPeriodSensors(sensors, input.Date) - result := validation.EvaluateRules(cfg.Validation.Trigger, cfg.Validation.Rules, sensors, time.Now()) + result := validation.EvaluateRules(cfg.Validation.Trigger, cfg.Validation.Rules, sensors, d.now()) if result.Passed { if err := publishEvent(ctx, d, string(types.EventValidationPassed), input.PipelineID, input.ScheduleID, input.Date, "all validation rules passed"); err != nil { @@ -112,8 +111,10 @@ func handleTrigger(ctx context.Context, d *Deps, input OrchestratorInput) (Orche // during Execute. Write success to joblog immediately and set a sentinel // runId so the Step Functions CheckJob JSONPath resolves. if metadata == nil { - _ = d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, - types.JobEventSuccess, "sync", 0, fmt.Sprintf("%s trigger completed synchronously", cfg.Job.Type)) + if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, + types.JobEventSuccess, "sync", 0, fmt.Sprintf("%s trigger completed synchronously", cfg.Job.Type)); err != nil { + d.Logger.Warn("failed to write sync job success joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) + } runID = "sync" metadata = map[string]interface{}{"completedSync": true} } @@ -168,7 +169,9 @@ func handleCheckJob(ctx context.Context, d *Deps, input OrchestratorInput) (Orch switch result.State { case "succeeded": - _ = d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventSuccess, input.RunID, 0, "") + if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventSuccess, input.RunID, 0, ""); err != nil { + d.Logger.Warn("failed to write polled job success joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) + } if err := publishEvent(ctx, d, string(types.EventJobCompleted), input.PipelineID, input.ScheduleID, input.Date, "job succeeded"); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventJobCompleted, "error", err) } @@ -178,7 +181,9 @@ func handleCheckJob(ctx context.Context, d *Deps, input OrchestratorInput) (Orch if result.FailureCategory != "" { writeOpts = append(writeOpts, store.WithFailureCategory(result.FailureCategory)) } - _ = d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventFail, input.RunID, 0, result.Message, writeOpts...) + if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventFail, input.RunID, 0, result.Message, writeOpts...); err != nil { + d.Logger.Warn("failed to write polled job failure joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date) + } if err := publishEvent(ctx, d, string(types.EventJobFailed), input.PipelineID, input.ScheduleID, input.Date, "job failed: "+result.Message); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventJobFailed, "error", err) } diff --git a/internal/lambda/postrun.go b/internal/lambda/postrun.go index e22e91c..3b0c317 100644 --- a/internal/lambda/postrun.go +++ b/internal/lambda/postrun.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "strings" - "time" "github.com/dwsmith1983/interlock/internal/validation" "github.com/dwsmith1983/interlock/pkg/types" @@ -27,7 +26,7 @@ func matchesPostRunRule(sensorKey string, rules []types.ValidationRule) bool { // date-scoped baseline captured at trigger completion. func handlePostRunSensorEvent(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, sensorKey string, sensorData map[string]interface{}) error { scheduleID := resolveScheduleID(cfg) - date := ResolveExecutionDate(sensorData) + date := ResolveExecutionDate(sensorData, d.now()) // Consistent read to handle race where sensor stream event arrives // before SFN sets trigger to COMPLETED. @@ -148,7 +147,7 @@ func handlePostRunCompleted(ctx context.Context, d *Deps, cfg *types.PipelineCon } RemapPerPeriodSensors(sensors, date) - result := validation.EvaluateRules("ALL", cfg.PostRun.Rules, sensors, time.Now()) + result := validation.EvaluateRules("ALL", cfg.PostRun.Rules, sensors, d.now()) if result.Passed { if err := publishEvent(ctx, d, string(types.EventPostRunPassed), pipelineID, scheduleID, date, diff --git a/internal/lambda/rerun.go b/internal/lambda/rerun.go index 5cb4451..5ab7a65 100644 --- a/internal/lambda/rerun.go +++ b/internal/lambda/rerun.go @@ -5,7 +5,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/aws/aws-lambda-go/events" "github.com/dwsmith1983/interlock/pkg/types" @@ -37,7 +36,9 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even // --- Calendar exclusion check (execution date) --- if isExcludedDate(cfg, date) { - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar") + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar"); err != nil { + d.Logger.Warn("failed to write rerun-rejected joblog for calendar exclusion", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, schedule, date, fmt.Sprintf("rerun blocked for %s: execution date %s excluded by calendar", pipelineID, date)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) @@ -76,8 +77,10 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even } if count >= budget { - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunRejected, "", 0, limitLabel) + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunRejected, "", 0, limitLabel); err != nil { + d.Logger.Warn("failed to write rerun-rejected joblog for limit exceeded", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := publishEvent(ctx, d, string(types.EventRerunRejected), pipelineID, schedule, date, fmt.Sprintf("rerun rejected for %s: %s", pipelineID, limitLabel)); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunRejected, "error", err) @@ -108,8 +111,10 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even } if !allowed { - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunRejected, "", 0, rejectReason) + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunRejected, "", 0, rejectReason); err != nil { + d.Logger.Warn("failed to write rerun-rejected joblog for circuit breaker", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := publishEvent(ctx, d, string(types.EventRerunRejected), pipelineID, schedule, date, fmt.Sprintf("rerun rejected for %s: %s", pipelineID, rejectReason)); err != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunRejected, "error", err) @@ -127,7 +132,9 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even // Delete date-scoped postrun-baseline so re-run captures fresh baseline. if cfg.PostRun != nil { - _ = d.Store.DeleteSensor(ctx, pipelineID, "postrun-baseline#"+date) + if err := d.Store.DeleteSensor(ctx, pipelineID, "postrun-baseline#"+date); err != nil { + d.Logger.Warn("failed to delete postrun-baseline sensor", "error", err, "pipeline", pipelineID, "date", date) + } } // Atomically reset the trigger lock for the new execution. @@ -146,15 +153,17 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even } // Publish acceptance event only after lock atomicity is confirmed. - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, - types.JobEventRerunAccepted, "", 0, "") + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + types.JobEventRerunAccepted, "", 0, ""); err != nil { + d.Logger.Warn("failed to write rerun-accepted joblog", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if pubErr := publishEvent(ctx, d, string(types.EventRerunAccepted), pipelineID, schedule, date, fmt.Sprintf("rerun accepted for %s (reason: %s)", pipelineID, reason)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventRerunAccepted, "error", pubErr) } - execName := truncateExecName(fmt.Sprintf("%s-%s-%s-%s-rerun-%d", pipelineID, schedule, date, reason, time.Now().Unix())) + execName := truncateExecName(fmt.Sprintf("%s-%s-%s-%s-rerun-%d", pipelineID, schedule, date, reason, d.now().Unix())) if err := startSFNWithName(ctx, d, cfg, pipelineID, schedule, date, execName); err != nil { if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, schedule, date); relErr != nil { d.Logger.Warn("failed to release lock after SFN start failure", "error", relErr) @@ -366,9 +375,11 @@ func checkLateDataArrival(ctx context.Context, d *Deps, pipelineID, schedule, da } // Dual-write: joblog entry (audit) + EventBridge event (alerting). - _ = d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, + if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventLateDataArrival, "", 0, - "sensor updated after pipeline completed successfully") + "sensor updated after pipeline completed successfully"); err != nil { + d.Logger.Warn("failed to write late-data-arrival joblog", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := publishEvent(ctx, d, string(types.EventLateDataArrival), pipelineID, schedule, date, fmt.Sprintf("late data arrival for %s: sensor updated after job completion", pipelineID)); err != nil { diff --git a/internal/lambda/sfn.go b/internal/lambda/sfn.go index c33243b..bbb84df 100644 --- a/internal/lambda/sfn.go +++ b/internal/lambda/sfn.go @@ -73,7 +73,7 @@ func truncateExecName(name string) string { // The name includes a Unix timestamp suffix to avoid ExecutionAlreadyExists // errors when a previous execution for the same pipeline/schedule/date failed. func startSFN(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string) error { - name := truncateExecName(fmt.Sprintf("%s-%s-%s-%d", pipelineID, scheduleID, date, time.Now().Unix())) + name := truncateExecName(fmt.Sprintf("%s-%s-%s-%d", pipelineID, scheduleID, date, d.now().Unix())) return startSFNWithName(ctx, d, cfg, pipelineID, scheduleID, date, name) } diff --git a/internal/lambda/sla_monitor.go b/internal/lambda/sla_monitor.go index f11b170..c48140e 100644 --- a/internal/lambda/sla_monitor.go +++ b/internal/lambda/sla_monitor.go @@ -25,7 +25,7 @@ import ( func HandleSLAMonitor(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { switch input.Mode { case "calculate": - return handleSLACalculate(input) + return handleSLACalculate(input, d.now()) case "fire-alert": return handleSLAFireAlert(ctx, d, input) case "schedule": @@ -43,7 +43,7 @@ func HandleSLAMonitor(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAM // and expected duration. Warning time = deadline - expectedDuration. // Breach time = deadline. Returns full ISO 8601 timestamps required by // Step Functions TimestampPath. -func handleSLACalculate(input SLAMonitorInput) (SLAMonitorOutput, error) { +func handleSLACalculate(input SLAMonitorInput, now time.Time) (SLAMonitorOutput, error) { dur, err := time.ParseDuration(input.ExpectedDuration) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("parse expectedDuration %q: %w", input.ExpectedDuration, err) @@ -57,7 +57,7 @@ func handleSLACalculate(input SLAMonitorInput) (SLAMonitorOutput, error) { } } - now := time.Now().In(loc) + now = now.In(loc) // Parse the execution date. Supports: // "2006-01-02" — daily @@ -147,16 +147,16 @@ func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SL suppressed = true } if suppressed { - return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: time.Now().UTC().Format(time.RFC3339)}, nil + return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: d.now().UTC().Format(time.RFC3339)}, nil } } if input.AlertType == "SLA_WARNING" && input.BreachAt != "" { breachAt, err := time.Parse(time.RFC3339, input.BreachAt) - if err == nil && !time.Now().UTC().Before(breachAt) { + if err == nil && !d.now().UTC().Before(breachAt) { d.Logger.InfoContext(ctx, "suppressing SLA_WARNING (past breach time)", "pipeline", input.PipelineID, "breachAt", input.BreachAt) - return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: time.Now().UTC().Format(time.RFC3339)}, nil + return SLAMonitorOutput{AlertType: input.AlertType, FiredAt: d.now().UTC().Format(time.RFC3339)}, nil } } @@ -194,7 +194,7 @@ func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SL return SLAMonitorOutput{ AlertType: input.AlertType, - FiredAt: time.Now().UTC().Format(time.RFC3339), + FiredAt: d.now().UTC().Format(time.RFC3339), }, nil } @@ -202,7 +202,7 @@ func handleSLAFireAlert(ctx context.Context, d *Deps, input SLAMonitorInput) (SL // SLA warning and breach times. Each schedule invokes this Lambda with // mode "fire-alert" at the exact timestamp, then auto-deletes. func handleSLASchedule(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { - calc, err := handleSLACalculate(input) + calc, err := handleSLACalculate(input, d.now()) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("schedule: %w", err) } @@ -253,7 +253,7 @@ func handleSLASchedule(ctx context.Context, d *Deps, input SLAMonitorInput) (SLA func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { // If warningAt/breachAt not provided, recalculate from deadline/expectedDuration. if input.WarningAt == "" && input.BreachAt == "" && input.Deadline != "" { - calc, err := handleSLACalculate(input) + calc, err := handleSLACalculate(input, d.now()) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("cancel recalculate: %w", err) } @@ -279,7 +279,7 @@ func handleSLACancel(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMo } // Determine final SLA status from the timestamps passed in - now := time.Now().UTC() + now := d.now().UTC() alertType := string(types.EventSLAMet) if input.BreachAt != "" { breachAt, _ := time.Parse(time.RFC3339, input.BreachAt) @@ -351,12 +351,12 @@ func createOneTimeSchedule(ctx context.Context, d *Deps, name, timestamp string, // that have already passed. Fallback for environments without EventBridge // Scheduler configured. func handleSLAReconcile(ctx context.Context, d *Deps, input SLAMonitorInput) (SLAMonitorOutput, error) { - calc, err := handleSLACalculate(input) + calc, err := handleSLACalculate(input, d.now()) if err != nil { return SLAMonitorOutput{}, fmt.Errorf("reconcile: %w", err) } - now := time.Now().UTC() + now := d.now().UTC() warningAt, _ := time.Parse(time.RFC3339, calc.WarningAt) breachAt, _ := time.Parse(time.RFC3339, calc.BreachAt) @@ -404,7 +404,8 @@ func isJobTerminal(ctx context.Context, d *Deps, pipelineID, scheduleID, date st } switch rec.Event { case types.JobEventSuccess, types.JobEventFail, types.JobEventTimeout, - types.JobEventInfraTriggerExhausted, types.JobEventValidationExhausted: + types.JobEventInfraTriggerExhausted, types.JobEventValidationExhausted, + types.JobEventJobPollExhausted: return true default: return false diff --git a/internal/lambda/stream_router.go b/internal/lambda/stream_router.go index e7a6a9c..949b5ac 100644 --- a/internal/lambda/stream_router.go +++ b/internal/lambda/stream_router.go @@ -175,6 +175,10 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Extract sensor data from the stream record's NewImage. sensorData := extractSensorData(record.Change.NewImage) + // Capture current time once for consistent use across rule evaluation, + // calendar checks, and execution date resolution. + now := d.now() + // Build a validation rule from the trigger condition and evaluate it. rule := types.ValidationRule{ Key: trigger.Key, @@ -182,7 +186,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event Field: trigger.Field, Value: trigger.Value, } - result := validation.EvaluateRule(rule, sensorData, time.Now()) + result := validation.EvaluateRule(rule, sensorData, now) if !result.Passed { d.Logger.Info("trigger condition not met", "pipelineId", pipelineID, @@ -193,14 +197,13 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event } // Check calendar exclusions (wall-clock date). - now := time.Now() if isExcluded(cfg, now) { d.Logger.Info("pipeline excluded by calendar", "pipelineId", pipelineID, "date", now.Format("2006-01-02"), ) scheduleIDForEvent := resolveScheduleID(cfg) - dateForEvent := ResolveExecutionDate(sensorData) + dateForEvent := ResolveExecutionDate(sensorData, now) if pubErr := publishEvent(ctx, d, string(types.EventPipelineExcluded), pipelineID, scheduleIDForEvent, dateForEvent, fmt.Sprintf("sensor trigger suppressed for %s: wall-clock date excluded by calendar", pipelineID)); pubErr != nil { d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventPipelineExcluded, "error", pubErr) @@ -210,7 +213,7 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Resolve schedule ID and date. scheduleID := resolveScheduleID(cfg) - date := ResolveExecutionDate(sensorData) + date := ResolveExecutionDate(sensorData, now) // Acquire trigger lock to prevent duplicate executions. acquired, err := d.Store.AcquireTriggerLock(ctx, pipelineID, scheduleID, date, ResolveTriggerLockTTL()) @@ -232,6 +235,9 @@ func handleSensorEvent(ctx context.Context, d *Deps, pk, sk string, record event // Start Step Function execution. if err := startSFN(ctx, d, cfg, pipelineID, scheduleID, date); err != nil { + if relErr := d.Store.ReleaseTriggerLock(ctx, pipelineID, scheduleID, date); relErr != nil { + d.Logger.Warn("failed to release lock after SFN start failure", "error", relErr) + } return fmt.Errorf("start SFN for %q: %w", pipelineID, err) } diff --git a/internal/lambda/stream_router_test.go b/internal/lambda/stream_router_test.go index 230ecec..3bb1d14 100644 --- a/internal/lambda/stream_router_test.go +++ b/internal/lambda/stream_router_test.go @@ -739,7 +739,7 @@ func TestStreamRouter_LateDataArrival_CompletedFailed_Silent(t *testing.T) { func TestResolveExecutionDate_WithDateAndHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "10", "complete": true} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03T10" { t.Errorf("got %q, want %q", got, "2026-03-03T10") } @@ -747,7 +747,7 @@ func TestResolveExecutionDate_WithDateAndHour(t *testing.T) { func TestResolveExecutionDate_DashedDate(t *testing.T) { data := map[string]interface{}{"date": "2026-03-03", "hour": "10"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03T10" { t.Errorf("got %q, want %q", got, "2026-03-03T10") } @@ -755,7 +755,7 @@ func TestResolveExecutionDate_DashedDate(t *testing.T) { func TestResolveExecutionDate_DateOnly(t *testing.T) { data := map[string]interface{}{"date": "20260303"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03" { t.Errorf("got %q, want %q", got, "2026-03-03") } @@ -763,16 +763,17 @@ func TestResolveExecutionDate_DateOnly(t *testing.T) { func TestResolveExecutionDate_NoFields(t *testing.T) { data := map[string]interface{}{"complete": true} - got := lambda.ResolveExecutionDate(data) - today := time.Now().Format("2006-01-02") - if got != today { - t.Errorf("got %q, want %q", got, today) + now := time.Now() + got := lambda.ResolveExecutionDate(data, now) + want := now.Format("2006-01-02") + if got != want { + t.Errorf("got %q, want %q", got, want) } } func TestResolveExecutionDate_HourWithLeadingZero(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "03"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03T03" { t.Errorf("got %q, want %q", got, "2026-03-03T03") } @@ -780,17 +781,18 @@ func TestResolveExecutionDate_HourWithLeadingZero(t *testing.T) { func TestResolveExecutionDate_InvalidDate(t *testing.T) { data := map[string]interface{}{"date": "not-a-date"} - got := lambda.ResolveExecutionDate(data) + now := time.Now() + got := lambda.ResolveExecutionDate(data, now) // Should fall back to today's date. - today := time.Now().Format("2006-01-02") - if got != today { - t.Errorf("got %q, want today %q", got, today) + want := now.Format("2006-01-02") + if got != want { + t.Errorf("got %q, want today %q", got, want) } } func TestResolveExecutionDate_InvalidHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "25"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) // Invalid hour should be ignored; return date only. if got != "2026-03-03" { t.Errorf("got %q, want %q", got, "2026-03-03") @@ -799,7 +801,7 @@ func TestResolveExecutionDate_InvalidHour(t *testing.T) { func TestResolveExecutionDate_NonNumericHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "ab"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) if got != "2026-03-03" { t.Errorf("got %q, want %q", got, "2026-03-03") } @@ -1118,6 +1120,36 @@ func TestSensor_StartSFNError(t *testing.T) { assert.Empty(t, sfnMock.executions, "SFN error means no execution recorded") } +func TestSensor_StartSFNError_ReleasesLock(t *testing.T) { + mock := newMockDDB() + d, sfnMock, _ := testDeps(mock) + + cfg := testStreamConfig() + seedConfig(mock, cfg) + + // Make SFN client return an error. + sfnMock.err = fmt.Errorf("SFN throttled") + + record := makeSensorRecord("gold-revenue", "upstream-complete", map[string]events.DynamoDBAttributeValue{ + "status": events.NewStringAttribute("ready"), + "date": events.NewStringAttribute("2026-03-08"), + }) + event := lambda.StreamEvent{Records: []events.DynamoDBEventRecord{record}} + + err := lambda.HandleStreamEvent(context.Background(), d, event) + require.NoError(t, err, "HandleStreamEvent swallows errors") + + // The trigger lock must have been released after SFN failure. + // Schedule ID for stream-triggered pipelines is "stream". + lockKey := ddbItemKey(testControlTable, + types.PipelinePK("gold-revenue"), + types.TriggerSK("stream", "2026-03-08")) + mock.mu.Lock() + _, lockExists := mock.items[lockKey] + mock.mu.Unlock() + assert.False(t, lockExists, "trigger lock must be released after SFN start failure") +} + func TestSensor_PerHour_DateOnly(t *testing.T) { mock := newMockDDB() d, sfnMock, _ := testDeps(mock) @@ -2179,19 +2211,19 @@ func TestConvertAV_Null(t *testing.T) { func TestNormalizeDate_AlreadyNormalized(t *testing.T) { data := map[string]interface{}{"date": "2026-03-03"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) assert.Equal(t, "2026-03-03", got) } func TestNormalizeDate_Compact(t *testing.T) { data := map[string]interface{}{"date": "20260303"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) assert.Equal(t, "2026-03-03", got) } func TestNormalizeDate_CompactWithHour(t *testing.T) { data := map[string]interface{}{"date": "20260303", "hour": "07"} - got := lambda.ResolveExecutionDate(data) + got := lambda.ResolveExecutionDate(data, time.Now()) assert.Equal(t, "2026-03-03T07", got) } diff --git a/internal/lambda/watchdog.go b/internal/lambda/watchdog.go index c09d281..102498f 100644 --- a/internal/lambda/watchdog.go +++ b/internal/lambda/watchdog.go @@ -64,8 +64,10 @@ func detectStaleTriggers(ctx context.Context, d *Deps) error { if tr.TTL > 0 { alertDetail["ttlExpired"] = time.Unix(tr.TTL, 0).UTC().Format(time.RFC3339) } - _ = publishEvent(ctx, d, string(types.EventSFNTimeout), pipelineID, schedule, date, - fmt.Sprintf("step function timed out for %s/%s/%s", pipelineID, schedule, date), alertDetail) + if err := publishEvent(ctx, d, string(types.EventSFNTimeout), pipelineID, schedule, date, + fmt.Sprintf("step function timed out for %s/%s/%s", pipelineID, schedule, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish SFN timeout event", "error", err, "pipeline", pipelineID, "schedule", schedule, "date", date) + } if err := d.Store.SetTriggerStatus(ctx, pipelineID, schedule, date, types.TriggerStatusFailedFinal); err != nil { d.Logger.Error("failed to set trigger status to FAILED_FINAL", @@ -164,7 +166,7 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { continue } - date := ResolveExecutionDate(sensorData) + date := ResolveExecutionDate(sensorData, now) found, err := d.Store.HasTriggerForDate(ctx, id, scheduleID, date) if err != nil { @@ -194,6 +196,9 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { } if err := startSFN(ctx, d, cfg, id, scheduleID, date); err != nil { + if relErr := d.Store.ReleaseTriggerLock(ctx, id, scheduleID, date); relErr != nil { + d.Logger.Warn("failed to release lock after SFN start failure during reconciliation", "error", relErr) + } d.Logger.Error("SFN start failed during reconciliation", "pipelineId", id, "date", date, "error", err) continue @@ -203,8 +208,10 @@ func reconcileSensorTriggers(ctx context.Context, d *Deps) error { "source": "reconciliation", "actionHint": "watchdog recovered missed sensor trigger", } - _ = publishEvent(ctx, d, string(types.EventTriggerRecovered), id, scheduleID, date, - fmt.Sprintf("trigger recovered for %s/%s/%s", id, scheduleID, date), alertDetail) + if err := publishEvent(ctx, d, string(types.EventTriggerRecovered), id, scheduleID, date, + fmt.Sprintf("trigger recovered for %s/%s/%s", id, scheduleID, date), alertDetail); err != nil { + d.Logger.Warn("failed to publish trigger recovered event", "error", err, "pipeline", id, "schedule", scheduleID, "date", date) + } d.Logger.Info("recovered missed trigger", "pipelineId", id, @@ -329,8 +336,10 @@ func detectMissedSchedules(ctx context.Context, d *Deps) error { if cfg.Schedule.Time != "" { alertDetail["expectedTime"] = cfg.Schedule.Time } - _ = publishEvent(ctx, d, string(types.EventScheduleMissed), id, scheduleID, today, - fmt.Sprintf("missed schedule for %s on %s", id, today), alertDetail) + if err := publishEvent(ctx, d, string(types.EventScheduleMissed), id, scheduleID, today, + fmt.Sprintf("missed schedule for %s on %s", id, today), alertDetail); err != nil { + d.Logger.Warn("failed to publish missed schedule event", "error", err, "pipeline", id, "schedule", scheduleID, "date", today) + } d.Logger.Info("detected missed schedule", "pipelineId", id, @@ -381,6 +390,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { switch { case err != nil: d.Logger.Warn("trigger lookup failed in SLA scheduling", "pipelineId", id, "error", err) + continue case tr != nil && (tr.Status == types.TriggerStatusCompleted || tr.Status == types.TriggerStatusFailedFinal): continue case isJobTerminal(ctx, d, id, scheduleID, date): @@ -395,7 +405,7 @@ func scheduleSLAAlerts(ctx context.Context, d *Deps) error { Deadline: cfg.SLA.Deadline, ExpectedDuration: cfg.SLA.ExpectedDuration, Timezone: cfg.SLA.Timezone, - }) + }, now) if err != nil { d.Logger.Error("SLA calculate failed", "pipelineId", id, "error", err) continue @@ -567,13 +577,17 @@ func detectMissingPostRunSensors(ctx context.Context, d *Deps) error { "ruleKeys": strings.Join(ruleKeys, ", "), "actionHint": "post-run sensor data has not arrived within the expected timeout", } - _ = publishEvent(ctx, d, string(types.EventPostRunSensorMissing), id, scheduleID, today, - fmt.Sprintf("post-run sensor missing for %s on %s", id, today), alertDetail) + if err := publishEvent(ctx, d, string(types.EventPostRunSensorMissing), id, scheduleID, today, + fmt.Sprintf("post-run sensor missing for %s on %s", id, today), alertDetail); err != nil { + d.Logger.Warn("failed to publish post-run sensor missing event", "error", err, "pipeline", id, "schedule", scheduleID, "date", today) + } // Write dedup marker to avoid re-alerting on subsequent watchdog runs. - _ = d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ + if err := d.Store.WriteSensor(ctx, id, dedupKey, map[string]interface{}{ "alerted": "true", - }) + }); err != nil { + d.Logger.Warn("failed to write post-run dedup marker", "error", err, "pipeline", id, "date", today) + } d.Logger.Info("detected missing post-run sensor", "pipelineId", id, diff --git a/internal/store/configcache.go b/internal/store/configcache.go index 0cdf186..573bc39 100644 --- a/internal/store/configcache.go +++ b/internal/store/configcache.go @@ -2,6 +2,7 @@ package store import ( "context" + "encoding/json" "sync" "time" @@ -79,7 +80,19 @@ func (c *ConfigCache) refresh(ctx context.Context) (map[string]*types.PipelineCo func copyConfigs(src map[string]*types.PipelineConfig) map[string]*types.PipelineConfig { dst := make(map[string]*types.PipelineConfig, len(src)) for k, v := range src { - cp := *v + data, err := json.Marshal(v) + if err != nil { + // Marshal of a known struct should never fail; shallow-copy as fallback. + cp := *v + dst[k] = &cp + continue + } + var cp types.PipelineConfig + if err := json.Unmarshal(data, &cp); err != nil { + shallow := *v + dst[k] = &shallow + continue + } dst[k] = &cp } return dst diff --git a/internal/store/configcache_test.go b/internal/store/configcache_test.go index 80debbb..26bc29f 100644 --- a/internal/store/configcache_test.go +++ b/internal/store/configcache_test.go @@ -421,3 +421,71 @@ func TestConfigCache_GetAll_ReturnsDeepCopy(t *testing.T) { t.Errorf("owner = %q, want %q (cache was mutated)", configs2["pipe-1"].Pipeline.Owner, "original-owner") } } + +func TestConfigCache_GetAll_ReturnsDeepCopyNested(t *testing.T) { + mock := newMockDDB() + s := newTestStore(mock) + + seedConfig(mock, types.PipelineConfig{ + Pipeline: types.PipelineIdentity{ID: "nested-pipe", Owner: "team-x"}, + Job: types.JobConfig{ + Type: "glue", + Config: map[string]interface{}{"key": "value"}, + }, + SLA: &types.SLAConfig{ + Deadline: "14:00", + ExpectedDuration: "30m", + Critical: false, + }, + }) + + cache := NewConfigCache(s, 5*time.Minute) + + // First call: retrieve configs. + configs1, err := cache.GetAll(context.Background()) + if err != nil { + t.Fatalf("first GetAll: %v", err) + } + + cfg1 := configs1["nested-pipe"] + if cfg1 == nil { + t.Fatal("expected nested-pipe config, got nil") + } + + // Mutate nested map (simulates what InjectDateArgs does). + cfg1.Job.Config["injected"] = "corrupted" + + // Mutate nested pointer field. + cfg1.SLA.Critical = true + cfg1.SLA.Deadline = "23:59" + + // Second call: cache must be unaffected. + configs2, err := cache.GetAll(context.Background()) + if err != nil { + t.Fatalf("second GetAll: %v", err) + } + + cfg2 := configs2["nested-pipe"] + if cfg2 == nil { + t.Fatal("expected nested-pipe config on second call, got nil") + } + + // Verify Job.Config map was not corrupted. + if _, ok := cfg2.Job.Config["injected"]; ok { + t.Error("Job.Config mutation leaked into cache: 'injected' key should not exist") + } + if cfg2.Job.Config["key"] != "value" { + t.Errorf("Job.Config[\"key\"] = %v, want %q", cfg2.Job.Config["key"], "value") + } + + // Verify SLA pointer field was not corrupted. + if cfg2.SLA == nil { + t.Fatal("SLA should not be nil") + } + if cfg2.SLA.Critical != false { + t.Error("SLA.Critical mutation leaked into cache: want false") + } + if cfg2.SLA.Deadline != "14:00" { + t.Errorf("SLA.Deadline = %q, want %q (cache was mutated)", cfg2.SLA.Deadline, "14:00") + } +} diff --git a/internal/trigger/airflow.go b/internal/trigger/airflow.go index 9818d9f..f78708a 100644 --- a/internal/trigger/airflow.go +++ b/internal/trigger/airflow.go @@ -27,10 +27,8 @@ func ExecuteAirflow(ctx context.Context, cfg *types.AirflowTriggerConfig) (map[s payload := map[string]interface{}{} if cfg.Body != "" { - // os.ExpandEnv is intentional: operators store ${VAR} references in - // pipeline configs, resolved at runtime from the execution environment. var conf interface{} - if err := json.Unmarshal([]byte(os.ExpandEnv(cfg.Body)), &conf); err != nil { + if err := json.Unmarshal([]byte(os.Expand(cfg.Body, safeEnvLookup)), &conf); err != nil { return nil, fmt.Errorf("airflow trigger: invalid body JSON: %w", err) } payload["conf"] = conf @@ -48,8 +46,7 @@ func ExecuteAirflow(ctx context.Context, cfg *types.AirflowTriggerConfig) (map[s req.Header.Set("Content-Type", "application/json") for k, v := range cfg.Headers { - // os.ExpandEnv is intentional — see body comment above. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } client := defaultHTTPClient @@ -105,8 +102,7 @@ func CheckAirflowStatus(ctx context.Context, airflowURL, dagID, dagRunID string, req.Header.Set("Content-Type", "application/json") for k, v := range headers { - // os.ExpandEnv is intentional — see ExecuteAirflow comment. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } resp, err := defaultHTTPClient.Do(req) diff --git a/internal/trigger/airflow_test.go b/internal/trigger/airflow_test.go index ddd7197..35b9cce 100644 --- a/internal/trigger/airflow_test.go +++ b/internal/trigger/airflow_test.go @@ -237,6 +237,64 @@ func TestCheckAirflowStatus_MissingStateField(t *testing.T) { assert.Contains(t, err.Error(), "response missing state field") } +func TestExecuteAirflow_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + var receivedBody map[string]interface{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + var payload map[string]interface{} + _ = json.NewDecoder(r.Body).Decode(&payload) + receivedBody = payload + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "dag_run_id": "run-env-test", + }) + })) + defer srv.Close() + + cfg := &types.AirflowTriggerConfig{ + URL: srv.URL, + DagID: "test_dag", + Headers: map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"}, + Body: `{"safe":"${INTERLOCK_TEST_VAR}","secret":"${SECRET_VAR}"}`, + } + + _, err := ExecuteAirflow(context.Background(), cfg) + require.NoError(t, err) + + // INTERLOCK_ prefixed vars should resolve; others should not. + assert.Equal(t, "Bearer safe/", receivedAuth) + conf, _ := receivedBody["conf"].(map[string]interface{}) + assert.Equal(t, "safe", conf["safe"]) + assert.Equal(t, "", conf["secret"]) + assert.NotContains(t, receivedAuth, "leaked") +} + +func TestCheckAirflowStatus_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "state": "running", + }) + })) + defer srv.Close() + + headers := map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"} + state, err := CheckAirflowStatus(context.Background(), srv.URL, "my_dag", "run-1", headers) + require.NoError(t, err) + assert.Equal(t, "running", state) + assert.Equal(t, "Bearer safe/", receivedAuth) + assert.NotContains(t, receivedAuth, "leaked") +} + func TestCheckAirflowStatus_WithHeaders(t *testing.T) { var receivedAuth string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/trigger/databricks.go b/internal/trigger/databricks.go index 50c445a..b6e5fe0 100644 --- a/internal/trigger/databricks.go +++ b/internal/trigger/databricks.go @@ -43,9 +43,7 @@ func ExecuteDatabricks(ctx context.Context, cfg *types.DatabricksTriggerConfig, req.Header.Set("Content-Type", "application/json") for k, v := range cfg.Headers { - // os.ExpandEnv is intentional: operators store ${VAR} references in - // pipeline configs, resolved at runtime from the execution environment. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } resp, err := httpClient.Do(req) @@ -97,8 +95,7 @@ func (r *Runner) checkDatabricksStatus(ctx context.Context, metadata map[string] req.Header.Set("Content-Type", "application/json") for k, v := range headers { - // os.ExpandEnv is intentional — see ExecuteDatabricks comment. - req.Header.Set(k, os.ExpandEnv(v)) + req.Header.Set(k, os.Expand(v, safeEnvLookup)) } resp, err := r.httpClient.Do(req) diff --git a/internal/trigger/databricks_test.go b/internal/trigger/databricks_test.go index c255545..1e30fcf 100644 --- a/internal/trigger/databricks_test.go +++ b/internal/trigger/databricks_test.go @@ -182,6 +182,58 @@ func TestCheckDatabricksStatus_MissingRunID(t *testing.T) { assert.Equal(t, "missing databricks metadata", result.Message) } +func TestExecuteDatabricks_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "run_id": 42, + }) + })) + defer srv.Close() + + cfg := &types.DatabricksTriggerConfig{ + WorkspaceURL: srv.URL, + JobID: "my-job", + Headers: map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"}, + } + + _, err := ExecuteDatabricks(context.Background(), cfg, srv.Client()) + require.NoError(t, err) + assert.Equal(t, "Bearer safe/", receivedAuth) + assert.NotContains(t, receivedAuth, "leaked") +} + +func TestCheckDatabricksStatus_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var receivedAuth string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "state": map[string]interface{}{ + "life_cycle_state": "RUNNING", + }, + }) + })) + defer srv.Close() + + r := NewRunner(WithHTTPClient(srv.Client())) + _, err := r.checkDatabricksStatus(context.Background(), map[string]interface{}{ + "databricks_workspace_url": srv.URL, + "databricks_run_id": "123", + }, map[string]string{"Authorization": "Bearer ${INTERLOCK_TEST_VAR}/${SECRET_VAR}"}) + require.NoError(t, err) + assert.Equal(t, "Bearer safe/", receivedAuth) + assert.NotContains(t, receivedAuth, "leaked") +} + func TestCheckDatabricksStatus_WithHeaders(t *testing.T) { var receivedAuth string srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/trigger/lambda.go b/internal/trigger/lambda.go index 70f791c..1fda822 100644 --- a/internal/trigger/lambda.go +++ b/internal/trigger/lambda.go @@ -28,7 +28,7 @@ func ExecuteLambda(ctx context.Context, cfg *types.LambdaTriggerConfig, client L } if cfg.Payload != "" { - input.Payload = []byte(os.ExpandEnv(cfg.Payload)) + input.Payload = []byte(os.Expand(cfg.Payload, safeEnvLookup)) } out, err := client.Invoke(ctx, input) diff --git a/internal/trigger/lambda_test.go b/internal/trigger/lambda_test.go index 90a5763..b5a4eee 100644 --- a/internal/trigger/lambda_test.go +++ b/internal/trigger/lambda_test.go @@ -92,6 +92,33 @@ func (c *capturingLambdaClient) Invoke(ctx context.Context, params *lambda.Invok return c.delegate.Invoke(ctx, params, optFns...) } +func TestExecuteLambda_EnvExpansionRestricted(t *testing.T) { + t.Setenv("INTERLOCK_TEST_VAR", "safe") + t.Setenv("SECRET_VAR", "leaked") + + var capturedPayload []byte + client := &mockLambdaClient{ + invokeOut: &lambda.InvokeOutput{StatusCode: 200}, + } + capClient := &capturingLambdaClient{ + delegate: client, + onInvoke: func(input *lambda.InvokeInput) { + capturedPayload = input.Payload + }, + } + cfg := &types.LambdaTriggerConfig{ + FunctionName: "my-audit", + Payload: `{"safe":"${INTERLOCK_TEST_VAR}","secret":"${SECRET_VAR}"}`, + } + err := ExecuteLambda(context.Background(), cfg, capClient) + require.NoError(t, err) + + payload := string(capturedPayload) + assert.Contains(t, payload, `"safe":"safe"`) + assert.Contains(t, payload, `"secret":""`) + assert.NotContains(t, payload, "leaked") +} + func TestExecuteLambda_NonPolling(t *testing.T) { client := &mockLambdaClient{ invokeOut: &lambda.InvokeOutput{StatusCode: 200}, diff --git a/pkg/types/events.go b/pkg/types/events.go index cc2f629..02ee321 100644 --- a/pkg/types/events.go +++ b/pkg/types/events.go @@ -32,6 +32,7 @@ const ( EventPostRunDriftInflight EventDetailType = "POST_RUN_DRIFT_INFLIGHT" EventPostRunSensorMissing EventDetailType = "POST_RUN_SENSOR_MISSING" EventRerunAccepted EventDetailType = "RERUN_ACCEPTED" + EventInfraAlarm EventDetailType = "INFRA_ALARM" ) // EventSource is the EventBridge source for all interlock events.