Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions sqs-files/CLOUDTRAIL_SPLITTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# CloudTrail Record Splitting in sqs-files Adapter

## Overview
The sqs-files adapter now supports automatic splitting of CloudTrail events that contain multiple records.

## Configuration

Add this option to your config:

```yaml
split_cloudtrail_records: true
```

## How It Works

1. **Downloads file from S3** - As before, the adapter downloads the file referenced in the SQS message

2. **Detects CloudTrail format** - Checks if the file contains:
- `event.Records[]` structure (with routing wrapper)
- `Records[]` structure (direct CloudTrail format)

3. **Splits records** - Each record in the array is extracted and sent as an individual message

4. **Preserves metadata** - Each split record maintains:
- Original `routing` information
- Original `ts` timestamp
- Individual `event` data

5. **Backward compatible** - If the file is not a CloudTrail event or splitting is disabled, it processes the entire file as before

## Example

### Input file (1 file with 6 records):
```json
{
"event": {
"Records": [
{ "eventName": "DescribeTargetHealth", ... },
{ "eventName": "DescribeTargetHealth", ... },
{ "eventName": "DescribeTargetHealth", ... },
{ "eventName": "AssumeRole", ... },
{ "eventName": "DescribeTargetHealth", ... },
{ "eventName": "DescribeTargetHealth", ... }
]
},
"routing": { ... },
"ts": "2026-01-14 17:30:10"
}
```

### Output (6 separate messages sent to USP):
```json
// Message 1
{
"event": { "eventName": "DescribeTargetHealth", ... },
"routing": { ... },
"ts": "2026-01-14 17:30:10"
}

// Message 2
{
"event": { "eventName": "DescribeTargetHealth", ... },
"routing": { ... },
"ts": "2026-01-14 17:30:10"
}

// ... and so on for all 6 records
```

## Benefits

- **Better granularity** - Each CloudTrail event is processed individually
- **Improved querying** - Easier to search and filter individual events
- **Session tracking** - Can track individual API calls per session
- **Flexible** - Works with both wrapped and unwrapped CloudTrail formats
- **Safe fallback** - Non-CloudTrail files are processed normally

## Config Example

```yaml
client_options:
hostname: "cloudtrail-adapter"
oid: "your-org-id"
installation_key: "your-key"

access_key: "AKIA..."
secret_key: "secret..."
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/cloudtrail-queue"
region: "us-east-1"

# Optional: for cross-account access
role_arn: "arn:aws:iam::999888777666:role/CloudTrailRole"
external_id: "secure-id"

# Enable CloudTrail splitting
split_cloudtrail_records: true

bucket_path: "bucket"
file_path: "s3/objectKey"
```
169 changes: 159 additions & 10 deletions sqs-files/client.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
package usp_sqs_files

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sts"

"github.com/refractionPOINT/go-uspclient"
"github.com/refractionPOINT/go-uspclient/protocol"
Expand Down Expand Up @@ -53,16 +58,19 @@ type SQSFilesConfig struct {
ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"`

// SQS specific
AccessKey string `json:"access_key" yaml:"access_key"`
SecretKey string `json:"secret_key,omitempty" yaml:"secret_key,omitempty"`
QueueURL string `json:"queue_url" yaml:"queue_url"`
Region string `json:"region" yaml:"region"`
AccessKey string `json:"access_key" yaml:"access_key"`
SecretKey string `json:"secret_key,omitempty" yaml:"secret_key,omitempty"`
QueueURL string `json:"queue_url" yaml:"queue_url"`
Region string `json:"region" yaml:"region"`
RoleArn string `json:"role_arn,omitempty" yaml:"role_arn,omitempty"`
ExternalId string `json:"external_id,omitempty" yaml:"external_id,omitempty"`

// S3 specific
ParallelFetch int `json:"parallel_fetch" yaml:"parallel_fetch"`
BucketPath string `json:"bucket_path,omitempty" yaml:"bucket_path,omitempty"`
FilePath string `json:"file_path,omitempty" yaml:"file_path,omitempty"`
IsDecodeObjectKey bool `json:"is_decode_object_key,omitempty" yaml:"is_decode_object_key,omitempty"`
ParallelFetch int `json:"parallel_fetch" yaml:"parallel_fetch"`
BucketPath string `json:"bucket_path,omitempty" yaml:"bucket_path,omitempty"`
FilePath string `json:"file_path,omitempty" yaml:"file_path,omitempty"`
IsDecodeObjectKey bool `json:"is_decode_object_key,omitempty" yaml:"is_decode_object_key,omitempty"`
SplitCloudTrailRecords bool `json:"split_cloudtrail_records,omitempty" yaml:"split_cloudtrail_records,omitempty"`
// Optional: alternative to BucketPath
Bucket string `json:"bucket,omitempty" yaml:"bucket,omitempty"`
}
Expand Down Expand Up @@ -119,6 +127,18 @@ func NewSQSFilesAdapter(ctx context.Context, conf SQSFilesConfig) (*SQSFilesAdap
return nil, nil, err
}

