Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static datadog.trace.api.DDSpanTypes.RPC;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ROUTE;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG;
Expand Down Expand Up @@ -62,6 +63,15 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private static final SpanKindFilter INTERNAL_KIND =
SpanKindFilter.builder().includeInternal().build();

// gRPC status-code source tags, probed in priority order on the OTLP export path
private static final String[] GRPC_STATUS_CODE_KEYS = {
InstrumentationTags.GRPC_STATUS_CODE, // "rpc.grpc.status_code"
"grpc.code",
"rpc.grpc.status.code",
"grpc.status.code",
"rpc.response.status_code",
};

private final Set<String> ignoredResources;
private final Thread thread;
private final MessagePassingQueue<InboxItem> inbox;
Expand Down Expand Up @@ -346,12 +356,21 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
Object httpMethodObj = span.unsafeGetTag(HTTP_METHOD);
httpMethod = httpMethodObj != null ? httpMethodObj.toString() : null;
Object httpEndpointObj = span.unsafeGetTag(HTTP_ENDPOINT);
// OTLP path falls back to http.route (mirrors libdatadog). The native v0.6 path keeps its
// http.endpoint-only lookup so this doesn't change its aggregation key / wire output.
if (otlpStatsExportEnabled && httpEndpointObj == null) {
httpEndpointObj = span.unsafeGetTag(HTTP_ROUTE);
}
httpEndpoint = httpEndpointObj != null ? httpEndpointObj.toString() : null;
}

CharSequence spanType = span.getType();
String grpcStatusCode = null;
if (spanType != null && RPC.contentEquals(spanType)) {
if (otlpStatsExportEnabled) {
// OTLP path: probe every known gRPC status-code convention, no span-type gate, so a span
// typed "grpc" (or carrying an OTel-style key) still surfaces rpc.response.status_code.
grpcStatusCode = firstTag(span, GRPC_STATUS_CODE_KEYS);
} else if (spanType != null && RPC.contentEquals(spanType)) {
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
}
Expand Down Expand Up @@ -395,6 +414,17 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
return error;
}

/** Returns the first non-null span tag among {@code keys}, in order, or {@code null} if none. */
private static String firstTag(CoreSpan<?> span, String[] keys) {
for (String key : keys) {
Object value = span.unsafeGetTag(key);
if (value != null) {
return value.toString();
}
}
return null;
}

/**
* Picks the peer-tag schema for a span. For internal-kind spans we always use the static {@link
* PeerTagSchema#INTERNAL} singleton (one entry for {@code base.service}); for {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class OtlpStatsMetricWriter implements MetricWriter {
private static final int DP_TIME_FIELD = 3;
private static final int DP_ATTRIBUTES_FIELD = 9;

private static final String SERVICE_NAME = "service.name";
private static final String SPAN_NAME = "span.name";
private static final String SPAN_KIND = "span.kind";
private static final String HTTP_REQUEST_METHOD = "http.request.method";
Expand All @@ -71,6 +72,14 @@ public final class OtlpStatsMetricWriter implements MetricWriter {
@Nullable private final OtlpSender sender;
private final boolean otelSemanticsMode;

/**
* The configured default service, reported once on the resource. A data point only carries its
* own {@code service.name} attribute when its span's service differs from this; for the default
* service the point inherits the resource value (most-specific-wins at the consumer). {@code
* null} disables the per-point comparison (test constructors).
*/
@Nullable private final String defaultService;

