Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions pkg/serverless/invocationlifecycle/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func (lp *LifecycleProcessor) initFromAPIGatewayEvent(event events.APIGatewayPro
lp.GetInferredSpan().EnrichInferredSpanWithAPIGatewayRESTEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, apiGateway)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractAPIGatewayEventARN(event, region))
lp.addTags(trigger.GetTagsFromAPIGatewayEvent(event))
Expand All @@ -40,7 +39,6 @@ func (lp *LifecycleProcessor) initFromAPIGatewayV2Event(event events.APIGatewayV
lp.GetInferredSpan().EnrichInferredSpanWithAPIGatewayHTTPEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, apiGateway)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractAPIGatewayV2EventARN(event, region))
lp.addTags(trigger.GetTagsFromAPIGatewayV2HTTPRequest(event))
Expand All @@ -51,34 +49,29 @@ func (lp *LifecycleProcessor) initFromAPIGatewayWebsocketEvent(event events.APIG
lp.GetInferredSpan().EnrichInferredSpanWithAPIGatewayWebsocketEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, apiGateway)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractAPIGatewayWebSocketEventARN(event, region))
}

func (lp *LifecycleProcessor) initFromAPIGatewayLambdaAuthorizerTokenEvent(event events.APIGatewayCustomAuthorizerRequest) {
lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, apiGateway)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractAPIGatewayCustomAuthorizerEventARN(event))
lp.addTags(trigger.GetTagsFromAPIGatewayCustomAuthorizerEvent(event))
}

func (lp *LifecycleProcessor) initFromAPIGatewayLambdaAuthorizerRequestParametersEvent(event events.APIGatewayCustomAuthorizerRequestTypeRequest) {
lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, apiGateway)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractAPIGatewayCustomAuthorizerRequestTypeEventARN(event))
lp.addTags(trigger.GetTagsFromAPIGatewayCustomAuthorizerRequestTypeEvent(event))
}

func (lp *LifecycleProcessor) initFromALBEvent(event events.ALBTargetGroupRequest) {
lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, applicationLoadBalancer)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractAlbEventARN(event))
lp.addTags(trigger.GetTagsFromALBTargetGroupRequest(event))
}

func (lp *LifecycleProcessor) initFromCloudWatchEvent(event events.CloudWatchEvent) {
lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, cloudwatchEvents)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractCloudwatchEventARN(event))
}
Expand All @@ -90,7 +83,6 @@ func (lp *LifecycleProcessor) initFromCloudWatchLogsEvent(event events.Cloudwatc
return
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, cloudwatchLogs)
lp.addTag(tagFunctionTriggerEventSourceArn, arn)
}
Expand All @@ -100,7 +92,6 @@ func (lp *LifecycleProcessor) initFromDynamoDBStreamEvent(event events.DynamoDBE
lp.GetInferredSpan().EnrichInferredSpanWithDynamoDBEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, dynamoDB)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractDynamoDBStreamEventARN(event))
}
Expand All @@ -110,7 +101,6 @@ func (lp *LifecycleProcessor) initFromEventBridgeEvent(event events.EventBridgeE
lp.GetInferredSpan().EnrichInferredSpanWithEventBridgeEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, eventBridge)
lp.addTag(tagFunctionTriggerEventSourceArn, event.Source)
}
Expand All @@ -120,7 +110,6 @@ func (lp *LifecycleProcessor) initFromKinesisStreamEvent(event events.KinesisEve
lp.GetInferredSpan().EnrichInferredSpanWithKinesisEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, kinesis)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractKinesisStreamEventARN(event))
}
Expand All @@ -130,7 +119,6 @@ func (lp *LifecycleProcessor) initFromS3Event(event events.S3Event) {
lp.GetInferredSpan().EnrichInferredSpanWithS3Event(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, s3)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractS3EventArn(event))
}
Expand All @@ -140,7 +128,6 @@ func (lp *LifecycleProcessor) initFromSNSEvent(event events.SNSEvent) {
lp.GetInferredSpan().EnrichInferredSpanWithSNSEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, sns)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractSNSEventArn(event))

Expand All @@ -158,7 +145,6 @@ func (lp *LifecycleProcessor) initFromSQSEvent(event events.SQSEvent) {
lp.GetInferredSpan().EnrichInferredSpanWithSQSEvent(event)
}

lp.requestHandler.event = event
lp.addTag(tagFunctionTriggerEventSource, sqs)
lp.addTag(tagFunctionTriggerEventSourceArn, trigger.ExtractSQSEventARN(event))

