Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions libdd-data-pipeline/examples/send-traces-agentless.rs

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe llm obs uses the test agent as a stand-in for intake in tests. Could we do the same and add integration tests for agentless payloads?

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Example: send a trace directly to the Datadog agentless intake.
//!
//! Reads the API key from the `DD_API_KEY` environment variable and POSTs a
//! small trace to `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`
//! (defaulting to `datadoghq.com`).
//!
//! Usage:
//! DD_API_KEY=<key> [DD_SITE=datadoghq.eu] \
//! cargo run --example send-traces-agentless -p libdd-data-pipeline

use clap::Parser;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_data_pipeline::trace_exporter::{
TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
};
use libdd_log::logger::{
logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget,
};
use libdd_shared_runtime::SharedRuntime;
use libdd_trace_utils::span::v04::{SpanBytes, SpanEvent, SpanLink};
use rand::random;
use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH};

fn get_span(now: i64, trace_id: u128, span_id: u64) -> SpanBytes {
let duration = 1_000_000 * span_id as i64;
SpanBytes {
trace_id,
span_id,
parent_id: span_id.saturating_sub(1),
duration: 1_000_000 * span_id as i64,
start: now + duration,
service: "data-pipeline-agentless-example".into(),
name: "agentless.example".into(),
resource: "resource".into(),
error: 0,
metrics: HashMap::from([("_sampling_priority_v1".into(), 1.0)]),
span_events: vec![SpanEvent {
time_unix_nano: now as u64,
name: "event".into(),
attributes: HashMap::new(),
}],
span_links: vec![SpanLink {
trace_id: 10101010101,
trace_id_high: 1010101,
span_id,
..Default::default()
}],
..Default::default()
}
}

#[derive(Parser)]
#[command(name = "send-traces-agentless")]
#[command(about = "Send a trace to the Datadog agentless intake")]
struct Args {
/// Override the intake URL. Defaults to
/// `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`.
#[arg(long = "url")]
url: Option<String>,
}

fn main() {
logger_configure_std(StdConfig {
target: StdTarget::Out,
})
.expect("Failed to configure logger");
logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level");

let args = Args::parse();

let api_key = std::env::var("DD_API_KEY")
.expect("DD_API_KEY environment variable must be set for agentless export");
let site = std::env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string());
let intake_url = args
.url
.unwrap_or_else(|| format!("https://public-trace-http-intake.logs.{site}/v1/input"));

let shared_runtime = Arc::new(SharedRuntime::new().expect("Failed to create runtime"));

let mut builder = TraceExporter::<NativeCapabilities>::builder();
builder
.set_hostname("COMP-N661JFW6JN")
.set_env("prod")
.set_app_version(env!("CARGO_PKG_VERSION"))
.set_service("data-pipeline-agentless-example")
.set_tracer_version(env!("CARGO_PKG_VERSION"))
.set_language("nodejs")
.set_language_version(env!("CARGO_PKG_RUST_VERSION"))
.set_input_format(TraceExporterInputFormat::V04)
.set_output_format(TraceExporterOutputFormat::V04)
.set_shared_runtime(shared_runtime.clone())
.set_agentless_endpoint(&intake_url, &api_key);

let exporter = builder
.build::<NativeCapabilities>()
.expect("Failed to build TraceExporter");

let now = UNIX_EPOCH
.elapsed()
.expect("Failed to read time")
.as_nanos() as i64;

let trace_id = random();
let trace: Vec<_> = (1..=3).map(|i| get_span(now, trace_id, i)).collect();
let traces = vec![trace];

exporter
.send_trace_chunks(traces)
.expect("Failed to send traces");
println!("Trace sent to agentless intake at {intake_url}");

shared_runtime
.shutdown(None)
.expect("Failed to shutdown runtime");
}
30 changes: 30 additions & 0 deletions libdd-data-pipeline/src/agentless/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Agentless APM trace export configuration.

use std::{fmt::Debug, time::Duration};

pub const DEFAULT_AGENTLESS_TIMEOUT: Duration = Duration::from_secs(15);

///Agentless trace exporter configuration.
#[derive(Clone)]
pub struct AgentlessTraceConfig {
/// Full URL to POST traces to (e.g.
/// `https://public-trace-http-intake.logs.datadoghq.com/v1/input`).
pub endpoint_url: String,
/// Datadog API key used for the `dd-api-key` header.
pub api_key: String,
/// Request timeout.
pub timeout: Duration,
}

