Skip to content
Draft

init #11667

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ final class AggregateEntry extends Hashtable.Entry {
DDCaches.newFixedSizeCache(32);
private static final DDCache<String, UTF8BytesString> GRPC_STATUS_CODE_CACHE =
DDCaches.newFixedSizeCache(32);
// Origin is a small fixed vocabulary (synthetics, synthetics-browser, rum, ciapp-test, lambda).
private static final DDCache<String, UTF8BytesString> ORIGIN_CACHE =
DDCaches.newFixedSizeCache(8);

/**
* Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner
Expand Down Expand Up @@ -108,8 +111,13 @@ final class AggregateEntry extends Hashtable.Entry {
@Nullable private final UTF8BytesString grpcStatusCode;
private final short httpStatusCode;

/** Whether the root span carried the {@code synthetics} origin tag (synthetic-monitoring run). */
private final boolean synthetic;
/**
* Trace origin (e.g. {@code synthetics}, {@code rum}, {@code ciapp-test}, {@code lambda}), or
* {@code null} when the root span carried no origin. Part of the bucket key, so spans with
* distinct origins aggregate separately. The OTLP export emits this as {@code datadog.origin};
* the native msgpack path reads {@link #isSynthetics()}, derived from it.
*/
@Nullable private final UTF8BytesString origin;

/** Whether this span is the trace root ({@code parentId == 0}). */
private final boolean traceRoot;
Expand Down Expand Up @@ -139,7 +147,8 @@ final class AggregateEntry extends Hashtable.Entry {
private int errorCount;
private int hitCount;
private int topLevelCount;
private long duration;
private long okDuration;
private long errorDuration;

/** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */
AggregateEntry(SpanSnapshot s, long keyHash) {
Expand All @@ -154,7 +163,7 @@ final class AggregateEntry extends Hashtable.Entry {
this.httpEndpoint = canonicalizeOptional(HTTP_ENDPOINT_CACHE, s.httpEndpoint);
this.grpcStatusCode = canonicalizeOptional(GRPC_STATUS_CODE_CACHE, s.grpcStatusCode);
this.httpStatusCode = s.httpStatusCode;
this.synthetic = s.synthetic;
this.origin = canonicalizeOptional(ORIGIN_CACHE, s.origin);
this.traceRoot = s.traceRoot;
this.peerTagNames = s.peerTagSchema == null ? null : s.peerTagSchema.names;
this.peerTagValues = s.peerTagValues;
Expand All @@ -174,11 +183,12 @@ void recordOneDuration(long tagAndDuration) {
if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) {
tagAndDuration ^= ERROR_TAG;
errorLatenciesForWrite().accept(tagAndDuration);
errorDuration += tagAndDuration;
++errorCount;
} else {
okLatencies.accept(tagAndDuration);
okDuration += tagAndDuration;
}
duration += tagAndDuration;
}

int getErrorCount() {
Expand All @@ -194,7 +204,15 @@ int getTopLevelCount() {
}

long getDuration() {
return duration;
return okDuration + errorDuration;
}

long getOkDuration() {
return okDuration;
}

long getErrorDuration() {
return errorDuration;
}

Histogram getOkLatencies() {
Expand Down Expand Up @@ -232,7 +250,8 @@ void clear() {
this.errorCount = 0;
this.hitCount = 0;
this.topLevelCount = 0;
this.duration = 0;
this.okDuration = 0;
this.errorDuration = 0;
this.okLatencies.clear();
// errorLatencies stays null on entries that never errored. Only clear if it was allocated.
if (this.errorLatencies != null) {
Expand All @@ -243,7 +262,7 @@ void clear() {
boolean matches(SpanSnapshot s) {
String[] snapshotNames = s.peerTagSchema == null ? null : s.peerTagSchema.names;
return httpStatusCode == s.httpStatusCode
&& synthetic == s.synthetic
&& contentEquals(origin, s.origin)
&& traceRoot == s.traceRoot
&& contentEquals(resource, s.resourceName)
&& contentEquals(service, s.serviceName)
Expand Down Expand Up @@ -284,7 +303,7 @@ static long hashOf(SpanSnapshot s) {
h = LongHashingUtils.addToHash(h, s.serviceNameSource);
h = LongHashingUtils.addToHash(h, s.spanType);
h = LongHashingUtils.addToHash(h, s.httpStatusCode);
h = LongHashingUtils.addToHash(h, s.synthetic);
h = LongHashingUtils.addToHash(h, s.origin);
h = LongHashingUtils.addToHash(h, s.traceRoot);
h = LongHashingUtils.addToHash(h, s.spanKind);
// Always mix in both the schema's content hash and the values' content hash, unconditionally
Expand Down Expand Up @@ -352,8 +371,21 @@ int getHttpStatusCode() {
return httpStatusCode;
}

/**
* The full trace origin, or {@code null} when unset. Used by {@link OtlpStatsMetricWriter} to
* emit {@code datadog.origin}.
*/
@Nullable
UTF8BytesString getOrigin() {
return origin;
}

/**
* Whether the origin is {@code synthetics}. Derived from {@link #origin} for the native msgpack
* writer, which emits a synthetics boolean rather than the full origin.
*/
boolean isSynthetics() {
return synthetic;
return origin != null && "synthetics".contentEquals(origin);
}

boolean isTraceRoot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private static final Map<String, String> DEFAULT_HEADERS =
Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);

private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";

private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
SpanKindFilter.builder()
.includeServer()
Expand Down Expand Up @@ -346,7 +344,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
span.getServiceNameSource(),
spanType,
span.getHttpStatusCode(),
isSynthetic(span),
span.getOrigin(),
span.getParentId() == 0,
spanKind,
peerTagSchema,
Expand Down Expand Up @@ -466,10 +464,6 @@ private static String[] capturePeerTagValues(CoreSpan<?> span, PeerTagSchema sch
return values;
}

private static boolean isSynthetic(CoreSpan<?> span) {
return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
}

public void stop() {
if (null != cancellation) {
cancellation.cancel();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package datadog.trace.common.metrics;

import datadog.metrics.api.Histogram;
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Projects a client-side-stats {@link Histogram} (a DDSketch over span durations recorded in
* <em>nanoseconds</em>) onto the fixed explicit-bounds histogram layout mandated by the OTLP Trace
* Metrics Export RFC.
*/
final class OtlpHistogramBuckets {
private OtlpHistogramBuckets() {}

private static final double NANOS_PER_SECOND = 1_000_000_000d;

static final double[] BOUNDS_SECONDS = {
0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15
};

static final List<Double> EXPLICIT_BOUNDS;

static {
List<Double> bounds = new ArrayList<>(BOUNDS_SECONDS.length + 1);
for (double bound : BOUNDS_SECONDS) {
bounds.add(bound);
}
bounds.add(Double.POSITIVE_INFINITY);
EXPLICIT_BOUNDS = Collections.unmodifiableList(bounds);
}

static int bucketIndex(double seconds) {
for (int i = 0; i < BOUNDS_SECONDS.length; i++) {
if (seconds <= BOUNDS_SECONDS[i]) {
return i;
}
}
return BOUNDS_SECONDS.length; // overflow
}

/**
* Re-bins {@code histogram} (nanosecond-valued) into an {@link OtlpHistogramPoint} expressed in
* seconds with OTLP's fixed bucket layout. {@code count}, {@code min}, and {@code max} are taken
* directly from the sketch; {@code sumNanos} is the exact duration sum tracked alongside the
* sketch by {@link AggregateEntry} (the DDSketch-derived sum would only be approximate).
*/
static OtlpHistogramPoint toHistogramPoint(Histogram histogram, long sumNanos) {
long[] bucketCounts = new long[BOUNDS_SECONDS.length + 1];

List<Double> binBoundaries = histogram.getBinBoundaries();
List<Double> binCounts = histogram.getBinCounts();
for (int i = 0; i < binBoundaries.size(); i++) {
double upperSeconds = binBoundaries.get(i) / NANOS_PER_SECOND;
long count = (long) binCounts.get(i).doubleValue();
bucketCounts[bucketIndex(upperSeconds)] += count;
}

List<Double> counts = new ArrayList<>(bucketCounts.length);
for (long count : bucketCounts) {
counts.add((double) count);
}

double sumSeconds = sumNanos / NANOS_PER_SECOND;
double minSeconds = histogram.isEmpty() ? 0d : histogram.getMinValue() / NANOS_PER_SECOND;
double maxSeconds = histogram.isEmpty() ? 0d : histogram.getMaxValue() / NANOS_PER_SECOND;

return new OtlpHistogramPoint(
histogram.getCount(), EXPLICIT_BOUNDS, counts, sumSeconds, minSeconds, maxSeconds);
}
}
Loading
Loading