Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db1fbc2
wip: default value for failed serialization
hannahkm Mar 23, 2026
44211b7
write dummy value as a byte for meta_struct
hannahkm Mar 23, 2026
f3ac9a2
dont double encode span links on spans
hannahkm Mar 23, 2026
fef304e
fix: reduce heap allocs?
hannahkm Mar 24, 2026
5a8e764
trigger tests
hannahkm Mar 24, 2026
f81c6ec
lint: don't use %v
hannahkm Mar 24, 2026
ec8baf2
fix: array length with _dd.span_links was wrong
hannahkm Mar 24, 2026
7cf57c5
Merge branch 'main' into hannahkm/v1-followups
hannahkm Mar 25, 2026
682035b
inlining fixes
hannahkm Mar 30, 2026
6613e9f
attempt: write array header after contents to save time on reading sp…
hannahkm Mar 30, 2026
0a7fffe
get meta values proactively
hannahkm Mar 30, 2026
91cecd6
only create string table if it doesn't already exist
hannahkm Mar 31, 2026
a4393ae
trigger pipelines
hannahkm Apr 1, 2026
aa8ebfa
delete outdated static checks workflow
hannahkm Apr 1, 2026
30e36d2
Revert "delete outdated static checks workflow"
hannahkm Apr 1, 2026
6098c60
Merge branch 'main' into hannahkm/v1-followups
hannahkm Apr 2, 2026
1f272f8
use built in string type for indices map
hannahkm Apr 3, 2026
089e444
Merge branch 'main' into hannahkm/v1-followups
kakkoyun Apr 8, 2026
eb52b87
Merge branch 'main' into hannahkm/v1-followups
hannahkm Apr 8, 2026
b2465d5
Merge branch 'main' into hannahkm/v1-followups
hannahkm Apr 8, 2026
c65531f
Merge branch 'main' into hannahkm/v1-followups
kakkoyun Apr 9, 2026
64548fb
reset string table on encode
hannahkm Apr 13, 2026
de0fcfc
fix: use warnUnsupportedValue func instead of log.Warn
hannahkm Apr 13, 2026
18d426e
wip: more benchmarking for payloads
hannahkm Apr 13, 2026
e4cd6de
Merge branch 'main' into hannahkm/v1-followups
darccio Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 329 additions & 0 deletions ddtrace/tracer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,39 @@ func newDetailedSpanList(n int) spanList {
return list
}

// creates a list of n spans, populated with repetitive tags
func newLowCardinalitySpanList(n int) spanList {
itoa := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"}
list := make([]*Span, n)
for i := range n {
list[i] = newBasicSpan("span.list." + itoa[i%5+1])
list[i].start = fixedTime
list[i].service = "high-cardinality-string-value"
list[i].resource = "resource." + itoa[i%5+1]
list[i].SetTag("tag.1", "high-cardinality-string-value")
list[i].SetTag("tag.2", "high-cardinality-string-value")
list[i].SetTag("tag.3", "high-cardinality-string-value")
list[i].SetTag("tag.4", "high-cardinality-string-value")
}
return list
}

// creates a list of n spans, populated with many unique tags
func newHighCardinalitySpanList(n int) spanList {
itoa := map[int]string{0: "0", 1: "1", 2: "2", 3: "3", 4: "4", 5: "5"}
list := make([]*Span, n)
for i := range n {
list[i] = newBasicSpan("span.list." + itoa[i%5+1])
list[i].start = fixedTime
list[i].service = "service." + itoa[i%5+1]
list[i].resource = "resource." + itoa[i%5+1]
for i := range 50 {
list[i].SetTag("tag."+itoa[i%5+1], "value."+itoa[i%5+1])
}
}
return list
}

// TestPayloadIntegrity tests that whatever we push into the payload
// allows us to read the same content as would have been encoded by
// the codec.
Expand Down Expand Up @@ -293,6 +326,8 @@ func TestPayloadV1SpanLinkTraceID(t *testing.T) {
span.spanLinks = []SpanLink{
{TraceID: 123, TraceIDHigh: 456, SpanID: 789},
}
span.setMeta("_dd.span_links", "test") // should not get serialized

_, err := p.push(spanList{span})
assert.NoError(err)

Expand All @@ -315,6 +350,9 @@ func TestPayloadV1SpanLinkTraceID(t *testing.T) {
assert.Equal(uint64(123), link.TraceID)
assert.Equal(uint64(456), link.TraceIDHigh)
assert.Equal(uint64(789), link.SpanID)

span = got.chunks[0].spans[0]
assert.Empty(span.meta["_dd.span_links"])
}

// TestPayloadV1SpanEventArray tests that a span with a span event containing ArrayValue
Expand Down Expand Up @@ -532,6 +570,79 @@ func assertProcessTags(t *testing.T, payload spanLists) {
}
}

