From 02155e1f9906a54d34bf9b52cd24efc41a2c0a77 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Thu, 4 Jun 2026 15:59:03 -0400 Subject: [PATCH 1/3] Pin `v1` protocol to test system-tests. --- .../communication/ddagent/DDAgentFeaturesDiscovery.java | 4 ++-- .../src/main/java/datadog/trace/api/ConfigDefaults.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index 55929c73f51..8278d54f34e 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -4,7 +4,7 @@ import static datadog.communication.http.OkHttpUtils.msgpackRequestBodyOf; import static datadog.communication.http.OkHttpUtils.prepareRequest; import static datadog.communication.serialization.msgpack.MsgPackWriter.FIXARRAY; -import static datadog.trace.api.ProtocolVersion.V0_4; +import static datadog.trace.api.ProtocolVersion.V1_0; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; @@ -118,7 +118,7 @@ public DDAgentFeaturesDiscovery( this.agentBaseUrl = agentUrl; this.metricsEnabled = metricsEnabled; this.ignoreAgentVersionForStats = ignoreAgentVersionForStats; - this.protocolVersion = protocolVersion != null ? protocolVersion : V0_4; + this.protocolVersion = protocolVersion != null ? protocolVersion : V1_0; this.discoveryTimer = monitoring.newTimer("trace.agent.discovery.time"); this.discoveryState = new State(); } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index a88c00db14e..6185f6795cd 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -88,7 +88,7 @@ public final class ConfigDefaults { Arrays.asList("user.id", "session.id", "account.id"); static final boolean DEFAULT_JMX_FETCH_ENABLED = true; - static final String DEFAULT_TRACE_AGENT_PROTOCOL_VERSION = ProtocolVersion.V0_4.asConfigValue(); + static final String DEFAULT_TRACE_AGENT_PROTOCOL_VERSION = ProtocolVersion.V1_0.asConfigValue(); static final boolean DEFAULT_CLIENT_IP_ENABLED = false; From 48654c782411dcd449893aa21e4db1dc90e8b624 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Thu, 4 Jun 2026 19:18:18 -0400 Subject: [PATCH 2/3] Fixed test for `v1` endpoint. --- .../ddagent/DDAgentFeaturesDiscoveryTest.groovy | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy index ff97ef6d3a2..115417e5ee5 100644 --- a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy +++ b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy @@ -88,7 +88,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { V1_0 | V1_ENDPOINT } - def "null protocol version falls back to v0.4 trace endpoints"() { + def "null protocol version falls back to v1.0 trace endpoints"() { setup: OkHttpClient client = Mock(OkHttpClient) DDAgentFeaturesDiscovery features = @@ -99,9 +99,10 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { then: 1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/info" }) >> { Request request -> infoResponse(request, "{}") } + 1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v1.0/traces" }) >> { Request request -> success(request) } 0 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.5/traces" }) >> { Request request -> success(request) } - 1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.4/traces" }) >> { Request request -> success(request) } - features.getTraceEndpoint() == V04_ENDPOINT + 0 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.4/traces" }) >> { Request request -> success(request) } + features.getTraceEndpoint() == V1_ENDPOINT 0 * _ } From c774104b7604f1ebce7491ade211f11898adfdd7 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Thu, 11 Jun 2026 14:06:48 -0400 Subject: [PATCH 3/3] Refactored span events to avoid not needed JSON double processing for v1 protocol. --- .../shim/trace/OtelConventions.java | 9 +- .../opentelemetry/shim/trace/OtelSpan.java | 4 +- .../shim/trace/OtelSpanEvent.java | 65 +- .../common/writer/ddagent/TraceMapperV1.java | 64 +- .../java/datadog/trace/core/CoreSpan.java | 14 +- .../main/java/datadog/trace/core/DDSpan.java | 40 +- .../datadog/trace/core/DDSpanContext.java | 41 +- .../java/datadog/trace/core/Metadata.java | 10 +- .../trace/common/metrics/SimpleSpan.groovy | 3 - .../ddagent/TraceMapperV1PayloadTest.groovy | 1746 --------------- .../FileBasedPayloadDispatcherTest.java | 1 + .../trace/common/writer/TraceGenerator.java | 50 +- .../ddagent/TraceMapperV1PayloadTest.java | 1866 +++++++++++++++++ .../groovy/TraceGenerator.groovy | 7 +- .../instrumentation/api/AgentSpan.java | 8 + .../instrumentation/api/AgentSpanEvent.java | 51 + 16 files changed, 2128 insertions(+), 1851 deletions(-) delete mode 100644 dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.java create mode 100644 internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpanEvent.java diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelConventions.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelConventions.java index a328a63140f..460ca0881f9 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelConventions.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelConventions.java @@ -7,7 +7,6 @@ import static datadog.trace.api.DDTags.ERROR_MSG; import static datadog.trace.api.DDTags.ERROR_STACK; import static datadog.trace.api.DDTags.ERROR_TYPE; -import static datadog.trace.api.DDTags.SPAN_EVENTS; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; @@ -136,11 +135,15 @@ public static void applyNamingConvention(AgentSpan span) { } } - public static void setEventsAsTag(AgentSpan span, List events) { + public static void recordSpanEvents(AgentSpan span, List events) { if (events == null || events.isEmpty()) { return; } - span.setTag(SPAN_EVENTS, OtelSpanEvent.toTag(events)); + // Hand the structured events to the span. The V1 payload encodes them natively, while v0.x + // payloads flatten them into the JSON `events` tag at serialization time (see DDSpanContext). + for (OtelSpanEvent event : events) { + span.addSpanEvent(event); + } } public static void applySpanEventExceptionAttributesAsTags( diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpan.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpan.java index ffe6cce4ea6..e3c9ba3589a 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpan.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpan.java @@ -3,7 +3,7 @@ import static datadog.opentelemetry.shim.trace.OtelConventions.applyNamingConvention; import static datadog.opentelemetry.shim.trace.OtelConventions.applyReservedAttribute; import static datadog.opentelemetry.shim.trace.OtelConventions.applySpanEventExceptionAttributesAsTags; -import static datadog.opentelemetry.shim.trace.OtelConventions.setEventsAsTag; +import static datadog.opentelemetry.shim.trace.OtelConventions.recordSpanEvents; import static datadog.opentelemetry.shim.trace.OtelSpanEvent.EXCEPTION_SPAN_EVENT_NAME; import static datadog.opentelemetry.shim.trace.OtelSpanEvent.initializeExceptionAttributes; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; @@ -179,7 +179,7 @@ public AgentSpan asAgentSpan() { @Override public void onSpanFinished() { applyNamingConvention(this.delegate); - setEventsAsTag(this.delegate, this.events); + recordSpanEvents(this.delegate, this.events); } private static class NoopSpan implements Span { diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpanEvent.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpanEvent.java index 6bd1802aebb..d7765c8f984 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpanEvent.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/trace/OtelSpanEvent.java @@ -2,18 +2,20 @@ import datadog.trace.api.time.SystemTimeSource; import datadog.trace.api.time.TimeSource; -import edu.umd.cs.findbugs.annotations.NonNull; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -public class OtelSpanEvent { +public class OtelSpanEvent implements AgentSpanEvent { public static final String EXCEPTION_SPAN_EVENT_NAME = "exception"; public static final AttributeKey EXCEPTION_MESSAGE_ATTRIBUTE_KEY = AttributeKey.stringKey("exception.message"); @@ -26,33 +28,53 @@ public class OtelSpanEvent { private static TimeSource timeSource = SystemTimeSource.INSTANCE; private final String name; - private final String attributes; + private final Attributes attributes; /** Event timestamp in nanoseconds. */ private final long timestamp; public OtelSpanEvent(String name, Attributes attributes) { - this.name = name; - this.attributes = AttributesJsonParser.toJson(attributes); - this.timestamp = OtelSpanEvent.timeSource.getCurrentTimeNanos(); + this(name, attributes, OtelSpanEvent.timeSource.getCurrentTimeNanos()); } public OtelSpanEvent(String name, Attributes attributes, long timestamp, TimeUnit unit) { + this(name, attributes, unit.toNanos(timestamp)); + } + + private OtelSpanEvent(String name, Attributes attributes, long timestampNanos) { this.name = name; - this.attributes = AttributesJsonParser.toJson(attributes); - this.timestamp = unit.toNanos(timestamp); + this.attributes = attributes; + this.timestamp = timestampNanos; } - @NonNull - public static String toTag(List events) { - StringBuilder builder = new StringBuilder("["); - for (OtelSpanEvent event : events) { - if (builder.length() > 1) { - builder.append(','); - } - builder.append(event.toJson()); + @Override + public long timeNanos() { + return this.timestamp; + } + + @Override + public String name() { + return this.name; + } + + /** + * Exposes the event attributes as typed values for native (V1) encoding. OpenTelemetry attribute + * values are already {@link String}, {@link Boolean}, {@link Long}, {@link Double} or a {@link + * List} of those, so they are passed through unchanged. + */ + @Override + public Map attributes() { + if (this.attributes == null || this.attributes.isEmpty()) { + return Collections.emptyMap(); } - return builder.append(']').toString(); + Map map = new LinkedHashMap<>(this.attributes.size()); + this.attributes.forEach((key, value) -> map.put(key.getKey(), value)); + return map; + } + + @Override + public CharSequence toJsonTag() { + return toJson(); } /** @@ -174,8 +196,9 @@ public String toJson() { StringBuilder builder = new StringBuilder( "{\"time_unix_nano\":" + this.timestamp + ",\"name\":\"" + this.name + "\""); - if (!this.attributes.isEmpty()) { - builder.append(",\"attributes\":").append(this.attributes); + String attributesJson = AttributesJsonParser.toJson(this.attributes); + if (!attributesJson.isEmpty()) { + builder.append(",\"attributes\":").append(attributesJson); } return builder.append('}').toString(); } @@ -186,8 +209,8 @@ public String toString() { + this.timestamp + ", name='" + this.name - + "', attributes='" + + "', attributes=" + this.attributes - + "'}"; + + '}'; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java index 4cb41c597f4..4b3bb3bb5bf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java @@ -17,6 +17,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.TagMap; import datadog.trace.api.sampling.SamplingMechanism; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.bootstrap.instrumentation.api.Tags; @@ -89,7 +90,7 @@ public void map(List> trace, Writable writable) { } CoreSpan firstSpan = trace.get(0); - firstSpan.processTagsAndBaggage(spanMetadata, false, false); + firstSpan.processTagsAndBaggageWithStructuredLinks(spanMetadata); Metadata firstSpanMeta = spanMetadata.metadata; // encoded fields: 1..7, but skipping #5, as not required by tracers and set by the agent. @@ -128,7 +129,7 @@ private void encodeSpans(Writable writable, int fieldId, List span : spans) { if (meta == null) { - span.processTagsAndBaggage(spanMetadata, false, false); + span.processTagsAndBaggageWithStructuredLinks(spanMetadata); meta = spanMetadata.metadata; } TagMap tags = meta.getTags(); @@ -162,7 +163,7 @@ private void encodeSpans(Writable writable, int fieldId, List events) { writable.writeInt(fieldId); - if (!(eventsObject instanceof List) || ((List) eventsObject).isEmpty()) { + if (events == null || events.isEmpty()) { writable.startArray(0); return; } - List events = (List) eventsObject; - int encodableCount = 0; - for (Object event : events) { - if (isEncodableSpanEvent(event)) { - encodableCount++; - } - } - writable.startArray(encodableCount); - for (Object event : events) { - if (!(event instanceof Map)) { - continue; - } - Map eventMap = (Map) event; - Long timeUnixNano = asLong(eventMap.get("time_unix_nano")); - Object nameObject = eventMap.get("name"); - if (timeUnixNano == null || nameObject == null) { - continue; - } - - Map attributes = - eventMap.get("attributes") instanceof Map ? (Map) eventMap.get("attributes") : null; - + writable.startArray(events.size()); + for (AgentSpanEvent event : events) { writable.startMap(3); - encodeLong(writable, 1, timeUnixNano); - encodeString(writable, 2, String.valueOf(nameObject)); - encodeEventAttributes(writable, 3, attributes); + encodeLong(writable, 1, event.timeNanos()); + encodeString(writable, 2, event.name()); + encodeEventAttributes(writable, 3, event.attributes()); } } - private boolean isEncodableSpanEvent(Object event) { - if (!(event instanceof Map)) { - return false; - } - Map eventMap = (Map) event; - return eventMap.get("name") != null && asLong(eventMap.get("time_unix_nano")) != null; - } - private void encodeEventAttributes(Writable writable, int fieldId, Map attrs) { writable.writeInt(fieldId); if (attrs == null || attrs.isEmpty()) { @@ -340,20 +314,6 @@ private boolean isIntegralNumber(Number number) { return !(number instanceof Float || number instanceof Double); } - private Long asLong(Object value) { - if (value instanceof Number) { - return ((Number) value).longValue(); - } - if (value instanceof CharSequence) { - try { - return Long.parseLong(value.toString()); - } catch (NumberFormatException ignored) { - return null; - } - } - return null; - } - private void encodeSpanAttributes( Writable writable, int fieldId, Metadata meta, Map metaStruct) { TagMap tags = meta.getTags(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java index a6ced35967c..2aa90c4ee21 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java @@ -97,8 +97,18 @@ default boolean isKind(SpanKindFilter filter) { void processTagsAndBaggage(MetadataConsumer consumer); - void processTagsAndBaggage( - MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags); + /** + * Variant of {@link #processTagsAndBaggage(MetadataConsumer)} for protocols that serialize span + * links as first-class structured data rather than tags (currently the V1 trace payload). Span + * links are therefore NOT flattened into the {@code _dd.span_links} tag, and baggage is + * materialized as {@code baggage.*} span tags, regardless of the tracer's configured defaults. + * + *

The default implementation delegates to {@link #processTagsAndBaggage(MetadataConsumer)}; + * implementations backed by real span configuration (e.g. {@code DDSpan}) override it. + */ + default void processTagsAndBaggageWithStructuredLinks(MetadataConsumer consumer) { + processTagsAndBaggage(consumer); + } T setSamplingPriority(int samplingPriority, int samplingMechanism); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 8ffcc77b49c..ed558e2a348 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -26,6 +26,7 @@ import datadog.trace.api.sampling.SamplingMechanism; import datadog.trace.bootstrap.debugger.DebuggerContext; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; import datadog.trace.bootstrap.instrumentation.api.AttachableWrapper; import datadog.trace.bootstrap.instrumentation.api.ErrorPriorities; @@ -119,6 +120,9 @@ static DDSpan create( private static final List EMPTY = Collections.emptyList(); protected volatile List links; + private static final List NO_EVENTS = Collections.emptyList(); + protected volatile List spanEvents = NO_EVENTS; + /** * Spans should be constructed using the builder, not by calling the constructor directly. * @@ -780,10 +784,9 @@ public void processTagsAndBaggage(final MetadataConsumer consumer) { } @Override - public void processTagsAndBaggage( - final MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - context.processTagsAndBaggage( - consumer, longRunningVersion, this, injectLinksAsTags, injectBaggageAsTags); + public void processTagsAndBaggageWithStructuredLinks(final MetadataConsumer consumer) { + // injectBaggageAsTags=true; links and events stay structured rather than flattened into tags + context.processTagsAndBaggage(consumer, longRunningVersion, this, true, false, false); } @Override @@ -901,6 +904,35 @@ public List getLinks() { return this.links; } + public List getSpanEvents() { + return this.spanEvents; + } + + @Override + public void addSpanEvent(AgentSpanEvent event) { + if (event == null) { + return; + } + + // Mirrors addLink: spanEvents starts as the shared immutable NO_EVENTS placeholder (safe to + // read, not to write). On first write, double-checked locking promotes it to a per-span + // CopyOnWriteArrayList. + List events = this.spanEvents; + if (events != NO_EVENTS) { + events.add(event); + return; + } + + synchronized (this) { + events = this.spanEvents; + if (events != NO_EVENTS) { + events.add(event); + } else { + this.spanEvents = new CopyOnWriteArrayList<>(Collections.singletonList(event)); + } + } + } + @Override public void addLink(AgentSpanLink link) { if (link == null) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index e7038db5dbe..8599af42988 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -1,6 +1,7 @@ package datadog.trace.core; import static datadog.trace.api.DDTags.PARENT_ID; +import static datadog.trace.api.DDTags.SPAN_EVENTS; import static datadog.trace.api.DDTags.SPAN_LINKS; import static datadog.trace.api.cache.RadixTreeCache.HTTP_STATUSES; import static datadog.trace.bootstrap.instrumentation.api.ErrorPriorities.UNSET; @@ -24,6 +25,7 @@ import datadog.trace.api.sampling.PrioritySampling; import datadog.trace.api.sampling.SamplingMechanism; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; import datadog.trace.bootstrap.instrumentation.api.AppendableSpanLinks; import datadog.trace.bootstrap.instrumentation.api.Baggage; import datadog.trace.bootstrap.instrumentation.api.ClientIpAddressData; @@ -1195,18 +1197,20 @@ void earlyProcessTags(AppendableSpanLinks links) { void processTagsAndBaggage( final MetadataConsumer consumer, int longRunningVersion, DDSpan restrictedSpan) { + // injectEventsAsTags=true: protocols other than V1 carry span events as the JSON `events` tag processTagsAndBaggage( - consumer, longRunningVersion, restrictedSpan, injectLinksAsTags, injectBaggageAsTags); + consumer, longRunningVersion, restrictedSpan, injectBaggageAsTags, injectLinksAsTags, true); } void processTagsAndBaggage( final MetadataConsumer consumer, int longRunningVersion, DDSpan restrictedSpan, + boolean injectBaggageAsTags, boolean injectLinksAsTags, - boolean injectBaggageAsTags) { + boolean injectEventsAsTags) { // NOTE: The span is passed for the sole purpose of allowing updating & reading of the span - // links + // links and events // This is a compromise to avoid... // - creating an extra wrapper object that would create significant allocation // - implementing an interface to read the spans that require making the read method public @@ -1222,6 +1226,15 @@ void processTagsAndBaggage( } } + // Events: flatten the structured span events into the legacy JSON `events` tag for protocols + // that don't encode them natively (V1 reads Metadata.getSpanEvents() instead). + if (injectEventsAsTags) { + String eventsTag = spanEventsToTag(restrictedSpan.getSpanEvents()); + if (eventsTag != null) { + unsafeTags.set(SPAN_EVENTS, eventsTag); + } + } + // Baggage Map baggageItemsWithPropagationTags; if (injectBaggageAsTags) { @@ -1248,8 +1261,28 @@ void processTagsAndBaggage( getOrigin(), longRunningVersion, ProcessTags.getTagsForSerialization(), - restrictedSpan.getLinks())); + restrictedSpan.getLinks(), + restrictedSpan.getSpanEvents())); + } + } + + /** + * Assembles the legacy v0.x {@code events} tag: a JSON array of the per-event objects produced by + * {@link AgentSpanEvent#toJsonTag()}. Returns {@code null} when there are no events so the tag is + * not emitted. + */ + private static String spanEventsToTag(List events) { + if (events == null || events.isEmpty()) { + return null; + } + StringBuilder builder = new StringBuilder("["); + for (AgentSpanEvent event : events) { + if (builder.length() > 1) { + builder.append(','); + } + builder.append(event.toJsonTag()); } + return builder.append(']').toString(); } void injectW3CBaggageTags(Map baggageItemsWithPropagationTags) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/Metadata.java b/dd-trace-core/src/main/java/datadog/trace/core/Metadata.java index b7f9a6b2cc2..aab99c0a0f6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/Metadata.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/Metadata.java @@ -4,6 +4,7 @@ import static java.util.Collections.emptyList; import datadog.trace.api.TagMap; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import java.util.List; @@ -23,6 +24,7 @@ public final class Metadata { private final int longRunningVersion; private final UTF8BytesString processTags; private final List spanLinks; + private final List spanEvents; public Metadata( long threadId, @@ -36,7 +38,8 @@ public Metadata( CharSequence origin, int longRunningVersion, UTF8BytesString processTags, - List spanLinks) { + List spanLinks, + List spanEvents) { this.threadId = threadId; this.threadName = threadName; this.httpStatusCode = httpStatusCode; @@ -49,6 +52,7 @@ public Metadata( this.longRunningVersion = longRunningVersion; this.processTags = processTags; this.spanLinks = spanLinks == null ? emptyList() : spanLinks; + this.spanEvents = spanEvents == null ? emptyList() : spanEvents; } public UTF8BytesString getHttpStatusCode() { @@ -102,4 +106,8 @@ public UTF8BytesString processTags() { public List getSpanLinks() { return spanLinks; } + + public List getSpanEvents() { + return spanEvents; + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index 2fd8554d499..5e315c363dc 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -235,9 +235,6 @@ class SimpleSpan implements CoreSpan { @Override void processTagsAndBaggage(MetadataConsumer consumer) {} - @Override - void processTagsAndBaggage(MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) {} - @Override SimpleSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { return this diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy deleted file mode 100644 index a1350a7538c..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy +++ /dev/null @@ -1,1746 +0,0 @@ -package datadog.trace.common.writer.ddagent - -import static datadog.trace.common.writer.TraceGenerator.generateRandomTraces -import static org.junit.jupiter.api.Assertions.assertArrayEquals -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertNotNull -import static org.junit.jupiter.api.Assertions.assertTrue -import static org.msgpack.core.MessageFormat.FIXSTR -import static org.msgpack.core.MessageFormat.STR16 -import static org.msgpack.core.MessageFormat.STR32 -import static org.msgpack.core.MessageFormat.STR8 - -import datadog.communication.serialization.ByteBufferConsumer -import datadog.communication.serialization.FlushingBuffer -import datadog.communication.serialization.msgpack.MsgPackWriter -import datadog.trace.api.DDTags -import datadog.trace.api.DDTraceId -import datadog.trace.api.DDSpanId -import datadog.trace.api.ProcessTags -import datadog.trace.api.sampling.PrioritySampling -import datadog.trace.api.sampling.SamplingMechanism -import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags -import datadog.trace.bootstrap.instrumentation.api.SpanAttributes -import datadog.trace.bootstrap.instrumentation.api.SpanLink -import datadog.trace.bootstrap.instrumentation.api.Tags -import datadog.trace.common.writer.Payload -import datadog.trace.common.writer.TraceGenerator -import datadog.trace.core.MetadataConsumer -import datadog.trace.test.util.DDSpecification -import java.nio.ByteBuffer -import java.nio.channels.WritableByteChannel -import org.junit.jupiter.api.Assertions -import org.msgpack.core.MessageFormat -import org.msgpack.core.MessagePack -import org.msgpack.core.MessageUnpacker - -class TraceMapperV1PayloadTest extends DDSpecification { - - def "test traces written correctly"() { - setup: - List> traces = generateRandomTraces(traceCount, lowCardinality) - TraceMapperV1 traceMapper = new TraceMapperV1() - PayloadVerifier verifier = new PayloadVerifier(traces, traceMapper) - MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(bufferSize, verifier)) - - when: - boolean tracesFitInBuffer = true - for (List trace : traces) { - if (!packer.format(trace, traceMapper)) { - verifier.skipLargeTrace() - tracesFitInBuffer = false - traceMapper.reset() - } - } - packer.flush() - - then: - if (tracesFitInBuffer) { - verifier.verifyTracesConsumed() - } - - where: - bufferSize | traceCount | lowCardinality - 20 << 10 | 0 | true - 20 << 10 | 1 | true - 30 << 10 | 2 | true - 20 << 10 | 0 | false - 20 << 10 | 1 | false - 30 << 10 | 2 | false - 100 << 10 | 10 | true - 100 << 10 | 100 | false - } - - def "test endpoint returns v1.0"() { - expect: - new TraceMapperV1().endpoint() == "v1.0" - } - - def "test span kind value conversion"() { - expect: - TraceMapperV1.getSpanKindValue(null) == TraceMapperV1.SPAN_KIND_UNSPECIFIED - TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_INTERNAL) == TraceMapperV1.SPAN_KIND_INTERNAL - TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_SERVER) == TraceMapperV1.SPAN_KIND_SERVER - TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_CLIENT) == TraceMapperV1.SPAN_KIND_CLIENT - TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_PRODUCER) == TraceMapperV1.SPAN_KIND_PRODUCER - TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_CONSUMER) == TraceMapperV1.SPAN_KIND_CONSUMER - TraceMapperV1.getSpanKindValue("unknown") == TraceMapperV1.SPAN_KIND_INTERNAL - } - - def "test payload contains expected header and chunk fields"() { - setup: - Map tags = [ - (Tags.ENV): "prod", - (Tags.VERSION): "1.2.3", - (Tags.COMPONENT): "http-client", - (Tags.SPAN_KIND): Tags.SPAN_KIND_CLIENT, - "attr.string": "value", - "attr.bool" : true, - "attr.number": 12.5d, - "_dd.p.dm" : "-3" - ] - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 1, - [:], - tags, - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - "rum") - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - int payloadFieldCount = unpacker.unpackMapHeader() - Set payloadFieldsSeen = new HashSet<>() - int chunkCount = -1 - Map payloadAttributes = null - - for (int i = 0; i < payloadFieldCount; i++) { - int fieldId = unpacker.unpackInt() - payloadFieldsSeen.add(fieldId) - switch (fieldId) { - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - case 8: - case 9: - readStreamingString(unpacker, stringTable) - break - case 10: - payloadAttributes = readAttributes(unpacker, stringTable) - break - case 11: - chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - verifyChunk(unpacker, [span], stringTable) - break - default: - Assertions.fail("Unexpected payload field id: " + fieldId) - } - } - - then: - assertEquals(10, payloadFieldCount) - assertEquals((2..11).toSet(), payloadFieldsSeen) - assertEquals(1, chunkCount) - assertNotNull(payloadAttributes) - if (ProcessTags.tagsForSerialization == null) { - assertEquals(0, payloadAttributes.size()) - } else { - assertEquals(1, payloadAttributes.size()) - assertEquals(ProcessTags.tagsForSerialization.toString(), payloadAttributes.get(DDTags.PROCESS_TAGS)) - } - } - - def "test sampling mechanism normalization from _dd.p.dm"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 321L, - 0L, - 1000L, - 2000L, - 0, - [:], - decisionMakerTag == null ? [:] : ["_dd.p.dm": decisionMakerTag], - "custom", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - unpacker.unpackMapHeader() - int samplingMechanism = -1 - - for (int i = 0; i < 10; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId == 11) { - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - int chunkFieldCount = unpacker.unpackMapHeader() - for (int j = 0; j < chunkFieldCount; j++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId == 7) { - samplingMechanism = unpacker.unpackInt() - } else { - skipChunkField(unpacker, chunkFieldId, stringTable) - } - } - } else { - skipPayloadField(unpacker, payloadFieldId, stringTable) - } - } - - then: - assertEquals(expectedSamplingMechanism, samplingMechanism) - - where: - decisionMakerTag | expectedSamplingMechanism - null | SamplingMechanism.DEFAULT - "-3" | 3 - "934086a686-7" | 7 - "invalid" | SamplingMechanism.DEFAULT - } - - def "test span ids are encoded as unsigned values in v1 payloads"() { - setup: - long spanId = Long.MIN_VALUE + 123L - long parentId = Long.MIN_VALUE + 456L - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - spanId, - parentId, - 1000L, - 2000L, - 0, - [:], - [:], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - unpacker.unpackMapHeader() - Long actualSpanId = null - Long actualParentId = null - - for (int i = 0; i < 10; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId == 11) { - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - int chunkFieldCount = unpacker.unpackMapHeader() - for (int j = 0; j < chunkFieldCount; j++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId == 4) { - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - int spanFieldCount = unpacker.unpackMapHeader() - for (int k = 0; k < spanFieldCount; k++) { - int spanFieldId = unpacker.unpackInt() - switch (spanFieldId) { - case 4: - assertEquals(MessageFormat.UINT64, unpacker.nextFormat) - actualSpanId = DDSpanId.from("${unpacker.unpackBigInteger()}") - break - case 5: - assertEquals(MessageFormat.UINT64, unpacker.nextFormat) - actualParentId = DDSpanId.from("${unpacker.unpackBigInteger()}") - break - default: - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } else { - skipChunkField(unpacker, chunkFieldId, stringTable) - } - } - } else { - skipPayloadField(unpacker, payloadFieldId, stringTable) - } - } - - then: - assertEquals(spanId, actualSpanId) - assertEquals(parentId, actualParentId) - } - - def "test span links are encoded from structured span links"() { - setup: - List spanLinks = [ - new SpanLink( - DDTraceId.fromHex("11223344556677889900aabbccddeeff"), - DDSpanId.fromHex("000000000000002a"), - (byte) 1, - "dd=s:1", - SpanAttributes.fromMap(["link.kind": "follows_from", "context_headers": "tracecontext"])), - new SpanLink( - DDTraceId.fromHex("00000000000000000000000000000001"), - DDSpanId.fromHex("0000000000000002"), - (byte) 0, - "", - SpanAttributes.EMPTY) - ] - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [:], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null, - spanLinks) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - List> links = readFirstSpanLinks(unpacker, stringTable) - - then: - assertEquals(2, links.size()) - assertArrayEquals(traceIdBytes(DDTraceId.fromHex("11223344556677889900aabbccddeeff")), links[0].traceId as byte[]) - assertEquals(DDSpanId.fromHex("000000000000002a"), links[0].spanId) - assertEquals("dd=s:1", links[0].tracestate) - assertEquals(1L, links[0].flags) - assertEquals(["link.kind": "follows_from", "context_headers": "tracecontext"], links[0].attributes) - - assertArrayEquals(traceIdBytes(DDTraceId.fromHex("00000000000000000000000000000001")), links[1].traceId as byte[]) - assertEquals(DDSpanId.fromHex("0000000000000002"), links[1].spanId) - assertEquals("", links[1].tracestate) - assertEquals(0L, links[1].flags) - assertEquals([:], links[1].attributes) - } - - def "test first span tags are processed once"() { - setup: - def firstSpan = new CountingPojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [(Tags.HTTP_URL): "http://localhost:7777/"], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - def secondSpan = new CountingPojoSpan( - "service-a", - "operation-b", - "resource-b", - DDTraceId.ONE, - 456L, - 123L, - 1000L, - 2000L, - 0, - [:], - [(Tags.HTTP_URL): "http://localhost:7777/"], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - - when: - serializeMappedPayload(mapper, [[firstSpan, secondSpan]]) - - then: - assertEquals(1, firstSpan.processTagsAndBaggageCount) - assertEquals(1, secondSpan.processTagsAndBaggageCount) - } - - def "test missing span links encode empty links"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [:], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - List> links = readFirstSpanLinks(unpacker, stringTable) - - then: - assertTrue(links.isEmpty()) - } - - def "test span events are encoded from events tag"() { - setup: - List> eventPayload = [ - [ - time_unix_nano: 1234567890L, - name : "event.one", - attributes : [ - str : "v", - int : 42L, - double: 12.5d, - bool : true, - arr : ["x", 7L, 2.5d, false] - ] - ], - [ - time_unix_nano: 1234567891L, - name : "event.two" - ] - ] - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - ["events": eventPayload], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - List> events = readFirstSpanEvents(unpacker, stringTable) - - then: - assertEquals(2, events.size()) - assertEquals(1234567890L, events[0].timeUnixNano) - assertEquals("event.one", events[0].name) - assertEquals("v", events[0].attributes["str"]) - assertEquals(42L, events[0].attributes["int"]) - assertEquals(12.5d, (events[0].attributes["double"] as Number).doubleValue(), 0.000001d) - assertEquals(true, events[0].attributes["bool"]) - assertEquals(["x", 7L, 2.5d, false], events[0].attributes["arr"]) - - assertEquals(1234567891L, events[1].timeUnixNano) - assertEquals("event.two", events[1].name) - assertEquals([:], events[1].attributes) - } - - def "test malformed span events fall back to empty events"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - ["events": [foo: "bar"]], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - List> events = readFirstSpanEvents(unpacker, stringTable) - - then: - assertTrue(events.isEmpty()) - } - - def "test meta struct is encoded as bytes attribute"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [:], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 200, - null) - span.setMetaStruct("meta_key", [foo: "bar", answer: 42L]) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) - byte[] metaStructBytes = attributes["meta_key"] as byte[] - MessageUnpacker metaStructUnpacker = MessagePack.newDefaultUnpacker(metaStructBytes) - int metaStructFieldCount = metaStructUnpacker.unpackMapHeader() - Map decodedMetaStruct = [:] - for (int i = 0; i < metaStructFieldCount; i++) { - String key = metaStructUnpacker.unpackString() - switch (metaStructUnpacker.getNextFormat().getValueType()) { - case org.msgpack.value.ValueType.INTEGER: - decodedMetaStruct[key] = metaStructUnpacker.unpackLong() - break - case org.msgpack.value.ValueType.STRING: - decodedMetaStruct[key] = metaStructUnpacker.unpackString() - break - default: - Assertions.fail("Unexpected meta_struct value type for key " + key) - } - } - - then: - assertNotNull(metaStructBytes) - assertEquals("bar", decodedMetaStruct["foo"]) - assertEquals(42L, decodedMetaStruct["answer"]) - } - - def "test map-valued span tags are flattened in v1 attributes"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [ - "usr": [ - "id" : "123", - "name" : "alice", - "authenticated": true, - "profile" : [ - "age": 30L - ] - ], - "appsec.events.users.login.success": [ - "metadata0": [ - "event" : "login", - "attempts": 1L - ], - "metadata1": [ - "blocked": false - ] - ] - ], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 0, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) - - then: - assertTrue(attributes.containsKey("usr.id")) - assertTrue(attributes.containsKey("usr.name")) - assertTrue(attributes.containsKey("usr.authenticated")) - assertTrue(attributes.containsKey("usr.profile.age")) - assertTrue(attributes.containsKey("appsec.events.users.login.success.metadata0.event")) - assertTrue(attributes.containsKey("appsec.events.users.login.success.metadata0.attempts")) - assertTrue(attributes.containsKey("appsec.events.users.login.success.metadata1.blocked")) - - assertEquals("123", attributes.get("usr.id")) - assertEquals("alice", attributes.get("usr.name")) - assertEquals(true, attributes.get("usr.authenticated")) - assertEquals(30d, (attributes.get("usr.profile.age") as Number).doubleValue(), 0.000001d) - assertEquals("login", attributes.get("appsec.events.users.login.success.metadata0.event")) - assertEquals(1d, (attributes.get("appsec.events.users.login.success.metadata0.attempts") as Number).doubleValue(), 0.000001d) - assertEquals(false, attributes.get("appsec.events.users.login.success.metadata1.blocked")) - - assertTrue(!attributes.containsKey("usr")) - assertTrue(!attributes.containsKey("appsec.events.users.login.success")) - } - - def "test primitive span tags are encoded in v1 attributes"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [ - "tag.bool" : true, - "tag.int" : 7, - "tag.long" : 9L, - "tag.float" : 3.5f, - "tag.double": 4.25d - ], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 0, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) - - then: - assertEquals(true, attributes.get("tag.bool")) - assertEquals(7d, (attributes.get("tag.int") as Number).doubleValue(), 0.000001d) - assertEquals(9d, (attributes.get("tag.long") as Number).doubleValue(), 0.000001d) - assertEquals(3.5d, (attributes.get("tag.float") as Number).doubleValue(), 0.000001d) - assertEquals(4.25d, (attributes.get("tag.double") as Number).doubleValue(), 0.000001d) - } - - def "test thread metadata is encoded in v1 attributes"() { - setup: - def span = new TraceGenerator.PojoSpan( - "service-a", - "operation-a", - "resource-a", - DDTraceId.ONE, - 123L, - 0L, - 1000L, - 2000L, - 0, - [:], - [:], - "web", - false, - PrioritySampling.SAMPLER_KEEP, - 0, - null) - - TraceMapperV1 mapper = new TraceMapperV1() - byte[] encoded = serializeMappedPayload(mapper, [[span]]) - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded) - List stringTable = new ArrayList<>() - stringTable.add("") - - when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) - - then: - assertAttributeValueEquals(span.getTag(DDTags.THREAD_ID), attributes.get(DDTags.THREAD_ID), DDTags.THREAD_ID) - assertEquals(span.getTag(DDTags.THREAD_NAME).toString(), attributes.get(DDTags.THREAD_NAME)) - } - - private static final class PayloadVerifier implements ByteBufferConsumer, WritableByteChannel { - - private final List> expectedTraces - private final TraceMapperV1 mapper - private ByteBuffer captured = ByteBuffer.allocate(200 << 10) - private int position = 0 - - private PayloadVerifier(List> expectedTraces, TraceMapperV1 mapper) { - this.expectedTraces = expectedTraces - this.mapper = mapper - } - - void skipLargeTrace() { - ++position - } - - @Override - void accept(int messageCount, ByteBuffer buffer) { - if (expectedTraces.isEmpty() && messageCount == 0) { - return - } - try { - Payload payload = mapper.newPayload().withBody(messageCount, buffer) - payload.writeTo(this) - captured.flip() - - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(captured) - if (messageCount == 0) { - assertEquals(0, unpacker.unpackMapHeader()) - return - } - - List stringTable = new ArrayList<>() - stringTable.add("") - - int payloadFieldCount = unpacker.unpackMapHeader() - assertEquals(10, payloadFieldCount) - - boolean seenChunks = false - for (int i = 0; i < payloadFieldCount; i++) { - int fieldId = unpacker.unpackInt() - if (fieldId == 11) { - int traceCount = unpacker.unpackArrayHeader() - assertEquals(messageCount, traceCount) - seenChunks = true - for (int traceIndex = 0; traceIndex < traceCount; traceIndex++) { - List expectedTrace = expectedTraces.get(position++) - verifyChunk(unpacker, expectedTrace, stringTable) - } - } else { - skipPayloadField(unpacker, fieldId, stringTable) - } - } - - assertTrue(seenChunks) - } catch (IOException e) { - Assertions.fail(e.getMessage()) - } finally { - mapper.reset() - captured.position(0) - captured.limit(captured.capacity()) - } - } - - @Override - int write(ByteBuffer src) { - if (captured.remaining() < src.remaining()) { - ByteBuffer newBuffer = ByteBuffer.allocate(captured.capacity() + src.remaining()) - captured.flip() - newBuffer.put(captured) - captured = newBuffer - return write(src) - } - captured.put(src) - return src.position() - } - - void verifyTracesConsumed() { - assertEquals(expectedTraces.size(), position) - } - - @Override - boolean isOpen() { - return true - } - - @Override - void close() { - } - } - - private static void verifyChunk( - MessageUnpacker unpacker, - List expectedTrace, - List stringTable) { - int chunkFieldCount = unpacker.unpackMapHeader() - assertEquals(6, chunkFieldCount) - - Integer priority = null - String origin = null - Map chunkAttributes = null - byte[] traceId = null - Integer samplingMechanism = null - List decodedSpans = null - - for (int i = 0; i < chunkFieldCount; i++) { - int fieldId = unpacker.unpackInt() - switch (fieldId) { - case 1: - priority = unpacker.unpackInt() - break - case 2: - origin = readStreamingString(unpacker, stringTable) - break - case 3: - chunkAttributes = readAttributes(unpacker, stringTable) - break - case 4: - decodedSpans = verifySpans(unpacker, expectedTrace, stringTable) - break - case 6: - int traceIdLen = unpacker.unpackBinaryHeader() - traceId = new byte[traceIdLen] - unpacker.readPayload(traceId) - break - case 7: - samplingMechanism = unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected chunk field id: " + fieldId) - } - } - - assertNotNull(priority) - assertNotNull(origin) - assertNotNull(chunkAttributes) - assertNotNull(decodedSpans) - assertNotNull(traceId) - assertNotNull(samplingMechanism) - - TraceGenerator.PojoSpan firstSpan = expectedTrace.get(0) - assertEquals(firstSpan.samplingPriority(), priority) - assertEqualsWithNullAsEmpty(firstSpan.getOrigin(), origin) - assertEquals(1, chunkAttributes.size()) - assertEqualsWithNullAsEmpty(firstSpan.getLocalRootSpan().getServiceName(), chunkAttributes.get("service")) - assertArrayEquals(traceIdBytes(firstSpan.getTraceId()), traceId) - assertEquals(expectedSamplingMechanism(firstSpan.getTags()), samplingMechanism) - } - - private static byte[] traceIdBytes(DDTraceId traceId) { - ByteBuffer.allocate(16) - .putLong(traceId.toHighOrderLong()) - .putLong(traceId.toLong()) - .array() - } - - private static List verifySpans( - MessageUnpacker unpacker, - List expectedTrace, - List stringTable) { - int spanCount = unpacker.unpackArrayHeader() - assertEquals(expectedTrace.size(), spanCount) - - for (int i = 0; i < spanCount; i++) { - verifySpan(unpacker, expectedTrace.get(i), stringTable) - } - return expectedTrace - } - - private static void verifySpan( - MessageUnpacker unpacker, - TraceGenerator.PojoSpan expectedSpan, - List stringTable) { - int spanFieldCount = unpacker.unpackMapHeader() - assertEquals(16, spanFieldCount) - - String service = null - String name = null - String resource = null - Long spanId = null - Long parentId = null - Long start = null - Long duration = null - Boolean error = null - Map attributes = null - String type = null - int linksCount = -1 - int eventsCount = -1 - String env = null - String version = null - String component = null - Integer spanKind = null - - for (int i = 0; i < spanFieldCount; i++) { - int fieldId = unpacker.unpackInt() - switch (fieldId) { - case 1: - service = readStreamingString(unpacker, stringTable) - break - case 2: - name = readStreamingString(unpacker, stringTable) - break - case 3: - resource = readStreamingString(unpacker, stringTable) - break - case 4: - spanId = unpackUnsignedLong(unpacker) - break - case 5: - parentId = unpackUnsignedLong(unpacker) - break - case 6: - start = unpacker.unpackLong() - break - case 7: - duration = unpacker.unpackLong() - break - case 8: - error = unpacker.unpackBoolean() - break - case 9: - attributes = readAttributes(unpacker, stringTable) - break - case 10: - type = readStreamingString(unpacker, stringTable) - break - case 11: - linksCount = unpacker.unpackArrayHeader() - break - case 12: - eventsCount = unpacker.unpackArrayHeader() - break - case 13: - env = readStreamingString(unpacker, stringTable) - break - case 14: - version = readStreamingString(unpacker, stringTable) - break - case 15: - component = readStreamingString(unpacker, stringTable) - break - case 16: - spanKind = unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected span field id: " + fieldId) - } - } - - assertEqualsWithNullAsEmpty(expectedSpan.getServiceName(), service) - assertEqualsWithNullAsEmpty(expectedSpan.getOperationName(), name) - assertEqualsWithNullAsEmpty(expectedSpan.getResourceName(), resource) - assertEquals(expectedSpan.getSpanId(), spanId) - assertEquals(expectedSpan.getParentId(), parentId) - assertEquals(expectedSpan.getStartTime(), start) - assertEquals(expectedSpan.getDurationNano(), duration) - assertEquals(expectedSpan.getError() != 0, error) - assertEqualsWithNullAsEmpty(expectedSpan.getType(), type) - assertEquals(0, linksCount) - assertEquals(0, eventsCount) - assertEqualsWithNullAsEmpty(expectedSpan.getTag(Tags.ENV), env) - assertEqualsWithNullAsEmpty(expectedSpan.getTag(Tags.VERSION), version) - assertEqualsWithNullAsEmpty(expectedSpan.getTag(Tags.COMPONENT), component) - assertEquals(TraceMapperV1.getSpanKindValue(expectedSpan.getTag(Tags.SPAN_KIND)), spanKind) - - assertNotNull(attributes) - int expectedHttpStatusCode = expectedSpan.getHttpStatusCode() - boolean shouldContainHttpStatus = expectedHttpStatusCode != 0 && !expectedSpan.getTags().containsKey("http.status_code") - Map expectedAttributes = [:] - for (Map.Entry entry : expectedSpan.getBaggage().entrySet()) { - expectedAttributes.put(entry.getKey(), entry.getValue()) - } - expectedAttributes.put(DDTags.THREAD_ID, expectedSpan.getTag(DDTags.THREAD_ID)) - expectedAttributes.put(DDTags.THREAD_NAME, expectedSpan.getTag(DDTags.THREAD_NAME)) - for (Map.Entry entry : expectedSpan.getTags().entrySet()) { - if (DDTags.SPAN_EVENTS == entry.getKey()) { - continue - } - addFlattenedExpectedAttribute(expectedAttributes, entry.getKey(), entry.getValue()) - } - if (shouldContainHttpStatus) { - expectedAttributes.put("http.status_code", Integer.toString(expectedHttpStatusCode)) - } - if (expectedSpan.isTopLevel()) { - expectedAttributes.put(InstrumentationTags.DD_TOP_LEVEL.toString(), 1d) - } - - assertEquals(expectedAttributes.size(), attributes.size()) - for (Map.Entry entry : expectedAttributes.entrySet()) { - String key = entry.getKey() - Object expectedValue = entry.getValue() - assertTrue(attributes.containsKey(key), "Missing attribute key: $key") - assertAttributeValueEquals(expectedValue, attributes.get(key), key) - } - } - - private static Map readAttributes(MessageUnpacker unpacker, List stringTable) { - int attrArraySize = unpacker.unpackArrayHeader() - assertEquals(0, attrArraySize % 3) - int attrCount = attrArraySize / 3 - - Map attributes = new HashMap<>() - for (int i = 0; i < attrCount; i++) { - String key = readStreamingString(unpacker, stringTable) - int attrType = unpacker.unpackInt() - Object value - switch (attrType) { - case TraceMapperV1.VALUE_TYPE_STRING: - value = readStreamingString(unpacker, stringTable) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - value = unpacker.unpackBoolean() - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - value = unpacker.unpackDouble() - break - case TraceMapperV1.VALUE_TYPE_BYTES: - int len = unpacker.unpackBinaryHeader() - byte[] data = new byte[len] - unpacker.readPayload(data) - value = data - break - default: - Assertions.fail("Unknown attribute value type: " + attrType) - } - attributes.put(key, value) - } - return attributes - } - - private static void assertAttributeValueEquals(Object expected, Object actual, String key) { - if (expected instanceof Number) { - assertTrue(actual instanceof Number, "Attribute $key should be numeric") - double expectedValue = ((Number) expected).doubleValue() - double actualValue = ((Number) actual).doubleValue() - double delta = Math.max(0.000001d, Math.abs(expectedValue) * 0.000000000001d) - assertEquals(expectedValue, actualValue, delta, "Numeric mismatch for $key") - } else if (expected instanceof Boolean) { - assertEquals(expected, actual, "Boolean mismatch for $key") - } else { - assertEquals(String.valueOf(expected), String.valueOf(actual), "String mismatch for $key") - } - } - - private static long unpackUnsignedLong(MessageUnpacker unpacker) { - MessageFormat format = unpacker.nextFormat - if (format == MessageFormat.UINT64) { - return DDSpanId.from("${unpacker.unpackBigInteger()}") - } - return unpacker.unpackLong() - } - - private static void addFlattenedExpectedAttribute( - Map expectedAttributes, - String key, - Object value) { - if (!(value instanceof Map)) { - expectedAttributes.put(key, value) - return - } - for (Map.Entry entry : ((Map) value).entrySet()) { - addFlattenedExpectedAttribute( - expectedAttributes, - key + "." + String.valueOf(entry.getKey()), - entry.getValue()) - } - } - - private static int expectedSamplingMechanism(Map tags) { - Object decisionMakerRaw = tags.get("_dd.p.dm") - if (decisionMakerRaw == null) { - return SamplingMechanism.DEFAULT - } - - String decisionMaker = String.valueOf(decisionMakerRaw) - try { - int value = Integer.parseInt(decisionMaker) - return value < 0 ? -value : value - } catch (NumberFormatException ignored) { - int separator = decisionMaker.lastIndexOf('-') - if (separator >= 0 && separator + 1 < decisionMaker.length()) { - try { - int value = Integer.parseInt(decisionMaker.substring(separator + 1)) - return value < 0 ? -value : value - } catch (NumberFormatException ignoredAgain) { - } - } - return SamplingMechanism.DEFAULT - } - } - - private static String readStreamingString(MessageUnpacker unpacker, List stringTable) { - MessageFormat format = unpacker.getNextFormat() - if (format == FIXSTR || format == STR8 || format == STR16 || format == STR32) { - String value = unpacker.unpackString() - if (!stringTable.contains(value)) { - stringTable.add(value) - } - return value - } - - int index = unpacker.unpackInt() - assertTrue(index >= 0 && index < stringTable.size(), "Invalid string-table index: " + index) - return stringTable.get(index) - } - - private static void skipPayloadField(MessageUnpacker unpacker, int fieldId, List stringTable) { - switch (fieldId) { - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - case 8: - case 9: - readStreamingString(unpacker, stringTable) - break - case 10: - readAttributes(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected payload field id while skipping: " + fieldId) - } - } - - private static void skipChunkField(MessageUnpacker unpacker, int fieldId, List stringTable) { - switch (fieldId) { - case 1: - unpacker.unpackInt() - break - case 2: - readStreamingString(unpacker, stringTable) - break - case 3: - readAttributes(unpacker, stringTable) - break - case 4: - int spanCount = unpacker.unpackArrayHeader() - for (int i = 0; i < spanCount; i++) { - skipSpan(unpacker, stringTable) - } - break - case 5: - unpacker.unpackBoolean() - break - case 6: - int len = unpacker.unpackBinaryHeader() - byte[] ignored = new byte[len] - unpacker.readPayload(ignored) - break - case 7: - unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected chunk field id while skipping: " + fieldId) - } - } - - private static void skipSpan(MessageUnpacker unpacker, List stringTable) { - int fieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < fieldCount; i++) { - int fieldId = unpacker.unpackInt() - switch (fieldId) { - case 1: - case 2: - case 3: - case 10: - case 13: - case 14: - case 15: - readStreamingString(unpacker, stringTable) - break - case 4: - case 5: - unpacker.unpackValue().asNumberValue().toLong() - break - case 6: - case 7: - unpacker.unpackLong() - break - case 8: - unpacker.unpackBoolean() - break - case 9: - int attrArraySize = unpacker.unpackArrayHeader() - int attrCount = attrArraySize / 3 - for (int j = 0; j < attrCount; j++) { - readStreamingString(unpacker, stringTable) - int type = unpacker.unpackInt() - switch (type) { - case TraceMapperV1.VALUE_TYPE_STRING: - readStreamingString(unpacker, stringTable) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - unpacker.unpackBoolean() - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - unpacker.unpackDouble() - break - case TraceMapperV1.VALUE_TYPE_BYTES: - int len = unpacker.unpackBinaryHeader() - byte[] ignored = new byte[len] - unpacker.readPayload(ignored) - break - default: - Assertions.fail("Unexpected attribute type while skipping: " + type) - } - } - break - case 11: - case 12: - unpacker.unpackArrayHeader() - break - case 16: - unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected span field id while skipping: " + fieldId) - } - } - } - - private static Map readFirstSpanAttributes( - MessageUnpacker unpacker, - List stringTable) { - int payloadFieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < payloadFieldCount; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId != 11) { - skipPayloadField(unpacker, payloadFieldId, stringTable) - continue - } - - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - - int chunkFieldCount = unpacker.unpackMapHeader() - for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId != 4) { - skipChunkField(unpacker, chunkFieldId, stringTable) - continue - } - - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - - int spanFieldCount = unpacker.unpackMapHeader() - for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { - int spanFieldId = unpacker.unpackInt() - if (spanFieldId == 9) { - return readAttributes(unpacker, stringTable) - } - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } - Assertions.fail("Could not find span attributes field in first span") - return [:] - } - - private static List> readFirstSpanLinks( - MessageUnpacker unpacker, - List stringTable) { - int payloadFieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < payloadFieldCount; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId != 11) { - skipPayloadField(unpacker, payloadFieldId, stringTable) - continue - } - - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - - int chunkFieldCount = unpacker.unpackMapHeader() - for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId != 4) { - skipChunkField(unpacker, chunkFieldId, stringTable) - continue - } - - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - - int spanFieldCount = unpacker.unpackMapHeader() - for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { - int spanFieldId = unpacker.unpackInt() - if (spanFieldId == 11) { - return readSpanLinks(unpacker, stringTable) - } - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } - Assertions.fail("Could not find span links field in first span") - return [] - } - - private static void skipSpanField(MessageUnpacker unpacker, int fieldId, List stringTable) { - switch (fieldId) { - case 1: - case 2: - case 3: - case 10: - case 13: - case 14: - case 15: - readStreamingString(unpacker, stringTable) - break - case 4: - case 5: - unpacker.unpackValue().asNumberValue().toLong() - break - case 6: - case 7: - unpacker.unpackLong() - break - case 8: - unpacker.unpackBoolean() - break - case 9: - readAttributes(unpacker, stringTable) - break - case 12: - int eventsCount = unpacker.unpackArrayHeader() - for (int j = 0; j < eventsCount; j++) { - skipSpanEvent(unpacker, stringTable) - } - break - case 11: - int linksCount = unpacker.unpackArrayHeader() - for (int j = 0; j < linksCount; j++) { - int linkFieldCount = unpacker.unpackMapHeader() - for (int k = 0; k < linkFieldCount; k++) { - int linkFieldId = unpacker.unpackInt() - switch (linkFieldId) { - case 1: - int traceIdLen = unpacker.unpackBinaryHeader() - byte[] ignored = new byte[traceIdLen] - unpacker.readPayload(ignored) - break - case 2: - case 5: - unpacker.unpackValue().asNumberValue().toLong() - break - case 3: - readAttributes(unpacker, stringTable) - break - case 4: - readStreamingString(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected span link field id while skipping: " + linkFieldId) - } - } - } - break - case 16: - unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected span field id while skipping: " + fieldId) - } - } - - private static List> readSpanLinks( - MessageUnpacker unpacker, - List stringTable) { - int linksCount = unpacker.unpackArrayHeader() - List> links = [] - - for (int i = 0; i < linksCount; i++) { - int linkFieldCount = unpacker.unpackMapHeader() - assertEquals(5, linkFieldCount) - - byte[] traceId = null - Long spanId = null - Map attributes = null - String tracestate = null - Long flags = null - - for (int j = 0; j < linkFieldCount; j++) { - int linkFieldId = unpacker.unpackInt() - switch (linkFieldId) { - case 1: - int traceIdLen = unpacker.unpackBinaryHeader() - traceId = new byte[traceIdLen] - unpacker.readPayload(traceId) - break - case 2: - spanId = unpacker.unpackValue().asNumberValue().toLong() - break - case 3: - attributes = readAttributes(unpacker, stringTable) - break - case 4: - tracestate = readStreamingString(unpacker, stringTable) - break - case 5: - flags = unpacker.unpackValue().asNumberValue().toLong() - break - default: - Assertions.fail("Unexpected span link field id: " + linkFieldId) - } - } - - links.add([ - traceId : traceId, - spanId : spanId, - attributes: attributes, - tracestate: tracestate, - flags : flags - ]) - } - - return links - } - - private static List> readFirstSpanEvents( - MessageUnpacker unpacker, - List stringTable) { - int payloadFieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < payloadFieldCount; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId != 11) { - skipPayloadField(unpacker, payloadFieldId, stringTable) - continue - } - - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - - int chunkFieldCount = unpacker.unpackMapHeader() - for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId != 4) { - skipChunkField(unpacker, chunkFieldId, stringTable) - continue - } - - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - - int spanFieldCount = unpacker.unpackMapHeader() - for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { - int spanFieldId = unpacker.unpackInt() - if (spanFieldId == 12) { - return readSpanEvents(unpacker, stringTable) - } - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } - Assertions.fail("Could not find span events field in first span") - return [] - } - - private static List> readSpanEvents( - MessageUnpacker unpacker, - List stringTable) { - int eventsCount = unpacker.unpackArrayHeader() - List> events = [] - - for (int i = 0; i < eventsCount; i++) { - int eventFieldCount = unpacker.unpackMapHeader() - assertEquals(3, eventFieldCount) - - Long timeUnixNano = null - String name = null - Map attributes = null - - for (int j = 0; j < eventFieldCount; j++) { - int eventFieldId = unpacker.unpackInt() - switch (eventFieldId) { - case 1: - timeUnixNano = unpacker.unpackLong() - break - case 2: - name = readStreamingString(unpacker, stringTable) - break - case 3: - attributes = readEventAttributes(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected span event field id: " + eventFieldId) - } - } - - events.add([ - timeUnixNano: timeUnixNano, - name : name, - attributes : attributes - ]) - } - return events - } - - private static Map readEventAttributes( - MessageUnpacker unpacker, - List stringTable) { - int attrArraySize = unpacker.unpackArrayHeader() - assertEquals(0, attrArraySize % 3) - int attrCount = attrArraySize / 3 - Map attributes = new HashMap<>() - - for (int i = 0; i < attrCount; i++) { - String key = readStreamingString(unpacker, stringTable) - int attrType = unpacker.unpackInt() - Object value - switch (attrType) { - case TraceMapperV1.VALUE_TYPE_STRING: - value = readStreamingString(unpacker, stringTable) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - value = unpacker.unpackBoolean() - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - value = unpacker.unpackDouble() - break - case TraceMapperV1.VALUE_TYPE_INT: - value = unpacker.unpackLong() - break - case TraceMapperV1.VALUE_TYPE_ARRAY: - value = readEventArrayValue(unpacker, stringTable) - break - default: - Assertions.fail("Unknown event attribute value type: " + attrType) - } - attributes.put(key, value) - } - return attributes - } - - private static List readEventArrayValue(MessageUnpacker unpacker, List stringTable) { - int itemArraySize = unpacker.unpackArrayHeader() - assertEquals(0, itemArraySize % 2) - int itemCount = itemArraySize / 2 - List values = [] - for (int i = 0; i < itemCount; i++) { - int itemType = unpacker.unpackInt() - switch (itemType) { - case TraceMapperV1.VALUE_TYPE_STRING: - values.add(readStreamingString(unpacker, stringTable)) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - values.add(unpacker.unpackBoolean()) - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - values.add(unpacker.unpackDouble()) - break - case TraceMapperV1.VALUE_TYPE_INT: - values.add(unpacker.unpackLong()) - break - default: - Assertions.fail("Unknown event array item type: " + itemType) - } - } - return values - } - - private static void skipSpanEvent(MessageUnpacker unpacker, List stringTable) { - int fieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < fieldCount; i++) { - int fieldId = unpacker.unpackInt() - switch (fieldId) { - case 1: - unpacker.unpackLong() - break - case 2: - readStreamingString(unpacker, stringTable) - break - case 3: - readEventAttributes(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected event field id while skipping: " + fieldId) - } - } - } - - private static byte[] serializeMappedPayload( - TraceMapperV1 mapper, - List> traces) { - CapturedBody capturedBody = new CapturedBody(mapper) - MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(2 << 20, capturedBody)) - - for (List trace : traces) { - assertTrue(packer.format(trace, mapper)) - } - packer.flush() - - assertNotNull(capturedBody.payloadBytes) - return capturedBody.payloadBytes - } - - private static byte[] serializePayload(Payload payload) { - ByteArrayChannel channel = new ByteArrayChannel() - payload.writeTo(channel) - return channel.bytes() - } - - private static class CapturedBody implements ByteBufferConsumer { - private final TraceMapperV1 mapper - private byte[] payloadBytes - - private CapturedBody(TraceMapperV1 mapper) { - this.mapper = mapper - } - - @Override - void accept(int messageCount, ByteBuffer buffer) { - Payload payload = mapper.newPayload().withBody(messageCount, buffer) - payloadBytes = serializePayload(payload) - mapper.reset() - } - } - - private static class CountingPojoSpan extends TraceGenerator.PojoSpan { - int processTagsAndBaggageCount = 0 - - CountingPojoSpan( - String serviceName, - String operationName, - CharSequence resourceName, - DDTraceId traceId, - long spanId, - long parentId, - long start, - long duration, - int error, - Map baggage, - Map tags, - CharSequence type, - boolean measured, - int samplingPriority, - int statusCode, - CharSequence origin) { - super( - serviceName, - operationName, - resourceName, - traceId, - spanId, - parentId, - start, - duration, - error, - baggage, - tags, - type, - measured, - samplingPriority, - statusCode, - origin) - } - - @Override - void processTagsAndBaggage(MetadataConsumer consumer) { - processTagsAndBaggageCount++ - super.processTagsAndBaggage(consumer) - } - - @Override - void processTagsAndBaggage(MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - processTagsAndBaggageCount++ - super.processTagsAndBaggage(consumer, injectLinksAsTags, injectBaggageAsTags) - } - } - - private static class ByteArrayChannel implements WritableByteChannel { - private byte[] data = new byte[0] - - @Override - int write(ByteBuffer src) { - int len = src.remaining() - byte[] incoming = new byte[len] - src.get(incoming) - byte[] combined = new byte[data.length + incoming.length] - System.arraycopy(data, 0, combined, 0, data.length) - System.arraycopy(incoming, 0, combined, data.length, incoming.length) - data = combined - return len - } - - byte[] bytes() { - return data - } - - @Override - boolean isOpen() { - return true - } - - @Override - void close() { - } - } - - private static void assertEqualsWithNullAsEmpty(CharSequence expected, CharSequence actual) { - if (expected == null) { - assertEquals("", actual) - } else { - assertEquals(expected.toString(), actual.toString()) - } - } -} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/writer/FileBasedPayloadDispatcherTest.java b/dd-trace-core/src/test/java/datadog/trace/common/writer/FileBasedPayloadDispatcherTest.java index 2040f685fdc..761ae52bd1d 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/writer/FileBasedPayloadDispatcherTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/writer/FileBasedPayloadDispatcherTest.java @@ -313,6 +313,7 @@ private static CoreSpan mockSpan(CharSequence type, Map tags) null, 0, null, + null, null); doAnswer( inv -> { diff --git a/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java b/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java index 618b2ef77ae..91c979bc844 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java @@ -10,6 +10,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.TagMap; import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.core.CoreSpan; @@ -214,6 +215,46 @@ public PojoSpan( int statusCode, CharSequence origin, List spanLinks) { + this( + serviceName, + operationName, + resourceName, + traceId, + spanId, + parentId, + start, + duration, + error, + baggage, + tags, + type, + measured, + samplingPriority, + statusCode, + origin, + spanLinks, + emptyList()); + } + + public PojoSpan( + String serviceName, + String operationName, + CharSequence resourceName, + DDTraceId traceId, + long spanId, + long parentId, + long start, + long duration, + int error, + Map baggage, + Map tags, + CharSequence type, + boolean measured, + int samplingPriority, + int statusCode, + CharSequence origin, + List spanLinks, + List spanEvents) { this.serviceName = UTF8BytesString.create(serviceName); this.operationName = UTF8BytesString.create(operationName); this.resourceName = UTF8BytesString.create(resourceName); @@ -240,7 +281,8 @@ public PojoSpan( origin, 0, ProcessTags.getTagsForSerialization(), - spanLinks); + spanLinks, + spanEvents); } @Override @@ -400,12 +442,6 @@ public void processTagsAndBaggage(MetadataConsumer consumer) { consumer.accept(metadata); } - @Override - public void processTagsAndBaggage( - MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - consumer.accept(metadata); - } - @Override public PojoSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { return this; diff --git a/dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.java b/dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.java new file mode 100644 index 00000000000..8d0ea861dbb --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.java @@ -0,0 +1,1866 @@ +package datadog.trace.common.writer.ddagent; + +import static datadog.trace.common.writer.TraceGenerator.generateRandomTraces; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.msgpack.core.MessageFormat.FIXSTR; +import static org.msgpack.core.MessageFormat.STR16; +import static org.msgpack.core.MessageFormat.STR32; +import static org.msgpack.core.MessageFormat.STR8; + +import datadog.communication.serialization.ByteBufferConsumer; +import datadog.communication.serialization.FlushingBuffer; +import datadog.communication.serialization.msgpack.MsgPackWriter; +import datadog.trace.api.DDSpanId; +import datadog.trace.api.DDTags; +import datadog.trace.api.DDTraceId; +import datadog.trace.api.ProcessTags; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.api.sampling.SamplingMechanism; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink; +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.bootstrap.instrumentation.api.SpanAttributes; +import datadog.trace.bootstrap.instrumentation.api.SpanLink; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.common.writer.Payload; +import datadog.trace.common.writer.TraceGenerator; +import datadog.trace.core.MetadataConsumer; +import datadog.trace.test.util.DDJavaSpecification; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.tabletest.junit.TableTest; + +public class TraceMapperV1PayloadTest extends DDJavaSpecification { + + @TableTest({ + "scenario | bufferSize | traceCount | lowCardinality", + "20k buffer, 0 traces, low cardinality | 20480 | 0 | true ", + "20k buffer, 1 trace, low cardinality | 20480 | 1 | true ", + "30k buffer, 2 traces, low cardinality | 30720 | 2 | true ", + "20k buffer, 0 traces, high cardinality | 20480 | 0 | false ", + "20k buffer, 1 trace, high cardinality | 20480 | 1 | false ", + "30k buffer, 2 traces, high cardinality | 30720 | 2 | false ", + "100k buffer, 10 traces, low cardinality | 102400 | 10 | true ", + "100k buffer, 100 traces, high card. | 102400 | 100 | false " + }) + void testTracesWrittenCorrectly(int bufferSize, int traceCount, boolean lowCardinality) { + // setup + List> traces = generateRandomTraces(traceCount, lowCardinality); + TraceMapperV1 traceMapper = new TraceMapperV1(); + PayloadVerifier verifier = new PayloadVerifier(traces, traceMapper); + MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(bufferSize, verifier)); + + // when + boolean tracesFitInBuffer = true; + for (List trace : traces) { + if (!packer.format(trace, traceMapper)) { + verifier.skipLargeTrace(); + tracesFitInBuffer = false; + traceMapper.reset(); + } + } + packer.flush(); + + // then + if (tracesFitInBuffer) { + verifier.verifyTracesConsumed(); + } + } + + @Test + void testEndpointReturnsV1() { + assertEquals("v1.0", new TraceMapperV1().endpoint()); + } + + @Test + void testSpanKindValueConversion() { + assertEquals(TraceMapperV1.SPAN_KIND_UNSPECIFIED, TraceMapperV1.getSpanKindValue(null)); + assertEquals( + TraceMapperV1.SPAN_KIND_INTERNAL, TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_INTERNAL)); + assertEquals( + TraceMapperV1.SPAN_KIND_SERVER, TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_SERVER)); + assertEquals( + TraceMapperV1.SPAN_KIND_CLIENT, TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_CLIENT)); + assertEquals( + TraceMapperV1.SPAN_KIND_PRODUCER, TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_PRODUCER)); + assertEquals( + TraceMapperV1.SPAN_KIND_CONSUMER, TraceMapperV1.getSpanKindValue(Tags.SPAN_KIND_CONSUMER)); + assertEquals(TraceMapperV1.SPAN_KIND_INTERNAL, TraceMapperV1.getSpanKindValue("unknown")); + } + + @Test + void testPayloadContainsExpectedHeaderAndChunkFields() throws IOException { + // setup + Map tags = new LinkedHashMap<>(); + tags.put(Tags.ENV, "prod"); + tags.put(Tags.VERSION, "1.2.3"); + tags.put(Tags.COMPONENT, "http-client"); + tags.put(Tags.SPAN_KIND, Tags.SPAN_KIND_CLIENT); + tags.put("attr.string", "value"); + tags.put("attr.bool", true); + tags.put("attr.number", 12.5d); + tags.put("_dd.p.dm", "-3"); + + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 1, + new HashMap<>(), + tags, + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + "rum"); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + int payloadFieldCount = unpacker.unpackMapHeader(); + Set payloadFieldsSeen = new HashSet<>(); + int chunkCount = -1; + Map payloadAttributes = null; + + for (int i = 0; i < payloadFieldCount; i++) { + int fieldId = unpacker.unpackInt(); + payloadFieldsSeen.add(fieldId); + switch (fieldId) { + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + case 8: + case 9: + readStreamingString(unpacker, stringTable); + break; + case 10: + payloadAttributes = readAttributes(unpacker, stringTable); + break; + case 11: + chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + verifyChunk(unpacker, Collections.singletonList(span), stringTable); + break; + default: + fail("Unexpected payload field id: " + fieldId); + } + } + + // then + assertEquals(10, payloadFieldCount); + Set expectedFields = new HashSet<>(); + for (int i = 2; i <= 11; i++) { + expectedFields.add(i); + } + assertEquals(expectedFields, payloadFieldsSeen); + assertEquals(1, chunkCount); + assertNotNull(payloadAttributes); + if (ProcessTags.getTagsForSerialization() == null) { + assertEquals(0, payloadAttributes.size()); + } else { + assertEquals(1, payloadAttributes.size()); + assertEquals( + ProcessTags.getTagsForSerialization().toString(), + payloadAttributes.get(DDTags.PROCESS_TAGS)); + } + } + + @TableTest({ + "scenario | decisionMakerTag | expectedSamplingMechanism", + "absent tag yields default | | 0 ", + "simple negative mechanism | -3 | 3 ", + "hash-prefixed mechanism | 934086a686-7 | 7 ", + "malformed tag yields default | invalid | 0 " + }) + void testSamplingMechanismNormalization(String decisionMakerTag, int expectedSamplingMechanism) + throws IOException { + // setup + Map tags = new HashMap<>(); + if (decisionMakerTag != null) { + tags.put("_dd.p.dm", decisionMakerTag); + } + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 321L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + tags, + "custom", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + unpacker.unpackMapHeader(); + int samplingMechanism = -1; + + for (int i = 0; i < 10; i++) { + int payloadFieldId = unpacker.unpackInt(); + if (payloadFieldId == 11) { + int chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + int chunkFieldCount = unpacker.unpackMapHeader(); + for (int j = 0; j < chunkFieldCount; j++) { + int chunkFieldId = unpacker.unpackInt(); + if (chunkFieldId == 7) { + samplingMechanism = unpacker.unpackInt(); + } else { + skipChunkField(unpacker, chunkFieldId, stringTable); + } + } + } else { + skipPayloadField(unpacker, payloadFieldId, stringTable); + } + } + + // then + assertEquals(expectedSamplingMechanism, samplingMechanism); + } + + @Test + void testSpanIdsAreEncodedAsUnsignedValues() throws IOException { + // setup + long spanId = Long.MIN_VALUE + 123L; + long parentId = Long.MIN_VALUE + 456L; + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + spanId, + parentId, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + unpacker.unpackMapHeader(); + Long actualSpanId = null; + Long actualParentId = null; + + for (int i = 0; i < 10; i++) { + int payloadFieldId = unpacker.unpackInt(); + if (payloadFieldId == 11) { + int chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + int chunkFieldCount = unpacker.unpackMapHeader(); + for (int j = 0; j < chunkFieldCount; j++) { + int chunkFieldId = unpacker.unpackInt(); + if (chunkFieldId == 4) { + int spanCount = unpacker.unpackArrayHeader(); + assertEquals(1, spanCount); + int spanFieldCount = unpacker.unpackMapHeader(); + for (int k = 0; k < spanFieldCount; k++) { + int spanFieldId = unpacker.unpackInt(); + switch (spanFieldId) { + case 4: + assertEquals(MessageFormat.UINT64, unpacker.getNextFormat()); + actualSpanId = DDSpanId.from(unpacker.unpackBigInteger().toString()); + break; + case 5: + assertEquals(MessageFormat.UINT64, unpacker.getNextFormat()); + actualParentId = DDSpanId.from(unpacker.unpackBigInteger().toString()); + break; + default: + skipSpanField(unpacker, spanFieldId, stringTable); + } + } + } else { + skipChunkField(unpacker, chunkFieldId, stringTable); + } + } + } else { + skipPayloadField(unpacker, payloadFieldId, stringTable); + } + } + + // then + assertEquals(spanId, actualSpanId); + assertEquals(parentId, actualParentId); + } + + @Test + void testSpanLinksAreEncodedFromStructuredSpanLinks() throws IOException { + // setup + Map firstLinkAttributes = new LinkedHashMap<>(); + firstLinkAttributes.put("link.kind", "follows_from"); + firstLinkAttributes.put("context_headers", "tracecontext"); + List spanLinks = + Arrays.asList( + new TestSpanLink( + DDTraceId.fromHex("11223344556677889900aabbccddeeff"), + DDSpanId.fromHex("000000000000002a"), + (byte) 1, + "dd=s:1", + SpanAttributes.fromMap(firstLinkAttributes)), + new TestSpanLink( + DDTraceId.fromHex("00000000000000000000000000000001"), + DDSpanId.fromHex("0000000000000002"), + (byte) 0, + "", + SpanAttributes.EMPTY)); + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null, + spanLinks); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + List> links = readFirstSpanLinks(unpacker, stringTable); + + // then + assertEquals(2, links.size()); + assertArrayEquals( + traceIdBytes(DDTraceId.fromHex("11223344556677889900aabbccddeeff")), + (byte[]) links.get(0).get("traceId")); + assertEquals(DDSpanId.fromHex("000000000000002a"), links.get(0).get("spanId")); + assertEquals("dd=s:1", links.get(0).get("tracestate")); + assertEquals(1L, links.get(0).get("flags")); + assertEquals(firstLinkAttributes, links.get(0).get("attributes")); + + assertArrayEquals( + traceIdBytes(DDTraceId.fromHex("00000000000000000000000000000001")), + (byte[]) links.get(1).get("traceId")); + assertEquals(DDSpanId.fromHex("0000000000000002"), links.get(1).get("spanId")); + assertEquals("", links.get(1).get("tracestate")); + assertEquals(0L, links.get(1).get("flags")); + assertEquals(Collections.emptyMap(), links.get(1).get("attributes")); + } + + @Test + void testFirstSpanTagsAreProcessedOnce() throws IOException { + // setup + Map tags = Collections.singletonMap(Tags.HTTP_URL, "http://localhost:7777/"); + CountingPojoSpan firstSpan = + new CountingPojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(tags), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null); + + CountingPojoSpan secondSpan = + new CountingPojoSpan( + "service-a", + "operation-b", + "resource-b", + DDTraceId.ONE, + 456L, + 123L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(tags), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + + // when + serializeMappedPayload(mapper, Collections.singletonList(Arrays.asList(firstSpan, secondSpan))); + + // then + assertEquals(1, firstSpan.processTagsAndBaggageCount); + assertEquals(1, secondSpan.processTagsAndBaggageCount); + } + + @Test + void testMissingSpanLinksEncodeEmptyLinks() throws IOException { + // setup + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + List> links = readFirstSpanLinks(unpacker, stringTable); + + // then + assertTrue(links.isEmpty()); + } + + @Test + void testSpanEventsAreEncodedNatively() throws IOException { + // setup: span events flow as structured AgentSpanEvent objects (Metadata.getSpanEvents()) and + // are encoded natively into V1 field 12 — no JSON round-trip. + TraceGenerator.PojoSpan span = + spanWithEvents(Arrays.asList(eventWithAllAttributeTypes(), simpleEvent())); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + List> events = readFirstSpanEvents(unpacker, stringTable); + + // then + assertEncodedEvents(events); + } + + @Test + void testNoSpanEventsEncodesEmptyArray() throws IOException { + // setup + TraceGenerator.PojoSpan span = spanWithEvents(Collections.emptyList()); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + List> events = readFirstSpanEvents(unpacker, stringTable); + + // then + assertTrue(events.isEmpty()); + } + + @Test + void testMetaStructIsEncodedAsBytesAttribute() throws IOException { + // setup + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null); + Map metaStructValue = new LinkedHashMap<>(); + metaStructValue.put("foo", "bar"); + metaStructValue.put("answer", 42L); + span.setMetaStruct("meta_key", metaStructValue); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + Map attributes = readFirstSpanAttributes(unpacker, stringTable); + byte[] metaStructBytes = (byte[]) attributes.get("meta_key"); + MessageUnpacker metaStructUnpacker = MessagePack.newDefaultUnpacker(metaStructBytes); + int metaStructFieldCount = metaStructUnpacker.unpackMapHeader(); + Map decodedMetaStruct = new HashMap<>(); + for (int i = 0; i < metaStructFieldCount; i++) { + String key = metaStructUnpacker.unpackString(); + switch (metaStructUnpacker.getNextFormat().getValueType()) { + case INTEGER: + decodedMetaStruct.put(key, metaStructUnpacker.unpackLong()); + break; + case STRING: + decodedMetaStruct.put(key, metaStructUnpacker.unpackString()); + break; + default: + fail("Unexpected meta_struct value type for key " + key); + } + } + + // then + assertNotNull(metaStructBytes); + assertEquals("bar", decodedMetaStruct.get("foo")); + assertEquals(42L, decodedMetaStruct.get("answer")); + } + + @Test + void testMapValuedSpanTagsAreFlattened() throws IOException { + // setup + Map usrProfile = Collections.singletonMap("age", 30L); + Map usr = new LinkedHashMap<>(); + usr.put("id", "123"); + usr.put("name", "alice"); + usr.put("authenticated", true); + usr.put("profile", usrProfile); + + Map loginMetadata0 = new LinkedHashMap<>(); + loginMetadata0.put("event", "login"); + loginMetadata0.put("attempts", 1L); + Map loginMetadata1 = Collections.singletonMap("blocked", false); + Map loginSuccess = new LinkedHashMap<>(); + loginSuccess.put("metadata0", loginMetadata0); + loginSuccess.put("metadata1", loginMetadata1); + + Map tags = new LinkedHashMap<>(); + tags.put("usr", usr); + tags.put("appsec.events.users.login.success", loginSuccess); + + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + tags, + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 0, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + Map attributes = readFirstSpanAttributes(unpacker, stringTable); + + // then + assertTrue(attributes.containsKey("usr.id")); + assertTrue(attributes.containsKey("usr.name")); + assertTrue(attributes.containsKey("usr.authenticated")); + assertTrue(attributes.containsKey("usr.profile.age")); + assertTrue(attributes.containsKey("appsec.events.users.login.success.metadata0.event")); + assertTrue(attributes.containsKey("appsec.events.users.login.success.metadata0.attempts")); + assertTrue(attributes.containsKey("appsec.events.users.login.success.metadata1.blocked")); + + assertEquals("123", attributes.get("usr.id")); + assertEquals("alice", attributes.get("usr.name")); + assertEquals(true, attributes.get("usr.authenticated")); + assertEquals(30d, ((Number) attributes.get("usr.profile.age")).doubleValue(), 0.000001d); + assertEquals("login", attributes.get("appsec.events.users.login.success.metadata0.event")); + assertEquals( + 1d, + ((Number) attributes.get("appsec.events.users.login.success.metadata0.attempts")) + .doubleValue(), + 0.000001d); + assertEquals(false, attributes.get("appsec.events.users.login.success.metadata1.blocked")); + + assertTrue(!attributes.containsKey("usr")); + assertTrue(!attributes.containsKey("appsec.events.users.login.success")); + } + + @Test + void testPrimitiveSpanTagsAreEncoded() throws IOException { + // setup + Map tags = new LinkedHashMap<>(); + tags.put("tag.bool", true); + tags.put("tag.int", 7); + tags.put("tag.long", 9L); + tags.put("tag.float", 3.5f); + tags.put("tag.double", 4.25d); + + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + tags, + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 0, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + Map attributes = readFirstSpanAttributes(unpacker, stringTable); + + // then + assertEquals(true, attributes.get("tag.bool")); + assertEquals(7d, ((Number) attributes.get("tag.int")).doubleValue(), 0.000001d); + assertEquals(9d, ((Number) attributes.get("tag.long")).doubleValue(), 0.000001d); + assertEquals(3.5d, ((Number) attributes.get("tag.float")).doubleValue(), 0.000001d); + assertEquals(4.25d, ((Number) attributes.get("tag.double")).doubleValue(), 0.000001d); + } + + @Test + void testThreadMetadataIsEncoded() throws IOException { + // setup + TraceGenerator.PojoSpan span = + new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 0, + null); + + TraceMapperV1 mapper = new TraceMapperV1(); + byte[] encoded = serializeMappedPayload(mapper, singleTrace(span)); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(encoded); + List stringTable = newStringTable(); + + // when + Map attributes = readFirstSpanAttributes(unpacker, stringTable); + + // then + assertAttributeValueEquals( + span.getTag(DDTags.THREAD_ID), attributes.get(DDTags.THREAD_ID), DDTags.THREAD_ID); + assertEquals(span.getTag(DDTags.THREAD_NAME).toString(), attributes.get(DDTags.THREAD_NAME)); + } + + // --- Shared event fixtures and assertions --- + + private static AgentSpanEvent eventWithAllAttributeTypes() { + Map attributes = new LinkedHashMap<>(); + attributes.put("str", "v"); + attributes.put("int", 42L); + attributes.put("double", 12.5d); + attributes.put("bool", true); + attributes.put("arr", Arrays.asList("x", 7L, 2.5d, false)); + return new TestSpanEvent(1234567890L, "event.one", attributes); + } + + private static AgentSpanEvent simpleEvent() { + return new TestSpanEvent(1234567891L, "event.two", Collections.emptyMap()); + } + + /** Minimal {@link AgentSpanEvent} for exercising native V1 encoding. */ + private static final class TestSpanEvent implements AgentSpanEvent { + private final long timeNanos; + private final String name; + private final Map attributes; + + private TestSpanEvent(long timeNanos, String name, Map attributes) { + this.timeNanos = timeNanos; + this.name = name; + this.attributes = attributes; + } + + @Override + public long timeNanos() { + return this.timeNanos; + } + + @Override + public String name() { + return this.name; + } + + @Override + public Map attributes() { + return this.attributes; + } + + @Override + public CharSequence toJsonTag() { + // Not exercised by the V1 mapper (native encoding reads the structured accessors above). + throw new UnsupportedOperationException(); + } + } + + private static void assertEncodedEvents(List> events) { + assertEquals(2, events.size()); + + Map first = events.get(0); + assertEquals(1234567890L, first.get("timeUnixNano")); + assertEquals("event.one", first.get("name")); + @SuppressWarnings("unchecked") + Map firstAttributes = (Map) first.get("attributes"); + assertEquals("v", firstAttributes.get("str")); + assertEquals(42L, firstAttributes.get("int")); + assertEquals(12.5d, ((Number) firstAttributes.get("double")).doubleValue(), 0.000001d); + assertEquals(true, firstAttributes.get("bool")); + assertEquals(Arrays.asList("x", 7L, 2.5d, false), firstAttributes.get("arr")); + + Map second = events.get(1); + assertEquals(1234567891L, second.get("timeUnixNano")); + assertEquals("event.two", second.get("name")); + assertEquals(Collections.emptyMap(), second.get("attributes")); + } + + private static TraceGenerator.PojoSpan spanWithEvents(List events) { + return new TraceGenerator.PojoSpan( + "service-a", + "operation-a", + "resource-a", + DDTraceId.ONE, + 123L, + 0L, + 1000L, + 2000L, + 0, + new HashMap<>(), + new HashMap<>(), + "web", + false, + PrioritySampling.SAMPLER_KEEP, + 200, + null, + Collections.emptyList(), + events); + } + + private static List> singleTrace(TraceGenerator.PojoSpan span) { + return Collections.singletonList(Collections.singletonList(span)); + } + + private static List newStringTable() { + List stringTable = new ArrayList<>(); + stringTable.add(""); + return stringTable; + } + + // --- Verification helpers (ported from the original Spock spec) --- + + private static final class PayloadVerifier implements ByteBufferConsumer, WritableByteChannel { + + private final List> expectedTraces; + private final TraceMapperV1 mapper; + private ByteBuffer captured = ByteBuffer.allocate(200 << 10); + private int position = 0; + + private PayloadVerifier( + List> expectedTraces, TraceMapperV1 mapper) { + this.expectedTraces = expectedTraces; + this.mapper = mapper; + } + + void skipLargeTrace() { + ++position; + } + + @Override + public void accept(int messageCount, ByteBuffer buffer) { + if (expectedTraces.isEmpty() && messageCount == 0) { + return; + } + try { + Payload payload = mapper.newPayload().withBody(messageCount, buffer); + payload.writeTo(this); + captured.flip(); + + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(captured); + if (messageCount == 0) { + assertEquals(0, unpacker.unpackMapHeader()); + return; + } + + List stringTable = newStringTable(); + + int payloadFieldCount = unpacker.unpackMapHeader(); + assertEquals(10, payloadFieldCount); + + boolean seenChunks = false; + for (int i = 0; i < payloadFieldCount; i++) { + int fieldId = unpacker.unpackInt(); + if (fieldId == 11) { + int traceCount = unpacker.unpackArrayHeader(); + assertEquals(messageCount, traceCount); + seenChunks = true; + for (int traceIndex = 0; traceIndex < traceCount; traceIndex++) { + List expectedTrace = expectedTraces.get(position++); + verifyChunk(unpacker, expectedTrace, stringTable); + } + } else { + skipPayloadField(unpacker, fieldId, stringTable); + } + } + + assertTrue(seenChunks); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + mapper.reset(); + captured.position(0); + captured.limit(captured.capacity()); + } + } + + @Override + public int write(ByteBuffer src) { + if (captured.remaining() < src.remaining()) { + ByteBuffer newBuffer = ByteBuffer.allocate(captured.capacity() + src.remaining()); + captured.flip(); + newBuffer.put(captured); + captured = newBuffer; + return write(src); + } + captured.put(src); + return src.position(); + } + + void verifyTracesConsumed() { + assertEquals(expectedTraces.size(), position); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + } + + private static void verifyChunk( + MessageUnpacker unpacker, + List expectedTrace, + List stringTable) + throws IOException { + int chunkFieldCount = unpacker.unpackMapHeader(); + assertEquals(6, chunkFieldCount); + + Integer priority = null; + String origin = null; + Map chunkAttributes = null; + byte[] traceId = null; + Integer samplingMechanism = null; + List decodedSpans = null; + + for (int i = 0; i < chunkFieldCount; i++) { + int fieldId = unpacker.unpackInt(); + switch (fieldId) { + case 1: + priority = unpacker.unpackInt(); + break; + case 2: + origin = readStreamingString(unpacker, stringTable); + break; + case 3: + chunkAttributes = readAttributes(unpacker, stringTable); + break; + case 4: + decodedSpans = verifySpans(unpacker, expectedTrace, stringTable); + break; + case 6: + int traceIdLen = unpacker.unpackBinaryHeader(); + traceId = new byte[traceIdLen]; + unpacker.readPayload(traceId); + break; + case 7: + samplingMechanism = unpacker.unpackInt(); + break; + default: + fail("Unexpected chunk field id: " + fieldId); + } + } + + assertNotNull(priority); + assertNotNull(origin); + assertNotNull(chunkAttributes); + assertNotNull(decodedSpans); + assertNotNull(traceId); + assertNotNull(samplingMechanism); + + TraceGenerator.PojoSpan firstSpan = expectedTrace.get(0); + assertEquals(firstSpan.samplingPriority(), priority); + assertEqualsWithNullAsEmpty(firstSpan.getOrigin(), origin); + assertEquals(1, chunkAttributes.size()); + assertEqualsWithNullAsEmpty( + firstSpan.getLocalRootSpan().getServiceName(), + (CharSequence) chunkAttributes.get("service")); + assertArrayEquals(traceIdBytes(firstSpan.getTraceId()), traceId); + assertEquals(expectedSamplingMechanism(firstSpan.getTags()), samplingMechanism); + } + + private static byte[] traceIdBytes(DDTraceId traceId) { + return ByteBuffer.allocate(16) + .putLong(traceId.toHighOrderLong()) + .putLong(traceId.toLong()) + .array(); + } + + private static List verifySpans( + MessageUnpacker unpacker, + List expectedTrace, + List stringTable) + throws IOException { + int spanCount = unpacker.unpackArrayHeader(); + assertEquals(expectedTrace.size(), spanCount); + + for (int i = 0; i < spanCount; i++) { + verifySpan(unpacker, expectedTrace.get(i), stringTable); + } + return expectedTrace; + } + + private static void verifySpan( + MessageUnpacker unpacker, TraceGenerator.PojoSpan expectedSpan, List stringTable) + throws IOException { + int spanFieldCount = unpacker.unpackMapHeader(); + assertEquals(16, spanFieldCount); + + String service = null; + String name = null; + String resource = null; + Long spanId = null; + Long parentId = null; + Long start = null; + Long duration = null; + Boolean error = null; + Map attributes = null; + String type = null; + int linksCount = -1; + int eventsCount = -1; + String env = null; + String version = null; + String component = null; + Integer spanKind = null; + + for (int i = 0; i < spanFieldCount; i++) { + int fieldId = unpacker.unpackInt(); + switch (fieldId) { + case 1: + service = readStreamingString(unpacker, stringTable); + break; + case 2: + name = readStreamingString(unpacker, stringTable); + break; + case 3: + resource = readStreamingString(unpacker, stringTable); + break; + case 4: + spanId = unpackUnsignedLong(unpacker); + break; + case 5: + parentId = unpackUnsignedLong(unpacker); + break; + case 6: + start = unpacker.unpackLong(); + break; + case 7: + duration = unpacker.unpackLong(); + break; + case 8: + error = unpacker.unpackBoolean(); + break; + case 9: + attributes = readAttributes(unpacker, stringTable); + break; + case 10: + type = readStreamingString(unpacker, stringTable); + break; + case 11: + linksCount = unpacker.unpackArrayHeader(); + break; + case 12: + eventsCount = unpacker.unpackArrayHeader(); + break; + case 13: + env = readStreamingString(unpacker, stringTable); + break; + case 14: + version = readStreamingString(unpacker, stringTable); + break; + case 15: + component = readStreamingString(unpacker, stringTable); + break; + case 16: + spanKind = unpacker.unpackInt(); + break; + default: + fail("Unexpected span field id: " + fieldId); + } + } + + assertEqualsWithNullAsEmpty(expectedSpan.getServiceName(), service); + assertEqualsWithNullAsEmpty(expectedSpan.getOperationName(), name); + assertEqualsWithNullAsEmpty(expectedSpan.getResourceName(), resource); + assertEquals(expectedSpan.getSpanId(), spanId); + assertEquals(expectedSpan.getParentId(), parentId); + assertEquals(expectedSpan.getStartTime(), start); + assertEquals(expectedSpan.getDurationNano(), duration); + assertEquals(expectedSpan.getError() != 0, error); + assertEqualsWithNullAsEmpty(expectedSpan.getType(), type); + assertEquals(0, linksCount); + assertEquals(0, eventsCount); + assertEqualsWithNullAsEmpty((CharSequence) expectedSpan.getTag(Tags.ENV), env); + assertEqualsWithNullAsEmpty((CharSequence) expectedSpan.getTag(Tags.VERSION), version); + assertEqualsWithNullAsEmpty((CharSequence) expectedSpan.getTag(Tags.COMPONENT), component); + assertEquals( + TraceMapperV1.getSpanKindValue((CharSequence) expectedSpan.getTag(Tags.SPAN_KIND)), + spanKind); + + assertNotNull(attributes); + int expectedHttpStatusCode = expectedSpan.getHttpStatusCode(); + boolean shouldContainHttpStatus = + expectedHttpStatusCode != 0 && !expectedSpan.getTags().containsKey("http.status_code"); + Map expectedAttributes = new HashMap<>(); + for (Map.Entry entry : expectedSpan.getBaggage().entrySet()) { + expectedAttributes.put(entry.getKey(), entry.getValue()); + } + expectedAttributes.put(DDTags.THREAD_ID, expectedSpan.getTag(DDTags.THREAD_ID)); + expectedAttributes.put(DDTags.THREAD_NAME, expectedSpan.getTag(DDTags.THREAD_NAME)); + for (Map.Entry entry : expectedSpan.getTags().entrySet()) { + if (DDTags.SPAN_EVENTS.equals(entry.getKey())) { + continue; + } + addFlattenedExpectedAttribute(expectedAttributes, entry.getKey(), entry.getValue()); + } + if (shouldContainHttpStatus) { + expectedAttributes.put("http.status_code", Integer.toString(expectedHttpStatusCode)); + } + if (expectedSpan.isTopLevel()) { + expectedAttributes.put(InstrumentationTags.DD_TOP_LEVEL.toString(), 1d); + } + + assertEquals(expectedAttributes.size(), attributes.size()); + for (Map.Entry entry : expectedAttributes.entrySet()) { + String key = entry.getKey(); + Object expectedValue = entry.getValue(); + assertTrue(attributes.containsKey(key), "Missing attribute key: " + key); + assertAttributeValueEquals(expectedValue, attributes.get(key), key); + } + } + + private static Map readAttributes( + MessageUnpacker unpacker, List stringTable) throws IOException { + int attrArraySize = unpacker.unpackArrayHeader(); + assertEquals(0, attrArraySize % 3); + int attrCount = attrArraySize / 3; + + Map attributes = new HashMap<>(); + for (int i = 0; i < attrCount; i++) { + String key = readStreamingString(unpacker, stringTable); + int attrType = unpacker.unpackInt(); + Object value; + switch (attrType) { + case TraceMapperV1.VALUE_TYPE_STRING: + value = readStreamingString(unpacker, stringTable); + break; + case TraceMapperV1.VALUE_TYPE_BOOLEAN: + value = unpacker.unpackBoolean(); + break; + case TraceMapperV1.VALUE_TYPE_FLOAT: + value = unpacker.unpackDouble(); + break; + case TraceMapperV1.VALUE_TYPE_BYTES: + int len = unpacker.unpackBinaryHeader(); + byte[] data = new byte[len]; + unpacker.readPayload(data); + value = data; + break; + default: + fail("Unknown attribute value type: " + attrType); + value = null; + } + attributes.put(key, value); + } + return attributes; + } + + private static void assertAttributeValueEquals(Object expected, Object actual, String key) { + if (expected instanceof Number) { + assertTrue(actual instanceof Number, "Attribute " + key + " should be numeric"); + double expectedValue = ((Number) expected).doubleValue(); + double actualValue = ((Number) actual).doubleValue(); + double delta = Math.max(0.000001d, Math.abs(expectedValue) * 0.000000000001d); + assertEquals(expectedValue, actualValue, delta, "Numeric mismatch for " + key); + } else if (expected instanceof Boolean) { + assertEquals(expected, actual, "Boolean mismatch for " + key); + } else { + assertEquals(String.valueOf(expected), String.valueOf(actual), "String mismatch for " + key); + } + } + + private static long unpackUnsignedLong(MessageUnpacker unpacker) throws IOException { + MessageFormat format = unpacker.getNextFormat(); + if (format == MessageFormat.UINT64) { + return DDSpanId.from(unpacker.unpackBigInteger().toString()); + } + return unpacker.unpackLong(); + } + + private static void addFlattenedExpectedAttribute( + Map expectedAttributes, String key, Object value) { + if (!(value instanceof Map)) { + expectedAttributes.put(key, value); + return; + } + for (Map.Entry entry : ((Map) value).entrySet()) { + addFlattenedExpectedAttribute( + expectedAttributes, key + "." + String.valueOf(entry.getKey()), entry.getValue()); + } + } + + private static int expectedSamplingMechanism(Map tags) { + Object decisionMakerRaw = tags.get("_dd.p.dm"); + if (decisionMakerRaw == null) { + return SamplingMechanism.DEFAULT; + } + + String decisionMaker = String.valueOf(decisionMakerRaw); + try { + int value = Integer.parseInt(decisionMaker); + return value < 0 ? -value : value; + } catch (NumberFormatException ignored) { + int separator = decisionMaker.lastIndexOf('-'); + if (separator >= 0 && separator + 1 < decisionMaker.length()) { + try { + int value = Integer.parseInt(decisionMaker.substring(separator + 1)); + return value < 0 ? -value : value; + } catch (NumberFormatException ignoredAgain) { + // Fallback to default. + } + } + return SamplingMechanism.DEFAULT; + } + } + + private static String readStreamingString(MessageUnpacker unpacker, List stringTable) + throws IOException { + MessageFormat format = unpacker.getNextFormat(); + if (format == FIXSTR || format == STR8 || format == STR16 || format == STR32) { + String value = unpacker.unpackString(); + if (!stringTable.contains(value)) { + stringTable.add(value); + } + return value; + } + + int index = unpacker.unpackInt(); + assertTrue(index >= 0 && index < stringTable.size(), "Invalid string-table index: " + index); + return stringTable.get(index); + } + + private static void skipPayloadField( + MessageUnpacker unpacker, int fieldId, List stringTable) throws IOException { + switch (fieldId) { + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + case 8: + case 9: + readStreamingString(unpacker, stringTable); + break; + case 10: + readAttributes(unpacker, stringTable); + break; + default: + fail("Unexpected payload field id while skipping: " + fieldId); + } + } + + private static void skipChunkField( + MessageUnpacker unpacker, int fieldId, List stringTable) throws IOException { + switch (fieldId) { + case 1: + unpacker.unpackInt(); + break; + case 2: + readStreamingString(unpacker, stringTable); + break; + case 3: + readAttributes(unpacker, stringTable); + break; + case 4: + int spanCount = unpacker.unpackArrayHeader(); + for (int i = 0; i < spanCount; i++) { + skipSpan(unpacker, stringTable); + } + break; + case 5: + unpacker.unpackBoolean(); + break; + case 6: + int len = unpacker.unpackBinaryHeader(); + byte[] ignored = new byte[len]; + unpacker.readPayload(ignored); + break; + case 7: + unpacker.unpackInt(); + break; + default: + fail("Unexpected chunk field id while skipping: " + fieldId); + } + } + + private static void skipSpan(MessageUnpacker unpacker, List stringTable) + throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < fieldCount; i++) { + int fieldId = unpacker.unpackInt(); + switch (fieldId) { + case 1: + case 2: + case 3: + case 10: + case 13: + case 14: + case 15: + readStreamingString(unpacker, stringTable); + break; + case 4: + case 5: + unpacker.unpackValue().asNumberValue().toLong(); + break; + case 6: + case 7: + unpacker.unpackLong(); + break; + case 8: + unpacker.unpackBoolean(); + break; + case 9: + int attrArraySize = unpacker.unpackArrayHeader(); + int attrCount = attrArraySize / 3; + for (int j = 0; j < attrCount; j++) { + readStreamingString(unpacker, stringTable); + int type = unpacker.unpackInt(); + switch (type) { + case TraceMapperV1.VALUE_TYPE_STRING: + readStreamingString(unpacker, stringTable); + break; + case TraceMapperV1.VALUE_TYPE_BOOLEAN: + unpacker.unpackBoolean(); + break; + case TraceMapperV1.VALUE_TYPE_FLOAT: + unpacker.unpackDouble(); + break; + case TraceMapperV1.VALUE_TYPE_BYTES: + int len = unpacker.unpackBinaryHeader(); + byte[] ignored = new byte[len]; + unpacker.readPayload(ignored); + break; + default: + fail("Unexpected attribute type while skipping: " + type); + } + } + break; + case 11: + case 12: + unpacker.unpackArrayHeader(); + break; + case 16: + unpacker.unpackInt(); + break; + default: + fail("Unexpected span field id while skipping: " + fieldId); + } + } + } + + private static Map readFirstSpanAttributes( + MessageUnpacker unpacker, List stringTable) throws IOException { + int payloadFieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < payloadFieldCount; i++) { + int payloadFieldId = unpacker.unpackInt(); + if (payloadFieldId != 11) { + skipPayloadField(unpacker, payloadFieldId, stringTable); + continue; + } + + int chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + + int chunkFieldCount = unpacker.unpackMapHeader(); + for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { + int chunkFieldId = unpacker.unpackInt(); + if (chunkFieldId != 4) { + skipChunkField(unpacker, chunkFieldId, stringTable); + continue; + } + + int spanCount = unpacker.unpackArrayHeader(); + assertEquals(1, spanCount); + + int spanFieldCount = unpacker.unpackMapHeader(); + for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { + int spanFieldId = unpacker.unpackInt(); + if (spanFieldId == 9) { + return readAttributes(unpacker, stringTable); + } + skipSpanField(unpacker, spanFieldId, stringTable); + } + } + } + fail("Could not find span attributes field in first span"); + return new HashMap<>(); + } + + private static List> readFirstSpanLinks( + MessageUnpacker unpacker, List stringTable) throws IOException { + int payloadFieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < payloadFieldCount; i++) { + int payloadFieldId = unpacker.unpackInt(); + if (payloadFieldId != 11) { + skipPayloadField(unpacker, payloadFieldId, stringTable); + continue; + } + + int chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + + int chunkFieldCount = unpacker.unpackMapHeader(); + for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { + int chunkFieldId = unpacker.unpackInt(); + if (chunkFieldId != 4) { + skipChunkField(unpacker, chunkFieldId, stringTable); + continue; + } + + int spanCount = unpacker.unpackArrayHeader(); + assertEquals(1, spanCount); + + int spanFieldCount = unpacker.unpackMapHeader(); + for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { + int spanFieldId = unpacker.unpackInt(); + if (spanFieldId == 11) { + return readSpanLinks(unpacker, stringTable); + } + skipSpanField(unpacker, spanFieldId, stringTable); + } + } + } + fail("Could not find span links field in first span"); + return new ArrayList<>(); + } + + private static void skipSpanField(MessageUnpacker unpacker, int fieldId, List stringTable) + throws IOException { + switch (fieldId) { + case 1: + case 2: + case 3: + case 10: + case 13: + case 14: + case 15: + readStreamingString(unpacker, stringTable); + break; + case 4: + case 5: + unpacker.unpackValue().asNumberValue().toLong(); + break; + case 6: + case 7: + unpacker.unpackLong(); + break; + case 8: + unpacker.unpackBoolean(); + break; + case 9: + readAttributes(unpacker, stringTable); + break; + case 12: + int eventsCount = unpacker.unpackArrayHeader(); + for (int j = 0; j < eventsCount; j++) { + skipSpanEvent(unpacker, stringTable); + } + break; + case 11: + int linksCount = unpacker.unpackArrayHeader(); + for (int j = 0; j < linksCount; j++) { + int linkFieldCount = unpacker.unpackMapHeader(); + for (int k = 0; k < linkFieldCount; k++) { + int linkFieldId = unpacker.unpackInt(); + switch (linkFieldId) { + case 1: + int traceIdLen = unpacker.unpackBinaryHeader(); + byte[] ignored = new byte[traceIdLen]; + unpacker.readPayload(ignored); + break; + case 2: + case 5: + unpacker.unpackValue().asNumberValue().toLong(); + break; + case 3: + readAttributes(unpacker, stringTable); + break; + case 4: + readStreamingString(unpacker, stringTable); + break; + default: + fail("Unexpected span link field id while skipping: " + linkFieldId); + } + } + } + break; + case 16: + unpacker.unpackInt(); + break; + default: + fail("Unexpected span field id while skipping: " + fieldId); + } + } + + private static List> readSpanLinks( + MessageUnpacker unpacker, List stringTable) throws IOException { + int linksCount = unpacker.unpackArrayHeader(); + List> links = new ArrayList<>(); + + for (int i = 0; i < linksCount; i++) { + int linkFieldCount = unpacker.unpackMapHeader(); + assertEquals(5, linkFieldCount); + + byte[] traceId = null; + Long spanId = null; + Map attributes = null; + String tracestate = null; + Long flags = null; + + for (int j = 0; j < linkFieldCount; j++) { + int linkFieldId = unpacker.unpackInt(); + switch (linkFieldId) { + case 1: + int traceIdLen = unpacker.unpackBinaryHeader(); + traceId = new byte[traceIdLen]; + unpacker.readPayload(traceId); + break; + case 2: + spanId = unpacker.unpackValue().asNumberValue().toLong(); + break; + case 3: + attributes = readAttributes(unpacker, stringTable); + break; + case 4: + tracestate = readStreamingString(unpacker, stringTable); + break; + case 5: + flags = unpacker.unpackValue().asNumberValue().toLong(); + break; + default: + fail("Unexpected span link field id: " + linkFieldId); + } + } + + Map link = new HashMap<>(); + link.put("traceId", traceId); + link.put("spanId", spanId); + link.put("attributes", attributes); + link.put("tracestate", tracestate); + link.put("flags", flags); + links.add(link); + } + + return links; + } + + private static List> readFirstSpanEvents( + MessageUnpacker unpacker, List stringTable) throws IOException { + int payloadFieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < payloadFieldCount; i++) { + int payloadFieldId = unpacker.unpackInt(); + if (payloadFieldId != 11) { + skipPayloadField(unpacker, payloadFieldId, stringTable); + continue; + } + + int chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + + int chunkFieldCount = unpacker.unpackMapHeader(); + for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { + int chunkFieldId = unpacker.unpackInt(); + if (chunkFieldId != 4) { + skipChunkField(unpacker, chunkFieldId, stringTable); + continue; + } + + int spanCount = unpacker.unpackArrayHeader(); + assertEquals(1, spanCount); + + int spanFieldCount = unpacker.unpackMapHeader(); + for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { + int spanFieldId = unpacker.unpackInt(); + if (spanFieldId == 12) { + return readSpanEvents(unpacker, stringTable); + } + skipSpanField(unpacker, spanFieldId, stringTable); + } + } + } + fail("Could not find span events field in first span"); + return new ArrayList<>(); + } + + private static List> readSpanEvents( + MessageUnpacker unpacker, List stringTable) throws IOException { + int eventsCount = unpacker.unpackArrayHeader(); + List> events = new ArrayList<>(); + + for (int i = 0; i < eventsCount; i++) { + int eventFieldCount = unpacker.unpackMapHeader(); + assertEquals(3, eventFieldCount); + + Long timeUnixNano = null; + String name = null; + Map attributes = null; + + for (int j = 0; j < eventFieldCount; j++) { + int eventFieldId = unpacker.unpackInt(); + switch (eventFieldId) { + case 1: + timeUnixNano = unpacker.unpackLong(); + break; + case 2: + name = readStreamingString(unpacker, stringTable); + break; + case 3: + attributes = readEventAttributes(unpacker, stringTable); + break; + default: + fail("Unexpected span event field id: " + eventFieldId); + } + } + + Map event = new HashMap<>(); + event.put("timeUnixNano", timeUnixNano); + event.put("name", name); + event.put("attributes", attributes); + events.add(event); + } + return events; + } + + private static Map readEventAttributes( + MessageUnpacker unpacker, List stringTable) throws IOException { + int attrArraySize = unpacker.unpackArrayHeader(); + assertEquals(0, attrArraySize % 3); + int attrCount = attrArraySize / 3; + Map attributes = new HashMap<>(); + + for (int i = 0; i < attrCount; i++) { + String key = readStreamingString(unpacker, stringTable); + int attrType = unpacker.unpackInt(); + Object value; + switch (attrType) { + case TraceMapperV1.VALUE_TYPE_STRING: + value = readStreamingString(unpacker, stringTable); + break; + case TraceMapperV1.VALUE_TYPE_BOOLEAN: + value = unpacker.unpackBoolean(); + break; + case TraceMapperV1.VALUE_TYPE_FLOAT: + value = unpacker.unpackDouble(); + break; + case TraceMapperV1.VALUE_TYPE_INT: + value = unpacker.unpackLong(); + break; + case TraceMapperV1.VALUE_TYPE_ARRAY: + value = readEventArrayValue(unpacker, stringTable); + break; + default: + fail("Unknown event attribute value type: " + attrType); + value = null; + } + attributes.put(key, value); + } + return attributes; + } + + private static List readEventArrayValue( + MessageUnpacker unpacker, List stringTable) throws IOException { + int itemArraySize = unpacker.unpackArrayHeader(); + assertEquals(0, itemArraySize % 2); + int itemCount = itemArraySize / 2; + List values = new ArrayList<>(); + for (int i = 0; i < itemCount; i++) { + int itemType = unpacker.unpackInt(); + switch (itemType) { + case TraceMapperV1.VALUE_TYPE_STRING: + values.add(readStreamingString(unpacker, stringTable)); + break; + case TraceMapperV1.VALUE_TYPE_BOOLEAN: + values.add(unpacker.unpackBoolean()); + break; + case TraceMapperV1.VALUE_TYPE_FLOAT: + values.add(unpacker.unpackDouble()); + break; + case TraceMapperV1.VALUE_TYPE_INT: + values.add(unpacker.unpackLong()); + break; + default: + fail("Unknown event array item type: " + itemType); + } + } + return values; + } + + private static void skipSpanEvent(MessageUnpacker unpacker, List stringTable) + throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < fieldCount; i++) { + int fieldId = unpacker.unpackInt(); + switch (fieldId) { + case 1: + unpacker.unpackLong(); + break; + case 2: + readStreamingString(unpacker, stringTable); + break; + case 3: + readEventAttributes(unpacker, stringTable); + break; + default: + fail("Unexpected event field id while skipping: " + fieldId); + } + } + } + + private static byte[] serializeMappedPayload( + TraceMapperV1 mapper, List> traces) { + CapturedBody capturedBody = new CapturedBody(mapper); + MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(2 << 20, capturedBody)); + + for (List trace : traces) { + assertTrue(packer.format(trace, mapper)); + } + packer.flush(); + + assertNotNull(capturedBody.payloadBytes); + return capturedBody.payloadBytes; + } + + private static byte[] serializePayload(Payload payload) { + ByteArrayChannel channel = new ByteArrayChannel(); + try { + payload.writeTo(channel); + } catch (IOException e) { + fail(e.getMessage()); + } + return channel.bytes(); + } + + private static class CapturedBody implements ByteBufferConsumer { + private final TraceMapperV1 mapper; + private byte[] payloadBytes; + + private CapturedBody(TraceMapperV1 mapper) { + this.mapper = mapper; + } + + @Override + public void accept(int messageCount, ByteBuffer buffer) { + Payload payload = mapper.newPayload().withBody(messageCount, buffer); + payloadBytes = serializePayload(payload); + mapper.reset(); + } + } + + private static class CountingPojoSpan extends TraceGenerator.PojoSpan { + int processTagsAndBaggageCount = 0; + + CountingPojoSpan( + String serviceName, + String operationName, + CharSequence resourceName, + DDTraceId traceId, + long spanId, + long parentId, + long start, + long duration, + int error, + Map baggage, + Map tags, + CharSequence type, + boolean measured, + int samplingPriority, + int statusCode, + CharSequence origin) { + super( + serviceName, + operationName, + resourceName, + traceId, + spanId, + parentId, + start, + duration, + error, + baggage, + tags, + type, + measured, + samplingPriority, + statusCode, + origin); + } + + // TraceMapperV1 calls processTagsAndBaggageWithStructuredLinks, whose default delegates here. + @Override + public void processTagsAndBaggage(MetadataConsumer consumer) { + processTagsAndBaggageCount++; + super.processTagsAndBaggage(consumer); + } + } + + /** Exposes {@link SpanLink}'s protected constructor for tests building links by raw id/flags. */ + private static final class TestSpanLink extends SpanLink { + TestSpanLink( + DDTraceId traceId, + long spanId, + byte traceFlags, + String traceState, + SpanAttributes attributes) { + super(traceId, spanId, traceFlags, traceState, attributes); + } + } + + private static class ByteArrayChannel implements WritableByteChannel { + private byte[] data = new byte[0]; + + @Override + public int write(ByteBuffer src) { + int len = src.remaining(); + byte[] incoming = new byte[len]; + src.get(incoming); + byte[] combined = new byte[data.length + incoming.length]; + System.arraycopy(data, 0, combined, 0, data.length); + System.arraycopy(incoming, 0, combined, data.length, incoming.length); + data = combined; + return len; + } + + byte[] bytes() { + return data; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + } + + private static void assertEqualsWithNullAsEmpty(CharSequence expected, CharSequence actual) { + if (expected == null) { + assertEquals("", actual); + } else { + assertEquals(expected.toString(), actual.toString()); + } + } +} diff --git a/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy b/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy index e668d0112a6..1aed4e10a4b 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy @@ -160,7 +160,7 @@ class TraceGenerator { this.measured = measured this.metadata = new Metadata(currentThread().getId(), UTF8BytesString.create(currentThread().getName()), fromMap(tags), baggage, UNSET, measured, topLevel, null, null, 0, - tagsForSerialization, emptyList()) + tagsForSerialization, emptyList(), emptyList()) } @Override @@ -319,11 +319,6 @@ class TraceGenerator { consumer.accept(metadata) } - @Override - void processTagsAndBaggage(MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - consumer.accept(metadata) - } - @Override PojoSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { return this diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpan.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpan.java index 99c90b53b30..0b256907343 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpan.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpan.java @@ -205,6 +205,14 @@ default boolean isValid() { void addLink(AgentSpanLink link); + /** + * Records a structured span event. The default implementation is a no-op; spans backed by real + * span data (e.g. {@code DDSpan}) override it to retain the event for serialization. + * + * @param event The span event to record. + */ + default void addSpanEvent(AgentSpanEvent event) {} + AgentSpan setMetaStruct(final String field, final Object value); boolean isOutbound(); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpanEvent.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpanEvent.java new file mode 100644 index 00000000000..a9640aed08a --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpanEvent.java @@ -0,0 +1,51 @@ +package datadog.trace.bootstrap.instrumentation.api; + +import java.util.Map; + +/** + * Describes an event that occurred during a span's lifetime (the OpenTelemetry span-event concept). + * + *

Span events are kept as structured data until serialization so the different trace payloads + * can encode them in their own representation without a JSON round-trip: + * + *

    + *
  • the V1 payload encodes them natively (span field {@code 12}) from {@link #timeNanos()}, + * {@link #name()} and {@link #attributes()}; + *
  • the legacy v0.x payloads carry them as the JSON string {@code events} tag, assembled from + * each event's {@link #toJsonTag()}. + *
+ * + * @see AgentSpanLink for the analogous structured representation of span links. + */ +public interface AgentSpanEvent { + /** + * Gets the event timestamp. + * + * @return The event timestamp, in nanoseconds since the Unix epoch. + */ + long timeNanos(); + + /** + * Gets the event name. + * + * @return The event name. + */ + String name(); + + /** + * Gets the event attributes as typed values. Values are {@link String}, {@link Boolean}, {@link + * Long}, {@link Double}, or a {@link java.util.List} of those, mirroring the OpenTelemetry + * attribute types. + * + * @return The event attributes. + */ + Map attributes(); + + /** + * Serializes the event as a single JSON object, matching the legacy v0.x {@code events} tag + * representation (for example {@code {"time_unix_nano":..,"name":"..","attributes":{..}}}). + * + * @return The event as a JSON object. + */ + CharSequence toJsonTag(); +}