Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions libdd-data-pipeline/src/otlp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,20 @@ pub struct OtlpTraceConfig {
#[allow(dead_code)]
pub(crate) protocol: OtlpProtocol,
}

/// Parsed OTLP trace-metrics exporter configuration.
#[derive(Clone, Debug)]
pub struct OtlpMetricsConfig {
/// Full URL to POST metrics to (e.g. `http://localhost:4318/v1/metrics`).
pub endpoint_url: String,
/// Pre-validated HTTP headers to include in each request.
pub headers: HeaderMap,
/// Request timeout.
pub timeout: Duration,
/// Protocol (for future use; currently only HttpJson is supported).
#[allow(dead_code)]
pub(crate) protocol: OtlpProtocol,
/// When `true`, emit only OTel attributes; omit `dd.*`/`_dd.*` ones
/// (`DD_TRACE_OTEL_SEMANTICS_ENABLED`).
pub otel_trace_semantics_enabled: bool,
}
55 changes: 40 additions & 15 deletions libdd-data-pipeline/src/otlp/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@

use super::config::OtlpTraceConfig;
use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
use http::HeaderMap;
use libdd_capabilities::{HttpClientCapability, SleepCapability};
use libdd_common::Endpoint;
use libdd_trace_utils::send_with_retry::{
send_with_retry, RetryBackoffType, RetryStrategy, SendWithRetryError,
};
use std::time::Duration;

/// Max retries for OTLP export.
const OTLP_MAX_RETRIES: u32 = 4;
/// Initial backoff between retries (milliseconds).
/// Max retries for OTLP export on transient failures.
pub(crate) const OTLP_MAX_RETRIES: u32 = 4;
/// No retries on shutdown to avoid a long backoff in the shutdown window.
pub(crate) const OTLP_SHUTDOWN_MAX_RETRIES: u32 = 0;
const OTLP_RETRY_DELAY_MS: u64 = 100;

/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries.
///
/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters.
///
/// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests
/// against the Datadog test agent's OTLP endpoint.
pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
/// POST an OTLP HTTP/JSON payload to `endpoint_url`; `test_token` enables snapshot tests.
pub(crate) async fn send_otlp_http<C: HttpClientCapability + SleepCapability>(
capabilities: &C,
config: &OtlpTraceConfig,
endpoint_url: &str,
config_headers: &HeaderMap,
timeout: Duration,
test_token: Option<&str>,
json_body: Vec<u8>,
max_retries: u32,
) -> Result<(), TraceExporterError> {
let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| {
let url = libdd_common::parse_uri(endpoint_url).map_err(|e| {
TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!(
"Invalid OTLP endpoint URL: {}",
e
Expand All @@ -37,11 +38,11 @@ pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(

let target = Endpoint {
url,
timeout_ms: config.timeout.as_millis() as u64,
timeout_ms: timeout.as_millis() as u64,
..Endpoint::default()
};

let mut headers = config.headers.clone();
let mut headers = config_headers.clone();
headers.insert(
http::header::CONTENT_TYPE,
libdd_common::header::APPLICATION_JSON,
Expand All @@ -56,7 +57,7 @@ pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
}

let retry_strategy = RetryStrategy::new(
OTLP_MAX_RETRIES,
max_retries,
OTLP_RETRY_DELAY_MS,
RetryBackoffType::Exponential,
None,
Expand All @@ -68,6 +69,30 @@ pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
}
}

/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries.
///
/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters.
///
/// `test_token` is forwarded as `X-Datadog-Test-Session-Token` when set, enabling snapshot tests
/// against the Datadog test agent's OTLP endpoint.
pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
capabilities: &C,
config: &OtlpTraceConfig,
test_token: Option<&str>,
json_body: Vec<u8>,
) -> Result<(), TraceExporterError> {
send_otlp_http(
capabilities,
&config.endpoint_url,
&config.headers,
config.timeout,
test_token,
json_body,
OTLP_MAX_RETRIES,
)
.await
}

async fn map_send_error(err: SendWithRetryError) -> TraceExporterError {
match err {
SendWithRetryError::Http(response, _) => {
Expand Down
Loading
Loading