Expand Down Expand Up @@ -212,15 +198,10 @@ func (lp *LifecycleProcessor) createWrappedEventBridgeSpan(eventBridgeEvent even
}

func (lp *LifecycleProcessor) initFromLambdaFunctionURLEvent(event events.LambdaFunctionURLRequest, region string, accountID string, functionName string) {
lp.requestHandler.event = event
if !lp.DetectLambdaLibrary() && lp.InferredSpansEnabled {
lp.GetInferredSpan().EnrichInferredSpanWithLambdaFunctionURLEvent(event)
}
lp.addTag(tagFunctionTriggerEventSource, functionURL)
lp.addTag(tagFunctionTriggerEventSourceArn, fmt.Sprintf("arn:aws:lambda:%v:%v:url:%v", region, accountID, functionName))
lp.addTags(trigger.GetTagsFromLambdaFunctionURLRequest(event))
}

func (lp *LifecycleProcessor) initFromStepFunctionPayload(event events.StepFunctionPayload) {
lp.requestHandler.event = event
Copy link
Copy Markdown
Contributor Author

@avedmala avedmala Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is never being used, the Event() interface{} in lifecycle.go was probably used by something before but not anymore so I removed both

}
41 changes: 29 additions & 12 deletions pkg/serverless/invocationlifecycle/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type LifecycleProcessor struct {
// inferred spans may contain a secondary inferred span in certain cases like SNS from SQS
type RequestHandler struct {
executionInfo *ExecutionStartInfo
event interface{}
inferredSpans [2]*inferredspan.InferredSpan
triggerTags map[string]string
triggerMetrics map[string]float64
Expand All @@ -68,13 +67,6 @@ func (r *RequestHandler) SetMetricsTag(tag string, value float64) {
r.triggerMetrics[tag] = value
}

// Event returns the invocation event parsed by the LifecycleProcessor. It is nil if the event type is not supported
// yet. The actual event type can be figured out thanks to a Go type switch on the event types of the package
// github.com/aws/aws-lambda-go/events
func (r *RequestHandler) Event() interface{} {
return r.event
}

// SetSamplingPriority sets the trace priority
func (r *RequestHandler) SetSamplingPriority(priority sampler.SamplingPriority) {
r.executionInfo.SamplingPriority = priority
Expand Down Expand Up @@ -230,21 +222,47 @@ func (lp *LifecycleProcessor) OnInvokeStart(startDetails *InvocationStartDetails
ev = event
lp.initFromLambdaFunctionURLEvent(event, region, account, resource)
case trigger.LegacyStepFunctionEvent:
var event events.StepFunctionEvent
var event events.StepFunctionEvent[events.StepFunctionPayload]
if err := json.Unmarshal(payloadBytes, &event); err != nil {
log.Debugf("Failed to unmarshal %s event: %s", stepFunction, err)
break
}
ev = event.Payload
lp.initFromStepFunctionPayload(event.Payload)
case trigger.StepFunctionEvent:
var eventPayload events.StepFunctionPayload
if err := json.Unmarshal(payloadBytes, &eventPayload); err != nil {
log.Debugf("Failed to unmarshal %s event: %s", stepFunction, err)
break
}
ev = eventPayload
lp.initFromStepFunctionPayload(eventPayload)
case trigger.LegacyNestedStepFunctionEvent:
var event events.StepFunctionEvent[events.NestedStepFunctionPayload]
if err := json.Unmarshal(payloadBytes, &event); err != nil {
log.Debugf("Failed to unmarshal %s event: %s", stepFunction, err)
break
}
ev = event.Payload
case trigger.NestedStepFunctionEvent:
var eventPayload events.NestedStepFunctionPayload
if err := json.Unmarshal(payloadBytes, &eventPayload); err != nil {
log.Debugf("Failed to unmarshal %s event: %s", stepFunction, err)
break
}
ev = eventPayload
case trigger.LegacyLambdaRootStepFunctionEvent:
var event events.StepFunctionEvent[events.LambdaRootStepFunctionPayload]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It felt a little repetitive to repeat the whole case for every "legacy lambda" case when in reality the only difference is the whole payload is wrapped in a "Payload": {...}

But the alternative was to parse out this Payload somewhere upstream so we treat legacy vs non-legacy the same here. We're basically doing the same thing because the extractors won't know the difference. I also didn't want to modify the value as it's being passed down

if err := json.Unmarshal(payloadBytes, &event); err != nil {
log.Debugf("Failed to unmarshal %s event: %s", stepFunction, err)
break
}
ev = event.Payload
case trigger.LambdaRootStepFunctionEvent:
var eventPayload events.LambdaRootStepFunctionPayload
if err := json.Unmarshal(payloadBytes, &eventPayload); err != nil {
log.Debugf("Failed to unmarshal %s event: %s", stepFunction, err)
break
}
ev = eventPayload
default:
log.Debug("Skipping adding trigger types and inferred spans as a non-supported payload was received.")
}
Expand Down Expand Up @@ -347,7 +365,6 @@ func (lp *LifecycleProcessor) newRequest(lambdaPayloadString []byte, startTime t
if lp.requestHandler == nil {
lp.requestHandler = &RequestHandler{}
}
lp.requestHandler.event = nil
lp.requestHandler.executionInfo = &ExecutionStartInfo{
requestPayload: lambdaPayloadString,
startTime: startTime,
Expand Down
140 changes: 140 additions & 0 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,146 @@ func TestLegacyLambdaStartExecutionSpanStepFunctionEvent(t *testing.T) {
assert.Equal(t, startInvocationTime, testProcessor.GetExecutionInfo().startTime)
}

func TestStartExecutionSpanNestedStepFunctionEvent(t *testing.T) {
extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
demux := createDemultiplexer(t)
mockProcessTrace := func(*api.Payload) {}
mockDetectLambdaLibrary := func() bool { return false }

eventPayload := `{"_datadog":{"Execution":{"Id":"arn:aws:states:sa-east-1:425362996713:execution:invokeJavaLambda:c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","Input":{},"StartTime":"2024-08-29T21:48:55.187Z","Name":"c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","RoleArn":"arn:aws:iam::425362996713:role/new-extension-test-java-dev-InvokeJavaLambdaRole-LtJmnJReIOTS","RedriveCount":0},"StateMachine":{"Id":"arn:aws:states:sa-east-1:425362996713:stateMachine:invokeJavaLambda","Name":"invokeJavaLambda"},"State":{"Name":"invoker","EnteredTime":"2024-08-29T21:48:55.275Z","RetryCount":0}, "RootExecutionId":"arn:aws:states:sa-east-1:425362996713:execution:invokeJavaLambda:4875aba4-ae31-4a4c-bf8a-63e9eee31dad","serverless-version":"v1"}}`
startInvocationTime := time.Now()
startDetails := InvocationStartDetails{
StartTime: startInvocationTime,
InvokeEventRawPayload: []byte(eventPayload),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
}

testProcessor.OnInvokeStart(&startDetails)

assert.NotNil(t, testProcessor.GetExecutionInfo())

assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID)
assert.Equal(t, uint64(1322229001489018110), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, uint64(5727675921946924302), testProcessor.GetExecutionInfo().parentID)
assert.Equal(t, sampler.SamplingPriority(1), testProcessor.GetExecutionInfo().SamplingPriority)
assert.Equal(t, "579d19b3ee216ee9", testProcessor.GetExecutionInfo().TraceIDUpper64Hex)
assert.Equal(t, startInvocationTime, testProcessor.GetExecutionInfo().startTime)
}

func TestLegacyLambdaStartExecutionSpanNestedStepFunctionEvent(t *testing.T) {
extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
demux := createDemultiplexer(t)
mockProcessTrace := func(*api.Payload) {}
mockDetectLambdaLibrary := func() bool { return false }

eventPayload := `{"Payload":{"_datadog":{"Execution":{"Id":"arn:aws:states:sa-east-1:425362996713:execution:invokeJavaLambda:c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","Input":{},"StartTime":"2024-08-29T21:48:55.187Z","Name":"c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","RoleArn":"arn:aws:iam::425362996713:role/new-extension-test-java-dev-InvokeJavaLambdaRole-LtJmnJReIOTS","RedriveCount":0},"StateMachine":{"Id":"arn:aws:states:sa-east-1:425362996713:stateMachine:invokeJavaLambda","Name":"invokeJavaLambda"},"State":{"Name":"invoker","EnteredTime":"2024-08-29T21:48:55.275Z","RetryCount":0}, "RootExecutionId":"arn:aws:states:sa-east-1:425362996713:execution:invokeJavaLambda:4875aba4-ae31-4a4c-bf8a-63e9eee31dad","serverless-version":"v1"}}}`
startInvocationTime := time.Now()
startDetails := InvocationStartDetails{
StartTime: startInvocationTime,
InvokeEventRawPayload: []byte(eventPayload),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
}

testProcessor.OnInvokeStart(&startDetails)

assert.NotNil(t, testProcessor.GetExecutionInfo())

assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID)
assert.Equal(t, uint64(1322229001489018110), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, uint64(5727675921946924302), testProcessor.GetExecutionInfo().parentID)
assert.Equal(t, sampler.SamplingPriority(1), testProcessor.GetExecutionInfo().SamplingPriority)
assert.Equal(t, "579d19b3ee216ee9", testProcessor.GetExecutionInfo().TraceIDUpper64Hex)
assert.Equal(t, startInvocationTime, testProcessor.GetExecutionInfo().startTime)
}

func TestStartExecutionSpanLambdaRootStepFunctionEvent(t *testing.T) {
extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
demux := createDemultiplexer(t)
mockProcessTrace := func(*api.Payload) {}
mockDetectLambdaLibrary := func() bool { return false }

eventPayload := `{"_datadog":{"Execution":{"Id":"arn:aws:states:sa-east-1:425362996713:execution:invokeJavaLambda:c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","Input":{},"StartTime":"2024-08-29T21:48:55.187Z","Name":"c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","RoleArn":"arn:aws:iam::425362996713:role/new-extension-test-java-dev-InvokeJavaLambdaRole-LtJmnJReIOTS","RedriveCount":0},"StateMachine":{"Id":"arn:aws:states:sa-east-1:425362996713:stateMachine:invokeJavaLambda","Name":"invokeJavaLambda"},"State":{"Name":"invoker","EnteredTime":"2024-08-29T21:48:55.275Z","RetryCount":0},"x-datadog-trace-id":"5821803790426892636","x-datadog-tags":"_dd.p.dm=-0,_dd.p.tid=672a7cb100000000","serverless-version":"v1"}}`
startInvocationTime := time.Now()
startDetails := InvocationStartDetails{
StartTime: startInvocationTime,
InvokeEventRawPayload: []byte(eventPayload),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
}

testProcessor.OnInvokeStart(&startDetails)

assert.NotNil(t, testProcessor.GetExecutionInfo())

assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID)
assert.Equal(t, uint64(5821803790426892636), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, uint64(5727675921946924302), testProcessor.GetExecutionInfo().parentID)
assert.Equal(t, sampler.SamplingPriority(1), testProcessor.GetExecutionInfo().SamplingPriority)
assert.Equal(t, "672a7cb100000000", testProcessor.GetExecutionInfo().TraceIDUpper64Hex)
assert.Equal(t, startInvocationTime, testProcessor.GetExecutionInfo().startTime)
}

func TestLegacyLambdaStartExecutionSpanLambdaRootStepFunctionEvent(t *testing.T) {
extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
demux := createDemultiplexer(t)
mockProcessTrace := func(*api.Payload) {}
mockDetectLambdaLibrary := func() bool { return false }

eventPayload := `{"Payload":{"_datadog":{"Execution":{"Id":"arn:aws:states:sa-east-1:425362996713:execution:invokeJavaLambda:c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","Input":{},"StartTime":"2024-08-29T21:48:55.187Z","Name":"c0ca8d0f-a3af-4c42-bfd4-b3b100e77f01","RoleArn":"arn:aws:iam::425362996713:role/new-extension-test-java-dev-InvokeJavaLambdaRole-LtJmnJReIOTS","RedriveCount":0},"StateMachine":{"Id":"arn:aws:states:sa-east-1:425362996713:stateMachine:invokeJavaLambda","Name":"invokeJavaLambda"},"State":{"Name":"invoker","EnteredTime":"2024-08-29T21:48:55.275Z","RetryCount":0},"x-datadog-trace-id":"5821803790426892636","x-datadog-tags":"_dd.p.dm=-0,_dd.p.tid=672a7cb100000000","serverless-version":"v1"}}}`
startInvocationTime := time.Now()
startDetails := InvocationStartDetails{
StartTime: startInvocationTime,
InvokeEventRawPayload: []byte(eventPayload),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
}

testProcessor.OnInvokeStart(&startDetails)

assert.NotNil(t, testProcessor.GetExecutionInfo())

assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID)
assert.Equal(t, uint64(5821803790426892636), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, uint64(5727675921946924302), testProcessor.GetExecutionInfo().parentID)
assert.Equal(t, sampler.SamplingPriority(1), testProcessor.GetExecutionInfo().SamplingPriority)
assert.Equal(t, "672a7cb100000000", testProcessor.GetExecutionInfo().TraceIDUpper64Hex)
assert.Equal(t, startInvocationTime, testProcessor.GetExecutionInfo().startTime)
}

func TestEndExecutionSpanNoLambdaLibrary(t *testing.T) {
t.Setenv(functionNameEnvVar, "TestFunction")

Expand Down
Loading