Skip to content

Commit d334174

Browse files
intel352claude
andcommitted
feat: add distributed tracing propagation across async boundaries
Implements trace context propagation for Kafka, EventBridge, webhook, and HTTP boundaries with five new pipeline steps and the tracing.propagation module type. New files: - module/tracing_propagation.go: PipelineTracePropagator interface, HTTPTracePropagator, KafkaTracePropagator, EventBridgeTracePropagator, WebhookTracePropagator, MapCarrier, EventBridgeCarrier, PipelineTracingMiddleware, and TracePropagationModule - module/pipeline_step_tracing.go: step.trace_start, step.trace_inject, step.trace_extract, step.trace_annotate, step.trace_link factories - module/tracing_propagation_test.go: 26 tests covering all propagators and pipeline steps (mock OTEL provider, no real collector required) Updated: - plugins/observability/plugin.go: add tracing.propagation module type, five step types, and StepFactories() method - plugins/observability/modules.go: tracePropagationFactory - plugins/observability/schemas.go: schema for tracing.propagation - plugins/observability/plugin_test.go: updated counts and step factory tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e2cf2e0 commit d334174

7 files changed

Lines changed: 1241 additions & 16 deletions

File tree

module/pipeline_step_tracing.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package module
2+
3+
import (
4+
"context"
5+
6+
"github.com/CrisisTextLine/modular"
7+
"go.opentelemetry.io/otel"
8+
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/trace"
10+
)
11+
12+
// ─── trace_start ──────────────────────────────────────────────────────────────
13+
14+
// TraceStartStep starts a new trace span and records its IDs in the pipeline context.
15+
type TraceStartStep struct {
16+
name string
17+
spanName string
18+
attributes map[string]string
19+
}
20+
21+
// NewTraceStartStepFactory returns a StepFactory for step.trace_start.
22+
func NewTraceStartStepFactory() StepFactory {
23+
return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) {
24+
spanName, _ := cfg["span_name"].(string)
25+
if spanName == "" {
26+
spanName = name
27+
}
28+
attrs := make(map[string]string)
29+
if raw, ok := cfg["attributes"].(map[string]any); ok {
30+
for k, v := range raw {
31+
if s, ok := v.(string); ok {
32+
attrs[k] = s
33+
}
34+
}
35+
}
36+
return &TraceStartStep{name: name, spanName: spanName, attributes: attrs}, nil
37+
}
38+
}
39+
40+
func (s *TraceStartStep) Name() string { return s.name }
41+
42+
func (s *TraceStartStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) {
43+
tracer := otel.GetTracerProvider().Tracer("workflow.pipeline")
44+
45+
otelAttrs := make([]attribute.KeyValue, 0, len(s.attributes))
46+
for k, v := range s.attributes {
47+
otelAttrs = append(otelAttrs, attribute.String(k, v))
48+
}
49+
50+
_, span := tracer.Start(ctx, s.spanName,
51+
trace.WithSpanKind(trace.SpanKindInternal),
52+
trace.WithAttributes(otelAttrs...),
53+
)
54+
defer span.End()
55+
56+
sc := span.SpanContext()
57+
return &StepResult{Output: map[string]any{
58+
"trace_id": sc.TraceID().String(),
59+
"span_id": sc.SpanID().String(),
60+
}}, nil
61+
}
62+
63+
// ─── trace_inject ─────────────────────────────────────────────────────────────
64+
65+
// TraceInjectStep injects the current trace context into an outbound carrier stored
66+
// in the pipeline context under carrier_field.
67+
type TraceInjectStep struct {
68+
name string
69+
carrierField string
70+
carrierType string // "http", "kafka", "eventbridge"
71+
}
72+
73+
// NewTraceInjectStepFactory returns a StepFactory for step.trace_inject.
74+
func NewTraceInjectStepFactory() StepFactory {
75+
return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) {
76+
carrierField, _ := cfg["carrier_field"].(string)
77+
if carrierField == "" {
78+
carrierField = "trace_headers"
79+
}
80+
carrierType, _ := cfg["carrier_type"].(string)
81+
if carrierType == "" {
82+
carrierType = "http"
83+
}
84+
return &TraceInjectStep{
85+
name: name,
86+
carrierField: carrierField,
87+
carrierType: carrierType,
88+
}, nil
89+
}
90+
}
91+
92+
func (s *TraceInjectStep) Name() string { return s.name }
93+
94+
func (s *TraceInjectStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) {
95+
headers := make(map[string]string)
96+
carrier := NewMapCarrier(headers)
97+
otel.GetTextMapPropagator().Inject(ctx, carrier)
98+
return &StepResult{Output: map[string]any{s.carrierField: carrier.GetMap()}}, nil
99+
}
100+
101+
// ─── trace_extract ────────────────────────────────────────────────────────────
102+
103+
// TraceExtractStep extracts trace context from an inbound carrier stored
104+
// in the pipeline context under carrier_field, and records the extracted IDs.
105+
type TraceExtractStep struct {
106+
name string
107+
carrierField string
108+
carrierType string
109+
}
110+
111+
// NewTraceExtractStepFactory returns a StepFactory for step.trace_extract.
112+
func NewTraceExtractStepFactory() StepFactory {
113+
return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) {
114+
carrierField, _ := cfg["carrier_field"].(string)
115+
if carrierField == "" {
116+
carrierField = "trace_headers"
117+
}
118+
carrierType, _ := cfg["carrier_type"].(string)
119+
if carrierType == "" {
120+
carrierType = "http"
121+
}
122+
return &TraceExtractStep{
123+
name: name,
124+
carrierField: carrierField,
125+
carrierType: carrierType,
126+
}, nil
127+
}
128+
}
129+
130+
func (s *TraceExtractStep) Name() string { return s.name }
131+
132+
func (s *TraceExtractStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
133+
headers := make(map[string]string)
134+
if raw, ok := pc.Current[s.carrierField]; ok {
135+
switch v := raw.(type) {
136+
case map[string]string:
137+
headers = v
138+
case map[string]any:
139+
for k, val := range v {
140+
if str, ok := val.(string); ok {
141+
headers[k] = str
142+
}
143+
}
144+
}
145+
}
146+
147+
carrier := NewMapCarrier(headers)
148+
extracted := otel.GetTextMapPropagator().Extract(ctx, carrier)
149+
sc := trace.SpanFromContext(extracted).SpanContext()
150+
151+
return &StepResult{Output: map[string]any{
152+
"extracted_trace_id": sc.TraceID().String(),
153+
"extracted_span_id": sc.SpanID().String(),
154+
"trace_valid": sc.IsValid(),
155+
}}, nil
156+
}
157+
158+
// ─── trace_annotate ───────────────────────────────────────────────────────────
159+
160+
// TraceAnnotateStep adds events and attributes to the current span from context.
161+
type TraceAnnotateStep struct {
162+
name string
163+
eventName string
164+
attributes map[string]string
165+
}
166+
167+
// NewTraceAnnotateStepFactory returns a StepFactory for step.trace_annotate.
168+
func NewTraceAnnotateStepFactory() StepFactory {
169+
return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) {
170+
eventName, _ := cfg["event_name"].(string)
171+
attrs := make(map[string]string)
172+
if raw, ok := cfg["attributes"].(map[string]any); ok {
173+
for k, v := range raw {
174+
if str, ok := v.(string); ok {
175+
attrs[k] = str
176+
}
177+
}
178+
}
179+
return &TraceAnnotateStep{name: name, eventName: eventName, attributes: attrs}, nil
180+
}
181+
}
182+
183+
func (s *TraceAnnotateStep) Name() string { return s.name }
184+
185+
func (s *TraceAnnotateStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) {
186+
span := trace.SpanFromContext(ctx)
187+
188+
otelAttrs := make([]attribute.KeyValue, 0, len(s.attributes))
189+
for k, v := range s.attributes {
190+
otelAttrs = append(otelAttrs, attribute.String(k, v))
191+
}
192+
span.SetAttributes(otelAttrs...)
193+
194+
if s.eventName != "" {
195+
span.AddEvent(s.eventName, trace.WithAttributes(otelAttrs...))
196+
}
197+
198+
return &StepResult{Output: map[string]any{"annotated": true}}, nil
199+
}
200+
201+
// ─── trace_link ───────────────────────────────────────────────────────────────
202+
203+
// TraceLinkStep links the current trace to a parent trace across service boundaries.
204+
// The parent trace context is read from pipeline context under parent_field as a
205+
// map[string]string of W3C traceparent/tracestate headers.
206+
type TraceLinkStep struct {
207+
name string
208+
parentField string
209+
}
210+
211+
// NewTraceLinkStepFactory returns a StepFactory for step.trace_link.
212+
func NewTraceLinkStepFactory() StepFactory {
213+
return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) {
214+
parentField, _ := cfg["parent_field"].(string)
215+
if parentField == "" {
216+
parentField = "parent_trace_headers"
217+
}
218+
return &TraceLinkStep{name: name, parentField: parentField}, nil
219+
}
220+
}
221+
222+
func (s *TraceLinkStep) Name() string { return s.name }
223+
224+
func (s *TraceLinkStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
225+
headers := make(map[string]string)
226+
if raw, ok := pc.Current[s.parentField]; ok {
227+
switch v := raw.(type) {
228+
case map[string]string:
229+
headers = v
230+
case map[string]any:
231+
for k, val := range v {
232+
if str, ok := val.(string); ok {
233+
headers[k] = str
234+
}
235+
}
236+
}
237+
}
238+
239+
if len(headers) == 0 {
240+
return &StepResult{Output: map[string]any{
241+
"linked": false,
242+
"reason": "no parent headers",
243+
}}, nil
244+
}
245+
246+
carrier := NewMapCarrier(headers)
247+
parentCtx := otel.GetTextMapPropagator().Extract(context.Background(), carrier)
248+
parentSpanCtx := trace.SpanFromContext(parentCtx).SpanContext()
249+
250+
if !parentSpanCtx.IsValid() {
251+
return &StepResult{Output: map[string]any{
252+
"linked": false,
253+
"reason": "invalid parent span context",
254+
}}, nil
255+
}
256+
257+
tracer := otel.GetTracerProvider().Tracer("workflow.pipeline")
258+
_, span := tracer.Start(ctx, "trace.link",
259+
trace.WithSpanKind(trace.SpanKindInternal),
260+
trace.WithLinks(trace.Link{SpanContext: parentSpanCtx}),
261+
)
262+
defer span.End()
263+
264+
return &StepResult{Output: map[string]any{
265+
"linked": true,
266+
"parent_trace_id": parentSpanCtx.TraceID().String(),
267+
"parent_span_id": parentSpanCtx.SpanID().String(),
268+
}}, nil
269+
}

0 commit comments

Comments
 (0)