impl Debug for AgentlessTraceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AgentlessTraceConfig")
.field("endpoint_url", &self.endpoint_url)
.field("api_key", &"<redacted>")
.field("timeout", &self.timeout)
.finish()
}
}
102 changes: 102 additions & 0 deletions libdd-data-pipeline/src/agentless/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Agentless HTTP/JSON trace exporter.

use super::config::AgentlessTraceConfig;
use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
use http::HeaderMap;
use libdd_capabilities::{HttpClientCapability, SleepCapability};
use libdd_common::Endpoint;
use libdd_trace_utils::send_with_retry::{
send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError,
};
use tracing::error;

const AGENTLESS_MAX_ATTEMPTS: u32 = 3;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #2047 this should be renamed AGENTLESS_MAX_RETRIES

const AGENTLESS_RETRY_DELAY_MS: u64 = 1000;

/// Send an agentless trace payload (JSON bytes) to the configured intake with retries.
///
/// `headers` should already contain all required headers (api key, content-type, meta-*,
/// entity, trace-count, etc.). `test_token` is forwarded as `X-Datadog-Test-Session-Token`
/// when set, enabling snapshot tests against a local mock.
pub async fn send_agentless_traces_http<C: HttpClientCapability + SleepCapability>(
capabilities: &C,
config: &AgentlessTraceConfig,
headers: HeaderMap,
json_body: Vec<u8>,
) -> Result<(), TraceExporterError> {
let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| {
TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!(
"Invalid agentless endpoint URL: {e}"
)))
})?;

let target = Endpoint {
url,
timeout_ms: config.timeout.as_millis() as u64,
..Endpoint::default()
};

let retry_strategy = RetryStrategy::new(
AGENTLESS_MAX_ATTEMPTS,
AGENTLESS_RETRY_DELAY_MS,
RetryBackoffType::Exponential,
None,
);

match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await {
Ok(_) => Ok(()),
Err(e) => Err(map_send_error(e)),
}
}

fn map_send_error(err: SendWithRetryError) -> TraceExporterError {
match err {
SendWithRetryError::Http(response, _) => {
let status = response.status();
let body_str = String::from_utf8_lossy(response.body());
match status.as_u16() {
401 | 403 => error!(
status = status.as_u16(),
body = %body_str,
"Agentless authentication failed. Verify DD_API_KEY is valid."
),
404 => error!(
status = status.as_u16(),
body = %body_str,
"Agentless endpoint not found. Verify DD_SITE is correctly configured."
),
429 => error!(
status = status.as_u16(),
body = %body_str,
"Agentless intake rate-limited the request. Traces were dropped."
),
500..=599 => error!(
status = status.as_u16(),
body = %body_str,
"Agentless intake returned a server error. Traces were dropped."
),
_ => error!(
status = status.as_u16(),
body = %body_str,
"Agentless intake returned an unexpected status."
),
}
TraceExporterError::Request(RequestError::new(status, &body_str))
}
SendWithRetryError::Timeout(_) => {
TraceExporterError::Io(std::io::Error::from(std::io::ErrorKind::TimedOut))
}
SendWithRetryError::Network(error, _) => TraceExporterError::from(error),
SendWithRetryError::ResponseBody(_) => {
TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(
"Failed to read agentless response body".to_string(),
))
}
SendWithRetryError::Build(_) => TraceExporterError::Internal(
InternalErrorKind::InvalidWorkerState("Failed to build agentless request".to_string()),
),
}
}
29 changes: 29 additions & 0 deletions libdd-data-pipeline/src/agentless/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Agentless APM trace export for libdatadog.
//!
//! When an agentless endpoint is configured via
//! [`crate::trace_exporter::TraceExporterBuilder::set_agentless_endpoint`], the
//! trace exporter sends APM trace spans directly to the Datadog HTTP intake
//! instead of to the local Datadog Agent.
//!
//! ## Differences from the regular agent export
//!
//! - **Transport**: `POST` to the public HTTP trace intake (default `https://public-trace-http-intake.logs.{DD_SITE}/v1/input`,
//! or a custom URL) using `dd-api-key` auth, instead of msgpack to the local agent's
//! `/v0.4/traces`. The host language resolves the URL from `DD_SITE` and supplies the API key;
//! the exporter reads no environment variables.
//! - **Encoding**: JSON (see [`libdd_trace_utils::agentless_encoder`]) instead of msgpack v04. See
//! that module for the payload-shape differences.
//! - **Retries**: up to 3 attempts with exponential backoff starting at 1 s and no cap (the agent
//! path uses its own strategy).
//! - **Mutual exclusion with OTLP**: if both an OTLP and an agentless endpoint are configured on
//! the builder, OTLP wins and the agentless config is silently dropped with a warning at build
//! time.

