diff --git a/sqs/client.go b/sqs/client.go index 77532c6..f338104 100644 --- a/sqs/client.go +++ b/sqs/client.go @@ -8,8 +8,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sts" "github.com/refractionPOINT/go-uspclient" "github.com/refractionPOINT/go-uspclient/protocol" @@ -37,6 +39,8 @@ type SQSConfig struct { SecretKey string `json:"secret_key,omitempty" yaml:"secret_key,omitempty"` QueueURL string `json:"queue_url" yaml:"queue_url"` Region string `json:"region" yaml:"region"` + RoleArn string `json:"role_arn,omitempty" yaml:"role_arn,omitempty"` + ExternalId string `json:"external_id,omitempty" yaml:"external_id,omitempty"` } func (c *SQSConfig) Validate() error { @@ -74,6 +78,18 @@ func NewSQSAdapter(ctx context.Context, conf SQSConfig) (*SQSAdapter, chan struc return nil, nil, err } + // If RoleArn is provided, assume the role + if conf.RoleArn != "" { + creds, err := a.assumeRole(conf.RoleArn, conf.ExternalId) + if err != nil { + return nil, nil, fmt.Errorf("failed to assume role: %v", err) + } + a.awsConfig.Credentials = creds + if a.awsSession, err = session.NewSession(a.awsConfig); err != nil { + return nil, nil, err + } + } + a.sqsClient = sqs.New(a.awsSession) a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) @@ -107,6 +123,28 @@ func (a *SQSAdapter) Close() error { return nil } +func (a *SQSAdapter) assumeRole(roleArn, externalId string) (*credentials.Credentials, error) { + // Create STS client with base credentials + stsClient := sts.New(a.awsSession) + + // Build assume role input + assumeRoleInput := &sts.AssumeRoleInput{ + RoleArn: aws.String(roleArn), + RoleSessionName: aws.String(fmt.Sprintf("sqs-adapter-%d", time.Now().Unix())), + } + + // Add external ID if provided (required for cross-account access) + if externalId != "" { + assumeRoleInput.ExternalId = aws.String(externalId) + } + + // Return credentials that will automatically refresh + return stscreds.NewCredentialsWithClient(stsClient, roleArn, func(p *stscreds.AssumeRoleProvider) { + p.ExternalID = assumeRoleInput.ExternalId + p.RoleSessionName = *assumeRoleInput.RoleSessionName + }), nil +} + func (a *SQSAdapter) receiveEvents() error { for !a.isStop { result, err := a.sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{