Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.1] - 2026-03-08

### Fixed

- **Glue RCA false-positive failure classification (Check 2 removed)**: The `verifyGlueRCA` Check 2 filter pattern (`?Exception ?Error ?FATAL ...`) matched benign JVM startup output in Glue's stderr (`/aws-glue/jobs/error`), causing every SUCCEEDED Glue job to be reclassified as FAILED. Classpath entries like `-XX:OnOutOfMemoryError` and Glue's internal `AnalyzerLogHelper` messages contain "Error" as substrings, producing a 100% false positive rate. Removed Check 2 entirely — Check 1 (GlueExceptionAnalysisJobFailed in the RCA log stream) is Glue's purpose-built mechanism for detecting false successes and is sufficient. Post-run validation provides the application-level safety net for data quality issues.

## [0.7.0] - 2026-03-08

### Added
Expand Down
34 changes: 6 additions & 28 deletions internal/trigger/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type CloudWatchLogsAPI interface {
// defaultGlueLogGroup is the standard CloudWatch log group for Glue v2 jobs.
const defaultGlueLogGroup = "/aws-glue/jobs/logs-v2"

// defaultGlueErrorLogGroup is the CloudWatch log group for Glue job errors.
const defaultGlueErrorLogGroup = "/aws-glue/jobs/error"

// ExecuteGlue starts an AWS Glue job run.
func ExecuteGlue(ctx context.Context, cfg *types.GlueTriggerConfig, client GlueAPI) (map[string]interface{}, error) {
if cfg.JobName == "" {
Expand Down Expand Up @@ -105,22 +102,19 @@ func (r *Runner) checkGlueStatus(ctx context.Context, metadata map[string]interf
}
}

// verifyGlueRCA checks CloudWatch logs for a Glue job run to detect false
// successes. It performs two checks:
// 1. RCA log stream for GlueExceptionAnalysisJobFailed events (Spark exceptions
// where the driver exits 0).
// 2. Error log group (/aws-glue/jobs/error) for entries matching error indicators
// (catches failures like disk-full errors that Glue's RCA may not detect).
// verifyGlueRCA checks the RCA (root cause analysis) log stream for a Glue
// job run to detect false successes. Glue can report SUCCEEDED when the Spark
// job actually failed (driver exits 0 despite SparkException). The RCA stream
// contains GlueExceptionAnalysisJobFailed events for these cases.
//
// Returns (true, reason) if either check finds failure evidence. Returns
// (false, "") on any error or if no failure is found.
// Returns (true, reason) if failure evidence is found. Returns (false, "") on
// any error or if no failure is found.
func (r *Runner) verifyGlueRCA(ctx context.Context, runID string, logGroupName *string) (failed bool, reason string) {
client, err := r.getCWLogsClient(ctx, "")
if err != nil {
return false, ""
}

// Check 1: RCA log stream for GlueExceptionAnalysisJobFailed.
logGroup := defaultGlueLogGroup
if logGroupName != nil && *logGroupName != "" {
logGroup = *logGroupName
Expand All @@ -141,22 +135,6 @@ func (r *Runner) verifyGlueRCA(ctx context.Context, runID string, logGroupName *
return true, "RCA: JobFailed"
}

// Check 2: Error log group for entries matching error indicators for this run.
errorLogGroup := defaultGlueErrorLogGroup
errOut, err := client.FilterLogEvents(ctx, &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: &errorLogGroup,
LogStreamNames: []string{runID},
FilterPattern: aws.String(`?Exception ?Error ?FATAL ?Traceback ?OutOfMemoryError ?StackOverflowError`),
Limit: aws.Int32(1),
})
if err == nil && len(errOut.Events) > 0 {
msg := aws.ToString(errOut.Events[0].Message)
if len(msg) > 200 {
msg = msg[:200]
}
return true, "error-log: " + msg
}

return false, ""
}

Expand Down
97 changes: 13 additions & 84 deletions internal/trigger/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func TestCheckGlueStatus_RCAUsesJobRunLogGroup(t *testing.T) {
cwClient := &mockCWLogsClient{
filterOut: &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}},
}
// Wrap to capture the log group parameter for each call
captureClient := &capturingCWLogsClient{
delegate: cwClient,
onFilter: func(input *cloudwatchlogs.FilterLogEventsInput) {
Expand All @@ -252,9 +251,8 @@ func TestCheckGlueStatus_RCAUsesJobRunLogGroup(t *testing.T) {
"glue_job_run_id": "jr_abc123",
})
require.NoError(t, err)
require.Len(t, capturedLogGroups, 2)
require.Len(t, capturedLogGroups, 1)
assert.Equal(t, customLogGroup, capturedLogGroups[0], "RCA check should use custom log group")
assert.Equal(t, "/aws-glue/jobs/error", capturedLogGroups[1], "error log check should use standard error group")
}

type capturingCWLogsClient struct {
Expand All @@ -269,76 +267,11 @@ func (c *capturingCWLogsClient) FilterLogEvents(ctx context.Context, params *clo
return c.delegate.FilterLogEvents(ctx, params, optFns...)
}

// funcCWLogsClient allows routing FilterLogEvents responses based on input.
type funcCWLogsClient struct {
filterFn func(context.Context, *cloudwatchlogs.FilterLogEventsInput) (*cloudwatchlogs.FilterLogEventsOutput, error)
}

func (f *funcCWLogsClient) FilterLogEvents(ctx context.Context, params *cloudwatchlogs.FilterLogEventsInput, _ ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.FilterLogEventsOutput, error) {
return f.filterFn(ctx, params)
}

func TestCheckGlueStatus_ErrorLogsDetectFalseSuccess(t *testing.T) {
glueClient := &mockGlueClient{
getOut: &glue.GetJobRunOutput{
JobRun: &gluetypes.JobRun{
JobRunState: gluetypes.JobRunStateSucceeded,
},
},
}
cwClient := &funcCWLogsClient{
filterFn: func(_ context.Context, params *cloudwatchlogs.FilterLogEventsInput) (*cloudwatchlogs.FilterLogEventsOutput, error) {
if aws.ToString(params.LogGroupName) == "/aws-glue/jobs/error" {
return &cloudwatchlogs.FilterLogEventsOutput{
Events: []cwltypes.FilteredLogEvent{
{Message: aws.String("java.io.IOException: No space left on device")},
},
}, nil
}
// RCA check — return empty (no GlueExceptionAnalysisJobFailed)
return &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, nil
},
}

r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient))
result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{
"glue_job_name": "my-job",
"glue_job_run_id": "jr_abc123",
})
require.NoError(t, err)
assert.Equal(t, RunCheckFailed, result.State)
assert.Contains(t, result.Message, "error-log")
assert.Contains(t, result.Message, "No space left on device")
assert.Equal(t, types.FailureTransient, result.FailureCategory)
}

