-
Notifications
You must be signed in to change notification settings - Fork 21
feat(data-pipeline): add agentless export #2081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Example: send a trace directly to the Datadog agentless intake. | ||
| //! | ||
| //! Reads the API key from the `DD_API_KEY` environment variable and POSTs a | ||
| //! small trace to `https://public-trace-http-intake.logs.{DD_SITE}/v1/input` | ||
| //! (defaulting to `datadoghq.com`). | ||
| //! | ||
| //! Usage: | ||
| //! DD_API_KEY=<key> [DD_SITE=datadoghq.eu] \ | ||
| //! cargo run --example send-traces-agentless -p libdd-data-pipeline | ||
|
|
||
| use clap::Parser; | ||
| use libdd_capabilities_impl::NativeCapabilities; | ||
| use libdd_data_pipeline::trace_exporter::{ | ||
| TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat, | ||
| }; | ||
| use libdd_log::logger::{ | ||
| logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget, | ||
| }; | ||
| use libdd_shared_runtime::SharedRuntime; | ||
| use libdd_trace_utils::span::v04::{SpanBytes, SpanEvent, SpanLink}; | ||
| use rand::random; | ||
| use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; | ||
|
|
||
| fn get_span(now: i64, trace_id: u128, span_id: u64) -> SpanBytes { | ||
| let duration = 1_000_000 * span_id as i64; | ||
| SpanBytes { | ||
| trace_id, | ||
| span_id, | ||
| parent_id: span_id.saturating_sub(1), | ||
| duration: 1_000_000 * span_id as i64, | ||
| start: now + duration, | ||
| service: "data-pipeline-agentless-example".into(), | ||
| name: "agentless.example".into(), | ||
| resource: "resource".into(), | ||
| error: 0, | ||
| metrics: HashMap::from([("_sampling_priority_v1".into(), 1.0)]), | ||
| span_events: vec![SpanEvent { | ||
| time_unix_nano: now as u64, | ||
| name: "event".into(), | ||
| attributes: HashMap::new(), | ||
| }], | ||
| span_links: vec![SpanLink { | ||
| trace_id: 10101010101, | ||
| trace_id_high: 1010101, | ||
| span_id, | ||
| ..Default::default() | ||
| }], | ||
| ..Default::default() | ||
| } | ||
| } | ||
|
|
||
| #[derive(Parser)] | ||
| #[command(name = "send-traces-agentless")] | ||
| #[command(about = "Send a trace to the Datadog agentless intake")] | ||
| struct Args { | ||
| /// Override the intake URL. Defaults to | ||
| /// `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`. | ||
| #[arg(long = "url")] | ||
| url: Option<String>, | ||
| } | ||
|
|
||
| fn main() { | ||
| logger_configure_std(StdConfig { | ||
| target: StdTarget::Out, | ||
| }) | ||
| .expect("Failed to configure logger"); | ||
| logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level"); | ||
|
|
||
| let args = Args::parse(); | ||
|
|
||
| let api_key = std::env::var("DD_API_KEY") | ||
| .expect("DD_API_KEY environment variable must be set for agentless export"); | ||
| let site = std::env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); | ||
| let intake_url = args | ||
| .url | ||
| .unwrap_or_else(|| format!("https://public-trace-http-intake.logs.{site}/v1/input")); | ||
|
|
||
| let shared_runtime = Arc::new(SharedRuntime::new().expect("Failed to create runtime")); | ||
|
|
||
| let mut builder = TraceExporter::<NativeCapabilities>::builder(); | ||
| builder | ||
| .set_hostname("COMP-N661JFW6JN") | ||
| .set_env("prod") | ||
| .set_app_version(env!("CARGO_PKG_VERSION")) | ||
| .set_service("data-pipeline-agentless-example") | ||
| .set_tracer_version(env!("CARGO_PKG_VERSION")) | ||
| .set_language("nodejs") | ||
| .set_language_version(env!("CARGO_PKG_RUST_VERSION")) | ||
| .set_input_format(TraceExporterInputFormat::V04) | ||
| .set_output_format(TraceExporterOutputFormat::V04) | ||
| .set_shared_runtime(shared_runtime.clone()) | ||
| .set_agentless_endpoint(&intake_url, &api_key); | ||
|
|
||
| let exporter = builder | ||
| .build::<NativeCapabilities>() | ||
| .expect("Failed to build TraceExporter"); | ||
|
|
||
| let now = UNIX_EPOCH | ||
| .elapsed() | ||
| .expect("Failed to read time") | ||
| .as_nanos() as i64; | ||
|
|
||
| let trace_id = random(); | ||
| let trace: Vec<_> = (1..=3).map(|i| get_span(now, trace_id, i)).collect(); | ||
| let traces = vec![trace]; | ||
|
|
||
| exporter | ||
| .send_trace_chunks(traces) | ||
| .expect("Failed to send traces"); | ||
| println!("Trace sent to agentless intake at {intake_url}"); | ||
|
|
||
| shared_runtime | ||
| .shutdown(None) | ||
| .expect("Failed to shutdown runtime"); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Agentless APM trace export configuration. | ||
|
|
||
| use std::{fmt::Debug, time::Duration}; | ||
|
|
||
| pub const DEFAULT_AGENTLESS_TIMEOUT: Duration = Duration::from_secs(15); | ||
|
|
||
| ///Agentless trace exporter configuration. | ||
| #[derive(Clone)] | ||
| pub struct AgentlessTraceConfig { | ||
| /// Full URL to POST traces to (e.g. | ||
| /// `https://public-trace-http-intake.logs.datadoghq.com/v1/input`). | ||
| pub endpoint_url: String, | ||
| /// Datadog API key used for the `dd-api-key` header. | ||
| pub api_key: String, | ||
| /// Request timeout. | ||
| pub timeout: Duration, | ||
| } | ||
|
|
||
| impl Debug for AgentlessTraceConfig { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("AgentlessTraceConfig") | ||
| .field("endpoint_url", &self.endpoint_url) | ||
| .field("api_key", &"<redacted>") | ||
| .field("timeout", &self.timeout) | ||
| .finish() | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Agentless HTTP/JSON trace exporter. | ||
|
|
||
| use super::config::AgentlessTraceConfig; | ||
| use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError}; | ||
| use http::HeaderMap; | ||
| use libdd_capabilities::{HttpClientCapability, SleepCapability}; | ||
| use libdd_common::Endpoint; | ||
| use libdd_trace_utils::send_with_retry::{ | ||
| send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError, | ||
| }; | ||
| use tracing::error; | ||
|
|
||
| const AGENTLESS_MAX_ATTEMPTS: u32 = 3; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After #2047 this should be renamed |
||
| const AGENTLESS_RETRY_DELAY_MS: u64 = 1000; | ||
|
|
||
| /// Send an agentless trace payload (JSON bytes) to the configured intake with retries. | ||
| /// | ||
| /// `headers` should already contain all required headers (api key, content-type, meta-*, | ||
| /// entity, trace-count, etc.). `test_token` is forwarded as `X-Datadog-Test-Session-Token` | ||
| /// when set, enabling snapshot tests against a local mock. | ||
| pub async fn send_agentless_traces_http<C: HttpClientCapability + SleepCapability>( | ||
| capabilities: &C, | ||
| config: &AgentlessTraceConfig, | ||
| headers: HeaderMap, | ||
| json_body: Vec<u8>, | ||
| ) -> Result<(), TraceExporterError> { | ||
| let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| { | ||
| TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!( | ||
| "Invalid agentless endpoint URL: {e}" | ||
| ))) | ||
| })?; | ||
|
|
||
| let target = Endpoint { | ||
| url, | ||
| timeout_ms: config.timeout.as_millis() as u64, | ||
| ..Endpoint::default() | ||
| }; | ||
|
|
||
| let retry_strategy = RetryStrategy::new( | ||
| AGENTLESS_MAX_ATTEMPTS, | ||
| AGENTLESS_RETRY_DELAY_MS, | ||
| RetryBackoffType::Exponential, | ||
| None, | ||
| ); | ||
|
|
||
| match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await { | ||
| Ok(_) => Ok(()), | ||
| Err(e) => Err(map_send_error(e)), | ||
| } | ||
| } | ||
|
|
||
| fn map_send_error(err: SendWithRetryError) -> TraceExporterError { | ||
| match err { | ||
| SendWithRetryError::Http(response, _) => { | ||
| let status = response.status(); | ||
| let body_str = String::from_utf8_lossy(response.body()); | ||
| match status.as_u16() { | ||
| 401 | 403 => error!( | ||
| status = status.as_u16(), | ||
| body = %body_str, | ||
| "Agentless authentication failed. Verify DD_API_KEY is valid." | ||
| ), | ||
| 404 => error!( | ||
| status = status.as_u16(), | ||
| body = %body_str, | ||
| "Agentless endpoint not found. Verify DD_SITE is correctly configured." | ||
| ), | ||
| 429 => error!( | ||
| status = status.as_u16(), | ||
| body = %body_str, | ||
| "Agentless intake rate-limited the request. Traces were dropped." | ||
| ), | ||
| 500..=599 => error!( | ||
| status = status.as_u16(), | ||
| body = %body_str, | ||
| "Agentless intake returned a server error. Traces were dropped." | ||
| ), | ||
| _ => error!( | ||
| status = status.as_u16(), | ||
| body = %body_str, | ||
| "Agentless intake returned an unexpected status." | ||
| ), | ||
| } | ||
| TraceExporterError::Request(RequestError::new(status, &body_str)) | ||
| } | ||
| SendWithRetryError::Timeout(_) => { | ||
| TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)) | ||
| } | ||
| SendWithRetryError::Network(error, _) => TraceExporterError::from(error), | ||
| SendWithRetryError::ResponseBody(_) => { | ||
| TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState( | ||
| "Failed to read agentless response body".to_string(), | ||
| )) | ||
| } | ||
| SendWithRetryError::Build(_) => TraceExporterError::Internal( | ||
| InternalErrorKind::InvalidWorkerState("Failed to build agentless request".to_string()), | ||
| ), | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Agentless APM trace export for libdatadog. | ||
| //! | ||
| //! When an agentless endpoint is configured via | ||
| //! [`crate::trace_exporter::TraceExporterBuilder::set_agentless_endpoint`], the | ||
| //! trace exporter sends APM trace spans directly to the Datadog HTTP intake | ||
| //! instead of to the local Datadog Agent. | ||
| //! | ||
| //! ## Differences from the regular agent export | ||
| //! | ||
| //! - **Transport**: `POST` to the public HTTP trace intake (default `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`, | ||
| //! or a custom URL) using `dd-api-key` auth, instead of msgpack to the local agent's | ||
| //! `/v0.4/traces`. The host language resolves the URL from `DD_SITE` and supplies the API key; | ||
| //! the exporter reads no environment variables. | ||
| //! - **Encoding**: JSON (see [`libdd_trace_utils::agentless_encoder`]) instead of msgpack v04. See | ||
| //! that module for the payload-shape differences. | ||
| //! - **Retries**: up to 3 attempts with exponential backoff starting at 1 s and no cap (the agent | ||
| //! path uses its own strategy). | ||
| //! - **Mutual exclusion with OTLP**: if both an OTLP and an agentless endpoint are configured on | ||
| //! the builder, OTLP wins and the agentless config is silently dropped with a warning at build | ||
| //! time. | ||
|
|
||
| pub(crate) mod config; | ||
| pub(crate) mod exporter; | ||
|
|
||
| pub use config::AgentlessTraceConfig; | ||
| pub use exporter::send_agentless_traces_http; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| use crate::agent_info::AgentInfoFetcher; | ||
| use crate::agentless::config::{AgentlessTraceConfig, DEFAULT_AGENTLESS_TIMEOUT}; | ||
| use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT}; | ||
| use crate::otlp::OtlpTraceConfig; | ||
| #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] | ||
|
|
@@ -65,6 +66,9 @@ pub struct TraceExporterBuilder { | |
| connection_timeout: Option<u64>, | ||
| otlp_endpoint: Option<String>, | ||
| otlp_headers: Vec<(String, String)>, | ||
| agentless_endpoint: Option<String>, | ||
| agentless_api_key: Option<String>, | ||
| agentless_timeout: Option<Duration>, | ||
| } | ||
|
|
||
| impl TraceExporterBuilder { | ||
|
|
@@ -296,6 +300,32 @@ impl TraceExporterBuilder { | |
| self | ||
| } | ||
|
|
||
| /// Enables agentless APM trace export and sets the intake URL and API key. | ||
| /// | ||
| /// When set, APM trace spans are sent directly to the Datadog HTTP intake in JSON format | ||
| /// (`POST /v1/input`) instead of through the Datadog Agent. The host language is responsible | ||
| /// for resolving the endpoint URL (default | ||
| /// `https://public-trace-http-intake.logs.{DD_SITE}` or a custom override) and the API key | ||
| /// from its configuration. This crate does not read environment variables. | ||
| /// | ||
| /// If OTLP is also configured via [`Self::set_otlp_endpoint`], OTLP takes precedence and a | ||
| /// warning is logged at build time; the agentless configuration is dropped. | ||
| /// | ||
| /// Example: `set_agentless_endpoint("https://public-trace-http-intake.logs.datadoghq.com/v1/input", "<api-key>")` | ||
| pub fn set_agentless_endpoint(&mut self, url: &str, api_key: &str) -> &mut Self { | ||
| self.agentless_endpoint = Some(url.to_owned()); | ||
| self.agentless_api_key = Some(api_key.to_owned()); | ||
| self | ||
| } | ||
|
|
||
| /// Sets the request timeout used by the agentless intake transport. | ||
| /// | ||
| /// Defaults to 15 seconds when not set. | ||
| pub fn set_agentless_timeout(&mut self, timeout: Duration) -> &mut Self { | ||
| self.agentless_timeout = Some(timeout); | ||
| self | ||
| } | ||
|
|
||
| /// Build the [`TraceExporter`] synchronously. | ||
| /// | ||
| /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. | ||
|
|
@@ -429,7 +459,35 @@ impl TraceExporterBuilder { | |
| } | ||
| }; | ||
|
|
||
| let otlp_config = self.otlp_endpoint.map(|url| { | ||
| // Resolve transport selection: OTLP takes precedence over agentless when both are set. | ||
| let (otlp_endpoint, agentless_endpoint, agentless_api_key) = match ( | ||
| self.otlp_endpoint.as_ref(), | ||
| self.agentless_endpoint.as_ref(), | ||
| ) { | ||
| (Some(_), Some(_)) => { | ||
| tracing::warn!( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the builder fail if it has conflicting configuration? SDK configuration is complex. I'm of the opinion that it's better for the complexity to be concentrated in the SDKs and let them handle things like precedence.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the same topic, the build should probably error out if |
||
| "Both OTLP and agentless trace export are configured; OTLP takes \ | ||
| precedence and the agentless configuration will be ignored." | ||
| ); | ||
| (self.otlp_endpoint, None, None) | ||
| } | ||
| _ => ( | ||
| self.otlp_endpoint, | ||
| self.agentless_endpoint, | ||
| self.agentless_api_key, | ||
| ), | ||
| }; | ||
|
|
||
| let agentless_config = match (agentless_endpoint, agentless_api_key) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agentless config is going to happen after
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the same applies to telemetry? It's built against the agent url.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think all of this also applies to otlp mode for the exporter. |
||
| (Some(url), Some(api_key)) => Some(AgentlessTraceConfig { | ||
| endpoint_url: url, | ||
| api_key, | ||
| timeout: self.agentless_timeout.unwrap_or(DEFAULT_AGENTLESS_TIMEOUT), | ||
| }), | ||
| _ => None, | ||
| }; | ||
|
|
||
| let otlp_config = otlp_endpoint.map(|url| { | ||
| let mut headers = http::HeaderMap::new(); | ||
| for (key, value) in self.otlp_headers { | ||
| match ( | ||
|
|
@@ -514,6 +572,7 @@ impl TraceExporterBuilder { | |
| .agent_rates_payload_version_enabled | ||
| .then(AgentResponsePayloadVersion::new), | ||
| otlp_config, | ||
| agentless_config, | ||
| }) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe llm obs uses the test agent as a stand-in for intake in tests. Could we do the same and add integration tests for agentless payloads?