From bf4921017920830f54efbb16cec7afe2cc3aaff1 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Thu, 4 Jun 2026 16:28:09 +0200 Subject: [PATCH] feat(data-pipeline): add agentless export # Motivation Send traces to the agentless endpoint to replace the JS and python exporters # What changes * Additional JSON agentless encoder * Additional configuration (agentless enablement, endpoint, API key) * Agentless traces need top level computation, but not dropping spans --- .../examples/send-traces-agentless.rs | 118 +++++ libdd-data-pipeline/src/agentless/config.rs | 30 ++ libdd-data-pipeline/src/agentless/exporter.rs | 102 +++++ libdd-data-pipeline/src/agentless/mod.rs | 29 ++ libdd-data-pipeline/src/lib.rs | 1 + .../src/trace_exporter/builder.rs | 61 ++- libdd-data-pipeline/src/trace_exporter/mod.rs | 231 ++++++++++ .../src/agentless_encoder/mod.rs | 408 ++++++++++++++++++ .../src/agentless_encoder/tests.rs | 271 ++++++++++++ libdd-trace-utils/src/lib.rs | 1 + 10 files changed, 1251 insertions(+), 1 deletion(-) create mode 100644 libdd-data-pipeline/examples/send-traces-agentless.rs create mode 100644 libdd-data-pipeline/src/agentless/config.rs create mode 100644 libdd-data-pipeline/src/agentless/exporter.rs create mode 100644 libdd-data-pipeline/src/agentless/mod.rs create mode 100644 libdd-trace-utils/src/agentless_encoder/mod.rs create mode 100644 libdd-trace-utils/src/agentless_encoder/tests.rs diff --git a/libdd-data-pipeline/examples/send-traces-agentless.rs b/libdd-data-pipeline/examples/send-traces-agentless.rs new file mode 100644 index 0000000000..f993473b78 --- /dev/null +++ b/libdd-data-pipeline/examples/send-traces-agentless.rs @@ -0,0 +1,118 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Example: send a trace directly to the Datadog agentless intake. +//! +//! Reads the API key from the `DD_API_KEY` environment variable and POSTs a +//! small trace to `https://public-trace-http-intake.logs.{DD_SITE}/v1/input` +//! (defaulting to `datadoghq.com`). +//! +//! Usage: +//! DD_API_KEY= [DD_SITE=datadoghq.eu] \ +//! cargo run --example send-traces-agentless -p libdd-data-pipeline + +use clap::Parser; +use libdd_capabilities_impl::NativeCapabilities; +use libdd_data_pipeline::trace_exporter::{ + TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat, +}; +use libdd_log::logger::{ + logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget, +}; +use libdd_shared_runtime::SharedRuntime; +use libdd_trace_utils::span::v04::{SpanBytes, SpanEvent, SpanLink}; +use rand::random; +use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; + +fn get_span(now: i64, trace_id: u128, span_id: u64) -> SpanBytes { + let duration = 1_000_000 * span_id as i64; + SpanBytes { + trace_id, + span_id, + parent_id: span_id.saturating_sub(1), + duration: 1_000_000 * span_id as i64, + start: now + duration, + service: "data-pipeline-agentless-example".into(), + name: "agentless.example".into(), + resource: "resource".into(), + error: 0, + metrics: HashMap::from([("_sampling_priority_v1".into(), 1.0)]), + span_events: vec![SpanEvent { + time_unix_nano: now as u64, + name: "event".into(), + attributes: HashMap::new(), + }], + span_links: vec![SpanLink { + trace_id: 10101010101, + trace_id_high: 1010101, + span_id, + ..Default::default() + }], + ..Default::default() + } +} + +#[derive(Parser)] +#[command(name = "send-traces-agentless")] +#[command(about = "Send a trace to the Datadog agentless intake")] +struct Args { + /// Override the intake URL. Defaults to + /// `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`. + #[arg(long = "url")] + url: Option, +} + +fn main() { + logger_configure_std(StdConfig { + target: StdTarget::Out, + }) + .expect("Failed to configure logger"); + logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level"); + + let args = Args::parse(); + + let api_key = std::env::var("DD_API_KEY") + .expect("DD_API_KEY environment variable must be set for agentless export"); + let site = std::env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); + let intake_url = args + .url + .unwrap_or_else(|| format!("https://public-trace-http-intake.logs.{site}/v1/input")); + + let shared_runtime = Arc::new(SharedRuntime::new().expect("Failed to create runtime")); + + let mut builder = TraceExporter::::builder(); + builder + .set_hostname("COMP-N661JFW6JN") + .set_env("prod") + .set_app_version(env!("CARGO_PKG_VERSION")) + .set_service("data-pipeline-agentless-example") + .set_tracer_version(env!("CARGO_PKG_VERSION")) + .set_language("nodejs") + .set_language_version(env!("CARGO_PKG_RUST_VERSION")) + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04) + .set_shared_runtime(shared_runtime.clone()) + .set_agentless_endpoint(&intake_url, &api_key); + + let exporter = builder + .build::() + .expect("Failed to build TraceExporter"); + + let now = UNIX_EPOCH + .elapsed() + .expect("Failed to read time") + .as_nanos() as i64; + + let trace_id = random(); + let trace: Vec<_> = (1..=3).map(|i| get_span(now, trace_id, i)).collect(); + let traces = vec![trace]; + + exporter + .send_trace_chunks(traces) + .expect("Failed to send traces"); + println!("Trace sent to agentless intake at {intake_url}"); + + shared_runtime + .shutdown(None) + .expect("Failed to shutdown runtime"); +} diff --git a/libdd-data-pipeline/src/agentless/config.rs b/libdd-data-pipeline/src/agentless/config.rs new file mode 100644 index 0000000000..33bc4294b6 --- /dev/null +++ b/libdd-data-pipeline/src/agentless/config.rs @@ -0,0 +1,30 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Agentless APM trace export configuration. + +use std::{fmt::Debug, time::Duration}; + +pub const DEFAULT_AGENTLESS_TIMEOUT: Duration = Duration::from_secs(15); + +///Agentless trace exporter configuration. +#[derive(Clone)] +pub struct AgentlessTraceConfig { + /// Full URL to POST traces to (e.g. + /// `https://public-trace-http-intake.logs.datadoghq.com/v1/input`). + pub endpoint_url: String, + /// Datadog API key used for the `dd-api-key` header. + pub api_key: String, + /// Request timeout. + pub timeout: Duration, +} + +impl Debug for AgentlessTraceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AgentlessTraceConfig") + .field("endpoint_url", &self.endpoint_url) + .field("api_key", &"") + .field("timeout", &self.timeout) + .finish() + } +} diff --git a/libdd-data-pipeline/src/agentless/exporter.rs b/libdd-data-pipeline/src/agentless/exporter.rs new file mode 100644 index 0000000000..a4ece50203 --- /dev/null +++ b/libdd-data-pipeline/src/agentless/exporter.rs @@ -0,0 +1,102 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Agentless HTTP/JSON trace exporter. + +use super::config::AgentlessTraceConfig; +use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; +use http::HeaderMap; +use libdd_capabilities::{HttpClientCapability, SleepCapability}; +use libdd_common::Endpoint; +use libdd_trace_utils::send_with_retry::{ + send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, +}; +use tracing::error; + +const AGENTLESS_MAX_ATTEMPTS: u32 = 3; +const AGENTLESS_RETRY_DELAY_MS: u64 = 1000; + +/// Send an agentless trace payload (JSON bytes) to the configured intake with retries. +/// +/// `headers` should already contain all required headers (api key, content-type, meta-*, +/// entity, trace-count, etc.). `test_token` is forwarded as `X-Datadog-Test-Session-Token` +/// when set, enabling snapshot tests against a local mock. +pub async fn send_agentless_traces_http( + capabilities: &C, + config: &AgentlessTraceConfig, + headers: HeaderMap, + json_body: Vec, +) -> Result<(), TraceExporterError> { + let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( + "Invalid agentless endpoint URL: {e}" + ))) + })?; + + let target = Endpoint { + url, + timeout_ms: config.timeout.as_millis() as u64, + ..Endpoint::default() + }; + + let retry_strategy = RetryStrategy::new( + AGENTLESS_MAX_ATTEMPTS, + AGENTLESS_RETRY_DELAY_MS, + RetryBackoffType::Exponential, + None, + ); + + match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await { + Ok(_) => Ok(()), + Err(e) => Err(map_send_error(e)), + } +} + +fn map_send_error(err: SendWithRetryError) -> TraceExporterError { + match err { + SendWithRetryError::Http(response, _) => { + let status = response.status(); + let body_str = String::from_utf8_lossy(response.body()); + match status.as_u16() { + 401 | 403 => error!( + status = status.as_u16(), + body = %body_str, + "Agentless authentication failed. Verify DD_API_KEY is valid." + ), + 404 => error!( + status = status.as_u16(), + body = %body_str, + "Agentless endpoint not found. Verify DD_SITE is correctly configured." + ), + 429 => error!( + status = status.as_u16(), + body = %body_str, + "Agentless intake rate-limited the request. Traces were dropped." + ), + 500..=599 => error!( + status = status.as_u16(), + body = %body_str, + "Agentless intake returned a server error. Traces were dropped." + ), + _ => error!( + status = status.as_u16(), + body = %body_str, + "Agentless intake returned an unexpected status." + ), + } + TraceExporterError::Request(RequestError::new(status, &body_str)) + } + SendWithRetryError::Timeout(_) => { + TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)) + } + SendWithRetryError::Network(error, _) => TraceExporterError::from(error), + SendWithRetryError::ResponseBody(_) => { + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState( + "Failed to read agentless response body".to_string(), + )) + } + SendWithRetryError::Build(_) => TraceExporterError::Internal( + InternalErrorKind::InvalidWorkerState("Failed to build agentless request".to_string()), + ), + } +} diff --git a/libdd-data-pipeline/src/agentless/mod.rs b/libdd-data-pipeline/src/agentless/mod.rs new file mode 100644 index 0000000000..3ca95014ce --- /dev/null +++ b/libdd-data-pipeline/src/agentless/mod.rs @@ -0,0 +1,29 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Agentless APM trace export for libdatadog. +//! +//! When an agentless endpoint is configured via +//! [`crate::trace_exporter::TraceExporterBuilder::set_agentless_endpoint`], the +//! trace exporter sends APM trace spans directly to the Datadog HTTP intake +//! instead of to the local Datadog Agent. +//! +//! ## Differences from the regular agent export +//! +//! - **Transport**: `POST` to the public HTTP trace intake (default `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`, +//! or a custom URL) using `dd-api-key` auth, instead of msgpack to the local agent's +//! `/v0.4/traces`. The host language resolves the URL from `DD_SITE` and supplies the API key; +//! the exporter reads no environment variables. +//! - **Encoding**: JSON (see [`libdd_trace_utils::agentless_encoder`]) instead of msgpack v04. See +//! that module for the payload-shape differences. +//! - **Retries**: up to 3 attempts with exponential backoff starting at 1 s and no cap (the agent +//! path uses its own strategy). +//! - **Mutual exclusion with OTLP**: if both an OTLP and an agentless endpoint are configured on +//! the builder, OTLP wins and the agentless config is silently dropped with a warning at build +//! time. + +pub(crate) mod config; +pub(crate) mod exporter; + +pub use config::AgentlessTraceConfig; +pub use exporter::send_agentless_traces_http; diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 2b9955ce3d..b651a2be54 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -12,6 +12,7 @@ //! in different languages. pub mod agent_info; +pub(crate) mod agentless; mod health_metrics; pub(crate) mod otlp; #[cfg(feature = "telemetry")] diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index c9430eadf2..b66d205bf4 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::agent_info::AgentInfoFetcher; +use crate::agentless::config::{AgentlessTraceConfig, DEFAULT_AGENTLESS_TIMEOUT}; use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; use crate::otlp::OtlpTraceConfig; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] @@ -65,6 +66,9 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + agentless_endpoint: Option, + agentless_api_key: Option, + agentless_timeout: Option, } impl TraceExporterBuilder { @@ -286,6 +290,32 @@ impl TraceExporterBuilder { self } + /// Enables agentless APM trace export and sets the intake URL and API key. + /// + /// When set, APM trace spans are sent directly to the Datadog HTTP intake in JSON format + /// (`POST /v1/input`) instead of through the Datadog Agent. The host language is responsible + /// for resolving the endpoint URL (default + /// `https://public-trace-http-intake.logs.{DD_SITE}` or a custom override) and the API key + /// from its configuration. This crate does not read environment variables. + /// + /// If OTLP is also configured via [`Self::set_otlp_endpoint`], OTLP takes precedence and a + /// warning is logged at build time; the agentless configuration is dropped. + /// + /// Example: `set_agentless_endpoint("https://public-trace-http-intake.logs.datadoghq.com/v1/input", "")` + pub fn set_agentless_endpoint(&mut self, url: &str, api_key: &str) -> &mut Self { + self.agentless_endpoint = Some(url.to_owned()); + self.agentless_api_key = Some(api_key.to_owned()); + self + } + + /// Sets the request timeout used by the agentless intake transport. + /// + /// Defaults to 15 seconds when not set. + pub fn set_agentless_timeout(&mut self, timeout: Duration) -> &mut Self { + self.agentless_timeout = Some(timeout); + self + } + #[allow(missing_docs)] pub fn build( self, @@ -412,7 +442,35 @@ impl TraceExporterBuilder { } }; - let otlp_config = self.otlp_endpoint.map(|url| { + // Resolve transport selection: OTLP takes precedence over agentless when both are set. + let (otlp_endpoint, agentless_endpoint, agentless_api_key) = match ( + self.otlp_endpoint.as_ref(), + self.agentless_endpoint.as_ref(), + ) { + (Some(_), Some(_)) => { + tracing::warn!( + "Both OTLP and agentless trace export are configured; OTLP takes \ + precedence and the agentless configuration will be ignored." + ); + (self.otlp_endpoint, None, None) + } + _ => ( + self.otlp_endpoint, + self.agentless_endpoint, + self.agentless_api_key, + ), + }; + + let agentless_config = match (agentless_endpoint, agentless_api_key) { + (Some(url), Some(api_key)) => Some(AgentlessTraceConfig { + endpoint_url: url, + api_key, + timeout: self.agentless_timeout.unwrap_or(DEFAULT_AGENTLESS_TIMEOUT), + }), + _ => None, + }; + + let otlp_config = otlp_endpoint.map(|url| { let mut headers = http::HeaderMap::new(); for (key, value) in self.otlp_headers { match ( @@ -495,6 +553,7 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), otlp_config, + agentless_config, }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 7e3dd1c951..853ae5867d 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -15,6 +15,7 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; +use crate::agentless::{send_agentless_traces_http, AgentlessTraceConfig}; use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; #[cfg(feature = "telemetry")] use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; @@ -55,6 +56,49 @@ use tracing::{debug, error, warn}; const INFO_ENDPOINT: &str = "/info"; +/// Build the HTTP headers required by the agentless intake. +/// +/// Includes the API key, content-type, trace count, `Datadog-Meta-*` tracer headers, +/// and entity headers (container-id / entity-id / external-env) when available. +fn build_agentless_headers( + metadata: &TracerMetadata, + api_key: &str, + trace_count: usize, +) -> Result { + let mut headers: HeaderMap = { + let tags: TracerHeaderTags = metadata.into(); + tags.into() + }; + + let api_key_val = http::HeaderValue::from_str(api_key).map_err(|_| { + TraceExporterError::Internal(error::InternalErrorKind::InvalidWorkerState( + "Invalid Datadog API key value for dd-api-key header".to_string(), + )) + })?; + headers.insert(http::HeaderName::from_static("dd-api-key"), api_key_val); + + headers.insert( + http::header::CONTENT_TYPE, + libdd_common::header::APPLICATION_JSON, + ); + + headers.insert( + http::HeaderName::from_static("x-datadog-trace-count"), + http::HeaderValue::from(trace_count), + ); + + for (name, value) in libdd_common::entity_id::get_entity_headers() { + if let (Ok(name), Ok(value)) = ( + http::HeaderName::from_bytes(name.as_bytes()), + http::HeaderValue::from_str(value), + ) { + headers.insert(name, value); + } + } + + Ok(headers) +} + /// Values for optional telemetry HTTP session headers (`dd-session-id`, root/parent). #[derive(Debug, Default, Clone)] pub struct TelemetryInstrumentationSessions { @@ -201,6 +245,9 @@ pub struct TraceExporter, /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. otlp_config: Option, + /// When set, APM trace spans are exported directly to the Datadog HTTP intake (agentless) + /// instead of via the Datadog Agent + agentless_config: Option, } impl TraceExporter { @@ -469,6 +516,28 @@ impl Tra self.send_trace_chunks_inner(trace_chunks).await } + /// Sends trace chunks to the Datadog agentless intake (`/v1/input`) as JSON. + async fn send_agentless_traces_inner( + &self, + traces: Vec>>, + config: &AgentlessTraceConfig, + ) -> Result { + let trace_count = traces.len(); + let json_body = libdd_trace_utils::agentless_encoder::encode_payload( + &traces, + &self.metadata, + ) + .map_err(|e| { + error!("Agentless JSON serialization error: {e}"); + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) + })?; + + let headers = build_agentless_headers(&self.metadata, &config.api_key, trace_count)?; + + send_agentless_traces_http(&self.capabilities, config, headers, json_body).await?; + Ok(AgentResponse::Unchanged) + } + /// Sends trace chunks via OTLP HTTP/JSON when OTLP config is enabled. async fn send_otlp_traces_inner( &self, @@ -575,6 +644,18 @@ impl Tra ) -> Result { let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); + if let Some(ref config) = self.agentless_config { + // For agentless we want to tag top level spans, but not perform + // stats aggregation or span drops + if !self.client_computed_top_level { + for chunk in traces.iter_mut() { + libdd_trace_utils::span::trace_utils::compute_top_level_span(chunk); + } + } + + return self.send_agentless_traces_inner(traces, config).await; + } + // Process stats computation and drop non-sampled (p0) chunks. // This must run before the OTLP path so that unsampled spans are not exported. let dropped_p0_stats = stats::process_traces_for_stats( @@ -1850,6 +1931,156 @@ mod tests { ); mock_otlp.assert(); } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_agentless_export_via_builder() { + let server = MockServer::start(); + let mock_intake = server.mock(|when, then| { + when.method(POST) + .path("/v1/input") + .header("Content-Type", "application/json") + .header("dd-api-key", "test-api-key") + .header("X-Datadog-Trace-Count", "1") + .header("datadog-meta-lang", "nodejs") + .header("datadog-meta-tracer-version", "1.0"); + then.status(200).body(""); + }); + + let intake_url = format!("{}/v1/input", server.url("/").trim_end_matches('/')); + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://127.0.0.1:8126") + .set_service("svc") + .set_env("env") + .set_tracer_version("1.0") + .set_language("nodejs") + .set_language_version("v20.11.0") + .set_language_interpreter("v8") + .set_agentless_endpoint(&intake_url, "test-api-key") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04); + let exporter = builder.build::().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"op").unwrap(), + service: BytesString::from_static("svc"), + resource: BytesString::from_static("res"), + trace_id: 0xdeadbeef, + span_id: 2, + parent_id: 0, + start: 2_500_000_000, + duration: 1_000_000, + error: 0, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + let result = exporter.send(data.as_ref()); + + assert!( + result.is_ok(), + "Agentless send should succeed: {:?}", + result.err() + ); + mock_intake.assert(); + + assert_eq!(mock_intake.calls(), 1); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_agentless_export_body_shape() { + let server = MockServer::start(); + let mock_intake = server.mock(|when, then| { + when.method(POST) + .path("/v1/input") + .body_includes("\"traces\":") + .body_includes("\"spans\":") + .body_includes("\"hostname\":\"h-1\"") + .body_includes("\"languageName\":\"nodejs\"") + .body_includes("\"_dd.compute_stats\":\"1\"") + .body_includes("\"_top_level\":1") + .body_includes("\"_trace_root\":1") + .body_includes("\"parent_id\":\"0000000000000000\""); + then.status(200).body(""); + }); + + let intake_url = format!("{}/v1/input", server.url("/").trim_end_matches('/')); + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://127.0.0.1:8126") + .set_hostname("h-1") + .set_service("svc") + .set_env("env") + .set_tracer_version("1.0") + .set_language("nodejs") + .set_language_version("v20.11.0") + .set_language_interpreter("v8") + .set_agentless_endpoint(&intake_url, "k") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04); + let exporter = builder.build::().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"op").unwrap(), + service: BytesString::from_static("svc"), + resource: BytesString::from_static("res"), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 0, + duration: 1, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + exporter.send(data.as_ref()).unwrap(); + mock_intake.assert(); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_agentless_and_otlp_both_set_otlp_wins() { + // OTLP intake. + let server = MockServer::start(); + let mock_otlp = server.mock(|when, then| { + when.method(POST).path("/v1/traces"); + then.status(200).body(""); + }); + let mock_agentless = server.mock(|when, then| { + when.method(POST).path("/v1/input"); + then.status(200).body(""); + }); + + let otlp_endpoint = format!("{}/v1/traces", server.url("/").trim_end_matches('/')); + let agentless_endpoint = format!("{}/v1/input", server.url("/").trim_end_matches('/')); + let mut builder = TraceExporterBuilder::default(); + builder + .set_url("http://127.0.0.1:8126") + .set_service("svc") + .set_tracer_version("1.0") + .set_language("nodejs") + .set_language_version("v20") + .set_language_interpreter("v8") + .set_otlp_endpoint(&otlp_endpoint) + .set_agentless_endpoint(&agentless_endpoint, "k") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V04); + let exporter = builder.build::().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"op").unwrap(), + service: BytesString::from_static("svc"), + trace_id: 1, + span_id: 1, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + exporter.send(data.as_ref()).unwrap(); + + mock_otlp.assert(); + // Agentless mock must NOT have been hit. + assert_eq!(mock_agentless.calls(), 0); + } } #[cfg(test)] diff --git a/libdd-trace-utils/src/agentless_encoder/mod.rs b/libdd-trace-utils/src/agentless_encoder/mod.rs new file mode 100644 index 0000000000..e91a3b9653 --- /dev/null +++ b/libdd-trace-utils/src/agentless_encoder/mod.rs @@ -0,0 +1,408 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Agentless APM JSON encoder. +//! +//! Encodes Datadog v04 trace chunks to the JSON body +//! accepted by the Datadog HTTP trace intake (`POST /v1/input`). +//! +//! ## Differences from the regular agent (msgpack v04) encoding +//! +//! - **Wire format**: JSON, wrapped as `{"traces": [ {hostname, env, ..., spans: [...] }, ... ]}`. +//! Per-trace metadata (hostname, env, language*, tracerVersion, runtimeID, containerID) is +//! inlined on each trace instead of being carried in request headers. Hostname is always emitted +//! - **IDs**: `trace_id`, `span_id`, `parent_id` are lowercase hex strings (16 chars; 32 for +//! span-link trace IDs) +//! - **128-bit trace IDs**: only the low 64 bits go into `trace_id`; the `_dd.p.tid` meta tag carie +//! upper 64 bits +//! - **Span links / events**: not top-level fields. They are JSON-stringified into +//! `meta["_dd.span_links"]` and `meta["events"]`, each truncated to 25_000 chars. No top-level +//! `links` field is emitted. +//! - **Stats / top-level flags**: the intake has no trace-agent to compute them, so the encoder +//! injects `meta["_dd.compute_stats"]="1"` on the first span of each chunk and +//! `metrics["_trace_root"]=1` where applicable. +//! - **Non-finite metrics** (NaN/Inf) are dropped (JSON can't represent them). +//! +//! Left todo is span normalization (service/name/resource/type truncation + defaults) + +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; +use crate::span::TraceData; +use crate::tracer_metadata::TracerMetadata; +use serde::{ + ser::{SerializeMap, SerializeSeq}, + Serializer, +}; +use std::borrow::Borrow; + +/// Maximum allowed size of a `meta` value before truncation. +const MAX_META_VALUE_LEN: usize = 25_000; +/// Suffix appended when a `meta` value is truncated. +const TRUNCATION_SUFFIX: &str = "..."; + +macro_rules! ser_fn { + ($(<$generic:ident $(: $bound:ident )?>)? |$serializer:ident , $($captured:ident : $ty:ty),+ $(,)?| { $($body:tt)* }) => { + { + struct SerializeClosure<'a, $($generic $(: $bound + 'a)? ,)? F: Fn() -> ($(&'a $ty ,)*)>(F); + + impl <'a, $($generic $(: $bound + 'a)?,)? F: Fn() -> ($(&'a $ty ,)*)> serde::Serialize for SerializeClosure<'a, $($generic,)? F> { + #[inline] + fn serialize(&self, serializer: S) -> Result { + let captured = (self.0)(); + (|$serializer: S , ($(& $captured, )*) : ($(&'a $ty ,)*)| { + $($body)* + })(serializer, captured) + } + } + + SerializeClosure(|| ($(& $captured ,)*)) + } + } +} + +/// Encode the given `traces` to the agentless JSON payload (`/v1/input` body). +/// +/// Returns the serialized JSON bytes on success. +pub fn encode_payload( + traces: &[Vec>], + metadata: &TracerMetadata, +) -> Result, serde_json::Error> { + let mut bytes = Vec::new(); + let mut serializer = serde_json::Serializer::new(&mut bytes); + + let mut map_ser = serializer.serialize_map(Some(1))?; + map_ser.serialize_entry( + "traces", + &ser_fn!( |ser, traces: &'a [Vec>], metadata: &'a TracerMetadata| { + let mut traces_serializer = ser.serialize_seq(Some(traces.len()))?; + for chunk in traces { + traces_serializer.serialize_element(&ser_fn!( |ser, chunk: &'a Vec>, metadata: &'a TracerMetadata| { + encode_trace(ser, chunk, metadata) + }))?; + } + traces_serializer.end() + }), + )?; + SerializeMap::end(map_ser)?; + Ok(bytes) +} + +fn encode_trace( + ser: S, + chunk: &[Span], + metadata: &TracerMetadata, +) -> Result { + let mut map = ser.serialize_map(None)?; + + // Per-trace metadata. Always include hostname; other fields when set. + map.serialize_entry("hostname", &metadata.hostname)?; + if !metadata.env.is_empty() { + map.serialize_entry("env", &metadata.env)?; + } + if !metadata.language.is_empty() { + map.serialize_entry("languageName", &metadata.language)?; + } + if !metadata.language_version.is_empty() { + map.serialize_entry("languageVersion", &metadata.language_version)?; + } + if !metadata.tracer_version.is_empty() { + map.serialize_entry("tracerVersion", &metadata.tracer_version)?; + } + if !metadata.runtime_id.is_empty() { + map.serialize_entry("runtimeID", &metadata.runtime_id)?; + } + if let Some(container_id) = libdd_common::entity_id::get_container_id() { + map.serialize_entry("containerID", container_id)?; + } + + map.serialize_entry( + "spans", + &ser_fn!( |ser, chunk: &'a [Span]| { + let mut seq = ser.serialize_seq(Some(chunk.len()))?; + for (i, span) in chunk.iter().enumerate() { + let is_first = i == 0; + seq.serialize_element(&ser_fn!( |ser, span: &'a Span, is_first: bool| { + encode_span(ser, span, is_first) + }))?; + } + seq.end() + }), + )?; + + map.end() +} + +fn encode_span( + ser: S, + span: &Span, + is_first_in_trace: bool, +) -> Result { + let mut map = ser.serialize_map(None)?; + + let trace_id = span.trace_id; + map.serialize_entry( + "trace_id", + &ser_fn!(|ser, trace_id: u128| { + ser.collect_str(&format_args!("{:016x}", trace_id as u64)) + }), + )?; + let span_id = span.span_id; + map.serialize_entry( + "span_id", + &ser_fn!(|ser, span_id: u64| { ser.collect_str(&format_args!("{:016x}", span_id as u64)) }), + )?; + let parent_id = span.parent_id; + map.serialize_entry( + "parent_id", + &ser_fn!(|ser, parent_id: u64| { + ser.collect_str(&format_args!("{:016x}", parent_id as u64)) + }), + )?; + + // Resource defaults to name when empty. + let name_str: &str = span.name.borrow(); + let resource_str: &str = span.resource.borrow(); + let service_str: &str = span.service.borrow(); + map.serialize_entry("name", name_str)?; + map.serialize_entry( + "resource", + if resource_str.is_empty() { + name_str + } else { + resource_str + }, + )?; + map.serialize_entry("service", service_str)?; + map.serialize_entry("error", &span.error)?; + map.serialize_entry("start", &span.start)?; + map.serialize_entry("duration", &span.duration)?; + + let type_str: &str = span.r#type.borrow(); + if !type_str.is_empty() { + map.serialize_entry("type", type_str)?; + } + + map.serialize_entry( + "meta", + &ser_fn!( |ser, span: &'a Span, is_first_in_trace: bool| { + let upper_bits = (span.trace_id >> 64) as u64; + let mut p_tid_seen = false; + let mut meta = ser.serialize_map(None)?; + for (k, v) in span.meta.iter() { + let key: &str = k.borrow(); + if key == "_dd.p.tid" { + p_tid_seen = true; + } + let val: &str = v.borrow(); + meta.serialize_entry(key, val)?; + } + if !p_tid_seen && upper_bits != 0 { + meta.serialize_entry( + "_dd.p.tid", + &ser_fn!(|ser, upper_bits: u64| { + ser.collect_str(&format_args!("{:016x}", upper_bits as u64)) + }), + )?; + } + if !span.span_links.is_empty() { + if let Some(s) = serialize_span_links(&span.span_links) { + meta.serialize_entry("_dd.span_links", &s)?; + } + } + if !span.span_events.is_empty() { + if let Some(s) = serialize_span_events(&span.span_events) { + meta.serialize_entry("events", &s)?; + } + } + if is_first_in_trace { + meta.serialize_entry("_dd.compute_stats", "1")?; + } + meta.end() + }), + )?; + + map.serialize_entry( + "metrics", + &ser_fn!( |ser, span: &'a Span| { + let mut metrics = ser.serialize_map(None)?; + for (k, v) in span.metrics.iter() { + let key: &str = k.borrow(); + if v.is_finite() { + if key == "_top_level" { + metrics.serialize_entry(key, &(*v as u32))?; + } else { + // serde_json refuses to serialize NaN/Inf; drop them silently. + metrics.serialize_entry(key, v)?; + } + } + } + if span.parent_id == 0 { + metrics.serialize_entry("_trace_root", &1u32)?; + } + metrics.end() + }), + )?; + + if !span.meta_struct.is_empty() { + map.serialize_entry( + "meta_struct", + &ser_fn!( |ser, span: &'a Span| { + let mut ms = ser.serialize_map(Some(span.meta_struct.len()))?; + for (k, v) in span.meta_struct.iter() { + let key: &str = k.borrow(); + let bytes: &[u8] = v.borrow(); + // Encode as a JSON array of u8 (default serde behavior for &[u8]). + ms.serialize_entry(key, bytes)?; + } + ms.end() + }), + )?; + } + map.end() +} + +/// Serialize span links to a JSON string suitable for `meta['_dd.span_links']`. +/// +/// Returns `None` if serialization fails. The result is truncated to +/// [`MAX_META_VALUE_LEN`] characters with a trailing `"..."` if it would +/// otherwise exceed that limit. +fn serialize_span_links(links: &[SpanLink]) -> Option { + let s = serde_json::to_string(&ser_fn!( |ser, links: &'a [SpanLink]| { + let mut seq = ser.serialize_seq(Some(links.len()))?; + for link in links { + seq.serialize_element(&ser_fn!( |ser, link: &'a SpanLink| { + encode_span_link(ser, link) + }))?; + } + seq.end() + })) + .ok()?; + Some(truncate_with_ellipsis(s, MAX_META_VALUE_LEN)) +} + +fn encode_span_link( + ser: S, + link: &SpanLink, +) -> Result { + let mut map = ser.serialize_map(None)?; + let trace_id_128: u128 = ((link.trace_id_high as u128) << 64) | (link.trace_id as u128); + map.serialize_entry("trace_id", &format!("{:032x}", trace_id_128))?; + map.serialize_entry("span_id", &format!("{:016x}", link.span_id))?; + if !link.attributes.is_empty() { + map.serialize_entry( + "attributes", + &ser_fn!( |ser, link: &'a SpanLink| { + let mut attrs = ser.serialize_map(Some(link.attributes.len()))?; + for (k, v) in link.attributes.iter() { + let key: &str = k.borrow(); + let val: &str = v.borrow(); + attrs.serialize_entry(key, val)?; + } + attrs.end() + }), + )?; + } + // `flags == 0` means no sampling decision is available; omit the field. + if link.flags != 0 { + map.serialize_entry("flags", &(link.flags as u64))?; + } + let tracestate: &str = link.tracestate.borrow(); + if !tracestate.is_empty() { + map.serialize_entry("tracestate", tracestate)?; + } + map.end() +} + +/// Serialize span events to a JSON string suitable for `meta['events']`. +fn serialize_span_events(events: &[SpanEvent]) -> Option { + let s = serde_json::to_string(&ser_fn!( |ser, events: &'a [SpanEvent]| { + let mut seq = ser.serialize_seq(Some(events.len()))?; + for event in events { + seq.serialize_element(&ser_fn!( |ser, event: &'a SpanEvent| { + encode_span_event(ser, event) + }))?; + } + seq.end() + })) + .ok()?; + Some(truncate_with_ellipsis(s, MAX_META_VALUE_LEN)) +} + +fn encode_span_event( + ser: S, + event: &SpanEvent, +) -> Result { + let mut map = ser.serialize_map(None)?; + let name: &str = event.name.borrow(); + map.serialize_entry("name", name)?; + map.serialize_entry("time_unix_nano", &event.time_unix_nano)?; + if !event.attributes.is_empty() { + map.serialize_entry( + "attributes", + &ser_fn!( |ser, event: &'a SpanEvent| { + let mut attrs = ser.serialize_map(Some(event.attributes.len()))?; + for (k, v) in event.attributes.iter() { + let key: &str = k.borrow(); + attrs.serialize_entry(key, &ser_fn!( |ser, v: &'a AttributeAnyValue | { + match v { + AttributeAnyValue::SingleValue(v) => serialize_scalar(ser, v), + AttributeAnyValue::Array(values) => { + let mut seq = ser.serialize_seq(Some(values.len()))?; + for v in values { + seq.serialize_element(&ser_fn!( |ser, v: &'a AttributeArrayValue| { + serialize_scalar(ser, v) + }))?; + } + seq.end() + } + } + }))?; + } + attrs.end() + }), + )?; + } + map.end() +} + +fn serialize_scalar( + ser: S, + s: &AttributeArrayValue, +) -> Result { + match s { + AttributeArrayValue::String(s) => { + let s: &str = s.borrow(); + ser.serialize_str(s) + } + AttributeArrayValue::Boolean(b) => ser.serialize_bool(*b), + AttributeArrayValue::Integer(i) => ser.serialize_i64(*i), + AttributeArrayValue::Double(d) => { + if d.is_finite() { + ser.serialize_f64(*d) + } else { + // NaN/Inf become JSON null. + ser.serialize_unit() + } + } + } +} + +/// Truncate `s` to at most `max_len` bytes, appending `"..."` when truncation occurs. +fn truncate_with_ellipsis(s: String, max_len: usize) -> String { + if s.len() <= max_len { + return s; + } + let suffix_len = TRUNCATION_SUFFIX.len(); + let cut = max_len.saturating_sub(suffix_len); + // Find the previous char boundary so we don't slice in the middle of a UTF-8 sequence. + let mut end = cut; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + let mut truncated = String::with_capacity(end + suffix_len); + truncated.push_str(&s[..end]); + truncated.push_str(TRUNCATION_SUFFIX); + truncated +} + +#[cfg(test)] +mod tests; diff --git a/libdd-trace-utils/src/agentless_encoder/tests.rs b/libdd-trace-utils/src/agentless_encoder/tests.rs new file mode 100644 index 0000000000..073f026045 --- /dev/null +++ b/libdd-trace-utils/src/agentless_encoder/tests.rs @@ -0,0 +1,271 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::encode_payload; +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; +use crate::span::BytesData; +use crate::tracer_metadata::TracerMetadata; +use libdd_tinybytes::BytesString; +use serde_json::Value; +use std::collections::HashMap; + +fn bs(s: &'static str) -> BytesString { + BytesString::from_static(s) +} + +fn base_metadata() -> TracerMetadata { + TracerMetadata { + hostname: "host-1".to_string(), + env: "prod".to_string(), + runtime_id: "rt-1".to_string(), + service: "svc".to_string(), + tracer_version: "1.2.3".to_string(), + language: "nodejs".to_string(), + language_version: "v20.11.0".to_string(), + ..Default::default() + } +} + +fn json_from_bytes(b: &[u8]) -> Value { + serde_json::from_slice(b).expect("payload must be valid JSON") +} + +#[test] +fn top_level_payload_shape_and_metadata() { + let span: Span = Span { + service: bs("svc"), + name: bs("op"), + resource: bs("res"), + trace_id: 0xdeadbeef_u128, + span_id: 1, + parent_id: 0, + start: 2_500_000_000, + duration: 1_000_000, + metrics: HashMap::from_iter([("_top_level".into(), 1.0)]), + ..Default::default() + }; + let bytes = encode_payload(&[vec![span]], &base_metadata()).unwrap(); + let v = json_from_bytes(&bytes); + + assert!(v.is_object()); + let traces = v.get("traces").unwrap().as_array().unwrap(); + assert_eq!(traces.len(), 1); + + let t = &traces[0]; + assert_eq!(t["hostname"], "host-1"); + assert_eq!(t["env"], "prod"); + assert_eq!(t["languageName"], "nodejs"); + assert_eq!(t["languageVersion"], "v20.11.0"); + assert_eq!(t["tracerVersion"], "1.2.3"); + assert_eq!(t["runtimeID"], "rt-1"); + + let spans = t["spans"].as_array().unwrap(); + assert_eq!(spans.len(), 1); + let s = &spans[0]; + assert_eq!(s["trace_id"], "00000000deadbeef"); + assert_eq!(s["span_id"], "0000000000000001"); + assert_eq!(s["parent_id"], "0000000000000000"); + assert_eq!(s["name"], "op"); + assert_eq!(s["resource"], "res"); + assert_eq!(s["service"], "svc"); + assert_eq!(s["error"], 0); + assert_eq!(s["start"], 2_500_000_000_i64); + assert_eq!(s["duration"], 1_000_000); + + // Root span gets `_trace_root`, top-level (no parent), and first span gets + // compute_stats. + let metrics = s["metrics"].as_object().unwrap(); + assert_eq!(metrics["_trace_root"], 1); + assert_eq!(metrics["_top_level"], 1); + let meta = s["meta"].as_object().unwrap(); + assert_eq!(meta["_dd.compute_stats"], "1"); +} + +#[test] +fn resource_defaults_to_name_when_empty() { + let span: Span = Span { + service: bs("svc"), + name: bs("op"), + // resource omitted (default empty) + trace_id: 1, + span_id: 1, + parent_id: 0, + start: 0, + duration: 1, + ..Default::default() + }; + let bytes = encode_payload(&[vec![span]], &base_metadata()).unwrap(); + let v = json_from_bytes(&bytes); + let s = &v["traces"][0]["spans"][0]; + assert_eq!(s["resource"], "op"); +} + +#[test] +fn keeps_existing_dd_p_tid_in_meta() { + // When the tracer already supplies `_dd.p.tid`, the encoder must pass it + // through unchanged and must NOT auto-inject a second value. + let mut span: Span = Span { + service: bs("svc"), + name: bs("op"), + // 64-bit trace_id — upper 64 bits are zero, so no auto-inject would fire. + trace_id: 0x1234_5678_9abc_def0_u128, + span_id: 2, + parent_id: 0, + start: 0, + duration: 1, + ..Default::default() + }; + span.meta.insert(bs("_dd.p.tid"), bs("5b8efff798038103")); + span.meta.insert(bs("some.tag"), bs("kept")); + + let v = json_from_bytes(&encode_payload(&[vec![span]], &base_metadata()).unwrap()); + let s = &v["traces"][0]["spans"][0]; + // Only the low 64 bits appear in `trace_id`. + assert_eq!(s["trace_id"], "123456789abcdef0"); + let meta = s["meta"].as_object().unwrap(); + // The tracer-supplied `_dd.p.tid` is preserved as-is. + assert_eq!(meta["_dd.p.tid"], "5b8efff798038103"); + assert_eq!(meta["some.tag"], "kept"); +} + +#[test] +fn span_links_serialised_into_meta_as_json_string() { + // Span links are JSON-stringified and stored in meta["_dd.span_links"]; + // no top-level `span_links` field is emitted. + let link = SpanLink:: { + trace_id: 0x9abc_def0_1234_5678, + trace_id_high: 0x0011_2233_4455_6677, + span_id: 0xfeed_face_dead_beef, + attributes: HashMap::from([(bs("link.name"), bs("scheduled_by"))]), + flags: 1, + tracestate: bs("dd=s:1"), + }; + let span: Span = Span { + service: bs("svc"), + name: bs("op"), + trace_id: 1, + span_id: 1, + parent_id: 0, + start: 0, + duration: 1, + span_links: vec![link], + ..Default::default() + }; + let v = json_from_bytes(&encode_payload(&[vec![span]], &base_metadata()).unwrap()); + let s = &v["traces"][0]["spans"][0]; + // No top-level `span_links` field. + assert!(s.get("span_links").is_none_or(|v| v.is_null())); + // Links are stored as a JSON string in meta["_dd.span_links"]. + let raw = s["meta"]["_dd.span_links"] + .as_str() + .expect("meta[_dd.span_links] must be a string"); + let links: serde_json::Value = serde_json::from_str(raw).expect("must be valid JSON"); + let link_obj = &links[0]; + // 32-char lowercase hex full 128-bit trace ID. + let expected_trace_id = format!( + "{:032x}", + ((0x0011_2233_4455_6677u128) << 64) | 0x9abc_def0_1234_5678_u128 + ); + assert_eq!(link_obj["trace_id"], expected_trace_id); + assert_eq!(expected_trace_id.len(), 32); + assert_eq!(link_obj["span_id"], "feedfacedeadbeef"); + assert_eq!(link_obj["attributes"]["link.name"], "scheduled_by"); + assert_eq!(link_obj["flags"], 1); + assert_eq!(link_obj["tracestate"], "dd=s:1"); +} + +#[test] +fn span_events_serialised_into_meta_as_json_string() { + // Span events are JSON-stringified and stored in meta["events"]; + // no top-level `span_events` field is emitted. + let event = SpanEvent:: { + time_unix_nano: 1_700_000_000_000_000_000, + name: bs("exception"), + attributes: HashMap::from([( + bs("exception.message"), + AttributeAnyValue::SingleValue(AttributeArrayValue::String(bs("timeout"))), + )]), + }; + let span: Span = Span { + service: bs("svc"), + name: bs("op"), + trace_id: 1, + span_id: 1, + parent_id: 0, + start: 0, + duration: 1, + span_events: vec![event], + ..Default::default() + }; + let v = json_from_bytes(&encode_payload(&[vec![span]], &base_metadata()).unwrap()); + let s = &v["traces"][0]["spans"][0]; + // No top-level `span_events` field. + assert!(s.get("span_events").is_none_or(|v| v.is_null())); + // Events are stored as a JSON string in meta["events"]. + let raw = s["meta"]["events"] + .as_str() + .expect("meta[events] must be a string"); + let events: serde_json::Value = serde_json::from_str(raw).expect("must be valid JSON"); + let evt = &events[0]; + assert_eq!(evt["name"], "exception"); + assert_eq!(evt["time_unix_nano"], 1_700_000_000_000_000_000_u64); + assert_eq!(evt["attributes"]["exception.message"], "timeout"); +} + +#[test] +fn top_level_only_for_first_span_when_parent_in_other_service() { + // Trace with two spans, parent in different service. + let parent: Span = Span { + service: bs("svc-a"), + name: bs("op"), + trace_id: 1, + span_id: 10, + parent_id: 0, + start: 0, + duration: 1, + metrics: HashMap::from_iter([("_top_level".into(), 1.0)]), + ..Default::default() + }; + let child_same_service: Span = Span { + service: bs("svc-a"), + name: bs("op"), + trace_id: 1, + span_id: 11, + parent_id: 10, + start: 0, + duration: 1, + ..Default::default() + }; + let child_other_service: Span = Span { + service: bs("svc-b"), + name: bs("op"), + trace_id: 1, + span_id: 12, + parent_id: 10, + start: 0, + duration: 1, + metrics: HashMap::from_iter([("_top_level".into(), 1.0)]), + ..Default::default() + }; + let v = json_from_bytes( + &encode_payload( + &[vec![parent, child_same_service, child_other_service]], + &base_metadata(), + ) + .unwrap(), + ); + let spans = v["traces"][0]["spans"].as_array().unwrap(); + // Parent (root) is top-level + trace_root. + assert_eq!(spans[0]["metrics"]["_top_level"], 1); + assert_eq!(spans[0]["metrics"]["_trace_root"], 1); + // Child in same service: NOT top-level. + assert!(spans[1]["metrics"].get("_top_level").is_none()); + assert!(spans[1]["metrics"].get("_trace_root").is_none()); + // Child in other service: top-level (parent service differs). + assert_eq!(spans[2]["metrics"]["_top_level"], 1); + assert!(spans[2]["metrics"].get("_trace_root").is_none()); + // Only the first span in the chunk gets _dd.compute_stats. + assert_eq!(spans[0]["meta"]["_dd.compute_stats"], "1"); + assert!(spans[1]["meta"].get("_dd.compute_stats").is_none()); + assert!(spans[2]["meta"].get("_dd.compute_stats").is_none()); +} diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index aa8d93c887..2ada15701f 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -7,6 +7,7 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] +pub mod agentless_encoder; pub mod config_utils; pub mod msgpack_decoder; pub mod msgpack_encoder;