// If RoleArn is provided, assume the role
if conf.RoleArn != "" {
creds, err := a.assumeRole(conf.RoleArn, conf.ExternalId)
if err != nil {
return nil, nil, fmt.Errorf("failed to assume role: %v", err)
}
a.awsConfig.Credentials = creds
if a.awsSession, err = session.NewSession(a.awsConfig); err != nil {
return nil, nil, err
}
}

a.sqsClient = sqs.New(a.awsSession)

// The S3 SDK will be initialized at run-time
Expand All @@ -132,6 +152,9 @@ func NewSQSFilesAdapter(ctx context.Context, conf SQSFilesConfig) (*SQSFilesAdap
return nil, nil, err
}

// Log the CloudTrail splitting configuration
a.conf.ClientOptions.DebugLog(fmt.Sprintf("CloudTrail splitting config: SplitCloudTrailRecords=%v", a.conf.SplitCloudTrailRecords))

// Start the processors.
for i := 0; i < a.conf.ParallelFetch; i++ {
a.wg.Add(1)
Expand Down Expand Up @@ -178,24 +201,49 @@ func (a *SQSFilesAdapter) initS3SDKs(bucket string) error {
if err != nil {
return fmt.Errorf("s3.Region: %v", err)
}

// Use the same credentials as SQS (which may already be assumed role credentials)
a.awsS3Config = &aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(a.conf.AccessKey, a.conf.SecretKey, ""),
Credentials: a.awsConfig.Credentials,
}

if a.awsS3Session, err = session.NewSession(a.awsS3Config); err != nil {
return fmt.Errorf("s3.NewSession(): %v", err)
}

a.awsS3 = s3.New(a.awsSession)
a.awsS3 = s3.New(a.awsS3Session)
a.awsDownloader = s3manager.NewDownloader(a.awsS3Session)
a.isS3Inited = true
return nil
}

func (a *SQSFilesAdapter) getBucketRegion(bucket string) (string, error) {
return s3manager.GetBucketRegion(a.ctx, session.Must(session.NewSession(&aws.Config{})), bucket, "us-east-1")
}

func (a *SQSFilesAdapter) assumeRole(roleArn, externalId string) (*credentials.Credentials, error) {
// Create STS client with base credentials
stsClient := sts.New(a.awsSession)

// Build assume role input
assumeRoleInput := &sts.AssumeRoleInput{
RoleArn: aws.String(roleArn),
RoleSessionName: aws.String(fmt.Sprintf("sqs-files-adapter-%d", time.Now().Unix())),
}

// Add external ID if provided (required for cross-account access)
if externalId != "" {
assumeRoleInput.ExternalId = aws.String(externalId)
}

// Return credentials that will automatically refresh
return stscreds.NewCredentialsWithClient(stsClient, roleArn, func(p *stscreds.AssumeRoleProvider) {
p.ExternalID = assumeRoleInput.ExternalId
p.RoleSessionName = *assumeRoleInput.RoleSessionName
}), nil
}

