diff --git a/.github/workflows/build-datadog-serverless-compat.yml b/.github/workflows/build-datadog-serverless-compat.yml index 0c72908..eca3b2f 100644 --- a/.github/workflows/build-datadog-serverless-compat.yml +++ b/.github/workflows/build-datadog-serverless-compat.yml @@ -56,7 +56,7 @@ jobs: retention-days: 3 - if: ${{ inputs.runner == 'windows-2022' }} shell: bash - run: cargo build --release -p datadog-serverless-compat --features windows-pipes + run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index fc640c5..7863afd 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -95,7 +95,7 @@ jobs: - shell: bash run: | if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then - cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes + cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics else cargo nextest run --workspace fi diff --git a/Cargo.lock b/Cargo.lock index 9a7c7be..4268d69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,6 +467,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "datadog-metrics-collector" +version = "0.1.0" +dependencies = [ + "dogstatsd", + "libdd-common 1.1.0", + "num_cpus", + "tracing", +] + [[package]] name = "datadog-opentelemetry" version = "0.3.0" @@ -519,6 +529,7 @@ name = "datadog-serverless-compat" version = "0.1.0" dependencies = [ "datadog-fips", + "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils 3.0.0", @@ -1027,6 +1038,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -1409,6 +1426,35 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "libdd-common" +version = "1.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=d52ee90209cb12a28bdda0114535c1a985a29d95#d52ee90209cb12a28bdda0114535c1a985a29d95" +dependencies = [ + "anyhow", + "cc", + "const_format", + "futures", + "futures-core", + "futures-util", + "hex", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "libc", + "nix", + "pin-project", + "regex", + "serde", + "static_assertions", + "thiserror 1.0.69", + "tokio", + "tower-service", + "windows-sys 0.52.0", +] + [[package]] name = "libdd-common" version = "2.0.1" @@ -1876,6 +1922,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.4" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 2d04b2a..b7890b7 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -85,6 +85,7 @@ headers,https://github.com/hyperium/headers,MIT,Sean McArthur heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,Without Boats +hermit-abi,https://github.com/hermit-os/hermit-rs,MIT OR Apache-2.0,Stefan Lankes hex,https://github.com/KokaKiwi/rust-hex,MIT OR Apache-2.0,KokaKiwi home,https://github.com/rust-lang/cargo,MIT OR Apache-2.0,Brian Anderson http,https://github.com/hyperium/http,MIT OR Apache-2.0,"Alex Crichton , Carl Lerche , Sean McArthur " @@ -147,6 +148,7 @@ nix,https://github.com/nix-rust/nix,MIT,The nix-rust Project Developers nom,https://github.com/Geal/nom,MIT,contact@geoffroycouprie.com nu-ansi-term,https://github.com/nushell/nu-ansi-term,MIT,"ogham@bsago.me, Ryan Scheel (Havvy) , Josh Triplett , The Nushell Project Developers" num-traits,https://github.com/rust-num/num-traits,MIT OR Apache-2.0,The Rust Project Developers +num_cpus,https://github.com/seanmonstar/num_cpus,MIT OR Apache-2.0,Sean McArthur once_cell,https://github.com/matklad/once_cell,MIT OR Apache-2.0,Aleksey Kladov openssl-probe,https://github.com/rustls/openssl-probe,MIT OR Apache-2.0,Alex Crichton opentelemetry,https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry,Apache-2.0,The opentelemetry Authors diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml new file mode 100644 index 0000000..98ee2a4 --- /dev/null +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "datadog-metrics-collector" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" + +[dependencies] +dogstatsd = { path = "../dogstatsd", default-features = true } +num_cpus = "1.16" +tracing = { version = "0.1", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", default-features = false } + +[features] +windows-enhanced-metrics = [] diff --git a/crates/datadog-metrics-collector/src/cpu.rs b/crates/datadog-metrics-collector/src/cpu.rs new file mode 100644 index 0000000..9580201 --- /dev/null +++ b/crates/datadog-metrics-collector/src/cpu.rs @@ -0,0 +1,170 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions +//! +//! This module provides OS-agnostic CPU stats collection, CPU usage +//! and limit computation, and metrics submission to Datadog. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use dogstatsd::aggregator::AggregatorHandle; +use dogstatsd::metric::{Metric, MetricValue, SortedTags}; +use libdd_common::azure_app_services; +use std::env; +use tracing::{debug, error}; + +const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.cpu.usage"; +const CPU_LIMIT_METRIC: &str = "azure.functions.enhanced.cpu.limit"; + +/// Computed CPU total and limit metrics +pub struct CpuStats { + pub total: u64, // Cumulative CPU usage in nanoseconds + pub limit: Option, // CPU limit in nanocores + pub defaulted_limit: bool, // Whether CPU limit was defaulted to host CPU count +} + +pub trait CpuStatsReader { + fn read(&self) -> Option; +} + +pub struct CpuMetricsCollector { + reader: Box, + aggregator: AggregatorHandle, + tags: Option, + last_usage_ns: Option, + last_collection_time: std::time::Instant, +} + +impl CpuMetricsCollector { + /// Creates a new CpuMetricsCollector + /// + /// # Arguments + /// + /// * `aggregator` - The aggregator handle to submit metrics to + /// * `tags` - Optional tags to attach to all metrics + pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { + #[cfg(feature = "windows-enhanced-metrics")] + let reader: Box = Box::new(crate::windows::WindowsCpuStatsReader); + #[cfg(not(feature = "windows-enhanced-metrics"))] + let reader: Box = Box::new(crate::linux::LinuxCpuStatsReader); + Self { + reader, + aggregator, + tags, + last_usage_ns: None, + last_collection_time: std::time::Instant::now(), + } + } + + pub fn collect_and_submit(&mut self) { + if let Some(cpu_stats) = self.reader.read() { + let current_usage_ns = cpu_stats.total; + let now_instant = std::time::Instant::now(); + + // Skip first collection + let Some(last_usage_ns) = self.last_usage_ns else { + debug!("First CPU collection, skipping interval"); + self.last_usage_ns = Some(current_usage_ns); + self.last_collection_time = now_instant; + return; + }; + + if current_usage_ns < last_usage_ns { + debug!("Current CPU usage is less than last usage, skipping interval"); + self.last_usage_ns = Some(current_usage_ns); + self.last_collection_time = now_instant; + return; + } + + let delta_ns = (current_usage_ns - last_usage_ns) as f64; + self.last_usage_ns = Some(current_usage_ns); + let elapsed_secs = now_instant + .duration_since(self.last_collection_time) + .as_secs_f64(); + self.last_collection_time = now_instant; + if elapsed_secs <= 0.0 { + debug!("Elapsed time is less than or equal to 0, skipping interval"); + return; + } + + // Divide nanoseconds delta by elapsed time to get usage rate in nanocores + let usage_rate_nc = delta_ns / elapsed_secs; + + let now = std::time::UNIX_EPOCH + .elapsed() + .map(|d| d.as_secs()) + .unwrap_or(0) + .try_into() + .unwrap_or(0); + + let usage_metric = Metric::new( + CPU_USAGE_METRIC.into(), + MetricValue::distribution(usage_rate_nc), + self.tags.clone(), + Some(now), + ); + + if let Err(e) = self.aggregator.insert_batch(vec![usage_metric]) { + error!("Failed to insert CPU usage metric: {}", e); + } + + if let Some(limit) = cpu_stats.limit { + if cpu_stats.defaulted_limit { + debug!("CPU limit defaulted to host CPU count"); + } + let limit_metric = Metric::new( + CPU_LIMIT_METRIC.into(), + MetricValue::distribution(limit), + self.tags.clone(), + Some(now), + ); + if let Err(e) = self.aggregator.insert_batch(vec![limit_metric]) { + error!("Failed to insert CPU limit metric: {}", e); + } + } + } else { + debug!( + "Skipping CPU metrics collection - could not find data to generate CPU usage and limit enhanced metrics" + ); + } + } +} + +pub fn build_cpu_metrics_tags() -> Option { + let mut tag_parts = Vec::new(); + // Azure tags from libdd_common + if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { + let aas_tags = [ + ("resource_group", aas_metadata.get_resource_group()), + ("subscription_id", aas_metadata.get_subscription_id()), + ("name", aas_metadata.get_site_name()), + ]; + for (name, value) in aas_tags { + if value != "unknown" { + tag_parts.push(format!("{}:{}", name, value)); + } + } + } + + // Tags from env vars (not in libdd_common) - origin tag is added by DogStatsD + for (tag_name, env_var) in [ + ("region", "REGION_NAME"), + ("plan_tier", "WEBSITE_SKU"), + ("service", "DD_SERVICE"), + ("env", "DD_ENV"), + ("version", "DD_VERSION"), + ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), + ] { + if let Ok(val) = env::var(env_var) + && !val.is_empty() + { + tag_parts.push(format!("{}:{}", tag_name, val)); + } + } + + if tag_parts.is_empty() { + return None; + } + SortedTags::parse(&tag_parts.join(",")).ok() +} diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs new file mode 100644 index 0000000..aa565e5 --- /dev/null +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -0,0 +1,14 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod cpu; +#[cfg(not(feature = "windows-enhanced-metrics"))] +pub(crate) mod linux; +#[cfg(feature = "windows-enhanced-metrics")] +pub(crate) mod windows; diff --git a/crates/datadog-metrics-collector/src/linux.rs b/crates/datadog-metrics-collector/src/linux.rs new file mode 100644 index 0000000..1eb072a --- /dev/null +++ b/crates/datadog-metrics-collector/src/linux.rs @@ -0,0 +1,338 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions (Linux) +//! +//! This module provides functionality to read raw CPU statistics from cgroup v1 files +//! and compute the CPU usage and limit in Linux environments. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use crate::cpu::{CpuStats, CpuStatsReader}; +use std::fs; +use std::io; +use tracing::debug; + +const CGROUP_CPU_USAGE_PATH: &str = "/sys/fs/cgroup/cpu/cpuacct.usage"; // Reports the total CPU time, in nanoseconds, consumed by all tasks in this cgroup +const CGROUP_CPUSET_CPUS_PATH: &str = "/sys/fs/cgroup/cpuset/cpuset.cpus"; // Specifies the CPUs that tasks in this cgroup are permitted to access +const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // Specifies a period of time, in microseconds, for how regularly a cgroup's access to CPU resources should be reallocated +const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period + +/// Statistics from cgroup v1 files, normalized to nanoseconds +struct CgroupStats { + total: Option, // Cumulative CPU usage (from cpuacct.usage) in nanoseconds + cpu_count: Option, // Number of accessible logical CPUs (from cpuset.cpus) + scheduler_period: Option, // CFS scheduler period (from cpu.cfs_period_us) in nanoseconds + scheduler_quota: Option, // CFS scheduler quota (from cpu.cfs_quota_us) in nanoseconds +} + +pub struct LinuxCpuStatsReader; + +impl CpuStatsReader for LinuxCpuStatsReader { + fn read(&self) -> Option { + let cgroup_stats = read_cgroup_stats(); + build_cpu_stats(&cgroup_stats) + } +} + +/// Builds CPU stats - rate and limit +fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { + let total = cgroup_stats.total?; + + let (limit_nc, defaulted) = compute_cpu_limit_nc(cgroup_stats); + + Some(CpuStats { + total, + limit: Some(limit_nc), + defaulted_limit: defaulted, + }) +} + +/// Reads raw CPU statistics from cgroup v1 files and converts to nanoseconds +fn read_cgroup_stats() -> CgroupStats { + let total = fs::read_to_string(CGROUP_CPU_USAGE_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().ok()); + if total.is_none() { + debug!("Could not read CPU usage from {CGROUP_CPU_USAGE_PATH}"); + } + + let cpu_count = read_cpu_count_from_file(CGROUP_CPUSET_CPUS_PATH).ok(); + if cpu_count.is_none() { + debug!("Could not read CPU count from {CGROUP_CPUSET_CPUS_PATH}"); + } + + let scheduler_period = fs::read_to_string(CGROUP_CPU_PERIOD_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().map(|v| v * 1000).ok()); // Convert from microseconds to nanoseconds + if scheduler_period.is_none() { + debug!("Could not read scheduler period from {CGROUP_CPU_PERIOD_PATH}"); + } + + let scheduler_quota = match fs::read_to_string(CGROUP_CPU_QUOTA_PATH) { + Ok(contents) => { + let trimmed = contents.trim(); + match trimmed.parse::() { + Ok(quota) => { + // Convert from microseconds to nanoseconds + if quota == -1 { + debug!("CFS scheduler quota is -1, setting to None"); + None + } else { + Some((quota * 1000) as u64) + } + } + Err(e) => { + debug!("Could not parse scheduler quota from {CGROUP_CPU_QUOTA_PATH}: {e}"); + None + } + } + } + Err(e) => { + debug!("Could not read scheduler quota from {CGROUP_CPU_QUOTA_PATH}: {e}"); + None + } + }; + + CgroupStats { + total, + cpu_count, + scheduler_period, + scheduler_quota, + } +} + +/// Reads CPU count from cpuset.cpus +/// +/// The cpuset.cpus file contains a comma-separated list, with dashes to represent ranges of CPUs, +/// e.g., "0-2,16" represents CPUs 0, 1, 2, and 16 +/// This function returns the count of CPUs, in this case 4. +fn read_cpu_count_from_file(path: &str) -> Result { + let contents = fs::read_to_string(path)?; + let cpuset_str = contents.trim(); + if cpuset_str.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("File {path} is empty"), + )); + } + + let mut cpu_count: u64 = 0; + + for part in cpuset_str.split(',') { + let range: Vec<&str> = part.split('-').collect(); + match range.len() { + 2 => { + // Range like "0-3" + let start: u64 = range[0].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + let end: u64 = range[1].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + if end < start { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid CPU range: {range:?}"), + )); + } + cpu_count += end - start + 1; + } + 1 => { + // Single CPU like "2" + part.parse::().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from cpu index {part:?}: {e}"), + ) + })?; + cpu_count += 1; + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Malformed cpuset segment: {part:?}"), + )); + } + } + } + Ok(cpu_count) +} + +/// Computes the CPU limit in nanocores, with fallback to host CPU count +fn compute_cpu_limit_nc(cgroup_stats: &CgroupStats) -> (f64, bool) { + match compute_cgroup_cpu_limit_nc(cgroup_stats) { + Some(limit) => (limit, false), + None => { + let host_cpu_count = num_cpus::get() as f64; + (host_cpu_count * 1000000000.0, true) // Convert to nanocores + } + } +} + +/// Computes the CPU limit in nanocores from cgroup statistics +/// Limit is computed using min(CPUSet, CFS CPU Quota) +fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { + let mut limit_nc = None; + + if let Some(cpu_count) = cgroup_stats.cpu_count { + let host_cpu_count = num_cpus::get() as u64; + if cpu_count != host_cpu_count { + let cpuset_limit_nc = cpu_count as f64 * 1000000000.0; // Convert to nanocores + limit_nc = Some(cpuset_limit_nc); + } + } + + if let (Some(scheduler_quota), Some(scheduler_period)) = + (cgroup_stats.scheduler_quota, cgroup_stats.scheduler_period) + { + let quota_limit_nc = 1000000000.0 * (scheduler_quota as f64 / scheduler_period as f64); + match limit_nc { + None => { + limit_nc = Some(quota_limit_nc); + debug!( + "limit_nc is None, setting CPU limit from cfs quota: {} nanocores", + quota_limit_nc + ); + } + Some(current_limit_nc) if quota_limit_nc < current_limit_nc => { + limit_nc = Some(quota_limit_nc); + debug!( + "CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {} nanocores", + quota_limit_nc + ); + } + _ => { + debug!("Keeping cpuset limit: {:?} nanocores", limit_nc); + } + } + } + limit_nc +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_stats( + cpu_count: Option, + scheduler_quota: Option, + scheduler_period: Option, + ) -> CgroupStats { + CgroupStats { + total: Some(0), + cpu_count, + scheduler_quota, + scheduler_period, + } + } + + #[test] + fn test_no_limit_returns_none() { + let stats = make_stats(None, None, None); + assert!(compute_cgroup_cpu_limit_nc(&stats).is_none()); + } + + #[test] + fn test_quota_unlimited_minus_one_returns_none() { + // quota=-1 is filtered out during parsing, so None here means unlimited + let stats = make_stats(None, None, Some(100_000_000)); + assert!(compute_cgroup_cpu_limit_nc(&stats).is_none()); + } + + #[test] + fn test_limited_to_2_cores_by_quota() { + let stats = make_stats(None, Some(200_000_000), Some(100_000_000)); // 200ms / 100ms = 2 cores + let result = compute_cgroup_cpu_limit_nc(&stats); + assert!((result.unwrap() - 2_000_000_000.0).abs() < 1_000.0); // Tolerance of 1,000 nanocores due to floating point arithmetic rounding errors + } + + #[test] + fn test_limited_to_half_core_by_quota() { + let stats = make_stats(None, Some(50_000_000), Some(100_000_000)); // 50ms / 100ms = 0.5 cores + let result = compute_cgroup_cpu_limit_nc(&stats); + assert!((result.unwrap() - 500_000_000.0).abs() < 1_000.0); + } + + #[test] + fn test_read_cpu_count_single_cpu() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_single_cpu.txt"); + std::fs::write(&path, "4\n").unwrap(); + let count = read_cpu_count_from_file(path.to_str().unwrap()).unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_read_cpu_count_range() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_range.txt"); + std::fs::write(&path, "0-3\n").unwrap(); + let count = read_cpu_count_from_file(path.to_str().unwrap()).unwrap(); + assert_eq!(count, 4); + } + + #[test] + fn test_read_cpu_count_mixed() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_mixed.txt"); + std::fs::write(&path, "0-2,16\n").unwrap(); + let count = read_cpu_count_from_file(path.to_str().unwrap()).unwrap(); + assert_eq!(count, 4); // 0,1,2 + 16 + } + + #[test] + fn test_read_cpu_count_invalid_range() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_invalid_range.txt"); + std::fs::write(&path, "3-1\n").unwrap(); + assert!(read_cpu_count_from_file(path.to_str().unwrap()).is_err()); + } + + #[test] + fn test_read_cpu_count_malformed_segment() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_malformed.txt"); + std::fs::write(&path, "0-3-5\n").unwrap(); + assert!(read_cpu_count_from_file(path.to_str().unwrap()).is_err()); + } + + #[test] + fn test_read_cpu_count_invalid_index() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_invalid.txt"); + std::fs::write(&path, "foo\n").unwrap(); + assert!(read_cpu_count_from_file(path.to_str().unwrap()).is_err()); + } + + #[test] + fn test_read_cpu_count_empty_file() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_empty.txt"); + std::fs::write(&path, "").unwrap(); + assert!(read_cpu_count_from_file(path.to_str().unwrap()).is_err()); + } + + #[test] + fn test_compute_cpu_limit_nc_with_quota() { + let stats = make_stats(None, Some(200_000_000), Some(100_000_000)); + let (limit, defaulted) = compute_cpu_limit_nc(&stats); + assert!((limit - 2_000_000_000.0).abs() < 1_000.0); + assert!(!defaulted); + } + + #[test] + fn test_compute_cpu_limit_nc_defaults_to_host() { + let stats = make_stats(None, None, None); + let (limit, defaulted) = compute_cpu_limit_nc(&stats); + let expected = num_cpus::get() as f64 * 1_000_000_000.0; + assert!((limit - expected).abs() < 1_000.0); + assert!(defaulted); + } +} diff --git a/crates/datadog-metrics-collector/src/windows.rs b/crates/datadog-metrics-collector/src/windows.rs new file mode 100644 index 0000000..c6451a4 --- /dev/null +++ b/crates/datadog-metrics-collector/src/windows.rs @@ -0,0 +1,20 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions (Windows) +//! +//! NOTE: Windows CPU enhanced metrics are not yet supported. +//! WindowsCpuStatsReader currently always returns None, so no CPU +//! usage or limit information is reported in Windows environments. +//! +//! All CPU metrics will be reported in nanocores (1 core = 1,000,000,000 nanocores). + +use crate::cpu::{CpuStats, CpuStatsReader}; + +pub struct WindowsCpuStatsReader; + +impl CpuStatsReader for WindowsCpuStatsReader { + fn read(&self) -> Option { + None + } +} diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index b41de18..344f8e3 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -8,9 +8,11 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [features] default = [] windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] +windows-enhanced-metrics = ["datadog-metrics-collector/windows-enhanced-metrics"] [dependencies] datadog-trace-agent = { path = "../datadog-trace-agent" } +datadog-metrics-collector = { path = "../datadog-metrics-collector" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" } datadog-fips = { path = "../datadog-fips", default-features = false } dogstatsd = { path = "../dogstatsd", default-features = true } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index d50798f..ba1ed17 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -23,6 +23,8 @@ use datadog_trace_agent::{ trace_processor, }; +use datadog_metrics_collector::cpu::CpuMetricsCollector; + use libdd_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; @@ -39,6 +41,7 @@ use dogstatsd::{ use dogstatsd::metric::{EMPTY_TAGS, SortedTags}; use tokio_util::sync::CancellationToken; +const CPU_METRICS_COLLECTION_INTERVAL_SECS: u64 = 1; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; @@ -104,6 +107,17 @@ pub async fn main() { .ok() .and_then(|val| parse_metric_namespace(&val)); + // Only enable enhanced metrics for Linux Azure Functions + #[cfg(not(feature = "windows-enhanced-metrics"))] + let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction + && env::var("DD_ENHANCED_METRICS_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + + // Enhanced metrics are not yet supported in Windows environments + #[cfg(feature = "windows-enhanced-metrics")] + let dd_enhanced_metrics = false; + let https_proxy = env::var("DD_PROXY_HTTPS") .or_else(|_| env::var("HTTPS_PROXY")) .ok(); @@ -170,53 +184,87 @@ pub async fn main() { } }); - let (metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { - debug!("Starting dogstatsd"); - let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( - dd_dogstatsd_port, - dd_api_key, - dd_site, - https_proxy, - dogstatsd_tags, - dd_statsd_metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name.clone(), - ) - .await; - if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { - info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; + + // The aggregator is shared between dogstatsd and enhanced metrics. + // It is started independently so that either can be enabled without the other. + // Only dogstatsd needs the dogstatsd listener + let (metrics_flusher, aggregator_handle) = if needs_aggregator { + debug!("Creating metrics flusher and aggregator"); + + let (flusher, handle) = + start_aggregator(dd_api_key, dd_site, https_proxy, dogstatsd_tags).await; + + if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let _ = start_dogstatsd_listener( + dd_dogstatsd_port, + handle.clone(), + dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] + dd_dogstatsd_windows_pipe_name.clone(), + ) + .await; + if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { + info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + } else { + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + } } else { - info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + info!("dogstatsd disabled"); } - (metrics_flusher, Some(aggregator_handle)) + (flusher, Some(handle)) } else { - info!("dogstatsd disabled"); + info!("dogstatsd and enhanced metrics disabled"); (None, None) }; + let mut cpu_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { + aggregator_handle.as_ref().map(|handle| { + let tags = datadog_metrics_collector::cpu::build_cpu_metrics_tags(); + CpuMetricsCollector::new(handle.clone(), tags) + }) + } else { + if !dd_enhanced_metrics { + info!("Enhanced metrics disabled"); + } else { + info!("Enhanced metrics enabled but metrics flusher not found"); + } + None + }; + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + let mut cpu_collection_interval = + interval(Duration::from_secs(CPU_METRICS_COLLECTION_INTERVAL_SECS)); flush_interval.tick().await; // discard first tick, which is instantaneous + cpu_collection_interval.tick().await; loop { - flush_interval.tick().await; - - if let Some(metrics_flusher) = metrics_flusher.as_ref() { - debug!("Flushing dogstatsd metrics"); - metrics_flusher.flush().await; + tokio::select! { + _ = flush_interval.tick() => { + if let Some(metrics_flusher) = metrics_flusher.clone() { + debug!("Flushing dogstatsd metrics"); + tokio::spawn(async move { + metrics_flusher.flush().await; + }); + } + } + _ = cpu_collection_interval.tick() => { + if let Some(ref mut collector) = cpu_collector { + collector.collect_and_submit(); + } + } } } } -async fn start_dogstatsd( - port: u16, +async fn start_aggregator( dd_api_key: Option, dd_site: String, https_proxy: Option, dogstatsd_tags: &str, - metric_namespace: Option, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, -) -> (CancellationToken, Option, AggregatorHandle) { - // 1. Create the aggregator service +) -> (Option, AggregatorHandle) { + // Create the aggregator service #[allow(clippy::expect_used)] let (service, handle) = AggregatorService::new( SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), @@ -224,53 +272,18 @@ async fn start_dogstatsd( ) .expect("Failed to create aggregator service"); - // 2. Start the aggregator service in the background + // Start the aggregator service in the background tokio::spawn(service.run()); - #[cfg(all(windows, feature = "windows-pipes"))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - windows_pipe_name, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - - #[cfg(not(all(windows, feature = "windows-pipes")))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - - // 3. Use handle in DogStatsD (cheap to clone) - let dogstatsd_client = DogStatsD::new( - &dogstatsd_config, - handle.clone(), - dogstatsd_cancel_token.clone(), - ) - .await; - - tokio::spawn(async move { - dogstatsd_client.spin().await; - }); - let metrics_flusher = match dd_api_key { Some(dd_api_key) => { let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) { Ok(client) => client, Err(e) => { error!("Failed to build HTTP client: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; - let metrics_intake_url_prefix = match Site::new(dd_site) .map_err(|e| e.to_string()) .and_then(|site| { @@ -279,7 +292,7 @@ async fn start_dogstatsd( Ok(prefix) => prefix, Err(e) => { error!("Failed to create metrics intake URL: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; @@ -299,7 +312,50 @@ async fn start_dogstatsd( } }; - (dogstatsd_cancel_token, metrics_flusher, handle) + (metrics_flusher, handle) +} + +async fn start_dogstatsd_listener( + port: u16, + handle: AggregatorHandle, + metric_namespace: Option, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, +) -> CancellationToken { + #[cfg(all(windows, feature = "windows-pipes"))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + windows_pipe_name, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + + // Use handle in DogStatsD (cheap to clone) + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + handle.clone(), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + dogstatsd_cancel_token } fn build_metrics_client( diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index d0c0952..5f869f4 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -18,6 +18,7 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; const DATADOG_PREFIX: &str = "datadog."; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -83,15 +84,17 @@ impl Metric { .join("."); // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) - || metric_name.starts_with(RUNTIME_PREFIX) - { - OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { - OriginService::ServerlessEnhanced - } else { - OriginService::ServerlessCustom - }; + let service = + if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; // Then determine the category based on tags let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") { @@ -327,7 +330,32 @@ mod tests { } #[test] - fn test_find_metric_origin_azure_functions() { + fn test_find_metric_origin_azure_functions_enhanced() { + let tags = SortedTags::parse("origin:azurefunction").unwrap(); + let metric = Metric { + id: 0, + name: "azure.functions.enhanced.cpu.usage".into(), + value: MetricValue::Gauge(1.0), + tags: Some(tags.clone()), + timestamp: 0, + }; + let origin = metric.find_origin(tags).unwrap(); + assert_eq!( + origin.origin_product as u32, + OriginProduct::Serverless as u32 + ); + assert_eq!( + origin.origin_category as u32, + OriginCategory::AzureFunctionsMetrics as u32 + ); + assert_eq!( + origin.origin_service as u32, + OriginService::ServerlessEnhanced as u32 + ); + } + + #[test] + fn test_find_metric_origin_azure_functions_custom() { let tags = SortedTags::parse("origin:azurefunction").unwrap(); let metric = Metric { id: 0,