pub(crate) mod config;
pub(crate) mod exporter;

pub use config::AgentlessTraceConfig;
pub use exporter::send_agentless_traces_http;
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! in different languages.

pub mod agent_info;
pub(crate) mod agentless;
mod health_metrics;
pub(crate) mod otlp;
#[cfg(feature = "telemetry")]
Expand Down
61 changes: 60 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::agent_info::AgentInfoFetcher;
use crate::agentless::config::{AgentlessTraceConfig, DEFAULT_AGENTLESS_TIMEOUT};
use crate::otlp::config::{OtlpProtocol, DEFAULT_OTLP_TIMEOUT};
use crate::otlp::OtlpTraceConfig;
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
Expand Down Expand Up @@ -65,6 +66,9 @@ pub struct TraceExporterBuilder {
connection_timeout: Option<u64>,
otlp_endpoint: Option<String>,
otlp_headers: Vec<(String, String)>,
agentless_endpoint: Option<String>,
agentless_api_key: Option<String>,
agentless_timeout: Option<Duration>,
}

impl TraceExporterBuilder {
Expand Down Expand Up @@ -296,6 +300,32 @@ impl TraceExporterBuilder {
self
}

/// Enables agentless APM trace export and sets the intake URL and API key.
///
/// When set, APM trace spans are sent directly to the Datadog HTTP intake in JSON format
/// (`POST /v1/input`) instead of through the Datadog Agent. The host language is responsible
/// for resolving the endpoint URL (default
/// `https://public-trace-http-intake.logs.{DD_SITE}` or a custom override) and the API key
/// from its configuration. This crate does not read environment variables.
///
/// If OTLP is also configured via [`Self::set_otlp_endpoint`], OTLP takes precedence and a
/// warning is logged at build time; the agentless configuration is dropped.
///
/// Example: `set_agentless_endpoint("https://public-trace-http-intake.logs.datadoghq.com/v1/input", "<api-key>")`
pub fn set_agentless_endpoint(&mut self, url: &str, api_key: &str) -> &mut Self {
self.agentless_endpoint = Some(url.to_owned());
self.agentless_api_key = Some(api_key.to_owned());
self
}

/// Sets the request timeout used by the agentless intake transport.
///
/// Defaults to 15 seconds when not set.
pub fn set_agentless_timeout(&mut self, timeout: Duration) -> &mut Self {
self.agentless_timeout = Some(timeout);
self
}

/// Build the [`TraceExporter`] synchronously.
///
/// Sync facade over [`Self::build_async`]; panics inside an existing tokio context.
Expand Down Expand Up @@ -429,7 +459,35 @@ impl TraceExporterBuilder {
}
};

let otlp_config = self.otlp_endpoint.map(|url| {
// Resolve transport selection: OTLP takes precedence over agentless when both are set.
let (otlp_endpoint, agentless_endpoint, agentless_api_key) = match (
self.otlp_endpoint.as_ref(),
self.agentless_endpoint.as_ref(),
) {
(Some(_), Some(_)) => {
tracing::warn!(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the builder fail if it has conflicting configuration? SDK configuration is complex. I'm of the opinion that it's better for the complexity to be concentrated in the SDKs and let them handle things like precedence.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the same topic, the build should probably error out if set_url and output_format is used and agentless is set at the same time.

"Both OTLP and agentless trace export are configured; OTLP takes \
precedence and the agentless configuration will be ignored."
);
(self.otlp_endpoint, None, None)
}
_ => (
self.otlp_endpoint,
self.agentless_endpoint,
self.agentless_api_key,
),
};

let agentless_config = match (agentless_endpoint, agentless_api_key) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agentless config is going to happen after AgentInfoFetcher is spawned here. Won't that fail every 5 minutes when it polls an agent that doesn't exist?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the same applies to telemetry? It's built against the agent url.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of this also applies to otlp mode for the exporter.

(Some(url), Some(api_key)) => Some(AgentlessTraceConfig {
endpoint_url: url,
api_key,
timeout: self.agentless_timeout.unwrap_or(DEFAULT_AGENTLESS_TIMEOUT),
}),
_ => None,
};

let otlp_config = otlp_endpoint.map(|url| {
let mut headers = http::HeaderMap::new();
for (key, value) in self.otlp_headers {
match (
Expand Down Expand Up @@ -514,6 +572,7 @@ impl TraceExporterBuilder {
.agent_rates_payload_version_enabled
.then(AgentResponsePayloadVersion::new),
otlp_config,
agentless_config,
})
}

Expand Down
Loading
Loading