From a86710bd94c5d5f4dae32c0cf1f33cc0743a1595 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 15:24:48 -0400 Subject: [PATCH 1/9] feat(ddsketch,trace-utils): OTLP roundtrip helpers Foundation pieces consumed by the OTLP trace-metrics exporter that follows. These are pure additions with no breaking changes. - libdd-ddsketch: `DDSketch::from_pb` rebuilds a sketch from its protobuf representation (or `None` when the mapping is missing/invalid); a thin `DDSketch::from_encoded` wraps protobuf decoding + `from_pb`. Lets callers read back the ok/error sketches that the span concentrator publishes. Includes a roundtrip test that goes `encode_to_vec` -> `from_encoded` and asserts bin count + total weight survive the trip. - libdd-trace-utils: extend `OtlpResourceInfo` with two new fields: `hostname` (emitted as the `host.name` resource attribute when set) and `process_tags` (comma-separated `key:value` pairs, each becoming a `dd.` resource attribute). The struct is `#[non_exhaustive]`, so adding fields is forward-compatible. Co-Authored-By: Claude Sonnet 4.6 --- libdd-ddsketch/src/lib.rs | 37 +++++++++++++++++++++++ libdd-trace-utils/src/otlp_encoder/mod.rs | 2 ++ 2 files changed, 39 insertions(+) diff --git a/libdd-ddsketch/src/lib.rs b/libdd-ddsketch/src/lib.rs index 6c6ddcf588..e25d26947b 100644 --- a/libdd-ddsketch/src/lib.rs +++ b/libdd-ddsketch/src/lib.rs @@ -110,6 +110,30 @@ impl DDSketch { pub fn encode_to_vec(self) -> Vec { self.into_pb().encode_to_vec() } + + /// Reconstruct a sketch from its protobuf. `None` if mapping missing/invalid. + pub fn from_pb(sketch: pb::DdSketch) -> Option { + let mapping_pb = sketch.mapping?; + let mapping = LogMapping::new(mapping_pb.gamma, mapping_pb.index_offset)?; + let store = match sketch.positive_values { + Some(s) => LowCollapsingDenseStore { + bins: s.contiguous_bin_counts.into(), + offset: s.contiguous_bin_index_offset, + max_size: LowCollapsingDenseStore::default().max_size, + }, + None => LowCollapsingDenseStore::default(), + }; + Some(DDSketch { + store, + zero_count: sketch.zero_count, + mapping, + }) + } + + /// Decode a sketch from serialized protobuf bytes; `None` on decode/mapping failure. + pub fn from_encoded(bytes: &[u8]) -> Option { + Self::from_pb(pb::DdSketch::decode(bytes).ok()?) + } } /// A store mapping the bin indexes to their respective weights @@ -415,6 +439,19 @@ mod test { assert_within!(sketch.count(), 108.0, f64::EPSILON); } + #[test] + fn test_sketch_pb_roundtrip() { + let mut sketch = DDSketch::default(); + for &p in &[0.0_f64, 0.1, 2.0, 10.0, 25.0, 10000.0] { + assert!(sketch.add(p).is_ok()); + } + let expected_count = sketch.count(); + let expected_bins = sketch.ordered_bins().len(); + let decoded = DDSketch::from_pb(sketch.into_pb()).expect("decodes"); + assert_within!(decoded.count(), expected_count, f64::EPSILON); + assert_eq!(decoded.ordered_bins().len(), expected_bins); + } + #[test] fn test_skecth_encode() { let mut sketch = DDSketch::default(); diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs index 782a10e10d..4771995c9d 100644 --- a/libdd-trace-utils/src/otlp_encoder/mod.rs +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -22,4 +22,6 @@ pub struct OtlpResourceInfo { pub language: String, pub tracer_version: String, pub runtime_id: String, + pub hostname: String, + pub process_tags: String, } From 5a33f23524c371d768cbf02ac4cf896291334934 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 15:27:15 -0400 Subject: [PATCH 2/9] feat(trace-stats): expose exact per-cell sum/min/max via OTLP sidecar Make the span concentrator accumulate exact per-cell (ok/error) duration totals and min/max in nanoseconds alongside the existing combined `duration` that the /v0.6/stats agent payload uses, and publish them on a new sidecar that the OTLP trace-metrics path can read. - `GroupedStats` gains six pub(super) accumulators (`ok_duration`/`ok_min`/ `ok_max` + the error trio) updated inside `insert`. They are seeded on the first span in each cell (count == 1) so the natural `0` default cannot masquerade as a real minimum. - New public types `OtlpExactCell`, `OtlpExactGroup`, `OtlpStatsBucket` carry the exact scalars alongside an unmodified `pb::ClientStatsBucket`. The `grpc_method` field on `OtlpExactGroup` is intentionally introduced here but only ever populated with `String::new()`; a later commit fills it in. - `StatsBucket::flush` now delegates to a new `flush_with_otlp_exact` which produces both the protobuf bucket (identical bytes) and the parallel sidecar. `SpanConcentrator::flush` and `flush_with_otlp_exact` share a generic `drain_due_buckets` helper so the bucket-window/buffer-len logic stays in one place. - A new concentrator test drives the full path through `add_span` for 3 ok + 2 error spans and asserts each cell's count/duration/min/max plus `ok_duration + error_duration == group.duration` (the agent field). Co-Authored-By: Claude Sonnet 4.6 --- .../src/span_concentrator/aggregation.rs | 96 ++++++++++++++++--- .../src/span_concentrator/mod.rs | 20 +++- .../src/span_concentrator/tests.rs | 53 ++++++++++ 3 files changed, 156 insertions(+), 13 deletions(-) diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index acc7629478..0c8bd6e73f 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -324,6 +324,14 @@ pub(super) struct GroupedStats { top_level_hits: u64, ok_summary: libdd_ddsketch::DDSketch, error_summary: libdd_ddsketch::DDSketch, + // Exact per-cell (ok/error) scalars used by the OTLP trace-metrics path. These are tracked + // separately from `duration` so the /v0.6/stats agent payload is byte-for-byte unchanged. + ok_duration: u64, + ok_min: u64, + ok_max: u64, + error_duration: u64, + error_min: u64, + error_max: u64, } impl GroupedStats { @@ -331,12 +339,23 @@ impl GroupedStats { fn insert(&mut self, duration: i64, is_error: bool, is_top_level: bool) { self.hits += 1; self.duration += duration as u64; - + let d = duration as u64; if is_error { self.errors += 1; let _ = self.error_summary.add(duration as f64); + self.error_duration += d; + self.error_min = if self.errors == 1 { + d + } else { + self.error_min.min(d) + }; + self.error_max = self.error_max.max(d); } else { let _ = self.ok_summary.add(duration as f64); + self.ok_duration += d; + let ok_count = self.hits - self.errors; + self.ok_min = if ok_count == 1 { d } else { self.ok_min.min(d) }; + self.ok_max = self.ok_max.max(d); } if is_top_level { self.top_level_hits += 1; @@ -344,6 +363,37 @@ impl GroupedStats { } } +/// Exact per-cell (ok or error) scalars for one aggregation group, surfaced to the OTLP +/// trace-metrics path. `count` is exact; `duration_ns`/`min_ns`/`max_ns` are exact when +/// `count > 0` and meaningless otherwise (the OTLP mapper suppresses empty cells). +#[derive(Debug, Clone, Copy, Default)] +pub struct OtlpExactCell { + pub count: u64, + pub duration_ns: u64, + pub min_ns: u64, + pub max_ns: u64, +} + +/// Exact OK/ERROR cells for one aggregation group, in the same order as the `stats` vector +/// of the accompanying [`pb::ClientStatsBucket`]. `grpc_method` is the group's gRPC method (DD +/// schema `grpc.method.name`) carried out-of-band so it does not appear in the agent stats +/// protobuf wire format. It is initialized empty here; a subsequent change populates it from +/// the aggregation key. +#[derive(Debug, Clone, Default)] +pub struct OtlpExactGroup { + pub ok: OtlpExactCell, + pub error: OtlpExactCell, + pub grpc_method: String, +} + +/// A bucket flushed for the OTLP trace-metrics path. `exact[i]` is the exact-scalar sidecar +/// for `bucket.stats[i]`; the protobuf bucket itself is identical to what the agent path uses. +#[derive(Debug, Clone)] +pub struct OtlpStatsBucket { + pub bucket: pb::ClientStatsBucket, + pub exact: Vec, +} + /// A time bucket used for stats aggregation. It stores a map of GroupedStats storing the stats of /// spans aggregated on their AggregationKey. #[derive(Debug, Clone)] @@ -379,16 +429,40 @@ impl StatsBucket { /// Consume the bucket and return a ClientStatsBucket containing the bucket stats. /// `bucket_duration` is the size of buckets for the concentrator containing the bucket. pub(super) fn flush(self, bucket_duration: u64) -> pb::ClientStatsBucket { - pb::ClientStatsBucket { - start: self.start, - duration: bucket_duration, - stats: self - .data - .into_iter() - .map(|(k, b)| encode_grouped_stats(k, b)) - .collect(), - // Agent-only field - agent_time_shift: 0, + self.flush_with_otlp_exact(bucket_duration).bucket + } + + /// Like [`Self::flush`], but additionally produces exact per-cell scalars for the OTLP + /// trace-metrics path. The `bucket` field is identical to what [`Self::flush`] returns. + pub(super) fn flush_with_otlp_exact(self, bucket_duration: u64) -> OtlpStatsBucket { + let mut stats = Vec::with_capacity(self.data.len()); + let mut exact = Vec::with_capacity(self.data.len()); + for (k, g) in self.data { + exact.push(OtlpExactGroup { + ok: OtlpExactCell { + count: g.hits.saturating_sub(g.errors), + duration_ns: g.ok_duration, + min_ns: g.ok_min, + max_ns: g.ok_max, + }, + error: OtlpExactCell { + count: g.errors, + duration_ns: g.error_duration, + min_ns: g.error_min, + max_ns: g.error_max, + }, + grpc_method: String::new(), + }); + stats.push(encode_grouped_stats(k, g)); + } + OtlpStatsBucket { + bucket: pb::ClientStatsBucket { + start: self.start, + duration: bucket_duration, + stats, + agent_time_shift: 0, + }, + exact, } } } diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1e83a4b342..fb0f4fc4d6 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -10,7 +10,7 @@ use aggregation::StatsBucket; mod aggregation; use aggregation::BorrowedAggregationKey; -pub use aggregation::FixedAggregationKey; +pub use aggregation::{FixedAggregationKey, OtlpExactCell, OtlpExactGroup, OtlpStatsBucket}; pub mod stat_span; pub use stat_span::StatSpan; @@ -207,6 +207,22 @@ impl SpanConcentrator { /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush /// all buckets. pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec { + self.drain_due_buckets(now, force, StatsBucket::flush) + } + + /// Like [`Self::flush`], but also emits exact per-cell scalars alongside each bucket for the + /// OTLP trace-metrics path. The protobuf bucket inside each [`OtlpStatsBucket`] is identical + /// to what [`Self::flush`] would produce, so the /v0.6/stats agent path is unaffected. + pub fn flush_with_otlp_exact(&mut self, now: SystemTime, force: bool) -> Vec { + self.drain_due_buckets(now, force, StatsBucket::flush_with_otlp_exact) + } + + fn drain_due_buckets( + &mut self, + now: SystemTime, + force: bool, + encode: impl Fn(StatsBucket, u64) -> T, + ) -> Vec { // TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64; let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect(); @@ -231,7 +247,7 @@ impl SpanConcentrator { self.buckets.insert(timestamp, bucket); return None; } - Some(bucket.flush(self.bucket_size)) + Some(encode(bucket, self.bucket_size)) }) .collect() } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 56039c4d5b..2dd064d93a 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1569,3 +1569,56 @@ fn test_pb_span() { assert_counts_equal(expected_stats, bucket.stats.clone()); } + +/// Verify the OTLP exact-scalar sidecar tracks per-cell (ok/error) duration/min/max in nanos +/// independently and that ok_duration + error_duration matches the combined group duration +/// (which the agent /v0.6/stats path uses). +#[test] +fn test_flush_with_otlp_exact_per_cell_scalars() { + let now = SystemTime::now(); + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + #[cfg(feature = "stats-obfuscation")] + None, + ); + // 3 ok spans (200, 300, 100 ns) and 2 error spans (700, 500 ns), all same agg key. + let mut spans = vec![ + get_test_span(now, 1, 0, 200, 0, "svc", "res", 0), + get_test_span(now, 2, 0, 300, 0, "svc", "res", 0), + get_test_span(now, 3, 0, 100, 0, "svc", "res", 0), + get_test_span(now, 4, 0, 700, 0, "svc", "res", 1), + get_test_span(now, 5, 0, 500, 0, "svc", "res", 1), + ]; + compute_top_level_span(spans.as_mut_slice()); + for s in &spans { + concentrator.add_span(s); + } + + let flushed = concentrator.flush_with_otlp_exact(now, true); + assert_eq!(flushed.len(), 1); + let b = &flushed[0]; + assert_eq!(b.exact.len(), 1); + let exact = &b.exact[0]; + + assert_eq!(exact.ok.count, 3); + assert_eq!(exact.ok.duration_ns, 600); + assert_eq!(exact.ok.min_ns, 100); + assert_eq!(exact.ok.max_ns, 300); + + assert_eq!(exact.error.count, 2); + assert_eq!(exact.error.duration_ns, 1200); + assert_eq!(exact.error.min_ns, 500); + assert_eq!(exact.error.max_ns, 700); + + // ok_duration + error_duration equals the combined group.duration (agent path field). + let group = &b.bucket.stats[0]; + assert_eq!( + group.duration, + exact.ok.duration_ns + exact.error.duration_ns + ); + assert_eq!(group.hits, 5); + assert_eq!(group.errors, 2); +} From 26bf9e290e2fb0b70f96c4c0ed560529f3b1d3d1 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 15:29:17 -0400 Subject: [PATCH 3/9] refactor(data-pipeline): share OTLP HTTP transport; add OtlpMetricsConfig Prepare the data-pipeline OTLP layer to host a second exporter (trace metrics) without changing the existing trace path's behavior or public API. - `otlp/exporter.rs`: factor the actual POST + retry plumbing into a new crate-private `send_otlp_http(endpoint_url, headers, timeout, ...)` helper. `send_otlp_traces_http` becomes a thin wrapper that pulls fields out of `OtlpTraceConfig` and calls it; the existing public function signature is unchanged, so external callers see no diff. Two new pub(crate) constants (`OTLP_MAX_ATTEMPTS`, `OTLP_SHUTDOWN_MAX_ATTEMPTS`) replace the previous `OTLP_MAX_RETRIES` literal so the trace-metrics worker can use a single attempt on shutdown. - `otlp/config.rs`: add `OtlpMetricsConfig` mirroring `OtlpTraceConfig` plus an `otel_trace_semantics_enabled` flag for `DD_TRACE_OTEL_SEMANTICS_ENABLED`. Annotated `#[allow(dead_code)]` until a follow-up commit consumes it. - `trace_exporter/builder.rs`: factor the inline OTLP header-map builder out of `build_async` into a small `build_otlp_header_map` helper and refactor the existing OTLP traces config building to use it. No behavior change; this dedup makes the metrics-config branch trivial when it lands. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/config.rs | 18 ++++++ libdd-data-pipeline/src/otlp/exporter.rs | 58 ++++++++++++++----- .../src/trace_exporter/builder.rs | 51 ++++++++-------- 3 files changed, 88 insertions(+), 39 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 02d7a45f80..c3d9951b93 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -36,3 +36,21 @@ pub struct OtlpTraceConfig { #[allow(dead_code)] pub(crate) protocol: OtlpProtocol, } + +/// Parsed OTLP trace-metrics exporter configuration. +#[derive(Clone, Debug)] +#[allow(dead_code)] // wired up in the OTLP trace-metrics exporter commit +pub struct OtlpMetricsConfig { + /// Full URL to POST metrics to (e.g. `http://localhost:4318/v1/metrics`). + pub endpoint_url: String, + /// Pre-validated HTTP headers to include in each request. + pub headers: HeaderMap, + /// Request timeout. + pub timeout: Duration, + /// Protocol (for future use; currently only HttpJson is supported). + #[allow(dead_code)] + pub(crate) protocol: OtlpProtocol, + /// When `true`, emit only OTel attributes; omit `dd.*`/`_dd.*` ones + /// (`DD_TRACE_OTEL_SEMANTICS_ENABLED`). + pub otel_trace_semantics_enabled: bool, +} diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 1f4d86a235..3004f10927 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -5,30 +5,32 @@ use super::config::OtlpTraceConfig; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; +use http::HeaderMap; use libdd_capabilities::{HttpClientCapability, SleepCapability}; use libdd_common::Endpoint; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, }; +use std::time::Duration; -/// Max retries for OTLP export. -const OTLP_MAX_RETRIES: u32 = 4; -/// Initial backoff between retries (milliseconds). +/// Max total attempts for OTLP export (initial + retries on transient failures). +pub(crate) const OTLP_MAX_ATTEMPTS: u32 = 5; +/// Single attempt with no retries, used on shutdown to avoid a long backoff in the shutdown window. +#[allow(dead_code)] // consumed by the OTLP trace-metrics worker in a follow-up commit +pub(crate) const OTLP_SHUTDOWN_MAX_ATTEMPTS: u32 = 1; const OTLP_RETRY_DELAY_MS: u64 = 100; -/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. -/// -/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. -/// -/// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests -/// against the Datadog test agent's OTLP endpoint. -pub async fn send_otlp_traces_http( +/// POST an OTLP HTTP/JSON payload to `endpoint_url`; `test_token` enables snapshot tests. +pub(crate) async fn send_otlp_http( capabilities: &C, - config: &OtlpTraceConfig, + endpoint_url: &str, + config_headers: &HeaderMap, + timeout: Duration, test_token: Option<&str>, json_body: Vec, + max_attempts: u32, ) -> Result<(), TraceExporterError> { - let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { + let url = libdd_common::parse_uri(endpoint_url).map_err(|e| { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( "Invalid OTLP endpoint URL: {}", e @@ -37,11 +39,11 @@ pub async fn send_otlp_traces_http( let target = Endpoint { url, - timeout_ms: config.timeout.as_millis() as u64, + timeout_ms: timeout.as_millis() as u64, ..Endpoint::default() }; - let mut headers = config.headers.clone(); + let mut headers = config_headers.clone(); headers.insert( http::header::CONTENT_TYPE, libdd_common::header::APPLICATION_JSON, @@ -55,8 +57,10 @@ pub async fn send_otlp_traces_http( } } + // `RetryStrategy` counts *retries*, and performs `max_retries + 1` total attempts. Convert the + // attempt budget accordingly so `max_attempts == 1` means a single try with no retries. let retry_strategy = RetryStrategy::new( - OTLP_MAX_RETRIES, + max_attempts.saturating_sub(1), OTLP_RETRY_DELAY_MS, RetryBackoffType::Exponential, None, @@ -68,6 +72,30 @@ pub async fn send_otlp_traces_http( } } +/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. +/// +/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. +/// +/// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests +/// against the Datadog test agent's OTLP endpoint. +pub async fn send_otlp_traces_http( + capabilities: &C, + config: &OtlpTraceConfig, + test_token: Option<&str>, + json_body: Vec, +) -> Result<(), TraceExporterError> { + send_otlp_http( + capabilities, + &config.endpoint_url, + &config.headers, + config.timeout, + test_token, + json_body, + OTLP_MAX_ATTEMPTS, + ) + .await +} + async fn map_send_error(err: SendWithRetryError) -> TraceExporterError { match err { SendWithRetryError::Http(response, _) => { diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 3c0e1f14b5..048c277447 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -27,6 +27,23 @@ use std::time::Duration; const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126"; +/// Build an [`http::HeaderMap`] from key/value pairs, skipping malformed entries. +fn build_otlp_header_map(headers: Vec<(String, String)>) -> http::HeaderMap { + let mut out = http::HeaderMap::new(); + for (k, v) in headers { + match ( + http::HeaderName::from_bytes(k.as_bytes()), + http::HeaderValue::from_str(&v), + ) { + (Ok(n), Ok(vv)) => { + out.insert(n, vv); + } + _ => tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", k, v), + } + } + out +} + #[allow(missing_docs)] #[derive(Debug, Default)] pub struct TraceExporterBuilder { @@ -429,30 +446,16 @@ impl TraceExporterBuilder { } }; - let otlp_config = self.otlp_endpoint.map(|url| { - let mut headers = http::HeaderMap::new(); - for (key, value) in self.otlp_headers { - match ( - http::HeaderName::from_bytes(key.as_bytes()), - http::HeaderValue::from_str(&value), - ) { - (Ok(name), Ok(val)) => { - headers.insert(name, val); - } - _ => { - tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", key, value); - } - } - } - OtlpTraceConfig { - endpoint_url: url, - headers, - timeout: self - .connection_timeout - .map(Duration::from_millis) - .unwrap_or(DEFAULT_OTLP_TIMEOUT), - protocol: OtlpProtocol::HttpJson, - } + let otlp_timeout = self + .connection_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_OTLP_TIMEOUT); + + let otlp_config = self.otlp_endpoint.map(|url| OtlpTraceConfig { + endpoint_url: url, + headers: build_otlp_header_map(self.otlp_headers), + timeout: otlp_timeout, + protocol: OtlpProtocol::HttpJson, }); Ok(TraceExporter { From f364d26dbe3003be826d33b9af9539bd14aae832 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 15:30:33 -0400 Subject: [PATCH 4/9] feat(data-pipeline): export client-computed span stats as OTLP trace metrics Wire up the actual OTLP trace-metrics exporter on top of the foundation pieces from earlier commits. - New `libdd-data-pipeline/src/otlp/metrics.rs`: - `map_stats_to_otlp_metrics` builds an `ExportMetricsServiceRequest` JSON value from `&[OtlpStatsBucket]` (one histogram data point per aggregation-key (ok|error) cell). `count`/`sum`/`min`/`max` come from the sidecar's exact accumulators (ns -> s); `bucketCounts` is projected from the per-cell DDSketch onto a fixed 17-bucket spanmetrics-style layout. Empty cells are suppressed. - `OtlpStatsExporter` runs as a `libdd_shared_runtime::Worker`: `trigger` waits one flush interval, `run` flushes + sends with `OTLP_MAX_ATTEMPTS`, `shutdown` force-flushes with `OTLP_SHUTDOWN_MAX_ATTEMPTS` (single attempt) so the final bucket is delivered inside the bounded shutdown window. - The mapper consumes `exact.grpc_method` (always empty here) so the later breaking-change commit only has to fill it in. - `otlp/mod.rs`: declare the new `metrics` module, re-export `OtlpMetricsConfig` and `OtlpStatsExporter`, and extend the module-level doc to describe the trace-metrics path. - `trace_exporter/builder.rs`: add `otlp_metrics_endpoint`, `otlp_metrics_headers` and `otel_trace_semantics_enabled` fields with matching setters (`set_otlp_metrics_endpoint`, `set_otlp_metrics_headers`, `enable_otel_trace_semantics`). When both an OTLP metrics endpoint and a stats bucket size are configured, spawn an `OtlpStatsExporter` worker on the shared runtime against an unconditionally-started `SpanConcentrator`; set a new `otlp_stats_enabled` flag on `TraceExporter` so the agent-info gate cannot later disable stats. The agent /v0.6/stats payload bytes are unchanged when no OTLP metrics endpoint is set. - `trace_exporter/mod.rs`: add the `otlp_stats_enabled` field on `TraceExporter`. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/config.rs | 1 - libdd-data-pipeline/src/otlp/exporter.rs | 1 - libdd-data-pipeline/src/otlp/metrics.rs | 546 ++++++++++++++++++ libdd-data-pipeline/src/otlp/mod.rs | 10 +- .../src/trace_exporter/builder.rs | 86 ++- libdd-data-pipeline/src/trace_exporter/mod.rs | 10 + 6 files changed, 648 insertions(+), 6 deletions(-) create mode 100644 libdd-data-pipeline/src/otlp/metrics.rs diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index c3d9951b93..f20c5c60e7 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -39,7 +39,6 @@ pub struct OtlpTraceConfig { /// Parsed OTLP trace-metrics exporter configuration. #[derive(Clone, Debug)] -#[allow(dead_code)] // wired up in the OTLP trace-metrics exporter commit pub struct OtlpMetricsConfig { /// Full URL to POST metrics to (e.g. `http://localhost:4318/v1/metrics`). pub endpoint_url: String, diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 3004f10927..2b36f2d297 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -16,7 +16,6 @@ use std::time::Duration; /// Max total attempts for OTLP export (initial + retries on transient failures). pub(crate) const OTLP_MAX_ATTEMPTS: u32 = 5; /// Single attempt with no retries, used on shutdown to avoid a long backoff in the shutdown window. -#[allow(dead_code)] // consumed by the OTLP trace-metrics worker in a follow-up commit pub(crate) const OTLP_SHUTDOWN_MAX_ATTEMPTS: u32 = 1; const OTLP_RETRY_DELAY_MS: u64 = 100; diff --git a/libdd-data-pipeline/src/otlp/metrics.rs b/libdd-data-pipeline/src/otlp/metrics.rs new file mode 100644 index 0000000000..a5e16e8417 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/metrics.rs @@ -0,0 +1,546 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Maps client-computed span stats into `traces.span.sdk.metrics.duration` OTLP histograms. +//! DDSketch summaries from the span concentrator are bucketed into fixed explicit bounds (seconds). + +use super::config::OtlpMetricsConfig; +use super::exporter::{send_otlp_http, OTLP_MAX_ATTEMPTS, OTLP_SHUTDOWN_MAX_ATTEMPTS}; +use async_trait::async_trait; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_ddsketch::DDSketch; +use libdd_shared_runtime::Worker; +use libdd_trace_protobuf::pb; +use libdd_trace_stats::span_concentrator::{OtlpStatsBucket, SpanConcentrator}; +use libdd_trace_utils::otlp_encoder::json_types::status_code; +use libdd_trace_utils::otlp_encoder::OtlpResourceInfo; +use serde_json::{json, Value}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; +use tracing::error; + +const METRIC_NAME: &str = "traces.span.sdk.metrics.duration"; +const NANOS_PER_SECOND: f64 = 1_000_000_000.0; +/// Fixed bucket boundaries (seconds) mirroring the OTel spanmetrics-connector defaults. +const EXPLICIT_BOUNDS_SECONDS: [f64; 16] = [ + 0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.0, 1.4, 2.0, 5.0, 10.0, 15.0, +]; + +fn kv_str(key: &str, value: &str) -> Value { + json!({ "key": key, "value": { "stringValue": value } }) +} +fn kv_int(key: &str, value: i64) -> Value { + json!({ "key": key, "value": { "intValue": value.to_string() } }) +} + +/// Build an OTLP metrics export request (`ExportMetricsServiceRequest`) as a JSON value. +/// +/// Emits one histogram data point per (aggregation key, ok/error) cell with exact `count`, +/// `sum`, `min` and `max` (the explicit-bucket distribution is still projected from the +/// per-cell DDSketch). Returns `None` when no cell yields a data point. +pub fn map_stats_to_otlp_metrics( + buckets: &[OtlpStatsBucket], + resource_info: &OtlpResourceInfo, + otel_trace_semantics_enabled: bool, +) -> Option { + let mut data_points = Vec::new(); + for b in buckets { + let end = b.bucket.start.saturating_add(b.bucket.duration); + for (group, exact) in b.bucket.stats.iter().zip(b.exact.iter()) { + for (is_error, summary, cell) in [ + (false, &group.ok_summary, &exact.ok), + (true, &group.error_summary, &exact.error), + ] { + if cell.count == 0 { + continue; + } + let Some(sketch) = DDSketch::from_encoded(summary) else { + continue; + }; + data_points.push(json!({ + "attributes": build_attributes(group, &exact.grpc_method, is_error, resource_info, otel_trace_semantics_enabled), + "startTimeUnixNano": b.bucket.start.to_string(), + "timeUnixNano": end.to_string(), + "count": cell.count.to_string(), + "sum": ns_to_s(cell.duration_ns), + "bucketCounts": sketch_bucket_counts(&sketch), + "explicitBounds": EXPLICIT_BOUNDS_SECONDS, + "min": ns_to_s(cell.min_ns), + "max": ns_to_s(cell.max_ns), + })); + } + } + } + if data_points.is_empty() { + return None; + } + Some(json!({ + "resourceMetrics": [{ + "resource": { "attributes": build_resource_attributes(resource_info, otel_trace_semantics_enabled) }, + "scopeMetrics": [{ + "metrics": [{ + "name": METRIC_NAME, + "unit": "s", + "histogram": { + // OTLP AggregationTemporality::Delta (each export covers only the interval). + "aggregationTemporality": 1, + "dataPoints": data_points, + } + }] + }] + }] + })) +} + +fn ns_to_s(ns: u64) -> f64 { + ns as f64 / NANOS_PER_SECOND +} + +/// Project the sketch's bins onto [`EXPLICIT_BOUNDS_SECONDS`] (one bucket per boundary plus a +/// trailing overflow bucket). The exact `count`/`sum`/`min`/`max` come from the concentrator's +/// per-cell accumulators; this function only shapes the distribution. +fn sketch_bucket_counts(sketch: &DDSketch) -> Vec { + let mut weights = vec![0.0_f64; EXPLICIT_BOUNDS_SECONDS.len() + 1]; + for (value, weight) in sketch.ordered_bins() { + if weight <= 0.0 { + continue; + } + let seconds = value / NANOS_PER_SECOND; + let idx = EXPLICIT_BOUNDS_SECONDS + .iter() + .position(|b| seconds <= *b) + .unwrap_or(EXPLICIT_BOUNDS_SECONDS.len()); + weights[idx] += weight; + } + weights + .iter() + .map(|w| (w.round() as u64).to_string()) + .collect() +} + +fn build_attributes( + group: &pb::ClientGroupedStats, + grpc_method: &str, + is_error: bool, + resource_info: &OtlpResourceInfo, + otel_trace_semantics_enabled: bool, +) -> Vec { + let mut attrs = Vec::new(); + let mut push = |k: &str, v: &str| { + if !v.is_empty() { + attrs.push(kv_str(k, v)); + } + }; + + // Service identity is on the resource; emit on the data point only when overridden. + let group_service = if group.service.is_empty() { + resource_info.service.as_str() + } else { + group.service.as_str() + }; + if group_service != resource_info.service { + push("service.name", group_service); + } + + push("span.name", &group.resource); + push("span.kind", &group.span_kind); + push("http.request.method", &group.http_method); + push("http.route", &group.http_endpoint); + push("rpc.method", grpc_method); + push("rpc.response.status_code", &group.grpc_status_code); + for tag in &group.peer_tags { + if let Some((k, v)) = tag.split_once(':') { + push(k, v); + } + } + if !otel_trace_semantics_enabled { + push("datadog.operation.name", &group.name); + push("datadog.span.type", &group.r#type); + } + if group.http_status_code != 0 { + attrs.push(kv_int( + "http.response.status_code", + group.http_status_code as i64, + )); + } + if !otel_trace_semantics_enabled { + // Only `synthetics` is surfaced as `datadog.origin`: the aggregation key carries just a + // boolean, not the full origin string, so other origins are lost upstream. + if group.synthetics { + attrs.push(kv_str("datadog.origin", "synthetics")); + } + if group.is_trace_root == pb::Trilean::True as i32 { + attrs.push(json!({ "key": "_datadog.is_trace_root", "value": { "boolValue": true } })); + } + let top_level = group.hits > 0 && group.top_level_hits == group.hits; + attrs.push(json!({ + "key": "datadog.span.top_level", "value": { "boolValue": top_level } + })); + } + if is_error { + attrs.push(kv_int("status.code", status_code::ERROR as i64)); + } + attrs +} + +fn build_resource_attributes( + info: &OtlpResourceInfo, + otel_trace_semantics_enabled: bool, +) -> Vec { + let mut attrs: Vec = [ + ("service.name", info.service.as_str()), + ("service.version", info.app_version.as_str()), + ("deployment.environment.name", info.env.as_str()), + ("host.name", info.hostname.as_str()), + ("telemetry.sdk.name", "datadog"), + ("telemetry.sdk.language", info.language.as_str()), + ("telemetry.sdk.version", info.tracer_version.as_str()), + ] + .into_iter() + .filter(|(_, v)| !v.is_empty()) + .map(|(k, v)| kv_str(k, v)) + .collect(); + + if !otel_trace_semantics_enabled { + if !info.runtime_id.is_empty() { + attrs.push(kv_str("datadog.runtime_id", &info.runtime_id)); + } + attrs.extend(info.process_tags.split(',').filter_map(|p| { + let (k, v) = p.split_once(':')?; + let (k, v) = (k.trim(), v.trim()); + (!k.is_empty() && !v.is_empty()).then(|| kv_str(&format!("datadog.{k}"), v)) + })); + } + attrs +} + +/// Worker that periodically flushes the span concentrator and exports OTLP trace metrics. +#[derive(Debug)] +pub struct OtlpStatsExporter { + pub flush_interval: Duration, + pub concentrator: Arc>, + pub config: OtlpMetricsConfig, + pub resource: OtlpResourceInfo, + pub test_token: Option, + pub capabilities: C, +} + +impl OtlpStatsExporter { + /// Flush the concentrator and export stats; returns `Ok(true)` if anything was sent. + async fn send(&self, force_flush: bool, max_attempts: u32) -> anyhow::Result { + let buckets = { + #[allow(clippy::unwrap_used)] + let mut c = self.concentrator.lock().unwrap(); + c.flush_with_otlp_exact(SystemTime::now(), force_flush) + }; + if buckets.is_empty() { + return Ok(false); + } + let Some(request) = map_stats_to_otlp_metrics( + &buckets, + &self.resource, + self.config.otel_trace_semantics_enabled, + ) else { + return Ok(false); + }; + send_otlp_http( + &self.capabilities, + &self.config.endpoint_url, + &self.config.headers, + self.config.timeout, + self.test_token.as_deref(), + serde_json::to_vec(&request)?, + max_attempts, + ) + .await?; + Ok(true) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Worker + for OtlpStatsExporter +{ + async fn trigger(&mut self) { + self.capabilities.sleep(self.flush_interval).await; + } + + async fn run(&mut self) { + if let Err(e) = self.send(false, OTLP_MAX_ATTEMPTS).await { + error!(?e, "Error exporting OTLP trace metrics"); + } + } + + async fn shutdown(&mut self) { + // Single attempt: a long backoff could miss the bounded shutdown window. + if let Err(e) = self.send(true, OTLP_SHUTDOWN_MAX_ATTEMPTS).await { + error!(?e, "Error exporting OTLP trace metrics on shutdown"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_stats::span_concentrator::{OtlpExactCell, OtlpExactGroup}; + + const T_START: u64 = 12_340_000_000_000; + const T_SIZE: u64 = 10_000_000_000; + + fn sketch(points: &[f64]) -> Vec { + let mut s = DDSketch::default(); + for p in points { + s.add(*p).unwrap(); + } + s.encode_to_vec() + } + + fn resource() -> OtlpResourceInfo { + let mut r = OtlpResourceInfo::default(); + r.service = "svc".to_string(); + r.env = "test".to_string(); + r + } + + fn cell(durations_ns: &[u64]) -> OtlpExactCell { + OtlpExactCell { + count: durations_ns.len() as u64, + duration_ns: durations_ns.iter().sum(), + min_ns: durations_ns.iter().copied().min().unwrap_or(0), + max_ns: durations_ns.iter().copied().max().unwrap_or(0), + } + } + + /// Build one (pb group, exact group) cell pair. `customize` lets each test tweak the pb group. + fn group_with_exact( + ok_ns: &[u64], + err_ns: &[u64], + customize: impl FnOnce(&mut pb::ClientGroupedStats), + ) -> (pb::ClientGroupedStats, OtlpExactGroup) { + let hits = (ok_ns.len() + err_ns.len()) as u64; + let to_f64s = |xs: &[u64]| -> Vec { xs.iter().map(|&x| x as f64).collect() }; + let mut g = pb::ClientGroupedStats { + service: "svc".into(), + name: "test.op".into(), + resource: "GET /foo".into(), + r#type: "web".into(), + hits, + errors: err_ns.len() as u64, + top_level_hits: hits, + ok_summary: if ok_ns.is_empty() { + Vec::new() + } else { + sketch(&to_f64s(ok_ns)) + }, + error_summary: if err_ns.is_empty() { + Vec::new() + } else { + sketch(&to_f64s(err_ns)) + }, + ..Default::default() + }; + customize(&mut g); + ( + g, + OtlpExactGroup { + ok: cell(ok_ns), + error: cell(err_ns), + grpc_method: String::new(), + }, + ) + } + + fn group_pair_with_grpc( + ok_ns: &[u64], + err_ns: &[u64], + grpc_method: &str, + customize: impl FnOnce(&mut pb::ClientGroupedStats), + ) -> (pb::ClientGroupedStats, OtlpExactGroup) { + let (g, mut e) = group_with_exact(ok_ns, err_ns, customize); + e.grpc_method = grpc_method.into(); + (g, e) + } + + fn buckets(groups: Vec<(pb::ClientGroupedStats, OtlpExactGroup)>) -> Vec { + let (stats, exact): (Vec<_>, Vec<_>) = groups.into_iter().unzip(); + vec![OtlpStatsBucket { + bucket: pb::ClientStatsBucket { + start: T_START, + duration: T_SIZE, + stats, + agent_time_shift: 0, + }, + exact, + }] + } + + /// Single-cell default group with a 1s ok span. + fn one_ok_group() -> (pb::ClientGroupedStats, OtlpExactGroup) { + group_with_exact(&[1_000_000_000], &[], |_| {}) + } + + fn metric(req: &Value) -> &Value { + &req["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0] + } + + fn points(req: &Value) -> &Vec { + metric(req)["histogram"]["dataPoints"].as_array().unwrap() + } + + fn resource_attrs(req: &Value) -> &Vec { + req["resourceMetrics"][0]["resource"]["attributes"] + .as_array() + .unwrap() + } + + fn str_at<'a>(attrs: &'a [Value], key: &str) -> Option<&'a str> { + attrs + .iter() + .find(|kv| kv["key"] == key) + .and_then(|kv| kv["value"]["stringValue"].as_str()) + } + + fn err_code() -> String { + (status_code::ERROR as i64).to_string() + } + + fn is_error_point(p: &Value) -> bool { + let ec = err_code(); + p["attributes"] + .as_array() + .unwrap() + .iter() + .any(|kv| kv["key"] == "status.code" && kv["value"]["intValue"].as_str() == Some(&ec)) + } + + #[test] + fn metric_shape_and_resource_attributes() { + assert!(map_stats_to_otlp_metrics(&[], &resource(), false).is_none()); + let mut r = resource(); + r.app_version = "1.2.3".to_string(); + r.hostname = "my-host".to_string(); + r.runtime_id = "abc-123".to_string(); + r.process_tags = "entrypoint.name:server".to_string(); + for otel in [false, true] { + let req = map_stats_to_otlp_metrics(&buckets(vec![one_ok_group()]), &r, otel).unwrap(); + let m = metric(&req); + assert_eq!(m["name"], "traces.span.sdk.metrics.duration"); + assert_eq!(m["unit"], "s"); + assert_eq!(m["histogram"]["aggregationTemporality"], 1); + assert!(req["resourceMetrics"][0]["scopeMetrics"][0]["scope"].is_null()); + let a = resource_attrs(&req); + assert_eq!(str_at(a, "service.name"), Some("svc")); + assert_eq!(str_at(a, "service.version"), Some("1.2.3")); + assert_eq!(str_at(a, "deployment.environment.name"), Some("test")); + assert_eq!(str_at(a, "host.name"), Some("my-host")); + assert_eq!(str_at(a, "telemetry.sdk.name"), Some("datadog")); + let dd = !otel; + assert_eq!(str_at(a, "datadog.runtime_id").is_some(), dd); + assert_eq!(str_at(a, "datadog.entrypoint.name").is_some(), dd); + } + } + + #[test] + fn data_point_attributes_and_otel_strip() { + let g_pair = group_pair_with_grpc(&[1_000_000_000], &[], "/pkg.Svc/Method", |g| { + g.http_status_code = 404; + g.http_method = "POST".into(); + g.http_endpoint = "/users/:id".into(); + g.synthetics = true; + }); + let custom_pair = group_with_exact(&[1_000_000_000], &[], |g| { + g.service = "svc-other".into(); + }); + let bs = buckets(vec![g_pair.clone(), custom_pair]); + + let req = map_stats_to_otlp_metrics(&bs, &resource(), false).unwrap(); + let pts = points(&req); + let a = pts + .iter() + .find(|p| str_at(p["attributes"].as_array().unwrap(), "service.name").is_none()) + .unwrap()["attributes"] + .as_array() + .unwrap(); + assert_eq!(str_at(a, "span.name"), Some("GET /foo")); + assert_eq!(str_at(a, "http.request.method"), Some("POST")); + assert_eq!(str_at(a, "http.route"), Some("/users/:id")); + assert!(a.iter().any(|kv| kv["key"] == "http.response.status_code")); + assert_eq!(str_at(a, "rpc.method"), Some("/pkg.Svc/Method")); + assert_eq!(str_at(a, "datadog.operation.name"), Some("test.op")); + assert_eq!(str_at(a, "datadog.span.type"), Some("web")); + assert_eq!(str_at(a, "datadog.origin"), Some("synthetics")); + assert!(a.iter().any(|kv| kv["key"] == "datadog.span.top_level")); + assert!(pts.iter().any( + |p| str_at(p["attributes"].as_array().unwrap(), "service.name") == Some("svc-other") + )); + + // OTel mode strips datadog.*/_datadog.* attributes. + let req = map_stats_to_otlp_metrics(&buckets(vec![g_pair]), &resource(), true).unwrap(); + let a = points(&req)[0]["attributes"].as_array().unwrap(); + assert!(!a.iter().any(|kv| { + let k = kv["key"].as_str().unwrap_or(""); + k.starts_with("datadog.") || k.starts_with("_datadog.") + })); + assert_eq!(str_at(a, "http.request.method"), Some("POST")); + } + + #[test] + fn histogram_values_are_exact_and_distribution_uses_sketch() { + // Single 1s ok span: count/sum/min/max all exact, distribution shaped by the sketch. + let req = + map_stats_to_otlp_metrics(&buckets(vec![one_ok_group()]), &resource(), false).unwrap(); + let p = &points(&req)[0]; + assert_eq!(p["count"], "1"); + assert_eq!(p["sum"].as_f64().unwrap(), 1.0); + assert_eq!(p["min"].as_f64().unwrap(), 1.0); + assert_eq!(p["max"].as_f64().unwrap(), 1.0); + assert_eq!(p["startTimeUnixNano"], T_START.to_string()); + assert_eq!(p["timeUnixNano"], (T_START + T_SIZE).to_string()); + assert_eq!(p["explicitBounds"], json!(EXPLICIT_BOUNDS_SECONDS)); + assert_eq!( + p["bucketCounts"].as_array().unwrap().len(), + EXPLICIT_BOUNDS_SECONDS.len() + 1 + ); + + // 3ms, 300ms, 3s land in three distinct buckets; exact sum = 3.303s. + let g = group_with_exact(&[3_000_000, 300_000_000, 3_000_000_000], &[], |_| {}); + let req = map_stats_to_otlp_metrics(&buckets(vec![g]), &resource(), false).unwrap(); + let p = &points(&req)[0]; + assert_eq!(p["count"], "3"); + assert_eq!(p["sum"].as_f64().unwrap(), ns_to_s(3_303_000_000)); + assert_eq!(p["min"].as_f64().unwrap(), ns_to_s(3_000_000)); + assert_eq!(p["max"].as_f64().unwrap(), ns_to_s(3_000_000_000)); + let counts = p["bucketCounts"].as_array().unwrap(); + assert_eq!(counts.iter().filter(|c| c.as_str() != Some("0")).count(), 3); + } + + #[test] + fn mixed_ok_and_error_have_exact_independent_sums() { + // 2 ok spans + 1 error span. ok_sum + error_sum must equal the combined group duration. + let ok = [200_000_000_u64, 300_000_000]; + let err = [700_000_000_u64]; + let combined_ns = ok.iter().sum::() + err.iter().sum::(); + let g = group_with_exact(&ok, &err, |_| {}); + let req = map_stats_to_otlp_metrics(&buckets(vec![g]), &resource(), false).unwrap(); + let pts = points(&req); + assert_eq!(pts.len(), 2); + let ok_pt = pts.iter().find(|p| !is_error_point(p)).unwrap(); + let err_pt = pts.iter().find(|p| is_error_point(p)).unwrap(); + + // Each cell's sum is exact and independent of the other. + assert_eq!(ok_pt["count"], "2"); + assert_eq!(ok_pt["sum"].as_f64().unwrap(), ns_to_s(500_000_000)); + assert_eq!(ok_pt["min"].as_f64().unwrap(), ns_to_s(200_000_000)); + assert_eq!(ok_pt["max"].as_f64().unwrap(), ns_to_s(300_000_000)); + assert_eq!(err_pt["count"], "1"); + assert_eq!(err_pt["sum"].as_f64().unwrap(), ns_to_s(700_000_000)); + assert_eq!(err_pt["min"].as_f64().unwrap(), ns_to_s(700_000_000)); + assert_eq!(err_pt["max"].as_f64().unwrap(), ns_to_s(700_000_000)); + + // ok_sum + error_sum equals the combined group duration. + let ok_s = ok_pt["sum"].as_f64().unwrap(); + let err_s = err_pt["sum"].as_f64().unwrap(); + assert_eq!(ok_s + err_s, ns_to_s(combined_ns)); + } +} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 658fc13b87..6fc635126f 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP trace export for libdatadog. +//! OTLP trace and trace-metrics export for libdatadog. //! //! When an OTLP endpoint is configured via //! [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], the trace exporter sends @@ -9,6 +9,10 @@ //! is responsible for resolving the endpoint from its own configuration (e.g. //! `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`). //! +//! With [`set_otlp_metrics_endpoint`](crate::trace_exporter::TraceExporterBuilder::set_otlp_metrics_endpoint), +//! client-computed span stats ship as a `traces.span.sdk.metrics.duration` OTLP histogram +//! instead of going to the agent `/v0.6/stats` endpoint. +//! //! ## Sampling //! //! The exporter enforces the sampling decision already made by the tracer: unsampled chunks are @@ -24,7 +28,9 @@ pub mod config; pub mod exporter; +pub mod metrics; -pub use config::OtlpTraceConfig; +pub use config::{OtlpMetricsConfig, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; +pub use metrics::OtlpStatsExporter; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 048c277447..e054315daa 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -3,7 +3,7 @@ use crate::agent_info::AgentInfoFetcher; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; -use crate::otlp::OtlpTraceConfig; +use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -82,6 +82,9 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + otlp_metrics_endpoint: Option, + otlp_metrics_headers: Vec<(String, String)>, + otel_trace_semantics_enabled: bool, } impl TraceExporterBuilder { @@ -313,6 +316,24 @@ impl TraceExporterBuilder { self } + /// Enable OTLP HTTP/JSON trace-metrics export to `url` (e.g. `.../v1/metrics`). + pub fn set_otlp_metrics_endpoint(&mut self, url: &str) -> &mut Self { + self.otlp_metrics_endpoint = Some(url.to_owned()); + self + } + + /// Additional HTTP headers for OTLP trace-metrics requests. + pub fn set_otlp_metrics_headers(&mut self, headers: Vec<(String, String)>) -> &mut Self { + self.otlp_metrics_headers = headers; + self + } + + /// Strip Datadog-specific `dd.*`/`_dd.*` data-point attributes from the exported histogram. + pub fn enable_otel_trace_semantics(&mut self) -> &mut Self { + self.otel_trace_semantics_enabled = true; + self + } + /// Build the [`TraceExporter`] synchronously. /// /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. @@ -458,6 +479,66 @@ impl TraceExporterBuilder { protocol: OtlpProtocol::HttpJson, }); + let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig { + endpoint_url: url, + headers: build_otlp_header_map(self.otlp_metrics_headers), + timeout: otlp_timeout, + protocol: OtlpProtocol::HttpJson, + otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + }); + + let runtime_id = uuid::Uuid::new_v4().to_string(); + + // OTLP metrics + stats bucket size: start the concentrator unconditionally (bypass the + // agent gate) so `check_agent_info` cannot later disable stats. + #[allow(unused_mut)] + let mut otlp_stats_enabled = false; + #[cfg(not(target_arch = "wasm32"))] + if let (Some(metrics_config), Some(bucket_size)) = + (otlp_metrics_config.clone(), self.stats_bucket_size) + { + use crate::otlp::OtlpStatsExporter; + use libdd_trace_stats::span_concentrator::SpanConcentrator; + use std::sync::Mutex; + let span_kinds = crate::trace_exporter::stats::DEFAULT_STATS_ELIGIBLE_SPAN_KINDS + .iter() + .map(|s| s.to_string()) + .collect(); + let concentrator = Arc::new(Mutex::new(SpanConcentrator::new( + bucket_size, + std::time::SystemTime::now(), + span_kinds, + self.peer_tags.clone(), + #[cfg(feature = "stats-obfuscation")] + None, + ))); + let mut resource = OtlpResourceInfo::default(); + resource.service = self.service.clone(); + resource.env = self.env.clone(); + resource.app_version = self.app_version.clone(); + resource.language = self.language.clone(); + resource.tracer_version = self.tracer_version.clone(); + resource.runtime_id = runtime_id.clone(); + resource.hostname = self.hostname.clone(); + resource.process_tags = self.process_tags.clone(); + let worker = OtlpStatsExporter { + flush_interval: bucket_size, + concentrator: concentrator.clone(), + config: metrics_config, + resource, + test_token: self.test_session_token.clone(), + capabilities: capabilities.clone(), + }; + let worker_handle = shared_runtime.spawn_worker(worker, false).map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) + })?; + stats = StatsComputationStatus::Enabled { + stats_concentrator: concentrator, + worker_handle, + }; + otlp_stats_enabled = true; + } + Ok(TraceExporter { endpoint: Endpoint { url: agent_url, @@ -480,7 +561,7 @@ impl TraceExporterBuilder { hostname: self.hostname, env: self.env, app_version: self.app_version, - runtime_id: uuid::Uuid::new_v4().to_string(), + runtime_id, service: self.service, }, input_format: self.input_format, @@ -517,6 +598,7 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), otlp_config, + otlp_stats_enabled, }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 070fc754e0..456659b712 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -213,6 +213,11 @@ pub struct TraceExporter, /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. otlp_config: Option, + /// When true, span stats are computed and exported as OTLP metrics. The concentrator is + /// started at build time, so agent-driven stats (de)activation in `check_agent_info` is + /// skipped. + #[cfg_attr(target_arch = "wasm32", allow(dead_code))] + otlp_stats_enabled: bool, } impl TraceExporter { @@ -352,6 +357,11 @@ impl Tra /// Reconcile in-process stats state with the latest agent info. /// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`. async fn check_agent_info(&self) { + // OTLP trace metrics run the concentrator independently of the agent; never let agent + // info enable or disable stats in that mode. + if self.otlp_stats_enabled { + return; + } let Some(agent_info) = agent_info::get_agent_info() else { return; }; From f330f8bee61610755986f34d381901baf957fe45 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 15:32:28 -0400 Subject: [PATCH 5/9] feat(trace-stats)!: key aggregation by gRPC method name Add the gRPC method name to the aggregation key so spans sharing the same service/resource/etc. but different `grpc.method.name` aggregate into distinct groups, and surface the value via the OTLP trace-metrics sidecar introduced earlier on this branch. - `aggregation.rs`: - New `GRPC_METHOD_FIELD` lookup list (`grpc.method.name`, fallback `rpc.method`) consumed by a new `get_grpc_method` helper. - New `FixedAggregationKey.grpc_method` field, appended at the END of the struct so the `PartialOrd` derive's field order (and therefore the ordering of any existing comparisons) is unaffected for the pre-existing fields. - `BorrowedAggregationKey::from_obfuscated_span` now picks up `grpc_method`; `OwnedAggregationKey::From` sets it to `""` (the agent stats protobuf does not carry it). - `StatsBucket::flush_with_otlp_exact` does `std::mem::take` on the key's `grpc_method` and moves it into `OtlpExactGroup.grpc_method` before encoding the agent payload, so the OTLP path reads it from the sidecar while the /v0.6/stats wire format stays byte-for-byte unchanged. - Aggregation test gains a case asserting that `grpc.method.name` (and by fallthrough, `rpc.method`) are extracted into the key. - `datadog-ipc/src/shm_stats.rs`: the SHM concentrator's `FixedAggregationKey` test fixture grows a `grpc_method: ""` field. BREAKING CHANGE: `FixedAggregationKey` (re-exported from `libdd_trace_stats::span_concentrator`) gains a public `grpc_method: T` field. External callers that construct it via a struct literal must add the field; callers using `Default::default()` are unaffected. The /v0.6/stats agent protobuf wire format and behavior are unchanged. Co-Authored-By: Claude Sonnet 4.6 --- datadog-ipc/src/shm_stats.rs | 1 + .../src/span_concentrator/aggregation.rs | 43 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 44941ec667..38c92fb856 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -858,6 +858,7 @@ mod tests { is_synthetics_request: false, is_trace_root: true, grpc_status_code: None, + grpc_method: "", }, peer_tags: &[], duration_ns: dur, diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 0c8bd6e73f..5e6678fc40 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -24,6 +24,7 @@ const GRPC_STATUS_CODE_FIELD: &[&str] = &[ "rpc.grpc.status.code", "grpc.status.code", ]; +const GRPC_METHOD_FIELD: &[&str] = &["grpc.method.name", "rpc.method"]; /// Aggregation key fields shared across all concentrator implementations — everything /// **except** peer tags. @@ -48,6 +49,10 @@ pub struct FixedAggregationKey { pub grpc_status_code: Option, pub is_synthetics_request: bool, pub is_trace_root: bool, + /// gRPC method name (DD schema `grpc.method.name`). Empty when absent. Stored owned/borrowed + /// like the other string fields so it participates in the SHM `StringRef` representation. + /// Appended at the end to preserve `PartialOrd` ordering of the pre-existing fields. + pub grpc_method: T, } impl FixedAggregationKey { @@ -72,6 +77,7 @@ impl FixedAggregationKey { grpc_status_code: self.grpc_status_code, is_synthetics_request: self.is_synthetics_request, is_trace_root: self.is_trace_root, + grpc_method: f(self.grpc_method.borrow()), } } } @@ -152,6 +158,17 @@ fn get_grpc_status_code<'a>(span: &'a impl StatSpan<'a>) -> Option { None } +fn get_grpc_method<'a>(span: &'a impl StatSpan<'a>) -> &'a str { + for key in GRPC_METHOD_FIELD { + if let Some(val) = span.get_meta(key) { + if !val.is_empty() { + return val; + } + } + } + "" +} + fn grpc_status_str_to_int_value(v: &str) -> Option { if let Ok(status) = v.parse() { return Some(status); @@ -251,6 +268,8 @@ impl<'a> BorrowedAggregationKey<'a> { let grpc_status_code = get_grpc_status_code(span); + let grpc_method = get_grpc_method(span); + let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default(); Self { @@ -265,6 +284,7 @@ impl<'a> BorrowedAggregationKey<'a> { service_source, http_status_code: status_code, grpc_status_code, + grpc_method, is_synthetics_request: span .get_meta(TAG_ORIGIN) .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), @@ -291,6 +311,7 @@ impl From for OwnedAggregationKey { grpc_status_code: value.grpc_status_code.parse().ok(), is_synthetics_request: value.synthetics, is_trace_root: value.is_trace_root == 1, + grpc_method: String::new(), }, peer_tags: value .peer_tags @@ -377,8 +398,7 @@ pub struct OtlpExactCell { /// Exact OK/ERROR cells for one aggregation group, in the same order as the `stats` vector /// of the accompanying [`pb::ClientStatsBucket`]. `grpc_method` is the group's gRPC method (DD /// schema `grpc.method.name`) carried out-of-band so it does not appear in the agent stats -/// protobuf wire format. It is initialized empty here; a subsequent change populates it from -/// the aggregation key. +/// protobuf wire format. #[derive(Debug, Clone, Default)] pub struct OtlpExactGroup { pub ok: OtlpExactCell, @@ -437,7 +457,9 @@ impl StatsBucket { pub(super) fn flush_with_otlp_exact(self, bucket_duration: u64) -> OtlpStatsBucket { let mut stats = Vec::with_capacity(self.data.len()); let mut exact = Vec::with_capacity(self.data.len()); - for (k, g) in self.data { + for (mut k, g) in self.data { + // Move grpc_method into the sidecar; the agent-stats path doesn't carry it. + let grpc_method = std::mem::take(&mut k.fixed.grpc_method); exact.push(OtlpExactGroup { ok: OtlpExactCell { count: g.hits.saturating_sub(g.errors), @@ -451,7 +473,7 @@ impl StatsBucket { min_ns: g.error_min, max_ns: g.error_max, }, - grpc_method: String::new(), + grpc_method, }); stats.push(encode_grouped_stats(k, g)); } @@ -815,6 +837,19 @@ mod tests { } .into_key(), ), + // grpc method extracted from `grpc.method.name` (DD schema) or `rpc.method` (OTel). + ( + SpanBytes { + meta: vec![("grpc.method.name".into(), "/pkg.Svc/Method".into())].into(), + ..Default::default() + }, + FixedAggregationKey { + grpc_method: "/pkg.Svc/Method".into(), + is_trace_root: true, + ..Default::default() + } + .into_key(), + ), // Span with grpc status from meta as numeric string ( SpanBytes { From f7eb56380f03e89f3a5cc791eb0d5b41bf124d23 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 19:44:54 -0400 Subject: [PATCH 6/9] feat(data-pipeline): set _dd.stats_computed on OTLP trace exports when SDK computes stats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When otlp_stats_enabled, add _dd.stats_computed="true" to the OTLP ResourceSpans resource attributes and Datadog-Client-Computed-Stats: yes to the HTTP request headers. The Agent's OTLP receiver already checks both signals (otlp.go:372, otlp.go:272) and skips its concentrator when either is set, preventing double-counted APM metrics. The resource attribute survives Collector hops (unlike HTTP headers); the header covers direct SDK→Agent connections. Both are backwards compatible: older Agents and non-Datadog OTLP receivers silently ignore unknown resource attributes and headers. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/trace_exporter/mod.rs | 18 +++++- libdd-trace-utils/src/otlp_encoder/mapper.rs | 57 +++++++++++++++++++ libdd-trace-utils/src/otlp_encoder/mod.rs | 3 + 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 456659b712..11ea247f8e 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -553,6 +553,7 @@ impl Tra r.language = self.metadata.language.clone(); r.tracer_version = self.metadata.tracer_version.clone(); r.runtime_id = self.metadata.runtime_id.clone(); + r.client_computed_stats = self.otlp_stats_enabled; r }; let request = map_traces_to_otlp(traces, &resource_info); @@ -560,9 +561,24 @@ impl Tra error!("OTLP JSON serialization error: {e}"); TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) })?; + // Also set the header: resource attributes survive Collector hops, headers don't. + let effective_config; + let config_to_use = if self.otlp_stats_enabled { + effective_config = { + let mut c = config.clone(); + c.headers.insert( + http::HeaderName::from_static("datadog-client-computed-stats"), + http::HeaderValue::from_static("yes"), + ); + c + }; + &effective_config + } else { + config + }; send_otlp_traces_http( &self.capabilities, - config, + config_to_use, self.endpoint.test_token.as_deref(), json_body, ) diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index 0575c20ccf..a4f71f02a2 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -109,6 +109,14 @@ fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { value: AnyValue::StringValue(resource_info.runtime_id.clone()), }); } + // Tells Datadog Agent OTLP receivers to skip their concentrator; prevents double-counted APM + // metrics. + if resource_info.client_computed_stats { + attributes.push(KeyValue { + key: "_dd.stats_computed".to_string(), + value: AnyValue::StringValue("true".to_string()), + }); + } Resource { attributes } } @@ -699,6 +707,55 @@ mod tests { assert_eq!(spans[2].trace_id, expected); } + #[test] + fn test_stats_computed_resource_attr_set_when_enabled() { + let resource_info = OtlpResourceInfo { + client_computed_stats: true, + ..Default::default() + }; + let span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("s"), + start: 0, + duration: 1, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let resource_attrs = &req.resource_spans[0].resource.as_ref().unwrap().attributes; + let kv = resource_attrs + .iter() + .find(|a| a.key == "_dd.stats_computed") + .expect("_dd.stats_computed must be present when client_computed_stats=true"); + assert!( + matches!(&kv.value, AnyValue::StringValue(s) if s == "true"), + "_dd.stats_computed must be \"true\", got {:?}", + kv.value + ); + } + + #[test] + fn test_stats_computed_resource_attr_absent_when_disabled() { + let resource_info = OtlpResourceInfo { + client_computed_stats: false, + ..Default::default() + }; + let span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("s"), + start: 0, + duration: 1, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let resource_attrs = &req.resource_spans[0].resource.as_ref().unwrap().attributes; + assert!( + !resource_attrs.iter().any(|a| a.key == "_dd.stats_computed"), + "_dd.stats_computed must not be emitted when client_computed_stats=false" + ); + } + #[test] fn test_empty_chunk_does_not_panic() { // Defensive: an empty chunk should produce no spans and not panic. diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs index 4771995c9d..02623cbc66 100644 --- a/libdd-trace-utils/src/otlp_encoder/mod.rs +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -24,4 +24,7 @@ pub struct OtlpResourceInfo { pub runtime_id: String, pub hostname: String, pub process_tags: String, + /// When true, emits `_dd.stats_computed: "true"` on the OTLP resource to prevent + /// double-counted APM metrics in Datadog Agent OTLP receivers (backwards compatible). + pub client_computed_stats: bool, } From 9cd1ad527005b5ff6350582134d24e275fd6c83d Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 20:19:51 -0400 Subject: [PATCH 7/9] fix(trace-stats): move grpc_method out of aggregation key; restore V1 negotiation with OTLP stats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit grpc_method was part of FixedAggregationKey (Hash+PartialEq), splitting same-service gRPC spans into separate buckets that encode_grouped_stats then serialised with an empty method — producing duplicate indistinguishable ClientGroupedStats rows on the /v0.6/stats path. Move it to GroupedStats (value side), set on group creation, and surface it to OtlpExactGroup from there. This also removes the one breaking change introduced by the prior commit. check_agent_info returned before refresh_v1_active when otlp_stats_enabled, preventing V1 protocol negotiation for exporters that combine enable_v1_protocol with OTLP metrics. Move the early return to after the V1 refresh so only stats enable/disable is skipped. Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/trace_exporter/mod.rs | 10 +++---- .../src/span_concentrator/aggregation.rs | 28 ++++++++----------- .../src/span_concentrator/mod.rs | 3 +- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 11ea247f8e..163189e8f4 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -357,11 +357,6 @@ impl Tra /// Reconcile in-process stats state with the latest agent info. /// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`. async fn check_agent_info(&self) { - // OTLP trace metrics run the concentrator independently of the agent; never let agent - // info enable or disable stats in that mode. - if self.otlp_stats_enabled { - return; - } let Some(agent_info) = agent_info::get_agent_info() else { return; }; @@ -373,6 +368,11 @@ impl Tra self.refresh_v1_active(&agent_info); } + // OTLP trace metrics run the concentrator independently; skip stats enable/disable. + if self.otlp_stats_enabled { + return; + } + // load_full() avoids holding an ArcSwap Guard (!Send) across .await. let status = self.client_side_stats.status.load_full(); match &*status { diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 5e6678fc40..cc5f82bdab 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -49,10 +49,6 @@ pub struct FixedAggregationKey { pub grpc_status_code: Option, pub is_synthetics_request: bool, pub is_trace_root: bool, - /// gRPC method name (DD schema `grpc.method.name`). Empty when absent. Stored owned/borrowed - /// like the other string fields so it participates in the SHM `StringRef` representation. - /// Appended at the end to preserve `PartialOrd` ordering of the pre-existing fields. - pub grpc_method: T, } impl FixedAggregationKey { @@ -77,7 +73,6 @@ impl FixedAggregationKey { grpc_status_code: self.grpc_status_code, is_synthetics_request: self.is_synthetics_request, is_trace_root: self.is_trace_root, - grpc_method: f(self.grpc_method.borrow()), } } } @@ -158,7 +153,7 @@ fn get_grpc_status_code<'a>(span: &'a impl StatSpan<'a>) -> Option { None } -fn get_grpc_method<'a>(span: &'a impl StatSpan<'a>) -> &'a str { +pub(super) fn get_grpc_method<'a>(span: &'a impl StatSpan<'a>) -> &'a str { for key in GRPC_METHOD_FIELD { if let Some(val) = span.get_meta(key) { if !val.is_empty() { @@ -268,8 +263,6 @@ impl<'a> BorrowedAggregationKey<'a> { let grpc_status_code = get_grpc_status_code(span); - let grpc_method = get_grpc_method(span); - let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default(); Self { @@ -284,7 +277,6 @@ impl<'a> BorrowedAggregationKey<'a> { service_source, http_status_code: status_code, grpc_status_code, - grpc_method, is_synthetics_request: span .get_meta(TAG_ORIGIN) .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), @@ -311,7 +303,6 @@ impl From for OwnedAggregationKey { grpc_status_code: value.grpc_status_code.parse().ok(), is_synthetics_request: value.synthetics, is_trace_root: value.is_trace_root == 1, - grpc_method: String::new(), }, peer_tags: value .peer_tags @@ -353,6 +344,9 @@ pub(super) struct GroupedStats { error_duration: u64, error_min: u64, error_max: u64, + // gRPC method for OTLP export only; not part of the aggregation key so agent stats are + // unaffected. + pub(super) grpc_method: String, } impl GroupedStats { @@ -439,10 +433,14 @@ impl StatsBucket { duration: i64, is_error: bool, is_top_level: bool, + grpc_method: &str, ) { self.data .entry_ref(&key) - .or_default() + .or_insert_with(|| GroupedStats { + grpc_method: grpc_method.to_owned(), + ..Default::default() + }) .insert(duration, is_error, is_top_level); } @@ -457,9 +455,8 @@ impl StatsBucket { pub(super) fn flush_with_otlp_exact(self, bucket_duration: u64) -> OtlpStatsBucket { let mut stats = Vec::with_capacity(self.data.len()); let mut exact = Vec::with_capacity(self.data.len()); - for (mut k, g) in self.data { - // Move grpc_method into the sidecar; the agent-stats path doesn't carry it. - let grpc_method = std::mem::take(&mut k.fixed.grpc_method); + for (k, mut g) in self.data { + let grpc_method = std::mem::take(&mut g.grpc_method); exact.push(OtlpExactGroup { ok: OtlpExactCell { count: g.hits.saturating_sub(g.errors), @@ -837,14 +834,13 @@ mod tests { } .into_key(), ), - // grpc method extracted from `grpc.method.name` (DD schema) or `rpc.method` (OTel). + // grpc.method.name is carried in GroupedStats (for OTLP), not in the aggregation key. ( SpanBytes { meta: vec![("grpc.method.name".into(), "/pkg.Svc/Method".into())].into(), ..Default::default() }, FixedAggregationKey { - grpc_method: "/pkg.Svc/Method".into(), is_trace_root: true, ..Default::default() } diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index fb0f4fc4d6..636e87744d 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -9,7 +9,7 @@ use libdd_trace_protobuf::pb; use aggregation::StatsBucket; mod aggregation; -use aggregation::BorrowedAggregationKey; +use aggregation::{get_grpc_method, BorrowedAggregationKey}; pub use aggregation::{FixedAggregationKey, OtlpExactCell, OtlpExactGroup, OtlpStatsBucket}; pub mod stat_span; @@ -184,6 +184,7 @@ impl SpanConcentrator { span.duration(), span.is_error(), span.has_top_level(), + get_grpc_method(span), ); } From d9c5f560ade979cf5af34eadc29a6032154c2749 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 15 Jun 2026 20:19:51 -0400 Subject: [PATCH 8/9] fix(trace-stats): move grpc_method out of aggregation key; restore V1 negotiation with OTLP stats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit grpc_method was part of FixedAggregationKey (Hash+PartialEq), splitting same-service gRPC spans into separate buckets that encode_grouped_stats then serialised with an empty method — producing duplicate indistinguishable ClientGroupedStats rows on the /v0.6/stats path. Move it to GroupedStats (value side), set on group creation, and surface it to OtlpExactGroup from there. This also removes the one breaking change introduced by the prior commit. check_agent_info returned before refresh_v1_active when otlp_stats_enabled, preventing V1 protocol negotiation for exporters that combine enable_v1_protocol with OTLP metrics. Move the early return to after the V1 refresh so only stats enable/disable is skipped. Co-Authored-By: Claude Sonnet 4.6 --- datadog-ipc/src/shm_stats.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 38c92fb856..44941ec667 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -858,7 +858,6 @@ mod tests { is_synthetics_request: false, is_trace_root: true, grpc_status_code: None, - grpc_method: "", }, peer_tags: &[], duration_ns: dur, From 3bd37737c4ab03138e3497cd340d0b79f674e346 Mon Sep 17 00:00:00 2001 From: Munir Date: Thu, 18 Jun 2026 15:13:26 -0400 Subject: [PATCH 9/9] fix(otlp): address reviewer feedback on attempts naming and runtime_id - Rename OTLP_MAX_ATTEMPTS/OTLP_SHUTDOWN_MAX_ATTEMPTS to OTLP_MAX_RETRIES/ OTLP_SHUTDOWN_MAX_RETRIES and rename the max_attempts parameter to max_retries throughout, converging on the retries convention used elsewhere - Add TraceExporterBuilder::set_runtime_id so callers can supply the language tracer's existing runtime_id; falls back to a generated UUID when not set, ensuring OTLP trace exports and OTLP trace-metrics share the same runtime_id for backend correlation Co-Authored-By: Claude Sonnet 4.6 --- libdd-data-pipeline/src/otlp/exporter.rs | 16 +++++++--------- libdd-data-pipeline/src/otlp/metrics.rs | 10 +++++----- .../src/trace_exporter/builder.rs | 14 +++++++++++++- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 2b36f2d297..31901f3251 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -13,10 +13,10 @@ use libdd_trace_utils::send_with_retry::{ }; use std::time::Duration; -/// Max total attempts for OTLP export (initial + retries on transient failures). -pub(crate) const OTLP_MAX_ATTEMPTS: u32 = 5; -/// Single attempt with no retries, used on shutdown to avoid a long backoff in the shutdown window. -pub(crate) const OTLP_SHUTDOWN_MAX_ATTEMPTS: u32 = 1; +/// Max retries for OTLP export on transient failures. +pub(crate) const OTLP_MAX_RETRIES: u32 = 4; +/// No retries on shutdown to avoid a long backoff in the shutdown window. +pub(crate) const OTLP_SHUTDOWN_MAX_RETRIES: u32 = 0; const OTLP_RETRY_DELAY_MS: u64 = 100; /// POST an OTLP HTTP/JSON payload to `endpoint_url`; `test_token` enables snapshot tests. @@ -27,7 +27,7 @@ pub(crate) async fn send_otlp_http( timeout: Duration, test_token: Option<&str>, json_body: Vec, - max_attempts: u32, + max_retries: u32, ) -> Result<(), TraceExporterError> { let url = libdd_common::parse_uri(endpoint_url).map_err(|e| { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( @@ -56,10 +56,8 @@ pub(crate) async fn send_otlp_http( } } - // `RetryStrategy` counts *retries*, and performs `max_retries + 1` total attempts. Convert the - // attempt budget accordingly so `max_attempts == 1` means a single try with no retries. let retry_strategy = RetryStrategy::new( - max_attempts.saturating_sub(1), + max_retries, OTLP_RETRY_DELAY_MS, RetryBackoffType::Exponential, None, @@ -90,7 +88,7 @@ pub async fn send_otlp_traces_http( config.timeout, test_token, json_body, - OTLP_MAX_ATTEMPTS, + OTLP_MAX_RETRIES, ) .await } diff --git a/libdd-data-pipeline/src/otlp/metrics.rs b/libdd-data-pipeline/src/otlp/metrics.rs index a5e16e8417..3e05d53e19 100644 --- a/libdd-data-pipeline/src/otlp/metrics.rs +++ b/libdd-data-pipeline/src/otlp/metrics.rs @@ -5,7 +5,7 @@ //! DDSketch summaries from the span concentrator are bucketed into fixed explicit bounds (seconds). use super::config::OtlpMetricsConfig; -use super::exporter::{send_otlp_http, OTLP_MAX_ATTEMPTS, OTLP_SHUTDOWN_MAX_ATTEMPTS}; +use super::exporter::{send_otlp_http, OTLP_MAX_RETRIES, OTLP_SHUTDOWN_MAX_RETRIES}; use async_trait::async_trait; use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; use libdd_ddsketch::DDSketch; @@ -227,7 +227,7 @@ pub struct OtlpStatsExporter { impl OtlpStatsExporter { /// Flush the concentrator and export stats; returns `Ok(true)` if anything was sent. - async fn send(&self, force_flush: bool, max_attempts: u32) -> anyhow::Result { + async fn send(&self, force_flush: bool, max_retries: u32) -> anyhow::Result { let buckets = { #[allow(clippy::unwrap_used)] let mut c = self.concentrator.lock().unwrap(); @@ -250,7 +250,7 @@ impl OtlpStatsExporter { self.config.timeout, self.test_token.as_deref(), serde_json::to_vec(&request)?, - max_attempts, + max_retries, ) .await?; Ok(true) @@ -267,14 +267,14 @@ impl Wor } async fn run(&mut self) { - if let Err(e) = self.send(false, OTLP_MAX_ATTEMPTS).await { + if let Err(e) = self.send(false, OTLP_MAX_RETRIES).await { error!(?e, "Error exporting OTLP trace metrics"); } } async fn shutdown(&mut self) { // Single attempt: a long backoff could miss the bounded shutdown window. - if let Err(e) = self.send(true, OTLP_SHUTDOWN_MAX_ATTEMPTS).await { + if let Err(e) = self.send(true, OTLP_SHUTDOWN_MAX_RETRIES).await { error!(?e, "Error exporting OTLP trace metrics on shutdown"); } } diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 39a7391acc..713341071f 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -86,6 +86,7 @@ pub struct TraceExporterBuilder { otlp_metrics_endpoint: Option, otlp_metrics_headers: Vec<(String, String)>, otel_trace_semantics_enabled: bool, + runtime_id: Option, } impl TraceExporterBuilder { @@ -335,6 +336,15 @@ impl TraceExporterBuilder { self } + /// Set the runtime identifier supplied by the language tracer. + /// + /// When set, this ID is reused for both OTLP trace exports and OTLP trace-metrics so that all + /// signals can be correlated by the backend. If not set, a fresh UUID is generated. + pub fn set_runtime_id(&mut self, id: &str) -> &mut Self { + self.runtime_id = Some(id.to_owned()); + self + } + /// Build the [`TraceExporter`] synchronously. /// /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. @@ -488,7 +498,9 @@ impl TraceExporterBuilder { otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, }); - let runtime_id = uuid::Uuid::new_v4().to_string(); + let runtime_id = self + .runtime_id + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); // OTLP metrics + stats bucket size: start the concentrator unconditionally (bypass the // agent gate) so `check_agent_info` cannot later disable stats.