diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 02d7a45f80..f20c5c60e7 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -36,3 +36,20 @@ pub struct OtlpTraceConfig { #[allow(dead_code)] pub(crate) protocol: OtlpProtocol, } + +/// Parsed OTLP trace-metrics exporter configuration. +#[derive(Clone, Debug)] +pub struct OtlpMetricsConfig { + /// Full URL to POST metrics to (e.g. `http://localhost:4318/v1/metrics`). + pub endpoint_url: String, + /// Pre-validated HTTP headers to include in each request. + pub headers: HeaderMap, + /// Request timeout. + pub timeout: Duration, + /// Protocol (for future use; currently only HttpJson is supported). + #[allow(dead_code)] + pub(crate) protocol: OtlpProtocol, + /// When `true`, emit only OTel attributes; omit `dd.*`/`_dd.*` ones + /// (`DD_TRACE_OTEL_SEMANTICS_ENABLED`). + pub otel_trace_semantics_enabled: bool, +} diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 1f4d86a235..31901f3251 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -5,30 +5,31 @@ use super::config::OtlpTraceConfig; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; +use http::HeaderMap; use libdd_capabilities::{HttpClientCapability, SleepCapability}; use libdd_common::Endpoint; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, }; +use std::time::Duration; -/// Max retries for OTLP export. -const OTLP_MAX_RETRIES: u32 = 4; -/// Initial backoff between retries (milliseconds). +/// Max retries for OTLP export on transient failures. +pub(crate) const OTLP_MAX_RETRIES: u32 = 4; +/// No retries on shutdown to avoid a long backoff in the shutdown window. +pub(crate) const OTLP_SHUTDOWN_MAX_RETRIES: u32 = 0; const OTLP_RETRY_DELAY_MS: u64 = 100; -/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. -/// -/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. -/// -/// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests -/// against the Datadog test agent's OTLP endpoint. -pub async fn send_otlp_traces_http( +/// POST an OTLP HTTP/JSON payload to `endpoint_url`; `test_token` enables snapshot tests. +pub(crate) async fn send_otlp_http( capabilities: &C, - config: &OtlpTraceConfig, + endpoint_url: &str, + config_headers: &HeaderMap, + timeout: Duration, test_token: Option<&str>, json_body: Vec, + max_retries: u32, ) -> Result<(), TraceExporterError> { - let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { + let url = libdd_common::parse_uri(endpoint_url).map_err(|e| { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( "Invalid OTLP endpoint URL: {}", e @@ -37,11 +38,11 @@ pub async fn send_otlp_traces_http( let target = Endpoint { url, - timeout_ms: config.timeout.as_millis() as u64, + timeout_ms: timeout.as_millis() as u64, ..Endpoint::default() }; - let mut headers = config.headers.clone(); + let mut headers = config_headers.clone(); headers.insert( http::header::CONTENT_TYPE, libdd_common::header::APPLICATION_JSON, @@ -56,7 +57,7 @@ pub async fn send_otlp_traces_http( } let retry_strategy = RetryStrategy::new( - OTLP_MAX_RETRIES, + max_retries, OTLP_RETRY_DELAY_MS, RetryBackoffType::Exponential, None, @@ -68,6 +69,30 @@ pub async fn send_otlp_traces_http( } } +/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. +/// +/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. +/// +/// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests +/// against the Datadog test agent's OTLP endpoint. +pub async fn send_otlp_traces_http( + capabilities: &C, + config: &OtlpTraceConfig, + test_token: Option<&str>, + json_body: Vec, +) -> Result<(), TraceExporterError> { + send_otlp_http( + capabilities, + &config.endpoint_url, + &config.headers, + config.timeout, + test_token, + json_body, + OTLP_MAX_RETRIES, + ) + .await +} + async fn map_send_error(err: SendWithRetryError) -> TraceExporterError { match err { SendWithRetryError::Http(response, _) => { diff --git a/libdd-data-pipeline/src/otlp/metrics.rs b/libdd-data-pipeline/src/otlp/metrics.rs new file mode 100644 index 0000000000..3e05d53e19 --- /dev/null +++ b/libdd-data-pipeline/src/otlp/metrics.rs @@ -0,0 +1,546 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Maps client-computed span stats into `traces.span.sdk.metrics.duration` OTLP histograms. +//! DDSketch summaries from the span concentrator are bucketed into fixed explicit bounds (seconds). + +use super::config::OtlpMetricsConfig; +use super::exporter::{send_otlp_http, OTLP_MAX_RETRIES, OTLP_SHUTDOWN_MAX_RETRIES}; +use async_trait::async_trait; +use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_ddsketch::DDSketch; +use libdd_shared_runtime::Worker; +use libdd_trace_protobuf::pb; +use libdd_trace_stats::span_concentrator::{OtlpStatsBucket, SpanConcentrator}; +use libdd_trace_utils::otlp_encoder::json_types::status_code; +use libdd_trace_utils::otlp_encoder::OtlpResourceInfo; +use serde_json::{json, Value}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; +use tracing::error; + +const METRIC_NAME: &str = "traces.span.sdk.metrics.duration"; +const NANOS_PER_SECOND: f64 = 1_000_000_000.0; +/// Fixed bucket boundaries (seconds) mirroring the OTel spanmetrics-connector defaults. +const EXPLICIT_BOUNDS_SECONDS: [f64; 16] = [ + 0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.0, 1.4, 2.0, 5.0, 10.0, 15.0, +]; + +fn kv_str(key: &str, value: &str) -> Value { + json!({ "key": key, "value": { "stringValue": value } }) +} +fn kv_int(key: &str, value: i64) -> Value { + json!({ "key": key, "value": { "intValue": value.to_string() } }) +} + +/// Build an OTLP metrics export request (`ExportMetricsServiceRequest`) as a JSON value. +/// +/// Emits one histogram data point per (aggregation key, ok/error) cell with exact `count`, +/// `sum`, `min` and `max` (the explicit-bucket distribution is still projected from the +/// per-cell DDSketch). Returns `None` when no cell yields a data point. +pub fn map_stats_to_otlp_metrics( + buckets: &[OtlpStatsBucket], + resource_info: &OtlpResourceInfo, + otel_trace_semantics_enabled: bool, +) -> Option { + let mut data_points = Vec::new(); + for b in buckets { + let end = b.bucket.start.saturating_add(b.bucket.duration); + for (group, exact) in b.bucket.stats.iter().zip(b.exact.iter()) { + for (is_error, summary, cell) in [ + (false, &group.ok_summary, &exact.ok), + (true, &group.error_summary, &exact.error), + ] { + if cell.count == 0 { + continue; + } + let Some(sketch) = DDSketch::from_encoded(summary) else { + continue; + }; + data_points.push(json!({ + "attributes": build_attributes(group, &exact.grpc_method, is_error, resource_info, otel_trace_semantics_enabled), + "startTimeUnixNano": b.bucket.start.to_string(), + "timeUnixNano": end.to_string(), + "count": cell.count.to_string(), + "sum": ns_to_s(cell.duration_ns), + "bucketCounts": sketch_bucket_counts(&sketch), + "explicitBounds": EXPLICIT_BOUNDS_SECONDS, + "min": ns_to_s(cell.min_ns), + "max": ns_to_s(cell.max_ns), + })); + } + } + } + if data_points.is_empty() { + return None; + } + Some(json!({ + "resourceMetrics": [{ + "resource": { "attributes": build_resource_attributes(resource_info, otel_trace_semantics_enabled) }, + "scopeMetrics": [{ + "metrics": [{ + "name": METRIC_NAME, + "unit": "s", + "histogram": { + // OTLP AggregationTemporality::Delta (each export covers only the interval). + "aggregationTemporality": 1, + "dataPoints": data_points, + } + }] + }] + }] + })) +} + +fn ns_to_s(ns: u64) -> f64 { + ns as f64 / NANOS_PER_SECOND +} + +/// Project the sketch's bins onto [`EXPLICIT_BOUNDS_SECONDS`] (one bucket per boundary plus a +/// trailing overflow bucket). The exact `count`/`sum`/`min`/`max` come from the concentrator's +/// per-cell accumulators; this function only shapes the distribution. +fn sketch_bucket_counts(sketch: &DDSketch) -> Vec { + let mut weights = vec![0.0_f64; EXPLICIT_BOUNDS_SECONDS.len() + 1]; + for (value, weight) in sketch.ordered_bins() { + if weight <= 0.0 { + continue; + } + let seconds = value / NANOS_PER_SECOND; + let idx = EXPLICIT_BOUNDS_SECONDS + .iter() + .position(|b| seconds <= *b) + .unwrap_or(EXPLICIT_BOUNDS_SECONDS.len()); + weights[idx] += weight; + } + weights + .iter() + .map(|w| (w.round() as u64).to_string()) + .collect() +} + +fn build_attributes( + group: &pb::ClientGroupedStats, + grpc_method: &str, + is_error: bool, + resource_info: &OtlpResourceInfo, + otel_trace_semantics_enabled: bool, +) -> Vec { + let mut attrs = Vec::new(); + let mut push = |k: &str, v: &str| { + if !v.is_empty() { + attrs.push(kv_str(k, v)); + } + }; + + // Service identity is on the resource; emit on the data point only when overridden. + let group_service = if group.service.is_empty() { + resource_info.service.as_str() + } else { + group.service.as_str() + }; + if group_service != resource_info.service { + push("service.name", group_service); + } + + push("span.name", &group.resource); + push("span.kind", &group.span_kind); + push("http.request.method", &group.http_method); + push("http.route", &group.http_endpoint); + push("rpc.method", grpc_method); + push("rpc.response.status_code", &group.grpc_status_code); + for tag in &group.peer_tags { + if let Some((k, v)) = tag.split_once(':') { + push(k, v); + } + } + if !otel_trace_semantics_enabled { + push("datadog.operation.name", &group.name); + push("datadog.span.type", &group.r#type); + } + if group.http_status_code != 0 { + attrs.push(kv_int( + "http.response.status_code", + group.http_status_code as i64, + )); + } + if !otel_trace_semantics_enabled { + // Only `synthetics` is surfaced as `datadog.origin`: the aggregation key carries just a + // boolean, not the full origin string, so other origins are lost upstream. + if group.synthetics { + attrs.push(kv_str("datadog.origin", "synthetics")); + } + if group.is_trace_root == pb::Trilean::True as i32 { + attrs.push(json!({ "key": "_datadog.is_trace_root", "value": { "boolValue": true } })); + } + let top_level = group.hits > 0 && group.top_level_hits == group.hits; + attrs.push(json!({ + "key": "datadog.span.top_level", "value": { "boolValue": top_level } + })); + } + if is_error { + attrs.push(kv_int("status.code", status_code::ERROR as i64)); + } + attrs +} + +fn build_resource_attributes( + info: &OtlpResourceInfo, + otel_trace_semantics_enabled: bool, +) -> Vec { + let mut attrs: Vec = [ + ("service.name", info.service.as_str()), + ("service.version", info.app_version.as_str()), + ("deployment.environment.name", info.env.as_str()), + ("host.name", info.hostname.as_str()), + ("telemetry.sdk.name", "datadog"), + ("telemetry.sdk.language", info.language.as_str()), + ("telemetry.sdk.version", info.tracer_version.as_str()), + ] + .into_iter() + .filter(|(_, v)| !v.is_empty()) + .map(|(k, v)| kv_str(k, v)) + .collect(); + + if !otel_trace_semantics_enabled { + if !info.runtime_id.is_empty() { + attrs.push(kv_str("datadog.runtime_id", &info.runtime_id)); + } + attrs.extend(info.process_tags.split(',').filter_map(|p| { + let (k, v) = p.split_once(':')?; + let (k, v) = (k.trim(), v.trim()); + (!k.is_empty() && !v.is_empty()).then(|| kv_str(&format!("datadog.{k}"), v)) + })); + } + attrs +} + +/// Worker that periodically flushes the span concentrator and exports OTLP trace metrics. +#[derive(Debug)] +pub struct OtlpStatsExporter { + pub flush_interval: Duration, + pub concentrator: Arc>, + pub config: OtlpMetricsConfig, + pub resource: OtlpResourceInfo, + pub test_token: Option, + pub capabilities: C, +} + +impl OtlpStatsExporter { + /// Flush the concentrator and export stats; returns `Ok(true)` if anything was sent. + async fn send(&self, force_flush: bool, max_retries: u32) -> anyhow::Result { + let buckets = { + #[allow(clippy::unwrap_used)] + let mut c = self.concentrator.lock().unwrap(); + c.flush_with_otlp_exact(SystemTime::now(), force_flush) + }; + if buckets.is_empty() { + return Ok(false); + } + let Some(request) = map_stats_to_otlp_metrics( + &buckets, + &self.resource, + self.config.otel_trace_semantics_enabled, + ) else { + return Ok(false); + }; + send_otlp_http( + &self.capabilities, + &self.config.endpoint_url, + &self.config.headers, + self.config.timeout, + self.test_token.as_deref(), + serde_json::to_vec(&request)?, + max_retries, + ) + .await?; + Ok(true) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Worker + for OtlpStatsExporter +{ + async fn trigger(&mut self) { + self.capabilities.sleep(self.flush_interval).await; + } + + async fn run(&mut self) { + if let Err(e) = self.send(false, OTLP_MAX_RETRIES).await { + error!(?e, "Error exporting OTLP trace metrics"); + } + } + + async fn shutdown(&mut self) { + // Single attempt: a long backoff could miss the bounded shutdown window. + if let Err(e) = self.send(true, OTLP_SHUTDOWN_MAX_RETRIES).await { + error!(?e, "Error exporting OTLP trace metrics on shutdown"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_stats::span_concentrator::{OtlpExactCell, OtlpExactGroup}; + + const T_START: u64 = 12_340_000_000_000; + const T_SIZE: u64 = 10_000_000_000; + + fn sketch(points: &[f64]) -> Vec { + let mut s = DDSketch::default(); + for p in points { + s.add(*p).unwrap(); + } + s.encode_to_vec() + } + + fn resource() -> OtlpResourceInfo { + let mut r = OtlpResourceInfo::default(); + r.service = "svc".to_string(); + r.env = "test".to_string(); + r + } + + fn cell(durations_ns: &[u64]) -> OtlpExactCell { + OtlpExactCell { + count: durations_ns.len() as u64, + duration_ns: durations_ns.iter().sum(), + min_ns: durations_ns.iter().copied().min().unwrap_or(0), + max_ns: durations_ns.iter().copied().max().unwrap_or(0), + } + } + + /// Build one (pb group, exact group) cell pair. `customize` lets each test tweak the pb group. + fn group_with_exact( + ok_ns: &[u64], + err_ns: &[u64], + customize: impl FnOnce(&mut pb::ClientGroupedStats), + ) -> (pb::ClientGroupedStats, OtlpExactGroup) { + let hits = (ok_ns.len() + err_ns.len()) as u64; + let to_f64s = |xs: &[u64]| -> Vec { xs.iter().map(|&x| x as f64).collect() }; + let mut g = pb::ClientGroupedStats { + service: "svc".into(), + name: "test.op".into(), + resource: "GET /foo".into(), + r#type: "web".into(), + hits, + errors: err_ns.len() as u64, + top_level_hits: hits, + ok_summary: if ok_ns.is_empty() { + Vec::new() + } else { + sketch(&to_f64s(ok_ns)) + }, + error_summary: if err_ns.is_empty() { + Vec::new() + } else { + sketch(&to_f64s(err_ns)) + }, + ..Default::default() + }; + customize(&mut g); + ( + g, + OtlpExactGroup { + ok: cell(ok_ns), + error: cell(err_ns), + grpc_method: String::new(), + }, + ) + } + + fn group_pair_with_grpc( + ok_ns: &[u64], + err_ns: &[u64], + grpc_method: &str, + customize: impl FnOnce(&mut pb::ClientGroupedStats), + ) -> (pb::ClientGroupedStats, OtlpExactGroup) { + let (g, mut e) = group_with_exact(ok_ns, err_ns, customize); + e.grpc_method = grpc_method.into(); + (g, e) + } + + fn buckets(groups: Vec<(pb::ClientGroupedStats, OtlpExactGroup)>) -> Vec { + let (stats, exact): (Vec<_>, Vec<_>) = groups.into_iter().unzip(); + vec![OtlpStatsBucket { + bucket: pb::ClientStatsBucket { + start: T_START, + duration: T_SIZE, + stats, + agent_time_shift: 0, + }, + exact, + }] + } + + /// Single-cell default group with a 1s ok span. + fn one_ok_group() -> (pb::ClientGroupedStats, OtlpExactGroup) { + group_with_exact(&[1_000_000_000], &[], |_| {}) + } + + fn metric(req: &Value) -> &Value { + &req["resourceMetrics"][0]["scopeMetrics"][0]["metrics"][0] + } + + fn points(req: &Value) -> &Vec { + metric(req)["histogram"]["dataPoints"].as_array().unwrap() + } + + fn resource_attrs(req: &Value) -> &Vec { + req["resourceMetrics"][0]["resource"]["attributes"] + .as_array() + .unwrap() + } + + fn str_at<'a>(attrs: &'a [Value], key: &str) -> Option<&'a str> { + attrs + .iter() + .find(|kv| kv["key"] == key) + .and_then(|kv| kv["value"]["stringValue"].as_str()) + } + + fn err_code() -> String { + (status_code::ERROR as i64).to_string() + } + + fn is_error_point(p: &Value) -> bool { + let ec = err_code(); + p["attributes"] + .as_array() + .unwrap() + .iter() + .any(|kv| kv["key"] == "status.code" && kv["value"]["intValue"].as_str() == Some(&ec)) + } + + #[test] + fn metric_shape_and_resource_attributes() { + assert!(map_stats_to_otlp_metrics(&[], &resource(), false).is_none()); + let mut r = resource(); + r.app_version = "1.2.3".to_string(); + r.hostname = "my-host".to_string(); + r.runtime_id = "abc-123".to_string(); + r.process_tags = "entrypoint.name:server".to_string(); + for otel in [false, true] { + let req = map_stats_to_otlp_metrics(&buckets(vec![one_ok_group()]), &r, otel).unwrap(); + let m = metric(&req); + assert_eq!(m["name"], "traces.span.sdk.metrics.duration"); + assert_eq!(m["unit"], "s"); + assert_eq!(m["histogram"]["aggregationTemporality"], 1); + assert!(req["resourceMetrics"][0]["scopeMetrics"][0]["scope"].is_null()); + let a = resource_attrs(&req); + assert_eq!(str_at(a, "service.name"), Some("svc")); + assert_eq!(str_at(a, "service.version"), Some("1.2.3")); + assert_eq!(str_at(a, "deployment.environment.name"), Some("test")); + assert_eq!(str_at(a, "host.name"), Some("my-host")); + assert_eq!(str_at(a, "telemetry.sdk.name"), Some("datadog")); + let dd = !otel; + assert_eq!(str_at(a, "datadog.runtime_id").is_some(), dd); + assert_eq!(str_at(a, "datadog.entrypoint.name").is_some(), dd); + } + } + + #[test] + fn data_point_attributes_and_otel_strip() { + let g_pair = group_pair_with_grpc(&[1_000_000_000], &[], "/pkg.Svc/Method", |g| { + g.http_status_code = 404; + g.http_method = "POST".into(); + g.http_endpoint = "/users/:id".into(); + g.synthetics = true; + }); + let custom_pair = group_with_exact(&[1_000_000_000], &[], |g| { + g.service = "svc-other".into(); + }); + let bs = buckets(vec![g_pair.clone(), custom_pair]); + + let req = map_stats_to_otlp_metrics(&bs, &resource(), false).unwrap(); + let pts = points(&req); + let a = pts + .iter() + .find(|p| str_at(p["attributes"].as_array().unwrap(), "service.name").is_none()) + .unwrap()["attributes"] + .as_array() + .unwrap(); + assert_eq!(str_at(a, "span.name"), Some("GET /foo")); + assert_eq!(str_at(a, "http.request.method"), Some("POST")); + assert_eq!(str_at(a, "http.route"), Some("/users/:id")); + assert!(a.iter().any(|kv| kv["key"] == "http.response.status_code")); + assert_eq!(str_at(a, "rpc.method"), Some("/pkg.Svc/Method")); + assert_eq!(str_at(a, "datadog.operation.name"), Some("test.op")); + assert_eq!(str_at(a, "datadog.span.type"), Some("web")); + assert_eq!(str_at(a, "datadog.origin"), Some("synthetics")); + assert!(a.iter().any(|kv| kv["key"] == "datadog.span.top_level")); + assert!(pts.iter().any( + |p| str_at(p["attributes"].as_array().unwrap(), "service.name") == Some("svc-other") + )); + + // OTel mode strips datadog.*/_datadog.* attributes. + let req = map_stats_to_otlp_metrics(&buckets(vec![g_pair]), &resource(), true).unwrap(); + let a = points(&req)[0]["attributes"].as_array().unwrap(); + assert!(!a.iter().any(|kv| { + let k = kv["key"].as_str().unwrap_or(""); + k.starts_with("datadog.") || k.starts_with("_datadog.") + })); + assert_eq!(str_at(a, "http.request.method"), Some("POST")); + } + + #[test] + fn histogram_values_are_exact_and_distribution_uses_sketch() { + // Single 1s ok span: count/sum/min/max all exact, distribution shaped by the sketch. + let req = + map_stats_to_otlp_metrics(&buckets(vec![one_ok_group()]), &resource(), false).unwrap(); + let p = &points(&req)[0]; + assert_eq!(p["count"], "1"); + assert_eq!(p["sum"].as_f64().unwrap(), 1.0); + assert_eq!(p["min"].as_f64().unwrap(), 1.0); + assert_eq!(p["max"].as_f64().unwrap(), 1.0); + assert_eq!(p["startTimeUnixNano"], T_START.to_string()); + assert_eq!(p["timeUnixNano"], (T_START + T_SIZE).to_string()); + assert_eq!(p["explicitBounds"], json!(EXPLICIT_BOUNDS_SECONDS)); + assert_eq!( + p["bucketCounts"].as_array().unwrap().len(), + EXPLICIT_BOUNDS_SECONDS.len() + 1 + ); + + // 3ms, 300ms, 3s land in three distinct buckets; exact sum = 3.303s. + let g = group_with_exact(&[3_000_000, 300_000_000, 3_000_000_000], &[], |_| {}); + let req = map_stats_to_otlp_metrics(&buckets(vec![g]), &resource(), false).unwrap(); + let p = &points(&req)[0]; + assert_eq!(p["count"], "3"); + assert_eq!(p["sum"].as_f64().unwrap(), ns_to_s(3_303_000_000)); + assert_eq!(p["min"].as_f64().unwrap(), ns_to_s(3_000_000)); + assert_eq!(p["max"].as_f64().unwrap(), ns_to_s(3_000_000_000)); + let counts = p["bucketCounts"].as_array().unwrap(); + assert_eq!(counts.iter().filter(|c| c.as_str() != Some("0")).count(), 3); + } + + #[test] + fn mixed_ok_and_error_have_exact_independent_sums() { + // 2 ok spans + 1 error span. ok_sum + error_sum must equal the combined group duration. + let ok = [200_000_000_u64, 300_000_000]; + let err = [700_000_000_u64]; + let combined_ns = ok.iter().sum::() + err.iter().sum::(); + let g = group_with_exact(&ok, &err, |_| {}); + let req = map_stats_to_otlp_metrics(&buckets(vec![g]), &resource(), false).unwrap(); + let pts = points(&req); + assert_eq!(pts.len(), 2); + let ok_pt = pts.iter().find(|p| !is_error_point(p)).unwrap(); + let err_pt = pts.iter().find(|p| is_error_point(p)).unwrap(); + + // Each cell's sum is exact and independent of the other. + assert_eq!(ok_pt["count"], "2"); + assert_eq!(ok_pt["sum"].as_f64().unwrap(), ns_to_s(500_000_000)); + assert_eq!(ok_pt["min"].as_f64().unwrap(), ns_to_s(200_000_000)); + assert_eq!(ok_pt["max"].as_f64().unwrap(), ns_to_s(300_000_000)); + assert_eq!(err_pt["count"], "1"); + assert_eq!(err_pt["sum"].as_f64().unwrap(), ns_to_s(700_000_000)); + assert_eq!(err_pt["min"].as_f64().unwrap(), ns_to_s(700_000_000)); + assert_eq!(err_pt["max"].as_f64().unwrap(), ns_to_s(700_000_000)); + + // ok_sum + error_sum equals the combined group duration. + let ok_s = ok_pt["sum"].as_f64().unwrap(); + let err_s = err_pt["sum"].as_f64().unwrap(); + assert_eq!(ok_s + err_s, ns_to_s(combined_ns)); + } +} diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 658fc13b87..6fc635126f 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -1,7 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP trace export for libdatadog. +//! OTLP trace and trace-metrics export for libdatadog. //! //! When an OTLP endpoint is configured via //! [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], the trace exporter sends @@ -9,6 +9,10 @@ //! is responsible for resolving the endpoint from its own configuration (e.g. //! `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`). //! +//! With [`set_otlp_metrics_endpoint`](crate::trace_exporter::TraceExporterBuilder::set_otlp_metrics_endpoint), +//! client-computed span stats ship as a `traces.span.sdk.metrics.duration` OTLP histogram +//! instead of going to the agent `/v0.6/stats` endpoint. +//! //! ## Sampling //! //! The exporter enforces the sampling decision already made by the tracer: unsampled chunks are @@ -24,7 +28,9 @@ pub mod config; pub mod exporter; +pub mod metrics; -pub use config::OtlpTraceConfig; +pub use config::{OtlpMetricsConfig, OtlpTraceConfig}; pub use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; +pub use metrics::OtlpStatsExporter; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 577280550a..713341071f 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -3,7 +3,7 @@ use crate::agent_info::AgentInfoFetcher; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; -use crate::otlp::OtlpTraceConfig; +use crate::otlp::{OtlpMetricsConfig, OtlpResourceInfo, OtlpTraceConfig}; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; @@ -28,6 +28,23 @@ use std::time::Duration; const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126"; +/// Build an [`http::HeaderMap`] from key/value pairs, skipping malformed entries. +fn build_otlp_header_map(headers: Vec<(String, String)>) -> http::HeaderMap { + let mut out = http::HeaderMap::new(); + for (k, v) in headers { + match ( + http::HeaderName::from_bytes(k.as_bytes()), + http::HeaderValue::from_str(&v), + ) { + (Ok(n), Ok(vv)) => { + out.insert(n, vv); + } + _ => tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", k, v), + } + } + out +} + #[allow(missing_docs)] #[derive(Debug, Default)] pub struct TraceExporterBuilder { @@ -66,6 +83,10 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + otlp_metrics_endpoint: Option, + otlp_metrics_headers: Vec<(String, String)>, + otel_trace_semantics_enabled: bool, + runtime_id: Option, } impl TraceExporterBuilder { @@ -297,6 +318,33 @@ impl TraceExporterBuilder { self } + /// Enable OTLP HTTP/JSON trace-metrics export to `url` (e.g. `.../v1/metrics`). + pub fn set_otlp_metrics_endpoint(&mut self, url: &str) -> &mut Self { + self.otlp_metrics_endpoint = Some(url.to_owned()); + self + } + + /// Additional HTTP headers for OTLP trace-metrics requests. + pub fn set_otlp_metrics_headers(&mut self, headers: Vec<(String, String)>) -> &mut Self { + self.otlp_metrics_headers = headers; + self + } + + /// Strip Datadog-specific `dd.*`/`_dd.*` data-point attributes from the exported histogram. + pub fn enable_otel_trace_semantics(&mut self) -> &mut Self { + self.otel_trace_semantics_enabled = true; + self + } + + /// Set the runtime identifier supplied by the language tracer. + /// + /// When set, this ID is reused for both OTLP trace exports and OTLP trace-metrics so that all + /// signals can be correlated by the backend. If not set, a fresh UUID is generated. + pub fn set_runtime_id(&mut self, id: &str) -> &mut Self { + self.runtime_id = Some(id.to_owned()); + self + } + /// Build the [`TraceExporter`] synchronously. /// /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. @@ -430,32 +478,80 @@ impl TraceExporterBuilder { } }; - let otlp_config = self.otlp_endpoint.map(|url| { - let mut headers = http::HeaderMap::new(); - for (key, value) in self.otlp_headers { - match ( - http::HeaderName::from_bytes(key.as_bytes()), - http::HeaderValue::from_str(&value), - ) { - (Ok(name), Ok(val)) => { - headers.insert(name, val); - } - _ => { - tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", key, value); - } - } - } - OtlpTraceConfig { - endpoint_url: url, - headers, - timeout: self - .connection_timeout - .map(Duration::from_millis) - .unwrap_or(DEFAULT_OTLP_TIMEOUT), - protocol: OtlpProtocol::HttpJson, - } + let otlp_timeout = self + .connection_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_OTLP_TIMEOUT); + + let otlp_config = self.otlp_endpoint.map(|url| OtlpTraceConfig { + endpoint_url: url, + headers: build_otlp_header_map(self.otlp_headers), + timeout: otlp_timeout, + protocol: OtlpProtocol::HttpJson, }); + let otlp_metrics_config = self.otlp_metrics_endpoint.map(|url| OtlpMetricsConfig { + endpoint_url: url, + headers: build_otlp_header_map(self.otlp_metrics_headers), + timeout: otlp_timeout, + protocol: OtlpProtocol::HttpJson, + otel_trace_semantics_enabled: self.otel_trace_semantics_enabled, + }); + + let runtime_id = self + .runtime_id + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + + // OTLP metrics + stats bucket size: start the concentrator unconditionally (bypass the + // agent gate) so `check_agent_info` cannot later disable stats. + #[allow(unused_mut)] + let mut otlp_stats_enabled = false; + #[cfg(not(target_arch = "wasm32"))] + if let (Some(metrics_config), Some(bucket_size)) = + (otlp_metrics_config.clone(), self.stats_bucket_size) + { + use crate::otlp::OtlpStatsExporter; + use libdd_trace_stats::span_concentrator::SpanConcentrator; + use std::sync::Mutex; + let span_kinds = crate::trace_exporter::stats::DEFAULT_STATS_ELIGIBLE_SPAN_KINDS + .iter() + .map(|s| s.to_string()) + .collect(); + let concentrator = Arc::new(Mutex::new(SpanConcentrator::new( + bucket_size, + std::time::SystemTime::now(), + span_kinds, + self.peer_tags.clone(), + #[cfg(feature = "stats-obfuscation")] + None, + ))); + let mut resource = OtlpResourceInfo::default(); + resource.service = self.service.clone(); + resource.env = self.env.clone(); + resource.app_version = self.app_version.clone(); + resource.language = self.language.clone(); + resource.tracer_version = self.tracer_version.clone(); + resource.runtime_id = runtime_id.clone(); + resource.hostname = self.hostname.clone(); + resource.process_tags = self.process_tags.clone(); + let worker = OtlpStatsExporter { + flush_interval: bucket_size, + concentrator: concentrator.clone(), + config: metrics_config, + resource, + test_token: self.test_session_token.clone(), + capabilities: capabilities.clone(), + }; + let worker_handle = shared_runtime.spawn_worker(worker, false).map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) + })?; + stats = StatsComputationStatus::Enabled { + stats_concentrator: concentrator, + worker_handle, + }; + otlp_stats_enabled = true; + } + Ok(TraceExporter { endpoint: Endpoint { url: agent_url, @@ -478,7 +574,7 @@ impl TraceExporterBuilder { hostname: self.hostname, env: self.env, app_version: self.app_version, - runtime_id: uuid::Uuid::new_v4().to_string(), + runtime_id, service: self.service, }, input_format: self.input_format, @@ -516,6 +612,7 @@ impl TraceExporterBuilder { .then(AgentResponsePayloadVersion::new), otlp_config, trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()), + otlp_stats_enabled, }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 0486a32678..3c6e40f69c 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -215,6 +215,11 @@ pub struct TraceExporter, trace_filterer: ArcSwap, + /// When true, span stats are computed and exported as OTLP metrics. The concentrator is + /// started at build time, so agent-driven stats (de)activation in `check_agent_info` is + /// skipped. + #[cfg_attr(target_arch = "wasm32", allow(dead_code))] + otlp_stats_enabled: bool, } impl TraceExporter { @@ -365,6 +370,11 @@ impl Tra self.refresh_v1_active(&agent_info); } + // OTLP trace metrics run the concentrator independently; skip stats enable/disable. + if self.otlp_stats_enabled { + return; + } + self.trace_filterer.store(Arc::new(TraceFilterer::new( &agent_info.info.filter_tags.require, &agent_info.info.filter_tags.reject, @@ -553,6 +563,7 @@ impl Tra r.language = self.metadata.language.clone(); r.tracer_version = self.metadata.tracer_version.clone(); r.runtime_id = self.metadata.runtime_id.clone(); + r.client_computed_stats = self.otlp_stats_enabled; r }; let request = map_traces_to_otlp(traces, &resource_info); @@ -560,9 +571,24 @@ impl Tra error!("OTLP JSON serialization error: {e}"); TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) })?; + // Also set the header: resource attributes survive Collector hops, headers don't. + let effective_config; + let config_to_use = if self.otlp_stats_enabled { + effective_config = { + let mut c = config.clone(); + c.headers.insert( + http::HeaderName::from_static("datadog-client-computed-stats"), + http::HeaderValue::from_static("yes"), + ); + c + }; + &effective_config + } else { + config + }; send_otlp_traces_http( &self.capabilities, - config, + config_to_use, self.endpoint.test_token.as_deref(), json_body, ) diff --git a/libdd-ddsketch/src/lib.rs b/libdd-ddsketch/src/lib.rs index 6c6ddcf588..e25d26947b 100644 --- a/libdd-ddsketch/src/lib.rs +++ b/libdd-ddsketch/src/lib.rs @@ -110,6 +110,30 @@ impl DDSketch { pub fn encode_to_vec(self) -> Vec { self.into_pb().encode_to_vec() } + + /// Reconstruct a sketch from its protobuf. `None` if mapping missing/invalid. + pub fn from_pb(sketch: pb::DdSketch) -> Option { + let mapping_pb = sketch.mapping?; + let mapping = LogMapping::new(mapping_pb.gamma, mapping_pb.index_offset)?; + let store = match sketch.positive_values { + Some(s) => LowCollapsingDenseStore { + bins: s.contiguous_bin_counts.into(), + offset: s.contiguous_bin_index_offset, + max_size: LowCollapsingDenseStore::default().max_size, + }, + None => LowCollapsingDenseStore::default(), + }; + Some(DDSketch { + store, + zero_count: sketch.zero_count, + mapping, + }) + } + + /// Decode a sketch from serialized protobuf bytes; `None` on decode/mapping failure. + pub fn from_encoded(bytes: &[u8]) -> Option { + Self::from_pb(pb::DdSketch::decode(bytes).ok()?) + } } /// A store mapping the bin indexes to their respective weights @@ -415,6 +439,19 @@ mod test { assert_within!(sketch.count(), 108.0, f64::EPSILON); } + #[test] + fn test_sketch_pb_roundtrip() { + let mut sketch = DDSketch::default(); + for &p in &[0.0_f64, 0.1, 2.0, 10.0, 25.0, 10000.0] { + assert!(sketch.add(p).is_ok()); + } + let expected_count = sketch.count(); + let expected_bins = sketch.ordered_bins().len(); + let decoded = DDSketch::from_pb(sketch.into_pb()).expect("decodes"); + assert_within!(decoded.count(), expected_count, f64::EPSILON); + assert_eq!(decoded.ordered_bins().len(), expected_bins); + } + #[test] fn test_skecth_encode() { let mut sketch = DDSketch::default(); diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index acc7629478..cc5f82bdab 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -24,6 +24,7 @@ const GRPC_STATUS_CODE_FIELD: &[&str] = &[ "rpc.grpc.status.code", "grpc.status.code", ]; +const GRPC_METHOD_FIELD: &[&str] = &["grpc.method.name", "rpc.method"]; /// Aggregation key fields shared across all concentrator implementations — everything /// **except** peer tags. @@ -152,6 +153,17 @@ fn get_grpc_status_code<'a>(span: &'a impl StatSpan<'a>) -> Option { None } +pub(super) fn get_grpc_method<'a>(span: &'a impl StatSpan<'a>) -> &'a str { + for key in GRPC_METHOD_FIELD { + if let Some(val) = span.get_meta(key) { + if !val.is_empty() { + return val; + } + } + } + "" +} + fn grpc_status_str_to_int_value(v: &str) -> Option { if let Ok(status) = v.parse() { return Some(status); @@ -324,6 +336,17 @@ pub(super) struct GroupedStats { top_level_hits: u64, ok_summary: libdd_ddsketch::DDSketch, error_summary: libdd_ddsketch::DDSketch, + // Exact per-cell (ok/error) scalars used by the OTLP trace-metrics path. These are tracked + // separately from `duration` so the /v0.6/stats agent payload is byte-for-byte unchanged. + ok_duration: u64, + ok_min: u64, + ok_max: u64, + error_duration: u64, + error_min: u64, + error_max: u64, + // gRPC method for OTLP export only; not part of the aggregation key so agent stats are + // unaffected. + pub(super) grpc_method: String, } impl GroupedStats { @@ -331,12 +354,23 @@ impl GroupedStats { fn insert(&mut self, duration: i64, is_error: bool, is_top_level: bool) { self.hits += 1; self.duration += duration as u64; - + let d = duration as u64; if is_error { self.errors += 1; let _ = self.error_summary.add(duration as f64); + self.error_duration += d; + self.error_min = if self.errors == 1 { + d + } else { + self.error_min.min(d) + }; + self.error_max = self.error_max.max(d); } else { let _ = self.ok_summary.add(duration as f64); + self.ok_duration += d; + let ok_count = self.hits - self.errors; + self.ok_min = if ok_count == 1 { d } else { self.ok_min.min(d) }; + self.ok_max = self.ok_max.max(d); } if is_top_level { self.top_level_hits += 1; @@ -344,6 +378,36 @@ impl GroupedStats { } } +/// Exact per-cell (ok or error) scalars for one aggregation group, surfaced to the OTLP +/// trace-metrics path. `count` is exact; `duration_ns`/`min_ns`/`max_ns` are exact when +/// `count > 0` and meaningless otherwise (the OTLP mapper suppresses empty cells). +#[derive(Debug, Clone, Copy, Default)] +pub struct OtlpExactCell { + pub count: u64, + pub duration_ns: u64, + pub min_ns: u64, + pub max_ns: u64, +} + +/// Exact OK/ERROR cells for one aggregation group, in the same order as the `stats` vector +/// of the accompanying [`pb::ClientStatsBucket`]. `grpc_method` is the group's gRPC method (DD +/// schema `grpc.method.name`) carried out-of-band so it does not appear in the agent stats +/// protobuf wire format. +#[derive(Debug, Clone, Default)] +pub struct OtlpExactGroup { + pub ok: OtlpExactCell, + pub error: OtlpExactCell, + pub grpc_method: String, +} + +/// A bucket flushed for the OTLP trace-metrics path. `exact[i]` is the exact-scalar sidecar +/// for `bucket.stats[i]`; the protobuf bucket itself is identical to what the agent path uses. +#[derive(Debug, Clone)] +pub struct OtlpStatsBucket { + pub bucket: pb::ClientStatsBucket, + pub exact: Vec, +} + /// A time bucket used for stats aggregation. It stores a map of GroupedStats storing the stats of /// spans aggregated on their AggregationKey. #[derive(Debug, Clone)] @@ -369,26 +433,55 @@ impl StatsBucket { duration: i64, is_error: bool, is_top_level: bool, + grpc_method: &str, ) { self.data .entry_ref(&key) - .or_default() + .or_insert_with(|| GroupedStats { + grpc_method: grpc_method.to_owned(), + ..Default::default() + }) .insert(duration, is_error, is_top_level); } /// Consume the bucket and return a ClientStatsBucket containing the bucket stats. /// `bucket_duration` is the size of buckets for the concentrator containing the bucket. pub(super) fn flush(self, bucket_duration: u64) -> pb::ClientStatsBucket { - pb::ClientStatsBucket { - start: self.start, - duration: bucket_duration, - stats: self - .data - .into_iter() - .map(|(k, b)| encode_grouped_stats(k, b)) - .collect(), - // Agent-only field - agent_time_shift: 0, + self.flush_with_otlp_exact(bucket_duration).bucket + } + + /// Like [`Self::flush`], but additionally produces exact per-cell scalars for the OTLP + /// trace-metrics path. The `bucket` field is identical to what [`Self::flush`] returns. + pub(super) fn flush_with_otlp_exact(self, bucket_duration: u64) -> OtlpStatsBucket { + let mut stats = Vec::with_capacity(self.data.len()); + let mut exact = Vec::with_capacity(self.data.len()); + for (k, mut g) in self.data { + let grpc_method = std::mem::take(&mut g.grpc_method); + exact.push(OtlpExactGroup { + ok: OtlpExactCell { + count: g.hits.saturating_sub(g.errors), + duration_ns: g.ok_duration, + min_ns: g.ok_min, + max_ns: g.ok_max, + }, + error: OtlpExactCell { + count: g.errors, + duration_ns: g.error_duration, + min_ns: g.error_min, + max_ns: g.error_max, + }, + grpc_method, + }); + stats.push(encode_grouped_stats(k, g)); + } + OtlpStatsBucket { + bucket: pb::ClientStatsBucket { + start: self.start, + duration: bucket_duration, + stats, + agent_time_shift: 0, + }, + exact, } } } @@ -741,6 +834,18 @@ mod tests { } .into_key(), ), + // grpc.method.name is carried in GroupedStats (for OTLP), not in the aggregation key. + ( + SpanBytes { + meta: vec![("grpc.method.name".into(), "/pkg.Svc/Method".into())].into(), + ..Default::default() + }, + FixedAggregationKey { + is_trace_root: true, + ..Default::default() + } + .into_key(), + ), // Span with grpc status from meta as numeric string ( SpanBytes { diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1e83a4b342..636e87744d 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -9,8 +9,8 @@ use libdd_trace_protobuf::pb; use aggregation::StatsBucket; mod aggregation; -use aggregation::BorrowedAggregationKey; -pub use aggregation::FixedAggregationKey; +use aggregation::{get_grpc_method, BorrowedAggregationKey}; +pub use aggregation::{FixedAggregationKey, OtlpExactCell, OtlpExactGroup, OtlpStatsBucket}; pub mod stat_span; pub use stat_span::StatSpan; @@ -184,6 +184,7 @@ impl SpanConcentrator { span.duration(), span.is_error(), span.has_top_level(), + get_grpc_method(span), ); } @@ -207,6 +208,22 @@ impl SpanConcentrator { /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush /// all buckets. pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec { + self.drain_due_buckets(now, force, StatsBucket::flush) + } + + /// Like [`Self::flush`], but also emits exact per-cell scalars alongside each bucket for the + /// OTLP trace-metrics path. The protobuf bucket inside each [`OtlpStatsBucket`] is identical + /// to what [`Self::flush`] would produce, so the /v0.6/stats agent path is unaffected. + pub fn flush_with_otlp_exact(&mut self, now: SystemTime, force: bool) -> Vec { + self.drain_due_buckets(now, force, StatsBucket::flush_with_otlp_exact) + } + + fn drain_due_buckets( + &mut self, + now: SystemTime, + force: bool, + encode: impl Fn(StatsBucket, u64) -> T, + ) -> Vec { // TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64; let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect(); @@ -231,7 +248,7 @@ impl SpanConcentrator { self.buckets.insert(timestamp, bucket); return None; } - Some(bucket.flush(self.bucket_size)) + Some(encode(bucket, self.bucket_size)) }) .collect() } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 56039c4d5b..2dd064d93a 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1569,3 +1569,56 @@ fn test_pb_span() { assert_counts_equal(expected_stats, bucket.stats.clone()); } + +/// Verify the OTLP exact-scalar sidecar tracks per-cell (ok/error) duration/min/max in nanos +/// independently and that ok_duration + error_duration matches the combined group duration +/// (which the agent /v0.6/stats path uses). +#[test] +fn test_flush_with_otlp_exact_per_cell_scalars() { + let now = SystemTime::now(); + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + #[cfg(feature = "stats-obfuscation")] + None, + ); + // 3 ok spans (200, 300, 100 ns) and 2 error spans (700, 500 ns), all same agg key. + let mut spans = vec![ + get_test_span(now, 1, 0, 200, 0, "svc", "res", 0), + get_test_span(now, 2, 0, 300, 0, "svc", "res", 0), + get_test_span(now, 3, 0, 100, 0, "svc", "res", 0), + get_test_span(now, 4, 0, 700, 0, "svc", "res", 1), + get_test_span(now, 5, 0, 500, 0, "svc", "res", 1), + ]; + compute_top_level_span(spans.as_mut_slice()); + for s in &spans { + concentrator.add_span(s); + } + + let flushed = concentrator.flush_with_otlp_exact(now, true); + assert_eq!(flushed.len(), 1); + let b = &flushed[0]; + assert_eq!(b.exact.len(), 1); + let exact = &b.exact[0]; + + assert_eq!(exact.ok.count, 3); + assert_eq!(exact.ok.duration_ns, 600); + assert_eq!(exact.ok.min_ns, 100); + assert_eq!(exact.ok.max_ns, 300); + + assert_eq!(exact.error.count, 2); + assert_eq!(exact.error.duration_ns, 1200); + assert_eq!(exact.error.min_ns, 500); + assert_eq!(exact.error.max_ns, 700); + + // ok_duration + error_duration equals the combined group.duration (agent path field). + let group = &b.bucket.stats[0]; + assert_eq!( + group.duration, + exact.ok.duration_ns + exact.error.duration_ns + ); + assert_eq!(group.hits, 5); + assert_eq!(group.errors, 2); +} diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index 0575c20ccf..a4f71f02a2 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -109,6 +109,14 @@ fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { value: AnyValue::StringValue(resource_info.runtime_id.clone()), }); } + // Tells Datadog Agent OTLP receivers to skip their concentrator; prevents double-counted APM + // metrics. + if resource_info.client_computed_stats { + attributes.push(KeyValue { + key: "_dd.stats_computed".to_string(), + value: AnyValue::StringValue("true".to_string()), + }); + } Resource { attributes } } @@ -699,6 +707,55 @@ mod tests { assert_eq!(spans[2].trace_id, expected); } + #[test] + fn test_stats_computed_resource_attr_set_when_enabled() { + let resource_info = OtlpResourceInfo { + client_computed_stats: true, + ..Default::default() + }; + let span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("s"), + start: 0, + duration: 1, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let resource_attrs = &req.resource_spans[0].resource.as_ref().unwrap().attributes; + let kv = resource_attrs + .iter() + .find(|a| a.key == "_dd.stats_computed") + .expect("_dd.stats_computed must be present when client_computed_stats=true"); + assert!( + matches!(&kv.value, AnyValue::StringValue(s) if s == "true"), + "_dd.stats_computed must be \"true\", got {:?}", + kv.value + ); + } + + #[test] + fn test_stats_computed_resource_attr_absent_when_disabled() { + let resource_info = OtlpResourceInfo { + client_computed_stats: false, + ..Default::default() + }; + let span: Span = Span { + trace_id: 1, + span_id: 2, + name: libdd_tinybytes::BytesString::from_static("s"), + start: 0, + duration: 1, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let resource_attrs = &req.resource_spans[0].resource.as_ref().unwrap().attributes; + assert!( + !resource_attrs.iter().any(|a| a.key == "_dd.stats_computed"), + "_dd.stats_computed must not be emitted when client_computed_stats=false" + ); + } + #[test] fn test_empty_chunk_does_not_panic() { // Defensive: an empty chunk should produce no spans and not panic. diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs index 782a10e10d..02623cbc66 100644 --- a/libdd-trace-utils/src/otlp_encoder/mod.rs +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -22,4 +22,9 @@ pub struct OtlpResourceInfo { pub language: String, pub tracer_version: String, pub runtime_id: String, + pub hostname: String, + pub process_tags: String, + /// When true, emits `_dd.stats_computed: "true"` on the OTLP resource to prevent + /// double-counted APM metrics in Datadog Agent OTLP receivers (backwards compatible). + pub client_computed_stats: bool, }