func (a *SQSFilesAdapter) receiveEvents() error {
defer close(a.chFiles)

Expand Down Expand Up @@ -300,11 +348,112 @@ func (a *SQSFilesAdapter) processFiles() error {

a.conf.ClientOptions.DebugLog(fmt.Sprintf("file %s downloaded in %v", path, time.Since(startTime)))

// If CloudTrail record splitting is enabled, attempt to try to split the records
if a.conf.SplitCloudTrailRecords {
a.conf.ClientOptions.DebugLog(fmt.Sprintf("CloudTrail splitting enabled for %s", path))
records, err := a.splitCloudTrailRecords(writerAt.Bytes())
if err != nil {
a.conf.ClientOptions.DebugLog(fmt.Sprintf("failed to split CloudTrail records from %s: %v", path, err))
} else if len(records) > 0 {
a.conf.ClientOptions.DebugLog(fmt.Sprintf("split %d CloudTrail records from %s", len(records), path))
for i, record := range records {
if !a.processEvent(record, false) {
a.conf.ClientOptions.OnError(fmt.Errorf("failed to process record %d from %s", i+1, path))
}
}
continue
} else {
a.conf.ClientOptions.DebugLog(fmt.Sprintf("no CloudTrail records found in %s, processing as regular file", path))
}
// If splitting failed or returned no records, fall through to process as normal file
}

a.processEvent(writerAt.Bytes(), isCompressed)
}
return nil
}

// splitCloudTrailRecords attempts to parse the data as a CloudTrail event
// and split it into individual records. Returns nil error and empty slice if not a CloudTrail event.
// Handles both raw JSON and gzip-compressed JSON files.
func (a *SQSFilesAdapter) splitCloudTrailRecords(data []byte) ([][]byte, error) {
// Check if data is gzipped (starts with gzip magic number 0x1f 0x8b)
if len(data) >= 2 && data[0] == 0x1f && data[1] == 0x8b {
// Decompress the gzipped data
gr, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %v", err)
}
defer gr.Close()

decompressed, err := io.ReadAll(gr)
if err != nil {
return nil, fmt.Errorf("failed to decompress gzip data: %v", err)
}
data = decompressed
}

var result [][]byte

// Try to parse as CloudTrail event with wrapper
var wrapper map[string]interface{}
if err := json.Unmarshal(data, &wrapper); err != nil {
return nil, err
}

// Check for event.Records structure (with routing wrapper)
var records []interface{}
if eventMap, ok := wrapper["event"].(map[string]interface{}); ok {
if recordsArray, ok := eventMap["Records"].([]interface{}); ok {
records = recordsArray
}
} else if recordsArray, ok := wrapper["Records"].([]interface{}); ok {
// Direct Records array (no wrapper)
records = recordsArray
}

// If no records found, return empty (not an error, just not a CloudTrail event)
if len(records) == 0 {
return nil, nil
}

// Extract routing and timestamp from wrapper if present
routing, _ := wrapper["routing"]
ts, _ := wrapper["ts"]

// Split each record into individual messages
for _, record := range records {
var message map[string]interface{}
if routing != nil || ts != nil {
// Preserve routing and timestamp
message = map[string]interface{}{
"event": record,
}
if routing != nil {
message["routing"] = routing
}
if ts != nil {
message["ts"] = ts
}
} else {
// Just the record itself
message = map[string]interface{}{
"event": record,
}
}

// Marshal back to JSON
recordData, err := json.Marshal(message)
if err != nil {
a.conf.ClientOptions.OnError(fmt.Errorf("failed to marshal record: %v", err))
continue
}
result = append(result, recordData)
}

return result, nil
}

func (a *SQSFilesAdapter) processEvent(data []byte, isCompressed bool) bool {
// Since we're dealing with files, we use the
// bundle payloads to avoid having to go through
Expand Down