diff --git a/sqs-files/CLOUDTRAIL_SPLITTING.md b/sqs-files/CLOUDTRAIL_SPLITTING.md new file mode 100644 index 0000000..3f06de0 --- /dev/null +++ b/sqs-files/CLOUDTRAIL_SPLITTING.md @@ -0,0 +1,100 @@ +# CloudTrail Record Splitting in sqs-files Adapter + +## Overview +The sqs-files adapter now supports automatic splitting of CloudTrail events that contain multiple records. + +## Configuration + +Add this option to your config: + +```yaml +split_cloudtrail_records: true +``` + +## How It Works + +1. **Downloads file from S3** - As before, the adapter downloads the file referenced in the SQS message + +2. **Detects CloudTrail format** - Checks if the file contains: + - `event.Records[]` structure (with routing wrapper) + - `Records[]` structure (direct CloudTrail format) + +3. **Splits records** - Each record in the array is extracted and sent as an individual message + +4. **Preserves metadata** - Each split record maintains: + - Original `routing` information + - Original `ts` timestamp + - Individual `event` data + +5. **Backward compatible** - If the file is not a CloudTrail event or splitting is disabled, it processes the entire file as before + +## Example + +### Input file (1 file with 6 records): +```json +{ + "event": { + "Records": [ + { "eventName": "DescribeTargetHealth", ... }, + { "eventName": "DescribeTargetHealth", ... }, + { "eventName": "DescribeTargetHealth", ... }, + { "eventName": "AssumeRole", ... }, + { "eventName": "DescribeTargetHealth", ... }, + { "eventName": "DescribeTargetHealth", ... } + ] + }, + "routing": { ... }, + "ts": "2026-01-14 17:30:10" +} +``` + +### Output (6 separate messages sent to USP): +```json +// Message 1 +{ + "event": { "eventName": "DescribeTargetHealth", ... }, + "routing": { ... }, + "ts": "2026-01-14 17:30:10" +} + +// Message 2 +{ + "event": { "eventName": "DescribeTargetHealth", ... }, + "routing": { ... }, + "ts": "2026-01-14 17:30:10" +} + +// ... and so on for all 6 records +``` + +## Benefits + +- **Better granularity** - Each CloudTrail event is processed individually +- **Improved querying** - Easier to search and filter individual events +- **Session tracking** - Can track individual API calls per session +- **Flexible** - Works with both wrapped and unwrapped CloudTrail formats +- **Safe fallback** - Non-CloudTrail files are processed normally + +## Config Example + +```yaml +client_options: + hostname: "cloudtrail-adapter" + oid: "your-org-id" + installation_key: "your-key" + +access_key: "AKIA..." +secret_key: "secret..." +queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/cloudtrail-queue" +region: "us-east-1" + +# Optional: for cross-account access +role_arn: "arn:aws:iam::999888777666:role/CloudTrailRole" +external_id: "secure-id" + +# Enable CloudTrail splitting +split_cloudtrail_records: true + +bucket_path: "bucket" +file_path: "s3/objectKey" +``` diff --git a/sqs-files/client.go b/sqs-files/client.go index 32b16a5..6a566c4 100644 --- a/sqs-files/client.go +++ b/sqs-files/client.go @@ -1,10 +1,13 @@ package usp_sqs_files import ( + "bytes" + "compress/gzip" "context" "encoding/json" "errors" "fmt" + "io" "net/url" "strings" "sync" @@ -12,10 +15,12 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sts" "github.com/refractionPOINT/go-uspclient" "github.com/refractionPOINT/go-uspclient/protocol" @@ -53,16 +58,19 @@ type SQSFilesConfig struct { ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` // SQS specific - AccessKey string `json:"access_key" yaml:"access_key"` - SecretKey string `json:"secret_key,omitempty" yaml:"secret_key,omitempty"` - QueueURL string `json:"queue_url" yaml:"queue_url"` - Region string `json:"region" yaml:"region"` + AccessKey string `json:"access_key" yaml:"access_key"` + SecretKey string `json:"secret_key,omitempty" yaml:"secret_key,omitempty"` + QueueURL string `json:"queue_url" yaml:"queue_url"` + Region string `json:"region" yaml:"region"` + RoleArn string `json:"role_arn,omitempty" yaml:"role_arn,omitempty"` + ExternalId string `json:"external_id,omitempty" yaml:"external_id,omitempty"` // S3 specific - ParallelFetch int `json:"parallel_fetch" yaml:"parallel_fetch"` - BucketPath string `json:"bucket_path,omitempty" yaml:"bucket_path,omitempty"` - FilePath string `json:"file_path,omitempty" yaml:"file_path,omitempty"` - IsDecodeObjectKey bool `json:"is_decode_object_key,omitempty" yaml:"is_decode_object_key,omitempty"` + ParallelFetch int `json:"parallel_fetch" yaml:"parallel_fetch"` + BucketPath string `json:"bucket_path,omitempty" yaml:"bucket_path,omitempty"` + FilePath string `json:"file_path,omitempty" yaml:"file_path,omitempty"` + IsDecodeObjectKey bool `json:"is_decode_object_key,omitempty" yaml:"is_decode_object_key,omitempty"` + SplitCloudTrailRecords bool `json:"split_cloudtrail_records,omitempty" yaml:"split_cloudtrail_records,omitempty"` // Optional: alternative to BucketPath Bucket string `json:"bucket,omitempty" yaml:"bucket,omitempty"` } @@ -119,6 +127,18 @@ func NewSQSFilesAdapter(ctx context.Context, conf SQSFilesConfig) (*SQSFilesAdap return nil, nil, err } + // If RoleArn is provided, assume the role + if conf.RoleArn != "" { + creds, err := a.assumeRole(conf.RoleArn, conf.ExternalId) + if err != nil { + return nil, nil, fmt.Errorf("failed to assume role: %v", err) + } + a.awsConfig.Credentials = creds + if a.awsSession, err = session.NewSession(a.awsConfig); err != nil { + return nil, nil, err + } + } + a.sqsClient = sqs.New(a.awsSession) // The S3 SDK will be initialized at run-time @@ -132,6 +152,9 @@ func NewSQSFilesAdapter(ctx context.Context, conf SQSFilesConfig) (*SQSFilesAdap return nil, nil, err } + // Log the CloudTrail splitting configuration + a.conf.ClientOptions.DebugLog(fmt.Sprintf("CloudTrail splitting config: SplitCloudTrailRecords=%v", a.conf.SplitCloudTrailRecords)) + // Start the processors. for i := 0; i < a.conf.ParallelFetch; i++ { a.wg.Add(1) @@ -178,17 +201,20 @@ func (a *SQSFilesAdapter) initS3SDKs(bucket string) error { if err != nil { return fmt.Errorf("s3.Region: %v", err) } + + // Use the same credentials as SQS (which may already be assumed role credentials) a.awsS3Config = &aws.Config{ Region: aws.String(region), - Credentials: credentials.NewStaticCredentials(a.conf.AccessKey, a.conf.SecretKey, ""), + Credentials: a.awsConfig.Credentials, } if a.awsS3Session, err = session.NewSession(a.awsS3Config); err != nil { return fmt.Errorf("s3.NewSession(): %v", err) } - a.awsS3 = s3.New(a.awsSession) + a.awsS3 = s3.New(a.awsS3Session) a.awsDownloader = s3manager.NewDownloader(a.awsS3Session) + a.isS3Inited = true return nil } @@ -196,6 +222,28 @@ func (a *SQSFilesAdapter) getBucketRegion(bucket string) (string, error) { return s3manager.GetBucketRegion(a.ctx, session.Must(session.NewSession(&aws.Config{})), bucket, "us-east-1") } +func (a *SQSFilesAdapter) assumeRole(roleArn, externalId string) (*credentials.Credentials, error) { + // Create STS client with base credentials + stsClient := sts.New(a.awsSession) + + // Build assume role input + assumeRoleInput := &sts.AssumeRoleInput{ + RoleArn: aws.String(roleArn), + RoleSessionName: aws.String(fmt.Sprintf("sqs-files-adapter-%d", time.Now().Unix())), + } + + // Add external ID if provided (required for cross-account access) + if externalId != "" { + assumeRoleInput.ExternalId = aws.String(externalId) + } + + // Return credentials that will automatically refresh + return stscreds.NewCredentialsWithClient(stsClient, roleArn, func(p *stscreds.AssumeRoleProvider) { + p.ExternalID = assumeRoleInput.ExternalId + p.RoleSessionName = *assumeRoleInput.RoleSessionName + }), nil +} + func (a *SQSFilesAdapter) receiveEvents() error { defer close(a.chFiles) @@ -300,11 +348,112 @@ func (a *SQSFilesAdapter) processFiles() error { a.conf.ClientOptions.DebugLog(fmt.Sprintf("file %s downloaded in %v", path, time.Since(startTime))) + // If CloudTrail record splitting is enabled, attempt to try to split the records + if a.conf.SplitCloudTrailRecords { + a.conf.ClientOptions.DebugLog(fmt.Sprintf("CloudTrail splitting enabled for %s", path)) + records, err := a.splitCloudTrailRecords(writerAt.Bytes()) + if err != nil { + a.conf.ClientOptions.DebugLog(fmt.Sprintf("failed to split CloudTrail records from %s: %v", path, err)) + } else if len(records) > 0 { + a.conf.ClientOptions.DebugLog(fmt.Sprintf("split %d CloudTrail records from %s", len(records), path)) + for i, record := range records { + if !a.processEvent(record, false) { + a.conf.ClientOptions.OnError(fmt.Errorf("failed to process record %d from %s", i+1, path)) + } + } + continue + } else { + a.conf.ClientOptions.DebugLog(fmt.Sprintf("no CloudTrail records found in %s, processing as regular file", path)) + } + // If splitting failed or returned no records, fall through to process as normal file + } + a.processEvent(writerAt.Bytes(), isCompressed) } return nil } +// splitCloudTrailRecords attempts to parse the data as a CloudTrail event +// and split it into individual records. Returns nil error and empty slice if not a CloudTrail event. +// Handles both raw JSON and gzip-compressed JSON files. +func (a *SQSFilesAdapter) splitCloudTrailRecords(data []byte) ([][]byte, error) { + // Check if data is gzipped (starts with gzip magic number 0x1f 0x8b) + if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b { + // Decompress the gzipped data + gr, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %v", err) + } + defer gr.Close() + + decompressed, err := io.ReadAll(gr) + if err != nil { + return nil, fmt.Errorf("failed to decompress gzip data: %v", err) + } + data = decompressed + } + + var result [][]byte + + // Try to parse as CloudTrail event with wrapper + var wrapper map[string]interface{} + if err := json.Unmarshal(data, &wrapper); err != nil { + return nil, err + } + + // Check for event.Records structure (with routing wrapper) + var records []interface{} + if eventMap, ok := wrapper["event"].(map[string]interface{}); ok { + if recordsArray, ok := eventMap["Records"].([]interface{}); ok { + records = recordsArray + } + } else if recordsArray, ok := wrapper["Records"].([]interface{}); ok { + // Direct Records array (no wrapper) + records = recordsArray + } + + // If no records found, return empty (not an error, just not a CloudTrail event) + if len(records) == 0 { + return nil, nil + } + + // Extract routing and timestamp from wrapper if present + routing, _ := wrapper["routing"] + ts, _ := wrapper["ts"] + + // Split each record into individual messages + for _, record := range records { + var message map[string]interface{} + if routing != nil || ts != nil { + // Preserve routing and timestamp + message = map[string]interface{}{ + "event": record, + } + if routing != nil { + message["routing"] = routing + } + if ts != nil { + message["ts"] = ts + } + } else { + // Just the record itself + message = map[string]interface{}{ + "event": record, + } + } + + // Marshal back to JSON + recordData, err := json.Marshal(message) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("failed to marshal record: %v", err)) + continue + } + result = append(result, recordData) + } + + return result, nil +} + func (a *SQSFilesAdapter) processEvent(data []byte, isCompressed bool) bool { // Since we're dealing with files, we use the // bundle payloads to avoid having to go through