Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 32 additions & 6 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use zstd::zstd_safe::CompressionLevel;

use datadog_trace_agent::{
aggregator::TraceAggregator,
config, env_verifier, mini_agent, proxy_flusher, stats_flusher, stats_processor,
config, env_verifier, mini_agent, proxy_flusher, stats_concentrator_service, stats_flusher,
stats_generator, stats_processor,
trace_flusher::{self, TraceFlusher},
trace_processor,
};
Expand Down Expand Up @@ -107,6 +108,12 @@ pub async fn main() {
let https_proxy = env::var("DD_PROXY_HTTPS")
.or_else(|_| env::var("HTTPS_PROXY"))
.ok();

let dd_serverless_stats_computation_enabled =
env::var("DD_SERVERLESS_STATS_COMPUTATION_ENABLED")
.map(|val| val.to_lowercase() != "false")
.unwrap_or(true);

debug!("Starting serverless trace mini agent");

let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level);
Expand All @@ -132,11 +139,6 @@ pub async fn main() {

let env_verifier = Arc::new(env_verifier::ServerlessEnvVerifier::default());

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {});

let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {});
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

let config = match config::Config::new() {
Ok(c) => Arc::new(c),
Err(e) => {
Expand All @@ -145,6 +147,30 @@ pub async fn main() {
}
};

let (stats_concentrator_handle, stats_generator) = if dd_serverless_stats_computation_enabled {
info!("serverless stats computation enabled");
let (service, handle) =
stats_concentrator_service::StatsConcentratorService::new(config.clone());
tokio::spawn(service.run());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to think about any sort of shutdown mechanism for this service, or do we rely on the existing application cleanup at the end?

(
Some(handle.clone()),
Some(Arc::new(stats_generator::StatsGenerator::new(handle))),
)
} else {
info!("serverless stats computation disabled");
(None, None)
};

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
stats_generator: stats_generator.clone(),
});

let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
stats_concentrator: stats_concentrator_handle.clone(),
force_flush_concentrator: false,
});
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

let trace_aggregator = Arc::new(TokioMutex::new(TraceAggregator::default()));
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
trace_aggregator,
Expand Down
7 changes: 6 additions & 1 deletion crates/datadog-trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ async-trait = "0.1.64"
tracing = { version = "0.1", default-features = false }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0"
thiserror = { version = "1.0.58", default-features = false }
libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [
"mini_agent",
] }
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
datadog-fips = { path = "../datadog-fips" }
reqwest = { version = "0.12.23", features = ["json", "http2"], default-features = false }
reqwest = { version = "0.12.23", features = [
"json",
"http2",
], default-features = false }
bytes = "1.10.1"

[dev-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub struct Config {
/// timeout for environment verification, in milliseconds
pub verify_env_timeout_ms: u64,
pub proxy_url: Option<String>,
pub service: Option<String>,
pub env: Option<String>,
pub version: Option<String>,
}

impl Config {
Expand Down Expand Up @@ -251,6 +254,9 @@ impl Config {
.or_else(|_| env::var("HTTPS_PROXY"))
.ok(),
tags,
service: env::var("DD_SERVICE").ok(),
env: env::var("DD_ENV").ok(),
version: env::var("DD_VERSION").ok(),
})
}
}
Expand Down Expand Up @@ -721,6 +727,9 @@ pub mod test_helpers {
proxy_request_retry_backoff_base_ms: 100,
verify_env_timeout_ms: 1000,
proxy_url: None,
service: None,
env: None,
version: None,
}
}
}
2 changes: 2 additions & 0 deletions crates/datadog-trace-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ pub mod env_verifier;
pub mod http_utils;
pub mod mini_agent;
pub mod proxy_flusher;
pub mod stats_concentrator_service;
pub mod stats_flusher;
pub mod stats_generator;
pub mod stats_processor;
pub mod trace_flusher;
pub mod trace_processor;
18 changes: 9 additions & 9 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error, warn};
use tracing::{debug, error};

use crate::http_utils::{log_and_create_http_response, verify_request_content_length};
use crate::proxy_flusher::{ProxyFlusher, ProxyRequest};
Expand Down Expand Up @@ -191,14 +191,14 @@ impl MiniAgent {
let sentinel = std::path::Path::new(LAMBDA_LITE_SENTINEL_PATH);
// SAFETY: LAMBDA_LITE_SENTINEL_PATH is a hard-coded absolute path,
// so .parent() always returns Some.
if let Some(parent) = sentinel.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
error!(
"Could not create parent directory for Lambda Lite sentinel \
if let Some(parent) = sentinel.parent()
&& let Err(e) = tokio::fs::create_dir_all(parent).await
{
error!(
"Could not create parent directory for Lambda Lite sentinel \
file at {}: {}.",
LAMBDA_LITE_SENTINEL_PATH, e
);
}
LAMBDA_LITE_SENTINEL_PATH, e
);
}
if let Err(e) = tokio::fs::write(sentinel, b"").await {
error!(
Expand Down Expand Up @@ -521,7 +521,7 @@ impl MiniAgent {
INFO_ENDPOINT_PATH,
PROFILING_ENDPOINT_PATH
],
"client_drop_p0s": true,
"client_drop_p0s": false,
"config": config_json
}
);
Expand Down
Loading
Loading