Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions datadog-opentelemetry/src/core/configuration/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use libdd_telemetry::data::Configuration;
use std::collections::{HashSet, VecDeque};
use std::fmt::Display;
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -92,6 +93,8 @@ pub const TRACER_VERSION: &str = env!("CARGO_PKG_VERSION");

const DATADOG_TAGS_MAX_LENGTH: usize = 512;
const RC_DEFAULT_POLL_INTERVAL: f64 = 5.0; // 5 seconds is the highest interval allowed by the spec
const DEFAULT_UNIX_TRACE_AGENT_URL: &str = "/var/run/datadog/apm.socket";
const DEFAULT_UNIX_DOGSTATSD_AGENT_URL: &str = "/var/run/datadog/dsd.socket";

/// OTLP protocol types for OTLP export.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -274,7 +277,6 @@ impl<T: Clone + ConfigurationValueProvider> ConfigItem<T> {
}

/// Gets the source of the current value
#[allow(dead_code)] // Used in tests and will be used for remote configuration
fn source(&self) -> ConfigSourceOrigin {
if self.code_value.is_some() {
ConfigSourceOrigin::Code
Expand All @@ -285,6 +287,10 @@ impl<T: Clone + ConfigurationValueProvider> ConfigItem<T> {
}
}

fn is_default_value(&self) -> bool {
self.source() == ConfigSourceOrigin::Default
}

fn build_configurations_list(&self, calculated_value: Option<String>) -> Vec<Configuration> {
let mut configurations = Vec::new();
// Always include the default value
Expand Down Expand Up @@ -1913,21 +1919,43 @@ impl ConfigBuilder {
// resolve trace_agent_url
// this will send the the config through telemetry with `calculated` origin.
if config.trace_agent_url.value().is_empty() {
let host = &config.agent_host.value();
let port = *config.trace_agent_port.value();
config
.trace_agent_url
.set_calculated(Cow::Owned(format!("http://{host}:{port}")));
let uds_is_alive = Path::new(DEFAULT_UNIX_TRACE_AGENT_URL)
.try_exists()
.unwrap_or(false);

// if user hasn't provided agent_host nor agent_port and UDS is alive, use it
let url = if config.agent_host.is_default_value()
&& config.trace_agent_port.is_default_value()
&& uds_is_alive
{
Cow::Owned(format!("unix://{DEFAULT_UNIX_TRACE_AGENT_URL}"))
} else {
let host = &config.agent_host.value();
let port = *config.trace_agent_port.value();
Cow::Owned(format!("http://{host}:{port}"))
};
config.trace_agent_url.set_calculated(url);
}

// resolve dogstatsd_agent_url
// this will send the the config through telemetry with `calculated` origin.
if config.dogstatsd_agent_url.value().is_empty() {
let host = &config.dogstatsd_agent_host.value();
let port = *config.dogstatsd_agent_port.value();
config
.dogstatsd_agent_url
.set_calculated(Cow::Owned(format!("http://{host}:{port}")));
let uds_is_alive = Path::new(DEFAULT_UNIX_DOGSTATSD_AGENT_URL)
.try_exists()
.unwrap_or(false);

// if user hasn't provided agent_host nor agent_port and UDS is alive, use it
let url = if config.agent_host.is_default_value()
&& config.trace_agent_port.is_default_value()
&& uds_is_alive
{
Cow::Owned(format!("unix://{DEFAULT_UNIX_DOGSTATSD_AGENT_URL}"))
} else {
let host = &config.dogstatsd_agent_host.value();
let port = *config.dogstatsd_agent_port.value();
Cow::Owned(format!("http://{host}:{port}"))
};
config.dogstatsd_agent_url.set_calculated(url);
}

config
Expand Down
50 changes: 24 additions & 26 deletions datadog-opentelemetry/src/core/configuration/remote_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ use crate::core::utils::{ShutdownSignaler, WorkerHandle};

use anyhow::Result;
use core::fmt;
use libdd_common::http_common::{self};
use libdd_common::{connector::Connector::Http, Endpoint, HttpClient};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread::{self};
use std::time::{Duration, Instant};

// HTTP client imports
use http_body_util::{BodyExt, Full};
use hyper::{body::Bytes, Method, Request};
use http_body_util::BodyExt;
use hyper::Method;
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;

Expand Down Expand Up @@ -411,22 +413,22 @@ struct RemoteConfigClient {
/// Different from runtime_id - each RemoteConfigClient gets its own UUID
client_id: String,
config: Arc<Config>,
agent_url: hyper::Uri,
// HTTP client timeout configuration
client_timeout: Duration,
agent_endpoint: Endpoint,
state: Arc<Mutex<ClientState>>,
capabilities: ClientCapabilities,
poll_interval: Duration,
// Cache of successfully applied configurations
cached_target_files: Vec<CachedTargetFile>,
// Registry of product handlers for processing different config types
product_registry: ProductRegistry,
// default http client
http_client: HttpClient,
}

impl RemoteConfigClient {
/// Creates a new remote configuration client
pub fn new(config: Arc<Config>) -> Result<Self, RemoteConfigClientError> {
let agent_url = hyper::Uri::from_maybe_shared(config.trace_agent_url().to_string())
let agent_url = libdd_common::parse_uri(&config.trace_agent_url())
.map_err(|_| RemoteConfigClientError::InvalidAgentUri)?;
let mut parts = agent_url.into_parts();
parts.path_and_query = Some(
Expand All @@ -437,6 +439,8 @@ impl RemoteConfigClient {
let agent_url =
hyper::Uri::from_parts(parts).map_err(|_| RemoteConfigClientError::InvalidAgentUri)?;

let agent_endpoint = libdd_common::Endpoint::from_url(agent_url);

let state = Arc::new(Mutex::new(ClientState {
root_version: 1, // Agent requires >= 1 (base TUF director root)
targets_version: 0,
Expand All @@ -448,50 +452,44 @@ impl RemoteConfigClient {

let poll_interval = Duration::from_secs_f64(config.remote_config_poll_interval());

// Create HTTP connector with timeout configuration
let mut connector = HttpConnector::new();
connector.set_connect_timeout(Some(DEFAULT_TIMEOUT));

Ok(Self {
client_id: uuid::Uuid::new_v4().to_string(),
config,
agent_url,
client_timeout: DEFAULT_TIMEOUT,
agent_endpoint,
state,
capabilities: ClientCapabilities::new(),
poll_interval,
cached_target_files: Vec::new(),
product_registry: ProductRegistry::new(),
http_client: Client::builder(TokioExecutor::default()).build(Http(connector)),
})
}

/// Fetches configuration from the agent and applies it
async fn fetch_and_apply_config(&mut self) -> Result<()> {
// Create HTTP connector with timeout configuration
let mut connector = HttpConnector::new();
connector.set_connect_timeout(Some(self.client_timeout));

// Create HTTP client for this request
// TODO(paullgdc): this doesn't support UDS
// We should instead use the client in ddcommon::hyper_migration and the helper methods
// to encode the URI the way it expects for UDS
let client = Client::builder(TokioExecutor::new()).build(connector);

let request_payload = self.build_request()?;

// Serialize the request to JSON
let json_body = serde_json::to_string(&request_payload)
.map_err(|e| anyhow::anyhow!("Failed to serialize request: {}", e))?;

// Parse the agent URL
let req_builder = self
.agent_endpoint
.to_request_builder("dd-trace-rs")
.map_err(|e| anyhow::anyhow!("Failed to build request builder: {}", e))?;

// Build HTTP request
let req = Request::builder()
let req = req_builder
.method(Method::POST)
.uri(self.agent_url.clone())
.header("content-type", "application/json")
.header("user-agent", "dd-trace-rs")
.body(Full::new(Bytes::from(json_body)))
.body(http_common::Body::from(json_body))
.map_err(|e| anyhow::anyhow!("Failed to build request: {}", e))?;

// Send request to agent
let response = client
let response = self
.http_client
.request(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send request: {}", e))?;
Expand Down
3 changes: 1 addition & 2 deletions datadog-opentelemetry/src/core/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ fn make_telemetry_worker(
custom_handle: Option<Box<dyn TelemetryHandle>>,
) -> Result<Box<dyn TelemetryHandle>, Error> {
if custom_handle.is_none() {
let mut builder = worker::TelemetryWorkerBuilder::new(
config.trace_agent_url().to_string(),
let mut builder = worker::TelemetryWorkerBuilder::new_fetch_host(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

config.service().to_string(),
config.language().to_string(),
config.language_version().to_string(),
Expand Down
Loading