func TestPayloadV1SerializationFailure(t *testing.T) {
t.Run("nil span", func(t *testing.T) {
assert := assert.New(t)
p := newPayloadV1()
sl := newSpanList(1)
sl = append(sl, nil) // add a nil span

_, err := p.push(sl)
assert.NoError(err)

encoded, err := io.ReadAll(p)
assert.NoError(err)

got := newPayloadV1()
buf := bytes.NewBuffer(encoded)
_, err = buf.WriteTo(got)
assert.NoError(err)

_, err = got.decodeBuffer()
assert.NoError(err)

require.Len(t, got.chunks, 1)
require.Len(t, got.chunks[0].spans, 2)
assert.Equal(&Span{}, got.chunks[0].spans[1])
})

t.Run("invalid valueType", func(t *testing.T) {
p := newPayloadV1()
p.attributes["bad-attr"] = anyValue{valueType: 999, value: "x"}
s := newBasicSpan("test-span")
_, err := p.push(spanList{s})
require.NoError(t, err)
encoded, err := io.ReadAll(p)
require.NoError(t, err)

got := newPayloadV1()
_, err = bytes.NewBuffer(encoded).WriteTo(got)
require.NoError(t, err)

_, err = got.decodeBuffer()
require.NoError(t, err)
require.NotNil(t, got.attributes["bad-attr"])
assert.Equal(t, StringValueType, got.attributes["bad-attr"].valueType)
assert.Equal(t, serializationFailed, got.attributes["bad-attr"].value)
})

t.Run("invalid meta struct value", func(t *testing.T) {
p := newPayloadV1()
s := newBasicSpan("test-span")
s.mu.Lock()
s.setMetaStructLocked("bad-key", make(chan int)) // unsupported type
s.mu.Unlock()
_, err := p.push(spanList{s})
require.NoError(t, err)
encoded, err := io.ReadAll(p)
require.NoError(t, err)

got := newPayloadV1()
_, err = bytes.NewBuffer(encoded).WriteTo(got)
require.NoError(t, err)

_, err = got.decodeBuffer()
require.NoError(t, err)
require.Len(t, got.chunks, 1)
require.Len(t, got.chunks[0].spans, 1)
ms := got.chunks[0].spans[0].metaStruct["bad-key"]
require.NotNil(t, ms)
v, ok := ms.([]byte)
assert.True(t, ok)
assert.Equal(t, []byte(serializationFailed), v)
})
}

func BenchmarkPayloadThroughput(b *testing.B) {
b.Run("10K", benchmarkPayloadThroughput(1))
b.Run("100K", benchmarkPayloadThroughput(10))
Expand Down Expand Up @@ -838,3 +949,221 @@ func BenchmarkPayloadVersions(b *testing.B) {
})
}
}

func BenchmarkPayloads(b *testing.B) {
b.Run("v0.4", func(b *testing.B) {
b.Run("push/10spans", func(b *testing.B) {
p := newPayloadV04()
sl := newSpanList(10)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/1000spans", func(b *testing.B) {
p := newPayloadV04()
sl := newSpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/10_detailed_spans", func(b *testing.B) {
p := newPayloadV04()
sl := newDetailedSpanList(10)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/1000_detailed_spans", func(b *testing.B) {
p := newPayloadV04()
sl := newDetailedSpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/low_cardinality_spans", func(b *testing.B) {
p := newPayloadV04()
sl := newLowCardinalitySpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/high_cardinality_spans", func(b *testing.B) {
p := newPayloadV04()
sl := newHighCardinalitySpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("flush/1span", func(b *testing.B) {
p := newPayloadV04()

p.push(newSpanList(1))

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
p.reset()
io.ReadAll(p)
}
})

b.Run("flush/100spans", func(b *testing.B) {
p := newPayloadV04()

p.push(newSpanList(100))

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
p.reset()
io.ReadAll(p)
}
})

b.Run("flush/1000spans", func(b *testing.B) {
p := newPayloadV04()

p.push(newSpanList(1000))

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
p.reset()
io.ReadAll(p)
}
})
})

b.Run("v1", func(b *testing.B) {
b.Run("push/10spans", func(b *testing.B) {
p := newPayloadV1()
sl := newSpanList(10)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/1000spans", func(b *testing.B) {
p := newPayloadV1()
sl := newSpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/10_detailed_spans", func(b *testing.B) {
p := newPayloadV1()
sl := newDetailedSpanList(10)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/1000_detailed_spans", func(b *testing.B) {
p := newPayloadV1()
sl := newDetailedSpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/low_cardinality_spans", func(b *testing.B) {
p := newPayloadV1()
sl := newLowCardinalitySpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("push/high_cardinality_spans", func(b *testing.B) {
p := newPayloadV1()
sl := newHighCardinalitySpanList(1000)

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
_, _ = p.push(sl)
}
})

b.Run("flush/1span", func(b *testing.B) {
p := newPayloadV1()

p.push(newSpanList(1))

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
p.reset()
io.ReadAll(p)
}
})

b.Run("flush/100spans", func(b *testing.B) {
p := newPayloadV1()

p.push(newSpanList(100))

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
p.reset()
io.ReadAll(p)
}
})

b.Run("flush/1000spans", func(b *testing.B) {
p := newPayloadV1()

p.push(newSpanList(1000))

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
p.reset()
io.ReadAll(p)
}
})
})

// ... Add more payload versions here...
}
Loading
Loading