/**
* Resource attribute blob prepended to every payload. In default mode it carries the {@code
* datadog.runtime_id} and process-tag resource attributes; in OTel-semantics mode it is the plain
Expand All @@ -89,18 +98,24 @@ public final class OtlpStatsMetricWriter implements MetricWriter {
private int metricBytes;

public OtlpStatsMetricWriter(Config config) {
this(createSender(config), config.isTraceOtelSemanticsEnabled());
this(createSender(config), config.isTraceOtelSemanticsEnabled(), config.getServiceName());
}

// visible for testing: lets tests inject a capturing sender to decode the emitted protobuf
OtlpStatsMetricWriter(@Nullable OtlpSender sender) {
this(sender, false);
this(sender, false, null);
}

// visible for testing: lets tests inject a capturing sender and control the semantics mode
OtlpStatsMetricWriter(@Nullable OtlpSender sender, boolean otelSemanticsMode) {
this(sender, otelSemanticsMode, null);
}

OtlpStatsMetricWriter(
@Nullable OtlpSender sender, boolean otelSemanticsMode, @Nullable String defaultService) {
this.sender = sender;
this.otelSemanticsMode = otelSemanticsMode;
this.defaultService = defaultService;
this.resourceMessage =
otelSemanticsMode
? OtlpResourceProto.RESOURCE_MESSAGE
Expand Down Expand Up @@ -176,6 +191,11 @@ private void writeDataPointAttributes(AggregateEntry entry, boolean error) {
// OTel semconv attrs are emitted in both modes
writeStringAttribute(SPAN_NAME, entry.getResource());
writeStringAttribute(SPAN_KIND, entry.getSpanKind());
// service.name on the point only when the span's service differs from the resource's default;
UTF8BytesString service = entry.getService();
if (service != null && service.length() > 0 && (!service.toString().equals(defaultService))) {
writeStringAttribute(SERVICE_NAME, service);
}
if (entry.getHttpMethod() != null) {
writeStringAttribute(HTTP_REQUEST_METHOD, entry.getHttpMethod());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,29 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.WireFormat;
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.metrics.api.Histograms;
import datadog.metrics.impl.DDSketchHistograms;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.SpanKindFilter;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.otlp.common.OtlpPayload;
import datadog.trace.core.otlp.common.OtlpSender;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -437,6 +448,126 @@ void httpAndGrpcAttributesAppearOnlyWhenSet() throws IOException {
assertFalse(bareAttrs.containsKey("rpc.response.status_code"));
}

@Test
void serviceNameEmittedOnlyForNonDefaultService() throws IOException {
CapturingSender sender = new CapturingSender();
// The configured default service ("web") is reported on the resource; only a span whose service
// differs from it repeats service.name on its own data point.
OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, false, "web");

long start = SECONDS.toNanos(1_700_000_000L);
writer.startBucket(2, start, SECONDS.toNanos(10));
writer.add(serviceEntry("web.request", "web")); // default service
writer.add(serviceEntry("db.query", "postgres")); // custom service
writer.finishBucket();

DecodedMetric metric = decode(sender.lastPayload);
assertEquals(2, metric.dataPoints.size());

Map<String, Object> defaultAttrs = null;
Map<String, Object> customAttrs = null;
for (DataPoint dp : metric.dataPoints) {
if ("db.query".equals(dp.attributes.get("datadog.operation.name"))) {
customAttrs = dp.attributes;
} else {
defaultAttrs = dp.attributes;
}
}
assertNotNull(customAttrs, "custom-service data point present");
assertNotNull(defaultAttrs, "default-service data point present");
assertEquals(
"postgres",
customAttrs.get("service.name"),
"non-default service is carried on its own data point");
assertFalse(
defaultAttrs.containsKey("service.name"),
"default service must not be repeated on its data point");
}

/** An ok-only entry on the given service and operation name, recording a single 1s hit. */
private static AggregateEntry serviceEntry(String operationName, String service) {
AggregateEntry e =
AggregateEntryTestUtils.of(
"GET /users",
service,
operationName,
null,
"web",
0,
false,
true,
"server",
null,
null,
null,
null);
e.recordOneDuration(SECONDS.toNanos(1));
return e;
}

// ── gRPC status extraction (OTLP aggregator path) ─────────────────────────

@Test
void grpcStatusExtractedFromGrpcTypedSpanOnOtlpPath() throws Exception {
// Drives the real ConflatingMetricsAggregator on the OTLP path (otlpStatsExportEnabled is true
// because the writer is an OtlpStatsMetricWriter): a span typed "grpc" carrying the
// grpc.status.code tag must surface rpc.response.status_code, even though that key + span type
// are outside the native v0.6 path's single-key + "rpc"-type lookup.
CapturingSender sender = new CapturingSender();
OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender);

DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class);
when(features.peerTags()).thenReturn(Collections.<String>emptySet());
Sink sink = mock(Sink.class);

ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
Collections.<String>emptySet(),
features,
HealthMetrics.NO_OP,
sink,
writer,
/* maxAggregates */ 16,
/* queueSize */ 16,
/* reportingInterval */ 10,
SECONDS,
/* includeEndpointInMetrics */ false);
try {
aggregator.start();
aggregator.publish(Collections.<CoreSpan<?>>singletonList(grpcSpan("grpc.status.code", "0")));
aggregator.forceReport().get(5, TimeUnit.SECONDS);

DecodedMetric metric = decode(sender.lastPayload);
assertEquals(1, metric.dataPoints.size());
assertEquals("0", metric.dataPoints.get(0).attributes.get("rpc.response.status_code"));
} finally {
aggregator.close();
}
}

/** A metrics-eligible, top-level span typed {@code grpc} carrying a single tag. */
@SuppressWarnings({"rawtypes", "unchecked"})
private static CoreSpan<?> grpcSpan(String tagKey, String tagValue) {
CoreSpan span = mock(CoreSpan.class);
when(span.isMeasured()).thenReturn(false);
when(span.isTopLevel()).thenReturn(true);
when(span.isKind(any(SpanKindFilter.class))).thenReturn(false);
when(span.getLongRunningVersion()).thenReturn(0);
when(span.getDurationNano()).thenReturn(SECONDS.toNanos(1));
when(span.getError()).thenReturn(0);
when(span.getResourceName()).thenReturn("grpc.request");
when(span.getServiceName()).thenReturn("svc");
when(span.getOperationName()).thenReturn("grpc.request");
when(span.getServiceNameSource()).thenReturn(null);
when(span.getType()).thenReturn("grpc");
when(span.getHttpStatusCode()).thenReturn((short) 0);
when(span.getParentId()).thenReturn(0L);
when(span.getOrigin()).thenReturn(null);
when(span.unsafeGetTag(eq(Tags.SPAN_KIND), any(CharSequence.class))).thenReturn("");
when(span.unsafeGetTag(tagKey)).thenReturn(tagValue);
return span;
}

@Test
void emptyBucketSendsNothing() {
CapturingSender sender = new CapturingSender();
Expand Down