diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 7cd588e9284..39c1116acf4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -4,6 +4,7 @@ import static datadog.trace.api.DDSpanTypes.RPC; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; +import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ROUTE; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; @@ -62,6 +63,15 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final SpanKindFilter INTERNAL_KIND = SpanKindFilter.builder().includeInternal().build(); + // gRPC status-code source tags, probed in priority order on the OTLP export path + private static final String[] GRPC_STATUS_CODE_KEYS = { + InstrumentationTags.GRPC_STATUS_CODE, // "rpc.grpc.status_code" + "grpc.code", + "rpc.grpc.status.code", + "grpc.status.code", + "rpc.response.status_code", + }; + private final Set ignoredResources; private final Thread thread; private final MessagePassingQueue inbox; @@ -346,12 +356,21 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { Object httpMethodObj = span.unsafeGetTag(HTTP_METHOD); httpMethod = httpMethodObj != null ? httpMethodObj.toString() : null; Object httpEndpointObj = span.unsafeGetTag(HTTP_ENDPOINT); + // OTLP path falls back to http.route (mirrors libdatadog). The native v0.6 path keeps its + // http.endpoint-only lookup so this doesn't change its aggregation key / wire output. + if (otlpStatsExportEnabled && httpEndpointObj == null) { + httpEndpointObj = span.unsafeGetTag(HTTP_ROUTE); + } httpEndpoint = httpEndpointObj != null ? httpEndpointObj.toString() : null; } CharSequence spanType = span.getType(); String grpcStatusCode = null; - if (spanType != null && RPC.contentEquals(spanType)) { + if (otlpStatsExportEnabled) { + // OTLP path: probe every known gRPC status-code convention, no span-type gate, so a span + // typed "grpc" (or carrying an OTel-style key) still surfaces rpc.response.status_code. + grpcStatusCode = firstTag(span, GRPC_STATUS_CODE_KEYS); + } else if (spanType != null && RPC.contentEquals(spanType)) { Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; } @@ -395,6 +414,17 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return error; } + /** Returns the first non-null span tag among {@code keys}, in order, or {@code null} if none. */ + private static String firstTag(CoreSpan span, String[] keys) { + for (String key : keys) { + Object value = span.unsafeGetTag(key); + if (value != null) { + return value.toString(); + } + } + return null; + } + /** * Picks the peer-tag schema for a span. For internal-kind spans we always use the static {@link * PeerTagSchema#INTERNAL} singleton (one entry for {@code base.service}); for {@code diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java index 97b31116a9e..de7144e37c1 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/OtlpStatsMetricWriter.java @@ -55,6 +55,7 @@ public final class OtlpStatsMetricWriter implements MetricWriter { private static final int DP_TIME_FIELD = 3; private static final int DP_ATTRIBUTES_FIELD = 9; + private static final String SERVICE_NAME = "service.name"; private static final String SPAN_NAME = "span.name"; private static final String SPAN_KIND = "span.kind"; private static final String HTTP_REQUEST_METHOD = "http.request.method"; @@ -71,6 +72,14 @@ public final class OtlpStatsMetricWriter implements MetricWriter { @Nullable private final OtlpSender sender; private final boolean otelSemanticsMode; + /** + * The configured default service, reported once on the resource. A data point only carries its + * own {@code service.name} attribute when its span's service differs from this; for the default + * service the point inherits the resource value (most-specific-wins at the consumer). {@code + * null} disables the per-point comparison (test constructors). + */ + @Nullable private final String defaultService; + /** * Resource attribute blob prepended to every payload. In default mode it carries the {@code * datadog.runtime_id} and process-tag resource attributes; in OTel-semantics mode it is the plain @@ -89,18 +98,24 @@ public final class OtlpStatsMetricWriter implements MetricWriter { private int metricBytes; public OtlpStatsMetricWriter(Config config) { - this(createSender(config), config.isTraceOtelSemanticsEnabled()); + this(createSender(config), config.isTraceOtelSemanticsEnabled(), config.getServiceName()); } // visible for testing: lets tests inject a capturing sender to decode the emitted protobuf OtlpStatsMetricWriter(@Nullable OtlpSender sender) { - this(sender, false); + this(sender, false, null); } // visible for testing: lets tests inject a capturing sender and control the semantics mode OtlpStatsMetricWriter(@Nullable OtlpSender sender, boolean otelSemanticsMode) { + this(sender, otelSemanticsMode, null); + } + + OtlpStatsMetricWriter( + @Nullable OtlpSender sender, boolean otelSemanticsMode, @Nullable String defaultService) { this.sender = sender; this.otelSemanticsMode = otelSemanticsMode; + this.defaultService = defaultService; this.resourceMessage = otelSemanticsMode ? OtlpResourceProto.RESOURCE_MESSAGE @@ -176,6 +191,11 @@ private void writeDataPointAttributes(AggregateEntry entry, boolean error) { // OTel semconv attrs are emitted in both modes writeStringAttribute(SPAN_NAME, entry.getResource()); writeStringAttribute(SPAN_KIND, entry.getSpanKind()); + // service.name on the point only when the span's service differs from the resource's default; + UTF8BytesString service = entry.getService(); + if (service != null && service.length() > 0 && (!service.toString().equals(defaultService))) { + writeStringAttribute(SERVICE_NAME, service); + } if (entry.getHttpMethod() != null) { writeStringAttribute(HTTP_REQUEST_METHOD, entry.getHttpMethod()); } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java index 377f457f69a..29a3b89c15f 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/OtlpStatsMetricWriterTest.java @@ -7,18 +7,29 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.protobuf.CodedInputStream; import com.google.protobuf.WireFormat; +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.metrics.api.Histograms; import datadog.metrics.impl.DDSketchHistograms; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.SpanKindFilter; +import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.otlp.common.OtlpPayload; import datadog.trace.core.otlp.common.OtlpSender; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -437,6 +448,126 @@ void httpAndGrpcAttributesAppearOnlyWhenSet() throws IOException { assertFalse(bareAttrs.containsKey("rpc.response.status_code")); } + @Test + void serviceNameEmittedOnlyForNonDefaultService() throws IOException { + CapturingSender sender = new CapturingSender(); + // The configured default service ("web") is reported on the resource; only a span whose service + // differs from it repeats service.name on its own data point. + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender, false, "web"); + + long start = SECONDS.toNanos(1_700_000_000L); + writer.startBucket(2, start, SECONDS.toNanos(10)); + writer.add(serviceEntry("web.request", "web")); // default service + writer.add(serviceEntry("db.query", "postgres")); // custom service + writer.finishBucket(); + + DecodedMetric metric = decode(sender.lastPayload); + assertEquals(2, metric.dataPoints.size()); + + Map defaultAttrs = null; + Map customAttrs = null; + for (DataPoint dp : metric.dataPoints) { + if ("db.query".equals(dp.attributes.get("datadog.operation.name"))) { + customAttrs = dp.attributes; + } else { + defaultAttrs = dp.attributes; + } + } + assertNotNull(customAttrs, "custom-service data point present"); + assertNotNull(defaultAttrs, "default-service data point present"); + assertEquals( + "postgres", + customAttrs.get("service.name"), + "non-default service is carried on its own data point"); + assertFalse( + defaultAttrs.containsKey("service.name"), + "default service must not be repeated on its data point"); + } + + /** An ok-only entry on the given service and operation name, recording a single 1s hit. */ + private static AggregateEntry serviceEntry(String operationName, String service) { + AggregateEntry e = + AggregateEntryTestUtils.of( + "GET /users", + service, + operationName, + null, + "web", + 0, + false, + true, + "server", + null, + null, + null, + null); + e.recordOneDuration(SECONDS.toNanos(1)); + return e; + } + + // ── gRPC status extraction (OTLP aggregator path) ───────────────────────── + + @Test + void grpcStatusExtractedFromGrpcTypedSpanOnOtlpPath() throws Exception { + // Drives the real ConflatingMetricsAggregator on the OTLP path (otlpStatsExportEnabled is true + // because the writer is an OtlpStatsMetricWriter): a span typed "grpc" carrying the + // grpc.status.code tag must surface rpc.response.status_code, even though that key + span type + // are outside the native v0.6 path's single-key + "rpc"-type lookup. + CapturingSender sender = new CapturingSender(); + OtlpStatsMetricWriter writer = new OtlpStatsMetricWriter(sender); + + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.peerTags()).thenReturn(Collections.emptySet()); + Sink sink = mock(Sink.class); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + HealthMetrics.NO_OP, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 16, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + try { + aggregator.start(); + aggregator.publish(Collections.>singletonList(grpcSpan("grpc.status.code", "0"))); + aggregator.forceReport().get(5, TimeUnit.SECONDS); + + DecodedMetric metric = decode(sender.lastPayload); + assertEquals(1, metric.dataPoints.size()); + assertEquals("0", metric.dataPoints.get(0).attributes.get("rpc.response.status_code")); + } finally { + aggregator.close(); + } + } + + /** A metrics-eligible, top-level span typed {@code grpc} carrying a single tag. */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static CoreSpan grpcSpan(String tagKey, String tagValue) { + CoreSpan span = mock(CoreSpan.class); + when(span.isMeasured()).thenReturn(false); + when(span.isTopLevel()).thenReturn(true); + when(span.isKind(any(SpanKindFilter.class))).thenReturn(false); + when(span.getLongRunningVersion()).thenReturn(0); + when(span.getDurationNano()).thenReturn(SECONDS.toNanos(1)); + when(span.getError()).thenReturn(0); + when(span.getResourceName()).thenReturn("grpc.request"); + when(span.getServiceName()).thenReturn("svc"); + when(span.getOperationName()).thenReturn("grpc.request"); + when(span.getServiceNameSource()).thenReturn(null); + when(span.getType()).thenReturn("grpc"); + when(span.getHttpStatusCode()).thenReturn((short) 0); + when(span.getParentId()).thenReturn(0L); + when(span.getOrigin()).thenReturn(null); + when(span.unsafeGetTag(eq(Tags.SPAN_KIND), any(CharSequence.class))).thenReturn(""); + when(span.unsafeGetTag(tagKey)).thenReturn(tagValue); + return span; + } + @Test void emptyBucketSendsNothing() { CapturingSender sender = new CapturingSender();