func TestCheckGlueStatus_BenignStderrDoesNotCauseFailure(t *testing.T) {
glueClient := &mockGlueClient{
getOut: &glue.GetJobRunOutput{
JobRun: &gluetypes.JobRun{
JobRunState: gluetypes.JobRunStateSucceeded,
},
},
}
// Simulate benign stderr: FilterPattern excludes "Preparing ..." so
// the error log group returns empty events despite the log existing.
cwClient := &funcCWLogsClient{
filterFn: func(_ context.Context, params *cloudwatchlogs.FilterLogEventsInput) (*cloudwatchlogs.FilterLogEventsOutput, error) {
return &cloudwatchlogs.FilterLogEventsOutput{Events: []cwltypes.FilteredLogEvent{}}, nil
},
}

r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient))
result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{
"glue_job_name": "my-job",
"glue_job_run_id": "jr_abc123",
})
require.NoError(t, err)
assert.Equal(t, RunCheckSucceeded, result.State)
assert.Equal(t, "SUCCEEDED", result.Message)
}

func TestCheckGlueStatus_ErrorLogFilterPatternPassed(t *testing.T) {
func TestCheckGlueStatus_RCAOnlyCheck(t *testing.T) {
// Verify that only the RCA check is performed (no error log group check).
// The error log group check was removed because Glue's stderr always
// contains benign JVM startup output with "Error" in class names,
// causing 100% false positive rate.
glueClient := &mockGlueClient{
getOut: &glue.GetJobRunOutput{
JobRun: &gluetypes.JobRun{
Expand All @@ -358,21 +291,17 @@ func TestCheckGlueStatus_ErrorLogFilterPatternPassed(t *testing.T) {
}

r := NewRunner(WithGlueClient(glueClient), WithCloudWatchLogsClient(cwClient))
_, err := r.checkGlueStatus(context.Background(), map[string]interface{}{
result, err := r.checkGlueStatus(context.Background(), map[string]interface{}{
"glue_job_name": "my-job",
"glue_job_run_id": "jr_abc123",
})
require.NoError(t, err)
require.Len(t, capturedInputs, 2)

// Check 2 (error log group) must have a FilterPattern
errorLogInput := capturedInputs[1]
assert.Equal(t, "/aws-glue/jobs/error", aws.ToString(errorLogInput.LogGroupName))
require.NotNil(t, errorLogInput.FilterPattern, "Check 2 must pass FilterPattern")
fp := aws.ToString(errorLogInput.FilterPattern)
for _, term := range []string{"Exception", "Error", "FATAL", "Traceback"} {
assert.Contains(t, fp, term, "FilterPattern should include %s", term)
}
assert.Equal(t, RunCheckSucceeded, result.State)

// Only one FilterLogEvents call (RCA check), no error log group check.
require.Len(t, capturedInputs, 1)
assert.Equal(t, "/aws-glue/jobs/logs-v2", aws.ToString(capturedInputs[0].LogGroupName))
assert.Contains(t, aws.ToString(capturedInputs[0].FilterPattern), "GlueExceptionAnalysisJobFailed")
}

func TestExtractGlueFailureReason(t *testing.T) {
Expand Down