diff --git a/Cargo.lock b/Cargo.lock index fff06dbaa3..333063960c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3006,6 +3006,7 @@ dependencies = [ "libdd-trace-protobuf", "libdd-trace-stats", "libdd-trace-utils", + "prost", "rand 0.8.5", "regex", "rmp-serde", @@ -3455,6 +3456,7 @@ dependencies = [ "flate2", "futures", "getrandom 0.2.15", + "hex", "http", "http-body", "http-body-util", diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 5271c86e63..bd027d2691 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -9,6 +9,7 @@ use libdd_common_ffi::{ CharSlice, {slice::AsBytes, slice::ByteSlice}, }; +use libdd_data_pipeline::otlp::OtlpProtocol; use libdd_data_pipeline::trace_exporter::{ TelemetryConfig, TelemetryInstrumentationSessions, TraceExporter as GenericTraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat, @@ -83,6 +84,7 @@ pub struct TraceExporterConfig { connection_timeout: Option, shared_runtime: Option>, otlp_endpoint: Option, + otlp_protocol: Option, } #[no_mangle] @@ -498,12 +500,51 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( ) } +/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or +/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value +/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`). +/// +/// Has no effect unless an OTLP endpoint is also configured via +/// `ddog_trace_exporter_config_set_otlp_endpoint`; without one, traces are sent to the +/// Datadog agent and this protocol selection is ignored. +/// +/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted +/// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( + config: Option<&mut TraceExporterConfig>, + protocol: CharSlice, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + let value = match sanitize_string(protocol) { + Ok(s) => s, + Err(e) => return Some(e), + }; + // `FromStr` is the single source of truth for string -> OtlpProtocol. The OTLP trace + // exporter is HTTP-only, so we additionally reject `Grpc` here (it parses, but is + // unsupported) rather than storing a value the exporter would refuse at send time. + // The `_` arm also covers any future non_exhaustive variant. + match value.parse::() { + Ok(p @ (OtlpProtocol::HttpJson | OtlpProtocol::HttpProtobuf)) => { + handle.otlp_protocol = Some(p); + None + } + _ => gen_error!(ErrorCode::InvalidArgument), + } + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Create a new TraceExporter instance. /// -/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in -/// OTLP HTTP/JSON to that endpoint instead of the Datadog agent. The same payload (e.g. -/// MessagePack) is passed to `ddog_trace_exporter_send`; the library decodes and converts to -/// OTLP when OTLP is enabled. +/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces to +/// that endpoint in OTLP over HTTP — JSON or protobuf per the configured protocol — instead of +/// to the Datadog agent. The same payload (e.g. MessagePack) is passed to +/// `ddog_trace_exporter_send`; the library decodes and converts it to OTLP when OTLP is enabled. /// /// # Arguments /// @@ -565,6 +606,9 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( if let Some(ref url) = config.otlp_endpoint { builder.set_otlp_endpoint(url); + if let Some(protocol) = config.otlp_protocol { + builder.set_otlp_protocol(protocol); + } } match builder.build() { @@ -1283,6 +1327,95 @@ mod tests { } } + #[test] + fn config_otlp_protocol_test() { + unsafe { + // Null config → InvalidArgument + let error = + ddog_trace_exporter_config_set_otlp_protocol(None, CharSlice::from("http/json")); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // "http/json" → success, stored + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("http/json"), + ); + assert_eq!(error, None); + assert_eq!( + config.as_ref().unwrap().otlp_protocol, + Some(OtlpProtocol::HttpJson) + ); + + // "http/protobuf" → success, stored + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("http/protobuf"), + ); + assert_eq!(error, None); + assert_eq!( + config.as_ref().unwrap().otlp_protocol, + Some(OtlpProtocol::HttpProtobuf) + ); + + // "grpc" → InvalidArgument + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("grpc"), + ); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // Garbage value → InvalidArgument + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from("nonsense"), + ); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // Non-UTF-8 input → InvalidInput + let mut config = Some(TraceExporterConfig::default()); + let invalid: [u8; 2] = [0x80u8, 0xFFu8]; + let error = ddog_trace_exporter_config_set_otlp_protocol( + config.as_mut(), + CharSlice::from_bytes(&invalid), + ); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidInput); + ddog_trace_exporter_error_free(error); + } + } + + #[test] + fn set_otlp_protocol_stores_parsed_enum() { + use libdd_data_pipeline::otlp::OtlpProtocol; + let mut cfg = TraceExporterConfig::default(); + let err = unsafe { + ddog_trace_exporter_config_set_otlp_protocol( + Some(&mut cfg), + CharSlice::from("http/protobuf"), + ) + }; + assert!(err.is_none()); + assert_eq!(cfg.otlp_protocol, Some(OtlpProtocol::HttpProtobuf)); + } + + #[test] + fn set_otlp_protocol_rejects_grpc_and_unknown() { + let mut cfg = TraceExporterConfig::default(); + for bad in ["grpc", "nonsense"] { + let err = unsafe { + ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(bad)) + }; + assert!(err.is_some(), "expected error for {bad}"); + assert_eq!(cfg.otlp_protocol, None, "{bad} must not be stored"); + } + } + #[cfg(all(feature = "catch_panic", panic = "unwind"))] #[test] fn catch_panic_test() { diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index bb93a10a59..85681e902f 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -73,6 +73,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils", features = [ "test-utils", ] } httpmock = "0.8.0-alpha.1" +prost = "0.14.1" rand = "0.8.5" tempfile = "3.3.0" tokio = { version = "1.23", features = [ diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 2b9955ce3d..f34613bd51 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -13,7 +13,7 @@ pub mod agent_info; mod health_metrics; -pub(crate) mod otlp; +pub mod otlp; #[cfg(feature = "telemetry")] pub(crate) mod telemetry; #[cfg(not(target_arch = "wasm32"))] diff --git a/libdd-data-pipeline/src/otlp/config.rs b/libdd-data-pipeline/src/otlp/config.rs index 02d7a45f80..36176f3a28 100644 --- a/libdd-data-pipeline/src/otlp/config.rs +++ b/libdd-data-pipeline/src/otlp/config.rs @@ -6,20 +6,77 @@ use http::HeaderMap; use std::time::Duration; -/// OTLP trace export protocol. HTTP/JSON is currently supported. +/// OTLP trace export protocol. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub(crate) enum OtlpProtocol { +#[non_exhaustive] +pub enum OtlpProtocol { /// HTTP with JSON body (Content-Type: application/json). Default for HTTP. #[default] HttpJson, - /// HTTP with protobuf body. (Not supported yet) - #[allow(dead_code)] + /// HTTP with protobuf body (Content-Type: application/x-protobuf). HttpProtobuf, - /// gRPC. (Not supported yet) - #[allow(dead_code)] + /// gRPC. Parsed by `FromStr` so callers get a clean error, but rejected at export time + /// (unsupported). Grpc, } +impl std::str::FromStr for OtlpProtocol { + type Err = String; + fn from_str(s: &str) -> Result { + match s { + "http/json" => Ok(OtlpProtocol::HttpJson), + "http/protobuf" => Ok(OtlpProtocol::HttpProtobuf), + "grpc" => Ok(OtlpProtocol::Grpc), + other => Err(format!("unknown OTLP protocol: {other}")), + } + } +} + +/// The wire encoding actually used to send OTLP traces over HTTP. Internal, closed set: the +/// only encodings the exporter supports. The user-facing [`OtlpProtocol`] (which also carries the +/// unsupported `Grpc`) converts into this at the send boundary via `TryFrom`. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum OtlpWireProtocol { + Json, + Protobuf, +} + +impl std::convert::TryFrom for OtlpWireProtocol { + type Error = OtlpProtocol; + /// Maps the user-facing protocol to a supported wire encoding. `Grpc` is unsupported and + /// returns `Err(Grpc)` so the caller surfaces a clean error instead of silently downgrading. + fn try_from(p: OtlpProtocol) -> Result { + match p { + OtlpProtocol::HttpJson => Ok(OtlpWireProtocol::Json), + OtlpProtocol::HttpProtobuf => Ok(OtlpWireProtocol::Protobuf), + other => Err(other), + } + } +} + +impl OtlpWireProtocol { + /// The HTTP `Content-Type` for this encoding. + pub fn content_type(&self) -> http::HeaderValue { + match self { + OtlpWireProtocol::Json => libdd_common::header::APPLICATION_JSON, + OtlpWireProtocol::Protobuf => libdd_common::header::APPLICATION_PROTOBUF, + } + } + + /// Encode the prost OTLP request to this wire format. + pub fn encode( + &self, + req: &libdd_trace_utils::otlp_encoder::ProtoExportTraceServiceRequest, + ) -> Result, serde_json::Error> { + match self { + OtlpWireProtocol::Json => libdd_trace_utils::otlp_encoder::encode_otlp_json(req), + OtlpWireProtocol::Protobuf => { + Ok(libdd_trace_utils::otlp_encoder::encode_otlp_protobuf(req)) + } + } + } +} + /// Default timeout for OTLP export requests. pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10); @@ -32,7 +89,51 @@ pub struct OtlpTraceConfig { pub headers: HeaderMap, /// Request timeout. pub timeout: Duration, - /// Protocol (for future use; currently only HttpJson is supported). - #[allow(dead_code)] - pub(crate) protocol: OtlpProtocol, + /// OTLP export protocol (selects body encoding and content-type). + pub protocol: OtlpProtocol, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + #[test] + fn protocol_from_str() { + assert_eq!( + OtlpProtocol::from_str("http/json").unwrap(), + OtlpProtocol::HttpJson + ); + assert_eq!( + OtlpProtocol::from_str("http/protobuf").unwrap(), + OtlpProtocol::HttpProtobuf + ); + assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); + assert!(OtlpProtocol::from_str("nonsense").is_err()); + } + + #[test] + fn wire_protocol_from_user_protocol() { + use std::convert::TryFrom; + assert_eq!( + OtlpWireProtocol::try_from(OtlpProtocol::HttpJson).unwrap(), + OtlpWireProtocol::Json + ); + assert_eq!( + OtlpWireProtocol::try_from(OtlpProtocol::HttpProtobuf).unwrap(), + OtlpWireProtocol::Protobuf + ); + assert!(OtlpWireProtocol::try_from(OtlpProtocol::Grpc).is_err()); + } + + #[test] + fn wire_protocol_content_types() { + assert_eq!( + OtlpWireProtocol::Json.content_type(), + libdd_common::header::APPLICATION_JSON + ); + assert_eq!( + OtlpWireProtocol::Protobuf.content_type(), + libdd_common::header::APPLICATION_PROTOBUF + ); + } } diff --git a/libdd-data-pipeline/src/otlp/exporter.rs b/libdd-data-pipeline/src/otlp/exporter.rs index 1f4d86a235..fd07df34cf 100644 --- a/libdd-data-pipeline/src/otlp/exporter.rs +++ b/libdd-data-pipeline/src/otlp/exporter.rs @@ -1,9 +1,9 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP HTTP/JSON trace exporter. +//! OTLP HTTP trace exporter (JSON or protobuf). -use super::config::OtlpTraceConfig; +use super::config::{OtlpTraceConfig, OtlpWireProtocol}; use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; use libdd_capabilities::{HttpClientCapability, SleepCapability}; use libdd_common::Endpoint; @@ -16,7 +16,9 @@ const OTLP_MAX_RETRIES: u32 = 4; /// Initial backoff between retries (milliseconds). const OTLP_RETRY_DELAY_MS: u64 = 100; -/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries. +/// Send an OTLP trace payload to the configured endpoint with retries. +/// +/// The `Content-Type` is derived from `wire`, which already selected the encoding. /// /// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters. /// @@ -25,8 +27,9 @@ const OTLP_RETRY_DELAY_MS: u64 = 100; pub async fn send_otlp_traces_http( capabilities: &C, config: &OtlpTraceConfig, + wire: OtlpWireProtocol, test_token: Option<&str>, - json_body: Vec, + body: Vec, ) -> Result<(), TraceExporterError> { let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( @@ -41,11 +44,10 @@ pub async fn send_otlp_traces_http( ..Endpoint::default() }; + let content_type = wire.content_type(); + let mut headers = config.headers.clone(); - headers.insert( - http::header::CONTENT_TYPE, - libdd_common::header::APPLICATION_JSON, - ); + headers.insert(http::header::CONTENT_TYPE, content_type); if let Some(token) = test_token { if let Ok(val) = http::HeaderValue::from_str(token) { headers.insert( @@ -62,7 +64,7 @@ pub async fn send_otlp_traces_http( None, ); - match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await { + match send_with_retry(capabilities, &target, body, &headers, &retry_strategy).await { Ok(_) => Ok(()), Err(e) => Err(map_send_error(e).await), } diff --git a/libdd-data-pipeline/src/otlp/mod.rs b/libdd-data-pipeline/src/otlp/mod.rs index 658fc13b87..8937a967c9 100644 --- a/libdd-data-pipeline/src/otlp/mod.rs +++ b/libdd-data-pipeline/src/otlp/mod.rs @@ -5,8 +5,9 @@ //! //! When an OTLP endpoint is configured via //! [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], the trace exporter sends -//! traces in OTLP HTTP/JSON format to that endpoint instead of the Datadog agent. The host language -//! is responsible for resolving the endpoint from its own configuration (e.g. +//! traces in OTLP HTTP format to that endpoint instead of the Datadog agent; the wire encoding +//! (JSON or protobuf) is selected via [`OtlpProtocol`]. The host language is responsible for +//! resolving the endpoint from its own configuration (e.g. //! `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`). //! //! ## Sampling @@ -22,9 +23,12 @@ //! spans from a local trace are closed (i.e. send complete trace chunks). This crate does not //! buffer or flush partially—it exports whatever trace chunks it receives. -pub mod config; -pub mod exporter; +pub(crate) mod config; +pub(crate) mod exporter; -pub use config::OtlpTraceConfig; -pub use exporter::send_otlp_traces_http; +pub use config::{OtlpProtocol, OtlpTraceConfig}; +// Internal: the resolved wire encoding. Callers select via the user-facing `OtlpProtocol`; this +// is derived from it at the send boundary and is not part of the crate's public API. +pub(crate) use config::OtlpWireProtocol; +pub(crate) use exporter::send_otlp_traces_http; pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo}; diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 3c0e1f14b5..31fe5729a0 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -65,6 +65,7 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + otlp_protocol: OtlpProtocol, } impl TraceExporterBuilder { @@ -286,6 +287,17 @@ impl TraceExporterBuilder { self } + /// Selects the OTLP export protocol. Accepts `OtlpProtocol::HttpJson` (default) or + /// `OtlpProtocol::HttpProtobuf`. The host language resolves this from + /// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`. + /// + /// `OtlpProtocol::Grpc` is not supported; selecting it makes [`build`](Self::build) / + /// [`build_async`](Self::build_async) fail with [`BuilderErrorKind::InvalidConfiguration`]. + pub fn set_otlp_protocol(&mut self, protocol: OtlpProtocol) -> &mut Self { + self.otlp_protocol = protocol; + self + } + /// Sets additional HTTP headers to include in OTLP trace export requests. /// /// Headers should be provided as key-value pairs. The host language is responsible for @@ -429,31 +441,47 @@ impl TraceExporterBuilder { } }; - let otlp_config = self.otlp_endpoint.map(|url| { - let mut headers = http::HeaderMap::new(); - for (key, value) in self.otlp_headers { - match ( - http::HeaderName::from_bytes(key.as_bytes()), - http::HeaderValue::from_str(&value), - ) { - (Ok(name), Ok(val)) => { - headers.insert(name, val); - } - _ => { - tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", key, value); + let otlp_config = match self.otlp_endpoint { + Some(url) => { + // OTLP gRPC export is not implemented. Reject it here — but only when OTLP export + // is actually enabled (an endpoint is configured) — so a `grpc` value resolved + // from the OTel-default `OTEL_EXPORTER_OTLP_PROTOCOL` stays inert for a normal + // Datadog-agent exporter rather than failing its build. Fails fast with the same + // `InvalidConfiguration` category (FFI: `InvalidArgument`) the C FFI setter uses; + // the send-time arm in `send_otlp_traces_inner` remains a defensive guard. + if self.otlp_protocol == OtlpProtocol::Grpc { + return Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration( + "OTLP gRPC export is not supported".to_string(), + ), + )); + } + let mut headers = http::HeaderMap::new(); + for (key, value) in self.otlp_headers { + match ( + http::HeaderName::from_bytes(key.as_bytes()), + http::HeaderValue::from_str(&value), + ) { + (Ok(name), Ok(val)) => { + headers.insert(name, val); + } + _ => { + tracing::warn!("Skipping invalid OTLP header: {:?}={:?}", key, value); + } } } + Some(OtlpTraceConfig { + endpoint_url: url, + headers, + timeout: self + .connection_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_OTLP_TIMEOUT), + protocol: self.otlp_protocol, + }) } - OtlpTraceConfig { - endpoint_url: url, - headers, - timeout: self - .connection_timeout - .map(Duration::from_millis) - .unwrap_or(DEFAULT_OTLP_TIMEOUT), - protocol: OtlpProtocol::HttpJson, - } - }); + None => None, + }; Ok(TraceExporter { endpoint: Endpoint { @@ -672,6 +700,41 @@ mod tests { )); } + // Builds a live agent exporter (spawns the agent_info worker, which makes real syscalls), + // so it can't run under miri. + #[cfg_attr(miri, ignore)] + #[test] + fn test_otlp_grpc_without_endpoint_still_builds() { + // The protocol setting must stay inert when no OTLP endpoint is configured: a `grpc` + // value (e.g. resolved from the OTel-default `OTEL_EXPORTER_OTLP_PROTOCOL`) must NOT + // break a normal Datadog-agent exporter that does no OTLP export at all. + let mut builder = TraceExporterBuilder::default(); + builder.set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!( + result.is_ok(), + "grpc protocol without an OTLP endpoint must still build the agent exporter" + ); + } + + #[test] + fn test_otlp_grpc_with_endpoint_rejected_at_build() { + // When OTLP export IS enabled (endpoint set), gRPC is unsupported and must fail fast at + // build time (not on the first send), with the same `InvalidConfiguration` category the + // C FFI setter uses. + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint("http://localhost:4318/v1/traces") + .set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc); + let result = builder.build::(); + assert!(matches!( + result, + Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration(_) + )) + )); + } + #[cfg_attr(miri, ignore)] #[test] fn test_build_with_v1_starts_inactive() { diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 070fc754e0..730d0dec10 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -15,7 +15,9 @@ use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; use crate::agent_info::ResponseObserver; -use crate::otlp::{map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig}; +use crate::otlp::{ + map_traces_to_otlp, send_otlp_traces_http, OtlpResourceInfo, OtlpTraceConfig, OtlpWireProtocol, +}; #[cfg(feature = "telemetry")] use crate::telemetry::{SendPayloadTelemetry, TelemetryClient}; use crate::trace_exporter::agent_response::{ @@ -545,16 +547,25 @@ impl Tra r.runtime_id = self.metadata.runtime_id.clone(); r }; + let wire = OtlpWireProtocol::try_from(config.protocol).map_err(|unsupported| { + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( + "unsupported OTLP protocol for HTTP export: {unsupported:?}" + ))) + })?; + // Single prost OTLP IR; each protocol encodes the same request to its wire format. let request = map_traces_to_otlp(traces, &resource_info); - let json_body = serde_json::to_vec(&request).map_err(|e| { - error!("OTLP JSON serialization error: {e}"); - TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string())) + let body = wire.encode(&request).map_err(|e| { + error!("OTLP serialization error: {e}"); + TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( + "failed to encode OTLP request: {e}" + ))) })?; send_otlp_traces_http( &self.capabilities, config, + wire, self.endpoint.test_token.as_deref(), - json_body, + body, ) .await?; Ok(AgentResponse::Unchanged) diff --git a/libdd-data-pipeline/tests/test_trace_exporter_otlp_protobuf_export.rs b/libdd-data-pipeline/tests/test_trace_exporter_otlp_protobuf_export.rs new file mode 100644 index 0000000000..2d193ed387 --- /dev/null +++ b/libdd-data-pipeline/tests/test_trace_exporter_otlp_protobuf_export.rs @@ -0,0 +1,86 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 +#[cfg(test)] +mod otlp_protobuf_tests { + use libdd_capabilities_impl::NativeCapabilities; + use libdd_data_pipeline::trace_exporter::TraceExporterBuilder; + use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; + use libdd_trace_utils::test_utils::create_test_json_span; + use prost::Message; + use serde_json::json; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use tokio::task; + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn otlp_protobuf_export_sends_decodable_payload() { + use httpmock::MockServer; + + // The httpmock 0.8 alpha API does not expose captured request bodies after the fact, so + // we decode and validate the protobuf body inside a custom request matcher. The matcher + // flips `body_valid` when the payload decodes and carries the expected service.name. + let body_valid = Arc::new(AtomicBool::new(false)); + let matcher_flag = body_valid.clone(); + + let server = MockServer::start_async().await; + let mock = server + .mock_async(move |when, then| { + let flag = matcher_flag.clone(); + when.method("POST") + .path("/v1/traces") + .header("content-type", "application/x-protobuf") + .is_true(move |req: &httpmock::prelude::HttpMockRequest| { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value; + let Ok(decoded) = ExportTraceServiceRequest::decode(req.body_ref()) else { + return false; + }; + let valid = decoded + .resource_spans + .first() + .and_then(|rs| rs.resource.as_ref()) + .map(|resource| { + resource.attributes.iter().any(|kv| { + kv.key == "service.name" + && matches!( + kv.value.as_ref().and_then(|v| v.value.as_ref()), + Some(Value::StringValue(s)) if s == "test" + ) + }) + }) + .unwrap_or(false); + if valid { + flag.store(true, Ordering::SeqCst); + } + valid + }); + then.status(200).body(""); + }) + .await; + + let endpoint = format!("http://localhost:{}/v1/traces", server.port()); + let task_result = task::spawn_blocking(move || { + let mut builder = TraceExporterBuilder::default(); + builder + .set_otlp_endpoint(&endpoint) + .set_otlp_protocol(libdd_data_pipeline::otlp::OtlpProtocol::HttpProtobuf) + .set_language("test-lang") + .set_tracer_version("1.0") + .set_env("test_env") + .set_service("test"); + let exporter = builder.build::().expect("build"); + let mut span = create_test_json_span(1234, 12342, 12341, 1, false); + span["name"] = json!("pb_span"); + let data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); + exporter.send(data.as_ref()).expect("send ok"); + }) + .await; + + assert!(task_result.is_ok()); + assert_eq!(mock.calls_async().await, 1); + assert!( + body_valid.load(Ordering::SeqCst), + "protobuf body did not decode to the expected ExportTraceServiceRequest" + ); + } +} diff --git a/libdd-trace-protobuf/build.rs b/libdd-trace-protobuf/build.rs index c9c891a681..ee60555381 100644 --- a/libdd-trace-protobuf/build.rs +++ b/libdd-trace-protobuf/build.rs @@ -36,6 +36,12 @@ fn generate_protobuf() { config.out_dir(output_path.clone()); + // The vendored OpenTelemetry proto doc comments are kept (generated onto the prost structs). + // The one comment with an indented example block (`Span.attributes` in trace.proto) is fenced + // as a ```text block in the vendored `.proto`, so rustdoc renders it as text rather than + // compiling it as a Rust doctest. Keep new vendored comments doctest-safe (fence example + // blocks) rather than re-adding a blanket `disable_comments`. + // The following prost_build config changes modify the protobuf generated structs in // in the following ways: @@ -62,9 +68,14 @@ fn generate_protobuf() { config.field_attribute(".pb.SpanLink.tracestate", "#[serde(default)]"); config.field_attribute(".pb.SpanLink.flags", "#[serde(default)]"); - config.type_attribute("Span", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("pb.Span", "#[derive(Deserialize, Serialize)]"); config.type_attribute( - "Span", + "pb.Span", + r#"#[cfg_attr(feature = "fuzzing", derive(bolero::TypeGenerator))]"#, + ); + config.type_attribute("pb.idx.Span", "#[derive(Deserialize, Serialize)]"); + config.type_attribute( + "pb.idx.Span", r#"#[cfg_attr(feature = "fuzzing", derive(bolero::TypeGenerator))]"#, ); config.field_attribute( @@ -319,6 +330,8 @@ fn generate_protobuf() { "src/pb/stats.proto", "src/pb/remoteconfig.proto", "src/pb/opentelemetry/proto/common/v1/process_context.proto", + "src/pb/opentelemetry/proto/trace/v1/trace.proto", + "src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto", "src/pb/idx/tracer_payload.proto", "src/pb/idx/span.proto", ], @@ -363,6 +376,14 @@ fn generate_protobuf() { otel_license, &output_path.join("opentelemetry.proto.common.v1.rs"), ); + prepend_to_file( + otel_license, + &output_path.join("opentelemetry.proto.trace.v1.rs"), + ); + prepend_to_file( + otel_license, + &output_path.join("opentelemetry.proto.collector.trace.v1.rs"), + ); } #[cfg(feature = "generate-protobuf")] diff --git a/libdd-trace-protobuf/src/_includes.rs b/libdd-trace-protobuf/src/_includes.rs index 1628f52c39..0377bbe9dc 100644 --- a/libdd-trace-protobuf/src/_includes.rs +++ b/libdd-trace-protobuf/src/_includes.rs @@ -4,6 +4,13 @@ // This file is @generated by prost-build. pub mod opentelemetry { pub mod proto { + pub mod collector { + pub mod trace { + pub mod v1 { + include!("opentelemetry.proto.collector.trace.v1.rs"); + } + } + } pub mod common { pub mod v1 { include!("opentelemetry.proto.common.v1.rs"); @@ -14,6 +21,11 @@ pub mod opentelemetry { include!("opentelemetry.proto.resource.v1.rs"); } } + pub mod trace { + pub mod v1 { + include!("opentelemetry.proto.trace.v1.rs"); + } + } } } pub mod pb { diff --git a/libdd-trace-protobuf/src/opentelemetry.proto.collector.trace.v1.rs b/libdd-trace-protobuf/src/opentelemetry.proto.collector.trace.v1.rs new file mode 100644 index 0000000000..3a1e3db44d --- /dev/null +++ b/libdd-trace-protobuf/src/opentelemetry.proto.collector.trace.v1.rs @@ -0,0 +1,54 @@ +// Copyright 2019, OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExportTraceServiceRequest { + /// An array of ResourceSpans. + /// For data coming from a single resource this array will typically contain one + /// element. Intermediary nodes (such as OpenTelemetry Collector) that receive + /// data from multiple origins typically batch the data before forwarding further and + /// in that case this array will contain multiple elements. + #[prost(message, repeated, tag = "1")] + pub resource_spans: ::prost::alloc::vec::Vec< + super::super::super::trace::v1::ResourceSpans, + >, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ExportTraceServiceResponse { + /// The details of a partially successful export request. + /// + /// If the request is only partially accepted + /// (i.e. when the server accepts only parts of the data and rejects the rest) + /// the server MUST initialize the `partial_success` field and MUST + /// set the `rejected_` with the number of items it rejected. + /// + /// Servers MAY also make use of the `partial_success` field to convey + /// warnings/suggestions to senders even when the request was fully accepted. + /// In such cases, the `rejected_` MUST have a value of `0` and + /// the `error_message` MUST be non-empty. + /// + /// A `partial_success` message with an empty value (rejected_ = 0 and + /// `error_message` = "") is equivalent to it not being set/present. Senders + /// SHOULD interpret it the same way as in the full success case. + #[prost(message, optional, tag = "1")] + pub partial_success: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ExportTracePartialSuccess { + /// The number of rejected spans. + /// + /// A `rejected_` field holding a `0` value indicates that the + /// request was fully accepted. + #[prost(int64, tag = "1")] + pub rejected_spans: i64, + /// A developer-facing human-readable message in English. It should be used + /// either to explain why the server rejected parts of the data during a partial + /// success or to convey warnings/suggestions during a full success. The message + /// should offer guidance on how users can address such issues. + /// + /// error_message is an optional field. An error_message with an empty value + /// is equivalent to it not being set. + #[prost(string, tag = "2")] + pub error_message: ::prost::alloc::string::String, +} diff --git a/libdd-trace-protobuf/src/opentelemetry.proto.trace.v1.rs b/libdd-trace-protobuf/src/opentelemetry.proto.trace.v1.rs new file mode 100644 index 0000000000..ef67bdd27e --- /dev/null +++ b/libdd-trace-protobuf/src/opentelemetry.proto.trace.v1.rs @@ -0,0 +1,440 @@ +// Copyright 2019, OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// This file is @generated by prost-build. +/// TracesData represents the traces data that can be stored in a persistent storage, +/// OR can be embedded by other protocols that transfer OTLP traces data but do +/// not implement the OTLP protocol. +/// +/// The main difference between this message and collector protocol is that +/// in this message there will not be any "control" or "metadata" specific to +/// OTLP protocol. +/// +/// When new fields are added into this message, the OTLP request MUST be updated +/// as well. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TracesData { + /// An array of ResourceSpans. + /// For data coming from a single resource this array will typically contain + /// one element. Intermediary nodes that receive data from multiple origins + /// typically batch the data before forwarding further and in that case this + /// array will contain multiple elements. + #[prost(message, repeated, tag = "1")] + pub resource_spans: ::prost::alloc::vec::Vec, +} +/// A collection of ScopeSpans from a Resource. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceSpans { + /// The resource for the spans in this message. + /// If this field is not set then no resource info is known. + #[prost(message, optional, tag = "1")] + pub resource: ::core::option::Option, + /// A list of ScopeSpans that originate from a resource. + #[prost(message, repeated, tag = "2")] + pub scope_spans: ::prost::alloc::vec::Vec, + /// The Schema URL, if known. This is the identifier of the Schema that the resource data + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see + /// + /// This schema_url applies to the data in the "resource" field. It does not apply + /// to the data in the "scope_spans" field which have their own schema_url field. + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, +} +/// A collection of Spans produced by an InstrumentationScope. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScopeSpans { + /// The instrumentation scope information for the spans in this message. + /// Semantically when InstrumentationScope isn't set, it is equivalent with + /// an empty instrumentation scope name (unknown). + #[prost(message, optional, tag = "1")] + pub scope: ::core::option::Option, + /// A list of Spans that originate from an instrumentation scope. + #[prost(message, repeated, tag = "2")] + pub spans: ::prost::alloc::vec::Vec, + /// The Schema URL, if known. This is the identifier of the Schema that the span data + /// is recorded in. Notably, the last part of the URL path is the version number of the + /// schema: http\[s\]://server\[:port\]/path/. To learn more about Schema URL see + /// + /// This schema_url applies to the data in the "scope" field and all spans and span + /// events in the "spans" field. + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, +} +/// A Span represents a single operation performed by a single component of the system. +/// +/// The next available field id is 17. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Span { + /// A unique identifier for a trace. All spans from the same trace share + /// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + /// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + /// is zero-length and thus is also invalid). + /// + /// This field is required. + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + /// A unique identifier for a span within a trace, assigned when the span + /// is created. The ID is an 8-byte array. An ID with all zeroes OR of length + /// other than 8 bytes is considered invalid (empty string in OTLP/JSON + /// is zero-length and thus is also invalid). + /// + /// This field is required. + #[prost(bytes = "vec", tag = "2")] + pub span_id: ::prost::alloc::vec::Vec, + /// trace_state conveys information about request position in multiple distributed tracing graphs. + /// It is a trace_state in w3c-trace-context format: + /// See also for more details about this field. + #[prost(string, tag = "3")] + pub trace_state: ::prost::alloc::string::String, + /// The `span_id` of this span's parent span. If this is a root span, then this + /// field must be empty. The ID is an 8-byte array. + #[prost(bytes = "vec", tag = "4")] + pub parent_span_id: ::prost::alloc::vec::Vec, + /// Flags, a bit field. + /// + /// Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + /// Context specification. To read the 8-bit W3C trace flag, use + /// `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + /// + /// See for the flag definitions. + /// + /// Bits 8 and 9 represent the 3 states of whether a span's parent + /// is remote. The states are (unknown, is not remote, is remote). + /// To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + /// To read whether the span is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + /// + /// When creating span messages, if the message is logically forwarded from another source + /// with an equivalent flags fields (i.e., usually another OTLP span message), the field SHOULD + /// be copied as-is. If creating from a source that does not have an equivalent flags field + /// (such as a runtime representation of an OpenTelemetry span), the high 22 bits MUST + /// be set to zero. + /// Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + /// + /// \[Optional\]. + #[prost(fixed32, tag = "16")] + pub flags: u32, + /// A description of the span's operation. + /// + /// For example, the name can be a qualified method name or a file name + /// and a line number where the operation is called. A best practice is to use + /// the same display name at the same call point in an application. + /// This makes it easier to correlate spans in different traces. + /// + /// This field is semantically required to be set to non-empty string. + /// Empty value is equivalent to an unknown span name. + /// + /// This field is required. + #[prost(string, tag = "5")] + pub name: ::prost::alloc::string::String, + /// Distinguishes between spans generated in a particular context. For example, + /// two spans with the same name may be distinguished using `CLIENT` (caller) + /// and `SERVER` (callee) to identify queueing latency associated with the span. + #[prost(enumeration = "span::SpanKind", tag = "6")] + pub kind: i32, + /// The start time of the span. On the client side, this is the time + /// kept by the local machine where the span execution starts. On the server side, this + /// is the time when the server's application handler starts running. + /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + /// + /// This field is semantically required and it is expected that end_time >= start_time. + #[prost(fixed64, tag = "7")] + pub start_time_unix_nano: u64, + /// The end time of the span. On the client side, this is the time + /// kept by the local machine where the span execution ends. On the server side, this + /// is the time when the server application handler stops running. + /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + /// + /// This field is semantically required and it is expected that end_time >= start_time. + #[prost(fixed64, tag = "8")] + pub end_time_unix_nano: u64, + /// A collection of key/value pairs. Note, global attributes + /// like server name can be set using the resource API. Examples of attributes: + /// + /// ```text + /// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + /// "/http/server_latency": 300 + /// "example.com/myattribute": true + /// "example.com/score": 10.239 + /// ``` + /// + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + /// The behavior of software that receives duplicated keys can be unpredictable. + #[prost(message, repeated, tag = "9")] + pub attributes: ::prost::alloc::vec::Vec, + /// The number of attributes that were discarded. Attributes + /// can be discarded because their keys are too long or because there are too many + /// attributes. If this value is 0, then no attributes were dropped. + #[prost(uint32, tag = "10")] + pub dropped_attributes_count: u32, + /// A collection of Event items. + #[prost(message, repeated, tag = "11")] + pub events: ::prost::alloc::vec::Vec, + /// The number of dropped events. If the value is 0, then no + /// events were dropped. + #[prost(uint32, tag = "12")] + pub dropped_events_count: u32, + /// A collection of Links, which are references from this span to a span + /// in the same or different trace. + #[prost(message, repeated, tag = "13")] + pub links: ::prost::alloc::vec::Vec, + /// The number of dropped links after the maximum size was + /// enforced. If this value is 0, then no links were dropped. + #[prost(uint32, tag = "14")] + pub dropped_links_count: u32, + /// An optional final status for this span. Semantically when Status isn't set, it means + /// span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + #[prost(message, optional, tag = "15")] + pub status: ::core::option::Option, +} +/// Nested message and enum types in `Span`. +pub mod span { + /// Event is a time-stamped annotation of the span, consisting of user-supplied + /// text description and key-value pairs. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Event { + /// The time the event occurred. + #[prost(fixed64, tag = "1")] + pub time_unix_nano: u64, + /// The name of the event. + /// This field is semantically required to be set to non-empty string. + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + /// A collection of attribute key/value pairs on the event. + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + /// The behavior of software that receives duplicated keys can be unpredictable. + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec< + super::super::super::common::v1::KeyValue, + >, + /// The number of dropped attributes. If the value is 0, + /// then no attributes were dropped. + #[prost(uint32, tag = "4")] + pub dropped_attributes_count: u32, + } + /// A pointer from the current span to another span in the same trace or in a + /// different trace. For example, this can be used in batching operations, + /// where a single batch handler processes multiple requests from different + /// traces or when the handler receives a request from a different project. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Link { + /// A unique identifier of a trace that this linked span is part of. The ID is a + /// 16-byte array. + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + /// A unique identifier for the linked span. The ID is an 8-byte array. + #[prost(bytes = "vec", tag = "2")] + pub span_id: ::prost::alloc::vec::Vec, + /// The trace_state associated with the link. + #[prost(string, tag = "3")] + pub trace_state: ::prost::alloc::string::String, + /// A collection of attribute key/value pairs on the link. + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + /// The behavior of software that receives duplicated keys can be unpredictable. + #[prost(message, repeated, tag = "4")] + pub attributes: ::prost::alloc::vec::Vec< + super::super::super::common::v1::KeyValue, + >, + /// The number of dropped attributes. If the value is 0, + /// then no attributes were dropped. + #[prost(uint32, tag = "5")] + pub dropped_attributes_count: u32, + /// Flags, a bit field. + /// + /// Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + /// Context specification. To read the 8-bit W3C trace flag, use + /// `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + /// + /// See for the flag definitions. + /// + /// Bits 8 and 9 represent the 3 states of whether the link is remote. + /// The states are (unknown, is not remote, is remote). + /// To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + /// To read whether the link is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + /// + /// Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + /// When creating new spans, bits 10-31 (most-significant 22-bits) MUST be zero. + /// + /// \[Optional\]. + #[prost(fixed32, tag = "6")] + pub flags: u32, + } + /// SpanKind is the type of span. Can be used to specify additional relationships between spans + /// in addition to a parent/child relationship. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum SpanKind { + /// Unspecified. Do NOT use as default. + /// Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + Unspecified = 0, + /// Indicates that the span represents an internal operation within an application, + /// as opposed to an operation happening at the boundaries. Default value. + Internal = 1, + /// Indicates that the span covers server-side handling of an RPC or other + /// remote network request. + Server = 2, + /// Indicates that the span describes a request to some remote service. + Client = 3, + /// Indicates that the span describes a producer sending a message to a broker. + /// Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + /// between producer and consumer spans. A PRODUCER span ends when the message was accepted + /// by the broker while the logical processing of the message might span a much longer time. + Producer = 4, + /// Indicates that the span describes consumer receiving a message from a broker. + /// Like the PRODUCER kind, there is often no direct critical path latency relationship + /// between producer and consumer spans. + Consumer = 5, + } + impl SpanKind { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "SPAN_KIND_UNSPECIFIED", + Self::Internal => "SPAN_KIND_INTERNAL", + Self::Server => "SPAN_KIND_SERVER", + Self::Client => "SPAN_KIND_CLIENT", + Self::Producer => "SPAN_KIND_PRODUCER", + Self::Consumer => "SPAN_KIND_CONSUMER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SPAN_KIND_UNSPECIFIED" => Some(Self::Unspecified), + "SPAN_KIND_INTERNAL" => Some(Self::Internal), + "SPAN_KIND_SERVER" => Some(Self::Server), + "SPAN_KIND_CLIENT" => Some(Self::Client), + "SPAN_KIND_PRODUCER" => Some(Self::Producer), + "SPAN_KIND_CONSUMER" => Some(Self::Consumer), + _ => None, + } + } + } +} +/// The Status type defines a logical error model that is suitable for different +/// programming environments, including REST APIs and RPC APIs. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct Status { + /// A developer-facing human readable error message. + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + /// The status code. + #[prost(enumeration = "status::StatusCode", tag = "3")] + pub code: i32, +} +/// Nested message and enum types in `Status`. +pub mod status { + /// For the semantics of status codes see + /// + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum StatusCode { + /// The default status. + Unset = 0, + /// The Span has been validated by an Application developer or Operator to + /// have completed successfully. + Ok = 1, + /// The Span contains an error. + Error = 2, + } + impl StatusCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unset => "STATUS_CODE_UNSET", + Self::Ok => "STATUS_CODE_OK", + Self::Error => "STATUS_CODE_ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "STATUS_CODE_UNSET" => Some(Self::Unset), + "STATUS_CODE_OK" => Some(Self::Ok), + "STATUS_CODE_ERROR" => Some(Self::Error), + _ => None, + } + } + } +} +/// SpanFlags represents constants used to interpret the +/// Span.flags field, which is protobuf 'fixed32' type and is to +/// be used as bit-fields. Each non-zero value defined in this enum is +/// a bit-mask. To extract the bit-field, for example, use an +/// expression like: +/// +/// (span.flags & SPAN_FLAGS_TRACE_FLAGS_MASK) +/// +/// See for the flag definitions. +/// +/// Note that Span flags were introduced in version 1.1 of the +/// OpenTelemetry protocol. Older Span producers do not set this +/// field, consequently consumers should not rely on the absence of a +/// particular flag bit to indicate the presence of a particular feature. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum SpanFlags { + /// The zero value for the enum. Should not be used for comparisons. + /// Instead use bitwise "and" with the appropriate mask as shown above. + DoNotUse = 0, + /// Bits 0-7 are used for trace flags. + TraceFlagsMask = 255, + /// Bits 8 and 9 are used to indicate that the parent span or link span is remote. + /// Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known. + /// Bit 9 (`IS_REMOTE`) indicates whether the span or link is remote. + ContextHasIsRemoteMask = 256, + ContextIsRemoteMask = 512, +} +impl SpanFlags { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::DoNotUse => "SPAN_FLAGS_DO_NOT_USE", + Self::TraceFlagsMask => "SPAN_FLAGS_TRACE_FLAGS_MASK", + Self::ContextHasIsRemoteMask => "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK", + Self::ContextIsRemoteMask => "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SPAN_FLAGS_DO_NOT_USE" => Some(Self::DoNotUse), + "SPAN_FLAGS_TRACE_FLAGS_MASK" => Some(Self::TraceFlagsMask), + "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK" => Some(Self::ContextHasIsRemoteMask), + "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK" => Some(Self::ContextIsRemoteMask), + _ => None, + } + } +} diff --git a/libdd-trace-protobuf/src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto b/libdd-trace-protobuf/src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto new file mode 100644 index 0000000000..1e77256209 --- /dev/null +++ b/libdd-trace-protobuf/src/pb/opentelemetry/proto/collector/trace/v1/trace_service.proto @@ -0,0 +1,80 @@ +// This file was vendored from open-telemetry/opentelemetry-proto at commit +// 1e725b853bc8f6b46ee62e8232e4c83017b9536f. + +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.collector.trace.v1; + +import "opentelemetry/proto/trace/v1/trace.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Collector.Trace.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.collector.trace.v1"; +option java_outer_classname = "TraceServiceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/collector/trace/v1"; + +// Service that can be used to push spans between one Application instrumented with +// OpenTelemetry and a collector, or between a collector and a central collector (in this +// case spans are sent/received to/from multiple Applications). +service TraceService { + rpc Export(ExportTraceServiceRequest) returns (ExportTraceServiceResponse) {} +} + +message ExportTraceServiceRequest { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain one + // element. Intermediary nodes (such as OpenTelemetry Collector) that receive + // data from multiple origins typically batch the data before forwarding further and + // in that case this array will contain multiple elements. + repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1; +} + +message ExportTraceServiceResponse { + // The details of a partially successful export request. + // + // If the request is only partially accepted + // (i.e. when the server accepts only parts of the data and rejects the rest) + // the server MUST initialize the `partial_success` field and MUST + // set the `rejected_` with the number of items it rejected. + // + // Servers MAY also make use of the `partial_success` field to convey + // warnings/suggestions to senders even when the request was fully accepted. + // In such cases, the `rejected_` MUST have a value of `0` and + // the `error_message` MUST be non-empty. + // + // A `partial_success` message with an empty value (rejected_ = 0 and + // `error_message` = "") is equivalent to it not being set/present. Senders + // SHOULD interpret it the same way as in the full success case. + ExportTracePartialSuccess partial_success = 1; +} + +message ExportTracePartialSuccess { + // The number of rejected spans. + // + // A `rejected_` field holding a `0` value indicates that the + // request was fully accepted. + int64 rejected_spans = 1; + + // A developer-facing human-readable message in English. It should be used + // either to explain why the server rejected parts of the data during a partial + // success or to convey warnings/suggestions during a full success. The message + // should offer guidance on how users can address such issues. + // + // error_message is an optional field. An error_message with an empty value + // is equivalent to it not being set. + string error_message = 2; +} \ No newline at end of file diff --git a/libdd-trace-protobuf/src/pb/opentelemetry/proto/trace/v1/trace.proto b/libdd-trace-protobuf/src/pb/opentelemetry/proto/trace/v1/trace.proto new file mode 100644 index 0000000000..277041fd10 --- /dev/null +++ b/libdd-trace-protobuf/src/pb/opentelemetry/proto/trace/v1/trace.proto @@ -0,0 +1,364 @@ +// This file was vendored from open-telemetry/opentelemetry-proto at commit +// 1e725b853bc8f6b46ee62e8232e4c83017b9536f. + +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.trace.v1; + +import "opentelemetry/proto/common/v1/common.proto"; +import "opentelemetry/proto/resource/v1/resource.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Trace.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.trace.v1"; +option java_outer_classname = "TraceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/trace/v1"; + +// TracesData represents the traces data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP traces data but do +// not implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message TracesData { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceSpans resource_spans = 1; +} + +// A collection of ScopeSpans from a Resource. +message ResourceSpans { + reserved 1000; + + // The resource for the spans in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeSpans that originate from a resource. + repeated ScopeSpans scope_spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the resource data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_spans" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationScope. +message ScopeSpans { + // The instrumentation scope information for the spans in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of Spans that originate from an instrumentation scope. + repeated Span spans = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the span data + // is recorded in. Notably, the last part of the URL path is the version number of the + // schema: http[s]://server[:port]/path/. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "scope" field and all spans and span + // events in the "spans" field. + string schema_url = 3; +} + +// A Span represents a single operation performed by a single component of the system. +// +// The next available field id is 17. +message Span { + // A unique identifier for a trace. All spans from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes trace_id = 1; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes OR of length + // other than 8 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is required. + bytes span_id = 2; + + // trace_state conveys information about request position in multiple distributed tracing graphs. + // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header + // See also https://github.com/w3c/distributed-tracing for more details about this field. + string trace_state = 3; + + // The `span_id` of this span's parent span. If this is a root span, then this + // field must be empty. The ID is an 8-byte array. + bytes parent_span_id = 4; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether a span's parent + // is remote. The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the span is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // When creating span messages, if the message is logically forwarded from another source + // with an equivalent flags fields (i.e., usually another OTLP span message), the field SHOULD + // be copied as-is. If creating from a source that does not have an equivalent flags field + // (such as a runtime representation of an OpenTelemetry span), the high 22 bits MUST + // be set to zero. + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // + // [Optional]. + fixed32 flags = 16; + + // A description of the span's operation. + // + // For example, the name can be a qualified method name or a file name + // and a line number where the operation is called. A best practice is to use + // the same display name at the same call point in an application. + // This makes it easier to correlate spans in different traces. + // + // This field is semantically required to be set to non-empty string. + // Empty value is equivalent to an unknown span name. + // + // This field is required. + string name = 5; + + // SpanKind is the type of span. Can be used to specify additional relationships between spans + // in addition to a parent/child relationship. + enum SpanKind { + // Unspecified. Do NOT use as default. + // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + SPAN_KIND_UNSPECIFIED = 0; + + // Indicates that the span represents an internal operation within an application, + // as opposed to an operation happening at the boundaries. Default value. + SPAN_KIND_INTERNAL = 1; + + // Indicates that the span covers server-side handling of an RPC or other + // remote network request. + SPAN_KIND_SERVER = 2; + + // Indicates that the span describes a request to some remote service. + SPAN_KIND_CLIENT = 3; + + // Indicates that the span describes a producer sending a message to a broker. + // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + // between producer and consumer spans. A PRODUCER span ends when the message was accepted + // by the broker while the logical processing of the message might span a much longer time. + SPAN_KIND_PRODUCER = 4; + + // Indicates that the span describes consumer receiving a message from a broker. + // Like the PRODUCER kind, there is often no direct critical path latency relationship + // between producer and consumer spans. + SPAN_KIND_CONSUMER = 5; + } + + // Distinguishes between spans generated in a particular context. For example, + // two spans with the same name may be distinguished using `CLIENT` (caller) + // and `SERVER` (callee) to identify queueing latency associated with the span. + SpanKind kind = 6; + + // The start time of the span. On the client side, this is the time + // kept by the local machine where the span execution starts. On the server side, this + // is the time when the server's application handler starts running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 start_time_unix_nano = 7; + + // The end time of the span. On the client side, this is the time + // kept by the local machine where the span execution ends. On the server side, this + // is the time when the server application handler stops running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 end_time_unix_nano = 8; + + // A collection of key/value pairs. Note, global attributes + // like server name can be set using the resource API. Examples of attributes: + // + // ```text + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "example.com/myattribute": true + // "example.com/score": 10.239 + // ``` + // + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // The number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 10; + + // Event is a time-stamped annotation of the span, consisting of user-supplied + // text description and key-value pairs. + message Event { + // The time the event occurred. + fixed64 time_unix_nano = 1; + + // The name of the event. + // This field is semantically required to be set to non-empty string. + string name = 2; + + // A collection of attribute key/value pairs on the event. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 3; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 4; + } + + // A collection of Event items. + repeated Event events = 11; + + // The number of dropped events. If the value is 0, then no + // events were dropped. + uint32 dropped_events_count = 12; + + // A pointer from the current span to another span in the same trace or in a + // different trace. For example, this can be used in batching operations, + // where a single batch handler processes multiple requests from different + // traces or when the handler receives a request from a different project. + message Link { + // A unique identifier of a trace that this linked span is part of. The ID is a + // 16-byte array. + bytes trace_id = 1; + + // A unique identifier for the linked span. The ID is an 8-byte array. + bytes span_id = 2; + + // The trace_state associated with the link. + string trace_state = 3; + + // A collection of attribute key/value pairs on the link. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // The number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 5; + + // Flags, a bit field. + // + // Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace + // Context specification. To read the 8-bit W3C trace flag, use + // `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`. + // + // See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. + // + // Bits 8 and 9 represent the 3 states of whether the link is remote. + // The states are (unknown, is not remote, is remote). + // To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`. + // To read whether the link is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`. + // + // Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero. + // When creating new spans, bits 10-31 (most-significant 22-bits) MUST be zero. + // + // [Optional]. + fixed32 flags = 6; + } + + // A collection of Links, which are references from this span to a span + // in the same or different trace. + repeated Link links = 13; + + // The number of dropped links after the maximum size was + // enforced. If this value is 0, then no links were dropped. + uint32 dropped_links_count = 14; + + // An optional final status for this span. Semantically when Status isn't set, it means + // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + Status status = 15; +} + +// The Status type defines a logical error model that is suitable for different +// programming environments, including REST APIs and RPC APIs. +message Status { + reserved 1; + + // A developer-facing human readable error message. + string message = 2; + + // For the semantics of status codes see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status + enum StatusCode { + // The default status. + STATUS_CODE_UNSET = 0; + // The Span has been validated by an Application developer or Operator to + // have completed successfully. + STATUS_CODE_OK = 1; + // The Span contains an error. + STATUS_CODE_ERROR = 2; + }; + + // The status code. + StatusCode code = 3; +} + +// SpanFlags represents constants used to interpret the +// Span.flags field, which is protobuf 'fixed32' type and is to +// be used as bit-fields. Each non-zero value defined in this enum is +// a bit-mask. To extract the bit-field, for example, use an +// expression like: +// +// (span.flags & SPAN_FLAGS_TRACE_FLAGS_MASK) +// +// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions. +// +// Note that Span flags were introduced in version 1.1 of the +// OpenTelemetry protocol. Older Span producers do not set this +// field, consequently consumers should not rely on the absence of a +// particular flag bit to indicate the presence of a particular feature. +enum SpanFlags { + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + SPAN_FLAGS_DO_NOT_USE = 0; + + // Bits 0-7 are used for trace flags. + SPAN_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; + + // Bits 8 and 9 are used to indicate that the parent span or link span is remote. + // Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known. + // Bit 9 (`IS_REMOTE`) indicates whether the span or link is remote. + SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK = 0x00000100; + SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK = 0x00000200; + + // Bits 10-31 are reserved for future use. +} \ No newline at end of file diff --git a/libdd-trace-utils/Cargo.toml b/libdd-trace-utils/Cargo.toml index 1d9fa07fe7..efa2c55571 100644 --- a/libdd-trace-utils/Cargo.toml +++ b/libdd-trace-utils/Cargo.toml @@ -20,6 +20,7 @@ path = "benches/main.rs" [dependencies] anyhow = "1.0" base64 = "0.22" +hex = "0.4" hyper = { workspace = true, optional = true, default-features = false } "http" = "1" "http-body" = "1" diff --git a/libdd-trace-utils/benches/main.rs b/libdd-trace-utils/benches/main.rs index 0d86f25ee5..6ea2ec7dbb 100644 --- a/libdd-trace-utils/benches/main.rs +++ b/libdd-trace-utils/benches/main.rs @@ -10,10 +10,12 @@ use libdd_common::bench_utils::ReportingAllocator; pub static GLOBAL: ReportingAllocator = ReportingAllocator::new(System); mod deserialization; +mod otlp_encoding; mod serialization; criterion_main!( serialization::serialize_benches, deserialization::deserialize_benches, - deserialization::deserialize_alloc_benches + deserialization::deserialize_alloc_benches, + otlp_encoding::otlp_benches ); diff --git a/libdd-trace-utils/benches/otlp_encoding.rs b/libdd-trace-utils/benches/otlp_encoding.rs new file mode 100644 index 0000000000..5da017ad94 --- /dev/null +++ b/libdd-trace-utils/benches/otlp_encoding.rs @@ -0,0 +1,133 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Benchmarks for the OTLP encoder hot paths: mapping native spans to the prost OTLP IR, and +//! serializing that IR to the HTTP/protobuf and HTTP/JSON wire formats. Inputs are decoded from +//! msgpack into borrowed `SpanSlice`s, matching the production exporter path. + +use criterion::{black_box, criterion_group, BatchSize, Criterion}; +use libdd_trace_utils::msgpack_decoder; +use libdd_trace_utils::otlp_encoder::{ + encode_otlp_json, encode_otlp_protobuf, map_traces_to_otlp, OtlpResourceInfo, +}; +use serde_json::{json, Value}; + +/// A realistic OTLP-bound span: a handful of string `meta` tags and a couple of numeric +/// `metrics`, so the per-span attribute work (the dominant cost) is exercised. The chunk root +/// carries `_dd.p.tid` (the 128-bit trace-id high bits, resolved once per chunk). +fn generate_spans(num_spans: usize, trace_id: u64) -> Vec { + let mut spans = Vec::with_capacity(num_spans); + let root_span_id = 100_000_000_000 + (trace_id % 1_000_000); + for i in 0..num_spans { + let span_id = root_span_id + i as u64; + let is_root = i == 0; + let parent_id = if is_root { 0 } else { root_span_id }; + let mut meta = json!({ + "http.method": "GET", + "http.url": "https://example.com/api/v1/users/12345", + "http.status_code": "200", + "env": "production", + "version": "1.2.3", + "component": "net/http", + }); + if is_root { + meta["_dd.p.tid"] = json!("5b8efff798038103"); + } + spans.push(json!({ + "service": "bench-service", + "name": "http.request", + "resource": "GET /api/v1/users", + "trace_id": trace_id, + "span_id": span_id, + "parent_id": parent_id, + "start": 1_544_712_660_000_000_000_i64 + i as i64, + "duration": 1_000_000, + "error": 0, + "meta": meta, + "metrics": { "_sampling_priority_v1": 1, "_dd.top_level": 1 }, + "type": "web", + })); + } + spans +} + +fn generate_trace_chunks(num_chunks: usize, num_spans: usize) -> Vec> { + (0..num_chunks) + .map(|i| generate_spans(num_spans, 100_000_000_000 + i as u64)) + .collect() +} + +fn resource_info() -> OtlpResourceInfo { + // `OtlpResourceInfo` is `#[non_exhaustive]`, so build via Default + field assignment. + let mut info = OtlpResourceInfo::default(); + info.service = "bench-service".to_string(); + info.env = "production".to_string(); + info.app_version = "1.2.3".to_string(); + info.language = "rust".to_string(); + info.tracer_version = "9.9.9".to_string(); + info.runtime_id = "11111111-2222-3333-4444-555555555555".to_string(); + info +} + +pub fn otlp_encoding_benches(c: &mut Criterion) { + let info = resource_info(); + + // (chunks, spans_per_chunk): a single large trace (clean per-span signal) and many small + // traces (the typical agent payload shape). Both ~1000 spans total. + for (num_chunks, num_spans) in [(1usize, 1000usize), (100usize, 10usize)] { + let id = format!("{num_chunks}x{num_spans}"); + let bytes = rmp_serde::to_vec(&generate_trace_chunks(num_chunks, num_spans)) + .expect("serialize fixture"); + // `spans` borrows `bytes`; both live for the rest of this iteration. + let (spans, _) = + msgpack_decoder::v04::from_slice(bytes.as_slice()).expect("decode fixture"); + + // 1) native spans -> prost OTLP IR (the mapper). + c.bench_function(&format!("otlp/map_to_prost/{id}"), |b| { + b.iter_batched( + || spans.clone(), + |s| black_box(map_traces_to_otlp(black_box(s), &info)), + BatchSize::SmallInput, + ) + }); + + // Pre-built IR for the encode-only benches (owned prost; no borrow of `bytes`). + let req = map_traces_to_otlp(spans.clone(), &info); + + // 2) prost IR -> HTTP/protobuf bytes. + c.bench_function(&format!("otlp/encode_protobuf/{id}"), |b| { + b.iter(|| black_box(encode_otlp_protobuf(black_box(&req)))) + }); + + // 3) prost IR -> OTLP/JSON bytes. + c.bench_function(&format!("otlp/encode_json/{id}"), |b| { + b.iter(|| black_box(encode_otlp_json(black_box(&req)).expect("json"))) + }); + + // 4) end-to-end native spans -> protobuf wire (the real protobuf export path). + c.bench_function(&format!("otlp/e2e_protobuf/{id}"), |b| { + b.iter_batched( + || spans.clone(), + |s| { + let req = map_traces_to_otlp(s, &info); + black_box(encode_otlp_protobuf(&req)) + }, + BatchSize::SmallInput, + ) + }); + + // 5) end-to-end native spans -> JSON wire (the real JSON export path). + c.bench_function(&format!("otlp/e2e_json/{id}"), |b| { + b.iter_batched( + || spans.clone(), + |s| { + let req = map_traces_to_otlp(s, &info); + black_box(encode_otlp_json(&req).expect("json")) + }, + BatchSize::SmallInput, + ) + }); + } +} + +criterion_group!(otlp_benches, otlp_encoding_benches); diff --git a/libdd-trace-utils/src/otlp_encoder/json_serializer.rs b/libdd-trace-utils/src/otlp_encoder/json_serializer.rs new file mode 100644 index 0000000000..70ad8a8708 --- /dev/null +++ b/libdd-trace-utils/src/otlp_encoder/json_serializer.rs @@ -0,0 +1,504 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Serializes the generated prost OTLP types to OTLP-spec HTTP/JSON. Trace/span ids are +//! lowercase hex, 64-bit integers (incl. timestamps) are decimal strings, `bytesValue` is +//! base64, enums are integers, field names are lowerCamelCase, and proto3 defaults are omitted. +//! This is the only place the OTLP/JSON wire shape is defined: the prost types are the single +//! source of truth, serialized directly to the OTLP/JSON wire format here. + +use base64::{engine::general_purpose::STANDARD, Engine as _}; +use serde::ser::{Serialize, SerializeMap, Serializer}; + +use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + any_value::Value as ProtoValue, AnyValue, ArrayValue, InstrumentationScope, KeyValue, + KeyValueList, +}; +use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; +use libdd_trace_protobuf::opentelemetry::proto::trace::v1::{ + span::{Event, Link}, + ResourceSpans, ScopeSpans, Span, Status, +}; + +/// Top-level wrapper: `serde_json::to_vec(&OtlpJson(req))` yields the OTLP/JSON body. +pub(crate) struct OtlpJson<'a>(pub &'a ExportTraceServiceRequest); + +pub(crate) fn to_otlp_json_vec(req: &ExportTraceServiceRequest) -> serde_json::Result> { + serde_json::to_vec(&OtlpJson(req)) +} + +/// Serialize a `&[T]` by wrapping each element with `W`. +fn seq<'a, T, W, S>(s: S, items: &'a [T], wrap: fn(&'a T) -> W) -> Result +where + W: Serialize, + S: Serializer, +{ + s.collect_seq(items.iter().map(wrap)) +} + +/// Serializes an OTLP id (`trace_id`/`span_id`/`parent_span_id`) as lowercase hex without +/// allocating a `String`. OTLP ids are 8 or 16 bytes (16/32 hex chars); a 64-byte stack buffer +/// covers them, with an allocating `hex::encode` fallback for any unexpectedly-long input. +struct HexId<'a>(&'a [u8]); +impl Serialize for HexId<'_> { + fn serialize(&self, s: S) -> Result { + let mut buf = [0u8; 64]; + let n = self.0.len() * 2; + if n <= buf.len() && hex::encode_to_slice(self.0, &mut buf[..n]).is_ok() { + if let Ok(hex) = std::str::from_utf8(&buf[..n]) { + return s.serialize_str(hex); + } + } + s.serialize_str(&hex::encode(self.0)) + } +} + +/// Serializes a 64-bit integer as a decimal JSON *string* — OTLP encodes `int64`/`uint64` (incl. +/// nanosecond timestamps) as strings to avoid IEEE-754 precision loss — without allocating. +struct NumStr(T); +impl Serialize for NumStr { + fn serialize(&self, s: S) -> Result { + use core::fmt::Write as _; + // u64::MAX and i64::MIN are at most 20 chars; 24 bytes is ample. + let mut buf = DecimalBuf::default(); + if write!(buf, "{}", self.0).is_ok() { + return s.serialize_str(buf.as_str()); + } + s.serialize_str(&self.0.to_string()) + } +} + +#[derive(Default)] +struct DecimalBuf { + buf: [u8; 24], + len: usize, +} +impl DecimalBuf { + fn as_str(&self) -> &str { + core::str::from_utf8(&self.buf[..self.len]).unwrap_or("") + } +} +impl core::fmt::Write for DecimalBuf { + fn write_str(&mut self, s: &str) -> core::fmt::Result { + let end = self.len + s.len(); + if end > self.buf.len() { + return Err(core::fmt::Error); + } + self.buf[self.len..end].copy_from_slice(s.as_bytes()); + self.len = end; + Ok(()) + } +} + +impl Serialize for OtlpJson<'_> { + fn serialize(&self, s: S) -> Result { + let mut m = s.serialize_map(Some(1))?; + m.serialize_entry("resourceSpans", &ResourceSpansSeq(&self.0.resource_spans))?; + m.end() + } +} + +struct ResourceSpansSeq<'a>(&'a [ResourceSpans]); +impl Serialize for ResourceSpansSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, ResourceSpansJson) + } +} + +struct ResourceSpansJson<'a>(&'a ResourceSpans); +impl Serialize for ResourceSpansJson<'_> { + fn serialize(&self, s: S) -> Result { + let rs = self.0; + let mut m = s.serialize_map(None)?; + if let Some(r) = &rs.resource { + m.serialize_entry("resource", &ResourceJson(r))?; + } + if !rs.scope_spans.is_empty() { + m.serialize_entry("scopeSpans", &ScopeSpansSeq(&rs.scope_spans))?; + } + if !rs.schema_url.is_empty() { + m.serialize_entry("schemaUrl", &rs.schema_url)?; + } + m.end() + } +} + +struct ResourceJson<'a>(&'a Resource); +impl Serialize for ResourceJson<'_> { + fn serialize(&self, s: S) -> Result { + let r = self.0; + let mut m = s.serialize_map(None)?; + if !r.attributes.is_empty() { + m.serialize_entry("attributes", &KeyValueSeq(&r.attributes))?; + } + if r.dropped_attributes_count != 0 { + m.serialize_entry("droppedAttributesCount", &r.dropped_attributes_count)?; + } + // `entity_refs` is a profiling-signal field, not part of the trace JSON shape — omitted. + m.end() + } +} + +struct ScopeSpansSeq<'a>(&'a [ScopeSpans]); +impl Serialize for ScopeSpansSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, ScopeSpansJson) + } +} + +struct ScopeSpansJson<'a>(&'a ScopeSpans); +impl Serialize for ScopeSpansJson<'_> { + fn serialize(&self, s: S) -> Result { + let ss = self.0; + let mut m = s.serialize_map(None)?; + if let Some(scope) = &ss.scope { + m.serialize_entry("scope", &ScopeJson(scope))?; + } + if !ss.spans.is_empty() { + m.serialize_entry("spans", &SpanSeq(&ss.spans))?; + } + if !ss.schema_url.is_empty() { + m.serialize_entry("schemaUrl", &ss.schema_url)?; + } + m.end() + } +} + +struct ScopeJson<'a>(&'a InstrumentationScope); +impl Serialize for ScopeJson<'_> { + fn serialize(&self, s: S) -> Result { + let sc = self.0; + let mut m = s.serialize_map(None)?; + if !sc.name.is_empty() { + m.serialize_entry("name", &sc.name)?; + } + if !sc.version.is_empty() { + m.serialize_entry("version", &sc.version)?; + } + if !sc.attributes.is_empty() { + m.serialize_entry("attributes", &KeyValueSeq(&sc.attributes))?; + } + if sc.dropped_attributes_count != 0 { + m.serialize_entry("droppedAttributesCount", &sc.dropped_attributes_count)?; + } + m.end() + } +} + +struct SpanSeq<'a>(&'a [Span]); +impl Serialize for SpanSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, SpanJson) + } +} + +struct SpanJson<'a>(&'a Span); +impl Serialize for SpanJson<'_> { + fn serialize(&self, s: S) -> Result { + let sp = self.0; + let mut m = s.serialize_map(None)?; + m.serialize_entry("traceId", &HexId(&sp.trace_id))?; + m.serialize_entry("spanId", &HexId(&sp.span_id))?; + if !sp.parent_span_id.is_empty() { + m.serialize_entry("parentSpanId", &HexId(&sp.parent_span_id))?; + } + if !sp.trace_state.is_empty() { + m.serialize_entry("traceState", &sp.trace_state)?; + } + m.serialize_entry("name", &sp.name)?; + m.serialize_entry("kind", &sp.kind)?; + m.serialize_entry("startTimeUnixNano", &NumStr(sp.start_time_unix_nano))?; + m.serialize_entry("endTimeUnixNano", &NumStr(sp.end_time_unix_nano))?; + if !sp.attributes.is_empty() { + m.serialize_entry("attributes", &KeyValueSeq(&sp.attributes))?; + } + if sp.dropped_attributes_count != 0 { + m.serialize_entry("droppedAttributesCount", &sp.dropped_attributes_count)?; + } + if !sp.events.is_empty() { + m.serialize_entry("events", &EventSeq(&sp.events))?; + } + if sp.dropped_events_count != 0 { + m.serialize_entry("droppedEventsCount", &sp.dropped_events_count)?; + } + if !sp.links.is_empty() { + m.serialize_entry("links", &LinkSeq(&sp.links))?; + } + if sp.dropped_links_count != 0 { + m.serialize_entry("droppedLinksCount", &sp.dropped_links_count)?; + } + if let Some(st) = &sp.status { + m.serialize_entry("status", &StatusJson(st))?; + } + if sp.flags != 0 { + m.serialize_entry("flags", &sp.flags)?; + } + m.end() + } +} + +struct StatusJson<'a>(&'a Status); +impl Serialize for StatusJson<'_> { + fn serialize(&self, s: S) -> Result { + let st = self.0; + let mut m = s.serialize_map(None)?; + if !st.message.is_empty() { + m.serialize_entry("message", &st.message)?; + } + m.serialize_entry("code", &st.code)?; + m.end() + } +} + +struct EventSeq<'a>(&'a [Event]); +impl Serialize for EventSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, EventJson) + } +} + +struct EventJson<'a>(&'a Event); +impl Serialize for EventJson<'_> { + fn serialize(&self, s: S) -> Result { + let e = self.0; + let mut m = s.serialize_map(None)?; + m.serialize_entry("timeUnixNano", &NumStr(e.time_unix_nano))?; + m.serialize_entry("name", &e.name)?; + if !e.attributes.is_empty() { + m.serialize_entry("attributes", &KeyValueSeq(&e.attributes))?; + } + if e.dropped_attributes_count != 0 { + m.serialize_entry("droppedAttributesCount", &e.dropped_attributes_count)?; + } + m.end() + } +} + +struct LinkSeq<'a>(&'a [Link]); +impl Serialize for LinkSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, LinkJson) + } +} + +struct LinkJson<'a>(&'a Link); +impl Serialize for LinkJson<'_> { + fn serialize(&self, s: S) -> Result { + let l = self.0; + let mut m = s.serialize_map(None)?; + m.serialize_entry("traceId", &HexId(&l.trace_id))?; + m.serialize_entry("spanId", &HexId(&l.span_id))?; + if !l.trace_state.is_empty() { + m.serialize_entry("traceState", &l.trace_state)?; + } + if !l.attributes.is_empty() { + m.serialize_entry("attributes", &KeyValueSeq(&l.attributes))?; + } + if l.dropped_attributes_count != 0 { + m.serialize_entry("droppedAttributesCount", &l.dropped_attributes_count)?; + } + if l.flags != 0 { + m.serialize_entry("flags", &l.flags)?; + } + m.end() + } +} + +struct KeyValueSeq<'a>(&'a [KeyValue]); +impl Serialize for KeyValueSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, KeyValueJson) + } +} + +struct KeyValueJson<'a>(&'a KeyValue); +impl Serialize for KeyValueJson<'_> { + fn serialize(&self, s: S) -> Result { + let kv = self.0; + let mut m = s.serialize_map(None)?; + m.serialize_entry("key", &kv.key)?; + // `key_ref` is a profiling-signal field, not part of the trace JSON shape — omitted. + if let Some(v) = &kv.value { + m.serialize_entry("value", &AnyValueJson(v))?; + } + m.end() + } +} + +struct AnyValueJson<'a>(&'a AnyValue); +impl Serialize for AnyValueJson<'_> { + fn serialize(&self, s: S) -> Result { + let mut m = s.serialize_map(None)?; + match &self.0.value { + Some(ProtoValue::StringValue(v)) => m.serialize_entry("stringValue", v)?, + Some(ProtoValue::BoolValue(v)) => m.serialize_entry("boolValue", v)?, + // int64 must be a string to avoid precision loss in JSON. + Some(ProtoValue::IntValue(v)) => m.serialize_entry("intValue", &NumStr(*v))?, + Some(ProtoValue::DoubleValue(v)) => m.serialize_entry("doubleValue", v)?, + Some(ProtoValue::BytesValue(v)) => { + m.serialize_entry("bytesValue", &STANDARD.encode(v))? + } + Some(ProtoValue::ArrayValue(a)) => { + m.serialize_entry("arrayValue", &ArrayValueJson(a))? + } + Some(ProtoValue::KvlistValue(kv)) => { + m.serialize_entry("kvlistValue", &KvListJson(kv))? + } + Some(ProtoValue::StringValueRef(_)) | None => {} + } + m.end() + } +} + +struct ArrayValueJson<'a>(&'a ArrayValue); +impl Serialize for ArrayValueJson<'_> { + fn serialize(&self, s: S) -> Result { + let mut m = s.serialize_map(Some(1))?; + m.serialize_entry("values", &AnyValueSeq(&self.0.values))?; + m.end() + } +} + +struct KvListJson<'a>(&'a KeyValueList); +impl Serialize for KvListJson<'_> { + fn serialize(&self, s: S) -> Result { + let mut m = s.serialize_map(Some(1))?; + m.serialize_entry("values", &KeyValueSeq(&self.0.values))?; + m.end() + } +} + +struct AnyValueSeq<'a>(&'a [AnyValue]); +impl Serialize for AnyValueSeq<'_> { + fn serialize(&self, s: S) -> Result { + seq(s, self.0, AnyValueJson) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoReq; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + any_value::Value as PV, AnyValue, KeyValue, + }; + use libdd_trace_protobuf::opentelemetry::proto::trace::v1::{ + span::Link, ResourceSpans, ScopeSpans, Span, Status, + }; + + fn span_json(s: Span) -> serde_json::Value { + let req = ProtoReq { + resource_spans: vec![ResourceSpans { + resource: None, + scope_spans: vec![ScopeSpans { + scope: None, + spans: vec![s], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let bytes = to_otlp_json_vec(&req).unwrap(); + serde_json::from_slice::(&bytes).unwrap()["resourceSpans"][0] + ["scopeSpans"][0]["spans"][0] + .clone() + } + + fn base_span() -> Span { + Span { + trace_id: 0x5b8efff798038103_d269b633813fc60c_u128 + .to_be_bytes() + .to_vec(), + span_id: 0xEEE19B7EC3C1B174u64.to_be_bytes().to_vec(), + trace_state: String::new(), + parent_span_id: Vec::new(), + flags: 0, + name: "res".to_string(), + kind: 2, + start_time_unix_nano: 1544712660000000000, + end_time_unix_nano: 1544712661000000000, + attributes: Vec::new(), + dropped_attributes_count: 0, + events: Vec::new(), + dropped_events_count: 0, + links: Vec::new(), + dropped_links_count: 0, + status: None, + } + } + + #[test] + fn ids_are_hex_timestamps_are_strings_kind_is_int() { + let j = span_json(base_span()); + assert_eq!(j["traceId"], "5b8efff798038103d269b633813fc60c"); + assert_eq!(j["spanId"], "eee19b7ec3c1b174"); + assert_eq!(j["startTimeUnixNano"], "1544712660000000000"); + assert_eq!(j["endTimeUnixNano"], "1544712661000000000"); + assert_eq!(j["kind"], 2); + // proto3 defaults omitted + assert!(j.get("parentSpanId").is_none()); + assert!(j.get("traceState").is_none()); + assert!(j.get("flags").is_none()); + assert!(j.get("attributes").is_none()); + assert!(j.get("status").is_none()); + } + + #[test] + fn int_value_is_string_bytes_value_is_base64() { + let mut s = base_span(); + s.attributes = vec![ + KeyValue { + key: "count".into(), + value: Some(AnyValue { + value: Some(PV::IntValue(42)), + }), + key_ref: 0, + }, + KeyValue { + key: "blob".into(), + value: Some(AnyValue { + value: Some(PV::BytesValue(vec![1, 2, 3])), + }), + key_ref: 0, + }, + KeyValue { + key: "name".into(), + value: Some(AnyValue { + value: Some(PV::StringValue("v".into())), + }), + key_ref: 0, + }, + ]; + let j = span_json(s); + let attrs = j["attributes"].as_array().unwrap(); + let by = |k: &str| attrs.iter().find(|a| a["key"] == k).unwrap()["value"].clone(); + assert_eq!(by("count")["intValue"], "42"); // int64 as STRING + assert_eq!(by("blob")["bytesValue"], "AQID"); // base64 + assert_eq!(by("name")["stringValue"], "v"); + } + + #[test] + fn status_and_parent_and_link_emitted() { + let mut s = base_span(); + s.parent_span_id = 0xEEE19B7EC3C1B173u64.to_be_bytes().to_vec(); + s.status = Some(Status { + message: "boom".into(), + code: 2, + }); + s.links = vec![Link { + trace_id: 1u128.to_be_bytes().to_vec(), + span_id: 2u64.to_be_bytes().to_vec(), + trace_state: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + flags: 0, + }]; + let j = span_json(s); + assert_eq!(j["parentSpanId"], "eee19b7ec3c1b173"); + assert_eq!(j["status"]["code"], 2); + assert_eq!(j["status"]["message"], "boom"); + assert_eq!(j["links"][0]["traceId"], "00000000000000000000000000000001"); + assert_eq!(j["links"][0]["spanId"], "0000000000000002"); + } +} diff --git a/libdd-trace-utils/src/otlp_encoder/json_types.rs b/libdd-trace-utils/src/otlp_encoder/json_types.rs deleted file mode 100644 index 34c68c944b..0000000000 --- a/libdd-trace-utils/src/otlp_encoder/json_types.rs +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -//! Minimal serde types for OTLP HTTP/JSON export (ExportTraceServiceRequest). -//! -//! These types mirror the OTLP protobuf schema for the HTTP/JSON wire format. Field names use -//! lowerCamelCase per the Protocol Buffers JSON Mapping spec; trace/span IDs are hex-encoded -//! strings; enum values (SpanKind, StatusCode) are integers. -//! -//! The canonical definitions live in the opentelemetry-proto repository: -//! -//! -//! -//! Hand-rolled serde structs are intentional here: for HTTP/JSON export, duplicating the type -//! definitions is simpler than pulling in `prost`-generated types from the `opentelemetry-proto` -//! crate. When HTTP/protobuf export is added, `opentelemetry-proto` should be introduced as a -//! dependency for that purpose: -//! - -use base64::{engine::general_purpose::STANDARD, Engine as _}; -use serde::{Serialize, Serializer}; - -/// Top-level OTLP trace export request (ExportTraceServiceRequest). -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ExportTraceServiceRequest { - pub resource_spans: Vec, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ResourceSpans { - pub resource: Option, - pub scope_spans: Vec, -} - -#[derive(Debug, Default, Serialize)] -pub struct Resource { - pub attributes: Vec, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ScopeSpans { - pub scope: Option, - pub spans: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - pub schema_url: Option, -} - -#[derive(Debug, Default, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct InstrumentationScope { - #[serde(skip_serializing_if = "Option::is_none")] - pub name: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub version: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OtlpSpan { - pub trace_id: String, - pub span_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub parent_span_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub trace_state: Option, - pub name: String, - pub kind: i32, - pub start_time_unix_nano: String, - pub end_time_unix_nano: String, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub attributes: Vec, - pub status: Status, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub links: Vec, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub events: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - pub dropped_attributes_count: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub dropped_events_count: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub flags: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OtlpSpanLink { - pub trace_id: String, - pub span_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub trace_state: Option, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub attributes: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - pub dropped_attributes_count: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OtlpSpanEvent { - pub time_unix_nano: String, - pub name: String, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub attributes: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - pub dropped_attributes_count: Option, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct KeyValue { - pub key: String, - pub value: AnyValue, -} - -/// A typed value in an OTLP attribute. Each variant serializes as a single-key JSON object -/// matching the OTLP HTTP/JSON wire format (e.g. `{"stringValue":"hello"}`). -/// -/// Per the protobuf JSON mapping spec, `int64` values must be encoded as strings to avoid -/// precision loss (JSON numbers are IEEE 754 doubles, exact only up to 2^53). -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub enum AnyValue { - StringValue(String), - BoolValue(bool), - #[serde(serialize_with = "serialize_int_value_as_string")] - IntValue(i64), - DoubleValue(f64), - #[serde(serialize_with = "serialize_bytes_as_base64")] - BytesValue(Vec), - ArrayValue(ArrayValue), -} - -fn serialize_int_value_as_string(v: &i64, s: S) -> Result { - s.serialize_str(&v.to_string()) -} - -fn serialize_bytes_as_base64(v: &[u8], s: S) -> Result { - s.serialize_str(&STANDARD.encode(v)) -} - -/// OTLP array value — wraps a list of [`AnyValue`] items. -#[derive(Debug, Serialize)] -pub struct ArrayValue { - pub values: Vec, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct Status { - #[serde(skip_serializing_if = "Option::is_none")] - pub message: Option, - pub code: i32, -} - -/// OTLP SpanKind enum values. -pub mod span_kind { - pub const UNSPECIFIED: i32 = 0; - pub const INTERNAL: i32 = 1; - pub const SERVER: i32 = 2; - pub const CLIENT: i32 = 3; - pub const PRODUCER: i32 = 4; - pub const CONSUMER: i32 = 5; -} - -/// OTLP StatusCode enum values. -pub mod status_code { - pub const UNSET: i32 = 0; - pub const OK: i32 = 1; - pub const ERROR: i32 = 2; -} diff --git a/libdd-trace-utils/src/otlp_encoder/mapper.rs b/libdd-trace-utils/src/otlp_encoder/mapper.rs index 0575c20ccf..639658f02c 100644 --- a/libdd-trace-utils/src/otlp_encoder/mapper.rs +++ b/libdd-trace-utils/src/otlp_encoder/mapper.rs @@ -1,21 +1,261 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! Maps Datadog trace/spans to OTLP ExportTraceServiceRequest. +//! Maps Datadog trace/spans directly to the generated prost OTLP types (the IR). +//! +//! The prost `ExportTraceServiceRequest` is the single OTLP representation: from it the +//! HTTP/protobuf wire format is produced by prost encoding and the HTTP/JSON wire format by the +//! serde serializer in `json_serializer`. Attributes are built straight into prost +//! `KeyValue`/`AnyValue` in one pass — there is no intermediate value type to keep the two +//! encoders in sync because there is only one IR. -use super::json_types::{ - self, AnyValue, ExportTraceServiceRequest, InstrumentationScope, KeyValue, OtlpSpan, - OtlpSpanEvent, OtlpSpanLink, Resource, ResourceSpans, ScopeSpans, Status, -}; use super::OtlpResourceInfo; use crate::span::v04::{Span, SpanEvent, SpanLink}; use crate::span::TraceData; use std::borrow::Borrow; +use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoReq; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + any_value::Value as ProtoValue, AnyValue as ProtoAnyValue, ArrayValue as ProtoArrayValue, + InstrumentationScope as ProtoScope, KeyValue as ProtoKeyValue, +}; +use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource as ProtoResource; +use libdd_trace_protobuf::opentelemetry::proto::trace::v1::{ + span::{Event as ProtoEvent, Link as ProtoLink}, + ResourceSpans as ProtoResourceSpans, ScopeSpans as ProtoScopeSpans, Span as ProtoSpan, + Status as ProtoStatus, +}; + /// Maximum number of attributes per span; excess are dropped and counted. -const MAX_ATTRIBUTES_PER_SPAN: usize = 128; +pub(crate) const MAX_ATTRIBUTES_PER_SPAN: usize = 128; + +/// OTLP SpanKind enum values. +mod span_kind { + pub const UNSPECIFIED: i32 = 0; + pub const INTERNAL: i32 = 1; + pub const SERVER: i32 = 2; + pub const CLIENT: i32 = 3; + pub const PRODUCER: i32 = 4; + pub const CONSUMER: i32 = 5; +} + +/// OTLP StatusCode enum values. +mod status_code { + pub const UNSET: i32 = 0; + pub const ERROR: i32 = 2; +} + +// ─── Scalar mapping helpers ────────────────────────────────────────────────── + +/// OTLP status (code, optional message) for a span. ERROR with `error.msg` when `span.error != 0`, +/// otherwise UNSET. +fn span_status(span: &Span) -> (i32, Option) { + if span.error != 0 { + ( + status_code::ERROR, + span.meta.get("error.msg").map(|v| v.borrow().to_string()), + ) + } else { + (status_code::UNSET, None) + } +} + +/// OTLP SpanKind for a span: prefer the explicit `span.kind` meta tag, else the DD span type. +fn span_kind(span: &Span) -> i32 { + span.meta + .get("span.kind") + .map(|v| tag_to_otlp_kind(v.borrow())) + .unwrap_or_else(|| dd_type_to_otlp_kind(span.r#type.borrow())) +} + +/// Resolve the high 64 bits of the chunk's 128-bit trace id (native field or `_dd.p.tid`). +fn chunk_trace_id_high(chunk: &[Span]) -> u64 { + chunk + .iter() + .find_map(|s| { + let high = (s.trace_id >> 64) as u64; + if high != 0 { + return Some(high); + } + s.meta + .get("_dd.p.tid") + .and_then(|v| u64::from_str_radix(v.borrow(), 16).ok()) + }) + .unwrap_or(0) +} + +/// Maps the explicit "span.kind" meta tag (set by OTEL-instrumented tracers) to an OTLP SpanKind. +fn tag_to_otlp_kind(t: &str) -> i32 { + // Case-insensitive match without allocating: these are ASCII keywords, so + // `eq_ignore_ascii_case` avoids the per-span `to_lowercase()` String on the encode hot + // path. + if t.eq_ignore_ascii_case("server") { + span_kind::SERVER + } else if t.eq_ignore_ascii_case("client") { + span_kind::CLIENT + } else if t.eq_ignore_ascii_case("producer") { + span_kind::PRODUCER + } else if t.eq_ignore_ascii_case("consumer") { + span_kind::CONSUMER + } else if t.eq_ignore_ascii_case("internal") { + span_kind::INTERNAL + } else { + span_kind::UNSPECIFIED + } +} + +/// Maps the Datadog span type field (set by DD-instrumented tracers) to an OTLP SpanKind. +fn dd_type_to_otlp_kind(t: &str) -> i32 { + // Case-insensitive match without allocating (see `tag_to_otlp_kind`). + if t.eq_ignore_ascii_case("server") + || t.eq_ignore_ascii_case("web") + || t.eq_ignore_ascii_case("http") + { + span_kind::SERVER + } else if t.eq_ignore_ascii_case("client") { + span_kind::CLIENT + } else if t.eq_ignore_ascii_case("producer") { + span_kind::PRODUCER + } else if t.eq_ignore_ascii_case("consumer") { + span_kind::CONSUMER + } else { + span_kind::INTERNAL + } +} + +// ─── Attribute builders (straight into prost) ───────────────────────────────── + +/// Wrap a prost attribute value as a `KeyValue`. `key_ref` is a profiling-signal field, set to +/// its zero default explicitly (no `..Default::default()`). +fn proto_kv(key: String, value: ProtoValue) -> ProtoKeyValue { + ProtoKeyValue { + key, + value: Some(ProtoAnyValue { value: Some(value) }), + key_ref: 0, + } +} + +/// Collect a span's OTLP attributes directly as prost `KeyValue`s plus the dropped count. +/// Per-span service.name (only when it differs from the resource service), operation.name, +/// span.type, resource.name, then meta (string), metrics (int when integral and in i64 range +/// else double), meta_struct (bytes), capped at `MAX_ATTRIBUTES_PER_SPAN`. +fn collect_span_attributes( + span: &Span, + resource_service: &str, +) -> (Vec, usize) { + // Pre-size to avoid reallocations as attributes accumulate. Upper bound is the 4 synthetic + // attrs plus every meta/metrics/meta_struct entry, clamped to the per-span cap. + let capacity = (4 + span.meta.len() + span.metrics.len() + span.meta_struct.len()) + .min(MAX_ATTRIBUTES_PER_SPAN); + let mut attrs: Vec = Vec::with_capacity(capacity); + let span_service = span.service.borrow(); + let has_per_span_service = !span_service.is_empty() && span_service != resource_service; + if has_per_span_service { + attrs.push(proto_kv( + "service.name".to_string(), + ProtoValue::StringValue(span_service.to_string()), + )); + } + let operation_name = span.name.borrow(); + let has_operation_name = !operation_name.is_empty(); + if has_operation_name { + attrs.push(proto_kv( + "operation.name".to_string(), + ProtoValue::StringValue(operation_name.to_string()), + )); + } + let span_type = span.r#type.borrow(); + let has_span_type = !span_type.is_empty(); + if has_span_type { + attrs.push(proto_kv( + "span.type".to_string(), + ProtoValue::StringValue(span_type.to_string()), + )); + } + let resource_name = span.resource.borrow(); + let has_resource_name = !resource_name.is_empty(); + if has_resource_name { + attrs.push(proto_kv( + "resource.name".to_string(), + ProtoValue::StringValue(resource_name.to_string()), + )); + } + for (k, v) in span.meta.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + attrs.push(proto_kv( + k.borrow().to_string(), + ProtoValue::StringValue(v.borrow().to_string()), + )); + } + for (k, v) in span.metrics.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + let value = if v.fract() == 0.0 && (*v >= i64::MIN as f64 && *v <= i64::MAX as f64) { + ProtoValue::IntValue(*v as i64) + } else { + ProtoValue::DoubleValue(*v) + }; + attrs.push(proto_kv(k.borrow().to_string(), value)); + } + for (k, v) in span.meta_struct.iter() { + if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { + break; + } + attrs.push(proto_kv( + k.borrow().to_string(), + ProtoValue::BytesValue(v.borrow().to_vec()), + )); + } + let total = (has_per_span_service as usize) + + (has_operation_name as usize) + + (has_span_type as usize) + + (has_resource_name as usize) + + span.meta.len() + + span.metrics.len() + + span.meta_struct.len(); + let dropped = total.saturating_sub(attrs.len()); + (attrs, dropped) +} -/// Maps Datadog trace chunks and resource info to an OTLP ExportTraceServiceRequest. +/// A single event/link attribute value → prost (events carry typed single/array values). +fn event_attr_value(av: &crate::span::v04::AttributeArrayValue) -> ProtoValue { + use crate::span::v04::AttributeArrayValue; + match av { + AttributeArrayValue::String(s) => ProtoValue::StringValue(s.borrow().to_string()), + AttributeArrayValue::Boolean(b) => ProtoValue::BoolValue(*b), + AttributeArrayValue::Integer(i) => ProtoValue::IntValue(*i), + AttributeArrayValue::Double(d) => ProtoValue::DoubleValue(*d), + } +} + +fn collect_event_attributes(ev: &SpanEvent) -> Vec { + use crate::span::v04::AttributeAnyValue; + ev.attributes + .iter() + .map(|(k, v)| { + let value = match v { + AttributeAnyValue::SingleValue(av) => event_attr_value(av), + AttributeAnyValue::Array(items) => ProtoValue::ArrayValue(ProtoArrayValue { + values: items + .iter() + .map(|it| ProtoAnyValue { + value: Some(event_attr_value(it)), + }) + .collect(), + }), + }; + proto_kv(k.borrow().to_string(), value) + }) + .collect() +} + +// ─── Public mapper ──────────────────────────────────────────────────────────── + +/// Maps Datadog trace chunks and resource info to a prost OTLP `ExportTraceServiceRequest`, built +/// directly from the native span fields (no hex/decimal round trip — the prost types are the IR). /// /// Resource: SDK-level attributes (service.name, deployment.environment.name, telemetry.sdk.*, /// runtime-id). InstrumentationScope: present but empty (DD SDKs don't have a scope concept). @@ -30,343 +270,179 @@ const MAX_ATTRIBUTES_PER_SPAN: usize = 128; pub fn map_traces_to_otlp( trace_chunks: Vec>>, resource_info: &OtlpResourceInfo, -) -> ExportTraceServiceRequest { +) -> ProtoReq { let resource = build_resource(resource_info); - let mut all_spans: Vec = Vec::new(); + // Pre-size to the total span count so the per-span push loop never reallocates. + let total_spans: usize = trace_chunks.iter().map(|chunk| chunk.len()).sum(); + let mut all_spans: Vec = Vec::with_capacity(total_spans); for chunk in &trace_chunks { // Resolve the high 64 bits of the 128-bit trace ID once per chunk. For each span, // prefer the native u128 `trace_id` field (e.g. Python's native spans hold the full // 128-bit ID there) and fall back to its RFC #85 `_dd.p.tid` meta tag. - let chunk_trace_id_high: u64 = chunk - .iter() - .find_map(|s| { - let high = (s.trace_id >> 64) as u64; - if high != 0 { - return Some(high); - } - s.meta - .get("_dd.p.tid") - .and_then(|v| u64::from_str_radix(v.borrow(), 16).ok()) - }) - .unwrap_or(0); + let high = chunk_trace_id_high(chunk); for span in chunk { - all_spans.push(map_span(span, &resource_info.service, chunk_trace_id_high)); + all_spans.push(map_span(span, &resource_info.service, high)); } } - let scope_spans = ScopeSpans { - scope: Some(InstrumentationScope::default()), - spans: all_spans, - schema_url: None, - }; - let resource_spans = ResourceSpans { - resource: Some(resource), - scope_spans: vec![scope_spans], - }; - ExportTraceServiceRequest { - resource_spans: vec![resource_spans], + ProtoReq { + resource_spans: vec![ProtoResourceSpans { + resource: Some(resource), + scope_spans: vec![ProtoScopeSpans { + scope: Some(ProtoScope { + name: String::new(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + spans: all_spans, + schema_url: String::new(), + }], + schema_url: String::new(), + }], } } -fn build_resource(resource_info: &OtlpResourceInfo) -> Resource { - let mut attributes: Vec = Vec::new(); - if !resource_info.service.is_empty() { - attributes.push(KeyValue { - key: "service.name".to_string(), - value: AnyValue::StringValue(resource_info.service.clone()), - }); - } - if !resource_info.env.is_empty() { - attributes.push(KeyValue { - key: "deployment.environment.name".to_string(), - value: AnyValue::StringValue(resource_info.env.clone()), - }); - } - if !resource_info.app_version.is_empty() { - attributes.push(KeyValue { - key: "service.version".to_string(), - value: AnyValue::StringValue(resource_info.app_version.clone()), - }); +fn push_str_attr(attrs: &mut Vec, k: &str, v: &str) { + if !v.is_empty() { + attrs.push(proto_kv( + k.to_string(), + ProtoValue::StringValue(v.to_string()), + )); } - attributes.push(KeyValue { - key: "telemetry.sdk.name".to_string(), - value: AnyValue::StringValue("datadog".to_string()), - }); - if !resource_info.language.is_empty() { - attributes.push(KeyValue { - key: "telemetry.sdk.language".to_string(), - value: AnyValue::StringValue(resource_info.language.clone()), - }); - } - if !resource_info.tracer_version.is_empty() { - attributes.push(KeyValue { - key: "telemetry.sdk.version".to_string(), - value: AnyValue::StringValue(resource_info.tracer_version.clone()), - }); - } - if !resource_info.runtime_id.is_empty() { - attributes.push(KeyValue { - key: "runtime-id".to_string(), - value: AnyValue::StringValue(resource_info.runtime_id.clone()), - }); +} + +fn build_resource(resource_info: &OtlpResourceInfo) -> ProtoResource { + let mut attributes: Vec = Vec::new(); + push_str_attr(&mut attributes, "service.name", &resource_info.service); + push_str_attr( + &mut attributes, + "deployment.environment.name", + &resource_info.env, + ); + push_str_attr( + &mut attributes, + "service.version", + &resource_info.app_version, + ); + attributes.push(proto_kv( + "telemetry.sdk.name".to_string(), + ProtoValue::StringValue("datadog".to_string()), + )); + push_str_attr( + &mut attributes, + "telemetry.sdk.language", + &resource_info.language, + ); + push_str_attr( + &mut attributes, + "telemetry.sdk.version", + &resource_info.tracer_version, + ); + push_str_attr(&mut attributes, "runtime-id", &resource_info.runtime_id); + // `entity_refs` is a profiling-signal-only field; explicit default. + ProtoResource { + attributes, + dropped_attributes_count: 0, + entity_refs: Vec::new(), } - Resource { attributes } } fn map_span( span: &Span, resource_service: &str, chunk_trace_id_high: u64, -) -> OtlpSpan { +) -> ProtoSpan { // Reconstruct the full 128-bit trace ID. The caller resolves the high 64 bits once per // chunk (from either the native u128 `trace_id` field or the "_dd.p.tid" meta tag). // All spans in a chunk share the same trace ID. let trace_id_128 = ((chunk_trace_id_high as u128) << 64) | (span.trace_id as u64 as u128); - let trace_id_hex = format!("{:032x}", trace_id_128); - let span_id_hex = format!("{:016x}", span.span_id); let parent_span_id = if span.parent_id != 0 { - Some(format!("{:016x}", span.parent_id)) + span.parent_id.to_be_bytes().to_vec() } else { - None + Vec::new() }; - let start_nano = span.start; - let end_nano = span.start + span.duration; - let start_time_unix_nano = start_nano.to_string(); - let end_time_unix_nano = end_nano.to_string(); - // Prefer explicit "span.kind" tag (set by OTEL-instrumented tracers); fall back to - // the Datadog span type field for DD-instrumented spans. - let kind = span - .meta - .get("span.kind") - .map(|v| tag_to_otlp_kind(v.borrow())) - .unwrap_or_else(|| dd_type_to_otlp_kind(span.r#type.borrow())); - let (attributes, dropped_attributes_count) = map_attributes(span, resource_service); - let error_msg = span.meta.get("error.msg").map(|v| v.borrow().to_string()); - let status = if span.error != 0 { - Status { - message: error_msg, - code: json_types::status_code::ERROR, - } - } else { - Status { - message: None, - code: json_types::status_code::UNSET, - } - }; - // Set flags from sampling priority: 1 = sampled/keep, 0 = dropped. + let (attributes, dropped_attributes_count) = collect_span_attributes(span, resource_service); + let (code, message) = span_status(span); let flags = span .metrics .get("_sampling_priority_v1") - .map(|p| if *p >= 1.0 { 1u32 } else { 0u32 }); + .map(|p| if *p >= 1.0 { 1u32 } else { 0u32 }) + .unwrap_or(0); let trace_state = span .meta .get("tracestate") .map(|v| v.borrow().to_string()) - .filter(|s| !s.is_empty()); + .filter(|s| !s.is_empty()) + .unwrap_or_default(); let links = span.span_links.iter().map(map_span_link).collect(); let (events, dropped_events_count) = map_span_events(&span.span_events); - OtlpSpan { - trace_id: trace_id_hex, - span_id: span_id_hex, - parent_span_id, + ProtoSpan { + trace_id: trace_id_128.to_be_bytes().to_vec(), + span_id: span.span_id.to_be_bytes().to_vec(), trace_state, + parent_span_id, + flags, name: span.resource.borrow().to_string(), - kind, - start_time_unix_nano, - end_time_unix_nano, + kind: span_kind(span), + // Clamp negatives to 0 — matches the prior parse_u64 zero-fallback on negative input. + start_time_unix_nano: span.start.max(0) as u64, + end_time_unix_nano: (span.start + span.duration).max(0) as u64, attributes, - status, - links, + dropped_attributes_count: dropped_attributes_count as u32, events, - dropped_attributes_count: if dropped_attributes_count > 0 { - Some(dropped_attributes_count as u32) - } else { - None - }, - dropped_events_count: if dropped_events_count > 0 { - Some(dropped_events_count as u32) - } else { - None - }, - flags, + dropped_events_count: dropped_events_count as u32, + links, + // The mapper enforces no link cap, so dropped links is always 0. + dropped_links_count: 0, + status: Some(ProtoStatus { + message: message.unwrap_or_default(), + code, + }), } } -fn map_span_link(link: &SpanLink) -> OtlpSpanLink { +fn map_span_link(link: &SpanLink) -> ProtoLink { let trace_id_128 = ((link.trace_id_high as u128) << 64) | (link.trace_id as u128); - let trace_id_hex = format!("{:032x}", trace_id_128); - let span_id_hex = format!("{:016x}", link.span_id); - let trace_state = if link.tracestate.borrow().is_empty() { - None - } else { - Some(link.tracestate.borrow().to_string()) - }; - let attributes: Vec = link - .attributes - .iter() - .map(|(k, v)| KeyValue { - key: k.borrow().to_string(), - value: AnyValue::StringValue(v.borrow().to_string()), - }) - .collect(); - OtlpSpanLink { - trace_id: trace_id_hex, - span_id: span_id_hex, - trace_state, - attributes, - dropped_attributes_count: None, + ProtoLink { + trace_id: trace_id_128.to_be_bytes().to_vec(), + span_id: link.span_id.to_be_bytes().to_vec(), + trace_state: { + let ts = link.tracestate.borrow(); + if ts.is_empty() { + String::new() + } else { + ts.to_string() + } + }, + attributes: link + .attributes + .iter() + .map(|(k, v)| { + proto_kv( + k.borrow().to_string(), + ProtoValue::StringValue(v.borrow().to_string()), + ) + }) + .collect(), + dropped_attributes_count: 0, + // `SpanLink` has no flags field; faithful value is 0. + flags: 0, } } -fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) { +fn map_span_events(events: &[SpanEvent]) -> (Vec, usize) { const MAX_EVENTS_PER_SPAN: usize = 128; - let mut otlp_events = Vec::with_capacity(events.len().min(MAX_EVENTS_PER_SPAN)); + let mut out = Vec::with_capacity(events.len().min(MAX_EVENTS_PER_SPAN)); for ev in events.iter().take(MAX_EVENTS_PER_SPAN) { - let attributes: Vec = ev - .attributes - .iter() - .map(|(k, v)| event_attr_to_key_value(k, v)) - .collect(); - otlp_events.push(OtlpSpanEvent { - time_unix_nano: ev.time_unix_nano.to_string(), + out.push(ProtoEvent { + time_unix_nano: ev.time_unix_nano, name: ev.name.borrow().to_string(), - attributes, - dropped_attributes_count: None, + attributes: collect_event_attributes(ev), + dropped_attributes_count: 0, }); } - let dropped = events.len().saturating_sub(otlp_events.len()); - (otlp_events, dropped) -} - -fn event_attr_to_key_value( - k: &T::Text, - v: &crate::span::v04::AttributeAnyValue, -) -> KeyValue { - use crate::span::v04::AttributeArrayValue; - let value = match v { - crate::span::v04::AttributeAnyValue::SingleValue(av) => match av { - AttributeArrayValue::String(s) => AnyValue::StringValue(s.borrow().to_string()), - AttributeArrayValue::Boolean(b) => AnyValue::BoolValue(*b), - AttributeArrayValue::Integer(i) => AnyValue::IntValue(*i), - AttributeArrayValue::Double(d) => AnyValue::DoubleValue(*d), - }, - crate::span::v04::AttributeAnyValue::Array(items) => { - let values = items - .iter() - .map(|item| match item { - AttributeArrayValue::String(s) => AnyValue::StringValue(s.borrow().to_string()), - AttributeArrayValue::Boolean(b) => AnyValue::BoolValue(*b), - AttributeArrayValue::Integer(i) => AnyValue::IntValue(*i), - AttributeArrayValue::Double(d) => AnyValue::DoubleValue(*d), - }) - .collect(); - AnyValue::ArrayValue(crate::otlp_encoder::json_types::ArrayValue { values }) - } - }; - KeyValue { - key: k.borrow().to_string(), - value, - } -} - -/// Maps the explicit "span.kind" meta tag (set by OTEL-instrumented tracers) to an OTLP SpanKind. -fn tag_to_otlp_kind(t: &str) -> i32 { - match t.to_lowercase().as_str() { - "server" => json_types::span_kind::SERVER, - "client" => json_types::span_kind::CLIENT, - "producer" => json_types::span_kind::PRODUCER, - "consumer" => json_types::span_kind::CONSUMER, - "internal" => json_types::span_kind::INTERNAL, - _ => json_types::span_kind::UNSPECIFIED, - } -} - -/// Maps the Datadog span type field (set by DD-instrumented tracers) to an OTLP SpanKind. -fn dd_type_to_otlp_kind(t: &str) -> i32 { - match t.to_lowercase().as_str() { - "server" | "web" | "http" => json_types::span_kind::SERVER, - "client" => json_types::span_kind::CLIENT, - "producer" => json_types::span_kind::PRODUCER, - "consumer" => json_types::span_kind::CONSUMER, - _ => json_types::span_kind::INTERNAL, - } -} - -fn map_attributes(span: &Span, resource_service: &str) -> (Vec, usize) { - let mut attrs: Vec = Vec::new(); - // Add service.name when the span's service differs from the resource-level service. - let span_service = span.service.borrow(); - let has_per_span_service = !span_service.is_empty() && span_service != resource_service; - if has_per_span_service { - attrs.push(KeyValue { - key: "service.name".to_string(), - value: AnyValue::StringValue(span_service.to_string()), - }); - } - let operation_name = span.name.borrow(); - let has_operation_name = !operation_name.is_empty(); - if has_operation_name { - attrs.push(KeyValue { - key: "operation.name".to_string(), - value: AnyValue::StringValue(operation_name.to_string()), - }); - } - let span_type = span.r#type.borrow(); - let has_span_type = !span_type.is_empty(); - if has_span_type { - attrs.push(KeyValue { - key: "span.type".to_string(), - value: AnyValue::StringValue(span_type.to_string()), - }); - } - let resource_name = span.resource.borrow(); - let has_resource_name = !resource_name.is_empty(); - if has_resource_name { - attrs.push(KeyValue { - key: "resource.name".to_string(), - value: AnyValue::StringValue(resource_name.to_string()), - }); - } - for (k, v) in span.meta.iter() { - if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { - break; - } - attrs.push(KeyValue { - key: k.borrow().to_string(), - value: AnyValue::StringValue(v.borrow().to_string()), - }); - } - for (k, v) in span.metrics.iter() { - if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { - break; - } - let value = if v.fract() == 0.0 && (*v >= i64::MIN as f64 && *v <= i64::MAX as f64) { - AnyValue::IntValue(*v as i64) - } else { - AnyValue::DoubleValue(*v) - }; - attrs.push(KeyValue { - key: k.borrow().to_string(), - value, - }); - } - for (k, v) in span.meta_struct.iter() { - if attrs.len() >= MAX_ATTRIBUTES_PER_SPAN { - break; - } - attrs.push(KeyValue { - key: k.borrow().to_string(), - value: AnyValue::BytesValue(v.borrow().to_vec()), - }); - } - let total = (if has_per_span_service { 1 } else { 0 }) - + (if has_operation_name { 1 } else { 0 }) - + (if has_span_type { 1 } else { 0 }) - + (if has_resource_name { 1 } else { 0 }) - + span.meta.len() - + span.metrics.len() - + span.meta_struct.len(); - let dropped = total.saturating_sub(attrs.len()); - (attrs, dropped) + let dropped = events.len().saturating_sub(out.len()); + (out, dropped) } #[cfg(test)] @@ -376,38 +452,115 @@ mod tests { use crate::span::BytesData; #[test] - fn test_trace_id_span_id_format() { + fn maps_native_span_to_prost_ir() { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; let resource_info = OtlpResourceInfo::default(); + let mut span: Span = Span { + trace_id: 0xD269B633813FC60C_u128, + span_id: 0xEEE19B7EC3C1B174, + parent_id: 0xEEE19B7EC3C1B173, + name: libdd_tinybytes::BytesString::from_static("op"), + resource: libdd_tinybytes::BytesString::from_static("res"), + r#type: libdd_tinybytes::BytesString::from_static("web"), + start: 1544712660000000000, + duration: 1000000000, + error: 1, + ..Default::default() + }; + span.meta.insert( + "error.msg".into(), + libdd_tinybytes::BytesString::from_static("boom"), + ); + span.metrics + .insert(libdd_tinybytes::BytesString::from_static("count"), 42.0); + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!(s.trace_id, 0xD269B633813FC60C_u128.to_be_bytes().to_vec()); + assert_eq!(s.span_id, 0xEEE19B7EC3C1B174u64.to_be_bytes().to_vec()); + assert_eq!( + s.parent_span_id, + 0xEEE19B7EC3C1B173u64.to_be_bytes().to_vec() + ); + assert_eq!(s.name, "res"); + assert_eq!(s.kind, 2); // SERVER (from dd type "web") + assert_eq!(s.start_time_unix_nano, 1544712660000000000); + assert_eq!(s.end_time_unix_nano, 1544712661000000000); + let st = s.status.as_ref().unwrap(); + assert_eq!(st.code, 2); + assert_eq!(st.message, "boom"); + let count = s.attributes.iter().find(|a| a.key == "count").unwrap(); + assert!(matches!( + count.value.as_ref().unwrap().value, + Some(PV::IntValue(42)) + )); + } + + #[test] + fn proto_span_uses_raw_id_bytes_and_native_timestamps() { + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; let span: Span = Span { - trace_id: 0xD269B633813FC60C_u128, // low 64 bits only (v04 wire format) + trace_id: 0x5b8efff798038103_d269b633813fc60c_u128, span_id: 0xEEE19B7EC3C1B174, parent_id: 0xEEE19B7EC3C1B173, - name: libdd_tinybytes::BytesString::from_static("test"), - service: libdd_tinybytes::BytesString::from_static("svc"), + name: libdd_tinybytes::BytesString::from_static("op"), resource: libdd_tinybytes::BytesString::from_static("res"), r#type: libdd_tinybytes::BytesString::from_static("web"), start: 1544712660000000000, duration: 1000000000, - error: 0, ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let rs = &req.resource_spans[0]; - let otlp_span = &rs.scope_spans[0].spans[0]; - assert_eq!(otlp_span.trace_id, "0000000000000000d269b633813fc60c"); - assert_eq!(otlp_span.span_id, "eee19b7ec3c1b174"); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!( + s.trace_id, + 0x5b8efff798038103_d269b633813fc60c_u128 + .to_be_bytes() + .to_vec() + ); + assert_eq!(s.span_id, 0xEEE19B7EC3C1B174u64.to_be_bytes().to_vec()); + assert_eq!( + s.parent_span_id, + 0xEEE19B7EC3C1B173u64.to_be_bytes().to_vec() + ); + assert_eq!(s.start_time_unix_nano, 1544712660000000000); + assert_eq!(s.end_time_unix_nano, 1544712661000000000); + assert_eq!(s.name, "res"); + assert_eq!(s.kind, span_kind::SERVER); + } + + #[test] + fn negative_start_clamps_to_zero() { + // Regression test: a span with negative start (malformed input) must map to + // start_time_unix_nano == 0 (and not wrap to u64::MAX), matching the old parse_u64 + // behavior. + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let span: Span = Span { + trace_id: 1, + span_id: 1, + start: -1, + duration: 0, + ..Default::default() + }; + let req = map_traces_to_otlp(vec![vec![span]], &resource_info); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!( + s.start_time_unix_nano, 0, + "negative start must clamp to 0, not wrap" + ); assert_eq!( - otlp_span.parent_span_id.as_deref(), - Some("eee19b7ec3c1b173") + s.end_time_unix_nano, 0, + "negative start+duration must clamp to 0, not wrap" ); - assert_eq!(otlp_span.kind, json_types::span_kind::SERVER); - assert_eq!(otlp_span.start_time_unix_nano, "1544712660000000000"); - assert_eq!(otlp_span.end_time_unix_nano, "1544712661000000000"); - assert_eq!(rs.scope_spans[0].scope.as_ref().unwrap().name, None); } #[test] - fn test_status_error_message_from_meta() { + fn status_error_message_from_meta() { let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 1, @@ -423,14 +576,15 @@ mod tests { libdd_tinybytes::BytesString::from_static("something broke"), ); let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; - let status = &otlp_span.status; - assert_eq!(status.code, json_types::status_code::ERROR); - assert_eq!(status.message.as_deref(), Some("something broke")); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + let status = s.status.as_ref().unwrap(); + assert_eq!(status.code, status_code::ERROR); + assert_eq!(status.message, "something broke"); } #[test] - fn test_metrics_as_int_or_double() { + fn metrics_as_int_or_double() { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 1, @@ -447,29 +601,23 @@ mod tests { std::f64::consts::PI, ); let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; - let count_kv = attrs - .as_array() - .unwrap() - .iter() - .find(|a| a["key"] == "count") - .unwrap(); - assert_eq!(count_kv["value"]["intValue"], "42"); - let rate_kv = attrs - .as_array() - .unwrap() - .iter() - .find(|a| a["key"] == "rate") - .unwrap(); - let rate = rate_kv["value"]["doubleValue"].as_f64().unwrap(); - assert!((rate - std::f64::consts::PI).abs() < 1e-9); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + let count = s.attributes.iter().find(|a| a.key == "count").unwrap(); + assert!(matches!( + count.value.as_ref().unwrap().value, + Some(PV::IntValue(42)) + )); + let rate = s.attributes.iter().find(|a| a.key == "rate").unwrap(); + match rate.value.as_ref().unwrap().value { + Some(PV::DoubleValue(d)) => assert!((d - std::f64::consts::PI).abs() < 1e-9), + ref other => panic!("expected double, got {other:?}"), + } } #[test] - fn test_128bit_trace_id_from_dd_p_tid() { + fn trace_id_128_from_dd_p_tid() { // When "_dd.p.tid" is present it supplies the high 64 bits of the trace ID. - // Low 64 bits come from span.trace_id; the two are concatenated to form a 128-bit hex ID. + // Low 64 bits come from span.trace_id; the two are concatenated to form a 128-bit ID. let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 0xD269B633813FC60C_u128, // low 64 bits @@ -484,12 +632,17 @@ mod tests { libdd_tinybytes::BytesString::from_static("5b8efff798038103"), ); let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; - assert_eq!(otlp_span.trace_id, "5b8efff798038103d269b633813fc60c"); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!( + s.trace_id, + 0x5b8efff798038103_d269b633813fc60c_u128 + .to_be_bytes() + .to_vec() + ); } #[test] - fn test_128bit_trace_id_from_native_span_field() { + fn trace_id_128_from_native_span_field() { // When the span's u128 `trace_id` field already carries the full 128-bit ID (e.g. // tracers with native spans like Python), the chunk-root meta lookup is skipped and // the field's high 64 bits are propagated to every span in the chunk. @@ -515,13 +668,13 @@ mod tests { }; let req = map_traces_to_otlp(vec![vec![root, child]], &resource_info); let spans = &req.resource_spans[0].scope_spans[0].spans; - let expected = "5b8efff798038103d269b633813fc60c"; + let expected = full.to_be_bytes().to_vec(); assert_eq!(spans[0].trace_id, expected); assert_eq!(spans[1].trace_id, expected); } #[test] - fn test_128bit_trace_id_without_dd_p_tid() { + fn trace_id_128_without_dd_p_tid_defaults_high_to_zero() { // When the entire chunk has no "_dd.p.tid" the high 64 bits default to zero // (legacy 64-bit-only trace IDs). let resource_info = OtlpResourceInfo::default(); @@ -534,12 +687,12 @@ mod tests { ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; - assert_eq!(otlp_span.trace_id, "0000000000000000d269b633813fc60c"); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!(s.trace_id, 0xD269B633813FC60C_u128.to_be_bytes().to_vec()); } #[test] - fn test_128bit_trace_id_propagated_to_chunk_children() { + fn trace_id_128_propagated_to_chunk_children() { // Per RFC #85 dd-trace tracers set "_dd.p.tid" only on the chunk root. // The OTLP mapper must apply that high-bits value to every span in the chunk // so receivers see the full 128-bit trace_id on every span. @@ -578,14 +731,16 @@ mod tests { let req = map_traces_to_otlp(vec![vec![root, child_a, child_b]], &resource_info); let spans = &req.resource_spans[0].scope_spans[0].spans; assert_eq!(spans.len(), 3); - let expected = "5b8efff798038103d269b633813fc60c"; + let expected = 0x5b8efff798038103_d269b633813fc60c_u128 + .to_be_bytes() + .to_vec(); for s in spans { - assert_eq!(s.trace_id, expected, "span {} mismatched", s.span_id); + assert_eq!(s.trace_id, expected); } } #[test] - fn test_128bit_trace_id_isolation_across_chunks() { + fn trace_id_128_isolation_across_chunks() { // The chunk-level high bits must not leak across chunks. Each chunk's spans // get only their own chunk root's "_dd.p.tid". let resource_info = OtlpResourceInfo::default(); @@ -639,9 +794,12 @@ mod tests { ); let spans = &req.resource_spans[0].scope_spans[0].spans; assert_eq!(spans.len(), 4); - // Spans 1, 2 belong to chunk A; spans 3, 4 to chunk B. - let expect_a = "aaaaaaaaaaaaaaaa1111111111111111"; - let expect_b = "bbbbbbbbbbbbbbbb2222222222222222"; + let expect_a = 0xaaaaaaaaaaaaaaaa_1111111111111111_u128 + .to_be_bytes() + .to_vec(); + let expect_b = 0xbbbbbbbbbbbbbbbb_2222222222222222_u128 + .to_be_bytes() + .to_vec(); assert_eq!(spans[0].trace_id, expect_a); assert_eq!(spans[1].trace_id, expect_a); assert_eq!(spans[2].trace_id, expect_b); @@ -649,7 +807,7 @@ mod tests { } #[test] - fn test_chunk_with_malformed_dd_p_tid_on_root_falls_back() { + fn chunk_with_malformed_dd_p_tid_on_root_falls_back() { // If the chunk root's "_dd.p.tid" fails to parse, the scan continues looking for // any other parseable value in the chunk before giving up. This keeps a malformed // tag on one span from poisoning the rest of the trace. @@ -693,14 +851,16 @@ mod tests { let spans = &req.resource_spans[0].scope_spans[0].spans; // The chunk-level scan skips the malformed root and picks up child_valid's tag, // which is then applied to every span in the chunk. - let expected = "ddddddddddddddddd269b633813fc60c"; + let expected = 0xdddddddddddddddd_d269b633813fc60c_u128 + .to_be_bytes() + .to_vec(); assert_eq!(spans[0].trace_id, expected); assert_eq!(spans[1].trace_id, expected); assert_eq!(spans[2].trace_id, expected); } #[test] - fn test_empty_chunk_does_not_panic() { + fn empty_chunk_does_not_panic() { // Defensive: an empty chunk should produce no spans and not panic. let resource_info = OtlpResourceInfo::default(); let empty: Vec>> = vec![vec![]]; @@ -710,7 +870,7 @@ mod tests { } #[test] - fn test_tracestate_from_meta() { + fn tracestate_from_meta() { let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 1, @@ -725,16 +885,14 @@ mod tests { libdd_tinybytes::BytesString::from_static("vendor1=abc,rojo=00f067"), ); let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; - assert_eq!( - otlp_span.trace_state.as_deref(), - Some("vendor1=abc,rojo=00f067") - ); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!(s.trace_state, "vendor1=abc,rojo=00f067"); } #[test] - fn test_meta_struct_as_bytes_value() { + fn meta_struct_as_bytes_value() { use libdd_tinybytes::Bytes; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { trace_id: 1, @@ -747,20 +905,21 @@ mod tests { span.meta_struct .insert("my_key".into(), Bytes::from(vec![1u8, 2, 3])); let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; - let kv = attrs - .as_array() - .unwrap() + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + let kv = s + .attributes .iter() - .find(|a| a["key"] == "my_key") + .find(|a| a.key == "my_key") .expect("my_key attribute not found"); - // Per the protobuf JSON mapping, bytes are base64-encoded. - assert_eq!(kv["value"]["bytesValue"], "AQID"); + match kv.value.as_ref().unwrap().value { + Some(PV::BytesValue(ref b)) => assert_eq!(b, &vec![1u8, 2, 3]), + ref other => panic!("expected bytes, got {other:?}"), + } } #[test] - fn test_operation_name_attribute() { + fn operation_name_attribute() { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; let resource_info = OtlpResourceInfo::default(); let span: Span = Span { trace_id: 1, @@ -771,19 +930,21 @@ mod tests { ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; - let kv = attrs - .as_array() - .unwrap() + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + let kv = s + .attributes .iter() - .find(|a| a["key"] == "operation.name") + .find(|a| a.key == "operation.name") .expect("operation.name attribute not found"); - assert_eq!(kv["value"]["stringValue"], "my.operation"); + match kv.value.as_ref().unwrap().value { + Some(PV::StringValue(ref v)) => assert_eq!(v, "my.operation"), + ref other => panic!("expected string, got {other:?}"), + } } #[test] - fn test_span_type_attribute() { + fn span_type_attribute() { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; let resource_info = OtlpResourceInfo::default(); let span: Span = Span { trace_id: 1, @@ -795,19 +956,21 @@ mod tests { ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; - let kv = attrs - .as_array() - .unwrap() + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + let kv = s + .attributes .iter() - .find(|a| a["key"] == "span.type") + .find(|a| a.key == "span.type") .expect("span.type attribute not found"); - assert_eq!(kv["value"]["stringValue"], "grpc"); + match kv.value.as_ref().unwrap().value { + Some(PV::StringValue(ref v)) => assert_eq!(v, "grpc"), + ref other => panic!("expected string, got {other:?}"), + } } #[test] - fn test_resource_name_attribute() { + fn resource_name_attribute_and_span_name() { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; let resource_info = OtlpResourceInfo::default(); let span: Span = Span { trace_id: 1, @@ -819,25 +982,24 @@ mod tests { ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let otlp_span = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]; + let s = &req.resource_spans[0].scope_spans[0].spans[0]; // resource maps to the OTLP span name - assert_eq!(otlp_span["name"], "GET /api/users"); + assert_eq!(s.name, "GET /api/users"); // resource also maps to the resource.name attribute - let kv = otlp_span["attributes"] - .as_array() - .unwrap() + let kv = s + .attributes .iter() - .find(|a| a["key"] == "resource.name") + .find(|a| a.key == "resource.name") .expect("resource.name attribute not found"); - assert_eq!(kv["value"]["stringValue"], "GET /api/users"); + match kv.value.as_ref().unwrap().value { + Some(PV::StringValue(ref v)) => assert_eq!(v, "GET /api/users"), + ref other => panic!("expected string, got {other:?}"), + } } #[test] - fn test_empty_resource_name_not_emitted() { + fn empty_resource_name_not_emitted() { // A span with no resource set should not emit a resource.name attribute. - // In practice DD spans always have a resource, but the mapper is defensive about - // empty fields from the wire. let resource_info = OtlpResourceInfo::default(); let span: Span = Span { trace_id: 1, @@ -849,18 +1011,16 @@ mod tests { ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let attrs = json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"] - .as_array() - .unwrap(); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; assert!( - !attrs.iter().any(|a| a["key"] == "resource.name"), + !s.attributes.iter().any(|a| a.key == "resource.name"), "resource.name should not be emitted when resource is empty" ); } #[test] - fn test_per_span_service_name_attribute() { + fn per_span_service_name_attribute() { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as PV; // When span.service differs from the resource-level service, service.name is emitted // as a per-span attribute so the receiver can distinguish between services in a trace. let resource_info = OtlpResourceInfo { @@ -877,19 +1037,20 @@ mod tests { ..Default::default() }; let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let json = serde_json::to_value(&req).unwrap(); - let attrs = &json["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["attributes"]; - let kv = attrs - .as_array() - .unwrap() + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + let kv = s + .attributes .iter() - .find(|a| a["key"] == "service.name") + .find(|a| a.key == "service.name") .expect("service.name attribute not found"); - assert_eq!(kv["value"]["stringValue"], "span-svc"); + match kv.value.as_ref().unwrap().value { + Some(PV::StringValue(ref v)) => assert_eq!(v, "span-svc"), + ref other => panic!("expected string, got {other:?}"), + } } #[test] - fn test_unsampled_span_flags_zero() { + fn unsampled_span_flags_zero() { // _sampling_priority_v1 = 0 means explicitly dropped; flags field must be 0. let resource_info = OtlpResourceInfo::default(); let mut span: Span = Span { @@ -902,7 +1063,7 @@ mod tests { }; span.metrics.insert("_sampling_priority_v1".into(), 0.0); let req = map_traces_to_otlp(vec![vec![span]], &resource_info); - let otlp_span = &req.resource_spans[0].scope_spans[0].spans[0]; - assert_eq!(otlp_span.flags, Some(0)); + let s = &req.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!(s.flags, 0); } } diff --git a/libdd-trace-utils/src/otlp_encoder/mod.rs b/libdd-trace-utils/src/otlp_encoder/mod.rs index 782a10e10d..aa47fab61a 100644 --- a/libdd-trace-utils/src/otlp_encoder/mod.rs +++ b/libdd-trace-utils/src/otlp_encoder/mod.rs @@ -1,13 +1,27 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! OTLP HTTP/JSON encoder: maps Datadog spans to ExportTraceServiceRequest. +//! OTLP encoder: maps Datadog spans to the prost OTLP types (the IR), then to the HTTP/protobuf +//! or HTTP/JSON wire format. -pub mod json_types; +pub(crate) mod json_serializer; pub mod mapper; pub use mapper::map_traces_to_otlp; +pub use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoExportTraceServiceRequest; +use prost::Message; + +/// Serialize the prost OTLP request to the HTTP/protobuf wire format. +pub fn encode_otlp_protobuf(req: &ProtoExportTraceServiceRequest) -> Vec { + req.encode_to_vec() +} + +/// Serialize the prost OTLP request to the HTTP/JSON wire format (OTLP/JSON spec). +pub fn encode_otlp_json(req: &ProtoExportTraceServiceRequest) -> serde_json::Result> { + json_serializer::to_otlp_json_vec(req) +} + /// Tracer-level attributes used to populate the OTLP Resource on export. /// /// These are the fields from the tracer's configuration that map to OTLP Resource attributes @@ -23,3 +37,84 @@ pub struct OtlpResourceInfo { pub tracer_version: String, pub runtime_id: String, } + +#[cfg(test)] +mod encode_tests { + use super::*; + use crate::span::v04::Span; + use crate::span::BytesData; + use libdd_trace_protobuf::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest as ProtoReq; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value::Value as ProtoValue; + use prost::Message; + + fn sample_native() -> (Vec>>, OtlpResourceInfo) { + let resource_info = OtlpResourceInfo { + service: "svc".to_string(), + ..Default::default() + }; + let mut span: Span = Span { + trace_id: 0x5b8efff798038103_d269b633813fc60c_u128, + span_id: 0xEEE19B7EC3C1B174, + name: libdd_tinybytes::BytesString::from_static("op"), + resource: libdd_tinybytes::BytesString::from_static("res"), + start: 1, + duration: 2, + error: 1, + ..Default::default() + }; + span.meta.insert( + "error.msg".into(), + libdd_tinybytes::BytesString::from_static("boom"), + ); + span.meta.insert( + "http.method".into(), + libdd_tinybytes::BytesString::from_static("GET"), + ); + (vec![vec![span]], resource_info) + } + + #[test] + fn json_and_protobuf_carry_same_span() { + // Decisive guard: JSON and protobuf are encoded from the *same* prost IR, so the two + // wire formats cannot drift. + let (chunks, info) = sample_native(); + let req = map_traces_to_otlp(chunks, &info); + let json = encode_otlp_json(&req).unwrap(); + let pb = encode_otlp_protobuf(&req); + + let jv: serde_json::Value = serde_json::from_slice(&json).unwrap(); + let jspan = &jv["resourceSpans"][0]["scopeSpans"][0]["spans"][0]; + let proto = ProtoReq::decode(pb.as_slice()).unwrap(); + let pspan = &proto.resource_spans[0].scope_spans[0].spans[0]; + + assert_eq!(jspan["name"].as_str().unwrap(), pspan.name); + assert_eq!( + jspan["spanId"].as_str().unwrap(), + hex::encode(&pspan.span_id) + ); + assert_eq!( + jspan["traceId"].as_str().unwrap(), + hex::encode(&pspan.trace_id) + ); + let pst = pspan.status.as_ref().unwrap(); + assert_eq!(jspan["status"]["code"].as_i64().unwrap() as i32, pst.code); + assert_eq!(jspan["status"]["message"].as_str().unwrap(), pst.message); + let jattr = jspan["attributes"] + .as_array() + .unwrap() + .iter() + .find(|a| a["key"] == "http.method") + .unwrap(); + let pattr = pspan + .attributes + .iter() + .find(|a| a.key == "http.method") + .unwrap(); + let pval = match pattr.value.as_ref().unwrap().value.as_ref().unwrap() { + ProtoValue::StringValue(v) => v.as_str(), + other => panic!("expected string, got {other:?}"), + }; + assert_eq!(jattr["value"]["stringValue"].as_str().unwrap(), pval); + assert_eq!(jattr["value"]["stringValue"].as_str().unwrap(), "GET"); + } +}