diff --git a/Cargo.lock b/Cargo.lock index fff06dbaa3..8ccd0cae6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -397,10 +397,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "itoa", + "itoa 1.0.11", "matchit", "memchr", "mime", @@ -421,8 +421,8 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -1226,7 +1226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" dependencies = [ "csv-core", - "itoa", + "itoa 1.0.11", "ryu", "serde", ] @@ -1447,7 +1447,7 @@ dependencies = [ "bytes", "constcat", "futures", - "http", + "http 1.1.0", "http-body-util", "libdd-common", "libdd-data-pipeline", @@ -1506,7 +1506,7 @@ dependencies = [ "datadog-live-debugger", "datadog-sidecar-macros", "futures", - "http", + "http 1.1.0", "http-body-util", "httpmock", "libc", @@ -1558,7 +1558,7 @@ dependencies = [ "datadog-ipc", "datadog-live-debugger", "datadog-sidecar", - "http", + "http 1.1.0", "libc", "libdd-common", "libdd-common-ffi", @@ -1636,6 +1636,15 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "derp" +version = "0.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9b84cfd9b6fa437e498215e5625e9e3ae3bf9bb54d623028a181c40820db169" +dependencies = [ + "untrusted 0.7.1", +] + [[package]] name = "diff" version = "0.1.13" @@ -2135,7 +2144,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.1.0", "indexmap 2.12.1", "slab", "tokio", @@ -2223,7 +2232,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http", + "http 1.1.0", "httpdate", "mime", "sha1", @@ -2235,7 +2244,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.1.0", ] [[package]] @@ -2314,6 +2323,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.11", +] + [[package]] name = "http" version = "1.1.0" @@ -2322,7 +2342,18 @@ checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 1.0.11", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", ] [[package]] @@ -2332,7 +2363,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.1.0", ] [[package]] @@ -2343,8 +2374,8 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2377,9 +2408,9 @@ dependencies = [ "futures-timer", "futures-util", "headers", - "http", + "http 1.1.0", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "lazy_static", "log", @@ -2402,6 +2433,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa 1.0.11", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.6.0" @@ -2412,11 +2466,11 @@ dependencies = [ "futures-channel", "futures-util", "h2", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "httparse", "httpdate", - "itoa", + "itoa 1.0.11", "pin-project-lite", "smallvec", "tokio", @@ -2429,8 +2483,8 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.1.0", + "hyper 1.6.0", "hyper-util", "rustls", "rustls-native-certs", @@ -2447,7 +2501,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -2465,9 +2519,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.6.0", "ipnet", "libc", "percent-encoding", @@ -2734,6 +2788,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "itoa" version = "1.0.11" @@ -2846,7 +2906,7 @@ version = "2.0.0" dependencies = [ "anyhow", "bytes", - "http", + "http 1.1.0", "thiserror 1.0.68", ] @@ -2855,7 +2915,7 @@ name = "libdd-capabilities-impl" version = "2.0.0" dependencies = [ "bytes", - "http", + "http 1.1.0", "http-body-util", "libdd-capabilities", "libdd-common", @@ -2875,11 +2935,11 @@ dependencies = [ "futures-core", "futures-util", "hex", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "httparse", - "hyper", + "hyper 1.6.0", "hyper-rustls", "hyper-util", "indexmap 2.12.1", @@ -2916,7 +2976,7 @@ dependencies = [ "chrono", "crossbeam-queue", "function_name", - "hyper", + "hyper 1.6.0", "libdd-common", "serde", ] @@ -2934,7 +2994,7 @@ dependencies = [ "cxx-build", "errno", "goblin", - "http", + "http 1.1.0", "libc", "libdd-common", "libdd-libunwind-sys", @@ -2990,7 +3050,7 @@ dependencies = [ "duplicate 2.0.1", "either", "getrandom 0.2.15", - "http", + "http 1.1.0", "http-body-util", "httpmock", "libdd-capabilities", @@ -3060,7 +3120,7 @@ version = "3.0.0" dependencies = [ "anyhow", "cadence", - "http", + "http 1.1.0", "libdd-common", "serde", "tokio", @@ -3075,7 +3135,7 @@ dependencies = [ "fastrand", "http-body-util", "httpmock", - "hyper", + "hyper 1.6.0", "hyper-util", "libdd-common", "reqwest", @@ -3182,7 +3242,7 @@ dependencies = [ "cxx-build", "futures", "hashbrown 0.16.1", - "http", + "http 1.1.0", "http-body-util", "httparse", "indexmap 2.12.1", @@ -3221,7 +3281,7 @@ dependencies = [ "function_name", "futures", "http-body-util", - "hyper", + "hyper 1.6.0", "libc", "libdd-common", "libdd-common-ffi", @@ -3257,14 +3317,17 @@ dependencies = [ "futures", "futures-util", "hashbrown 0.15.1", - "http", + "http 1.1.0", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", + "libdd-capabilities", + "libdd-capabilities-impl", "libdd-common", "libdd-remote-config", "libdd-trace-protobuf", "manual_future", + "prost", "serde", "serde_json", "serde_with", @@ -3274,6 +3337,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "tuf", "uuid", ] @@ -3324,7 +3388,7 @@ dependencies = [ "bytes", "futures", "hashbrown 0.15.1", - "http", + "http 1.1.0", "http-body-util", "httpmock", "libc", @@ -3423,7 +3487,7 @@ dependencies = [ "async-trait", "criterion", "hashbrown 0.15.1", - "http", + "http 1.1.0", "httpmock", "libdd-capabilities", "libdd-capabilities-impl", @@ -3455,11 +3519,11 @@ dependencies = [ "flate2", "futures", "getrandom 0.2.15", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "httpmock", - "hyper", + "hyper 1.6.0", "indexmap 2.12.1", "libdd-capabilities", "libdd-capabilities-impl", @@ -3488,7 +3552,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", - "http", + "http 1.1.0", "httpmock", "libdd-common", "libdd-remote-config", @@ -3764,7 +3828,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 1.1.0", "httparse", "memchr", "mime", @@ -4762,10 +4826,10 @@ dependencies = [ "futures-core", "futures-util", "hickory-resolver", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-rustls", "hyper-util", "js-sys", @@ -4805,7 +4869,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.15", "libc", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -4969,7 +5033,7 @@ dependencies = [ "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -5213,7 +5277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "indexmap 2.12.1", - "itoa", + "itoa 1.0.11", "memchr", "ryu", "serde", @@ -5275,7 +5339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ "indexmap 2.12.1", - "itoa", + "itoa 1.0.11", "ryu", "serde", "unsafe-libyaml", @@ -5534,7 +5598,7 @@ version = "2.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe17b8deb33a9441280b4266c2d257e166bafbaea6e66b4b34ca139c91766d9" dependencies = [ - "itoa", + "itoa 1.0.11", "ryu", "sval", ] @@ -5545,7 +5609,7 @@ version = "2.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "854addb048a5bafb1f496c98e0ab5b9b581c3843f03ca07c034ae110d3b7c623" dependencies = [ - "itoa", + "itoa 1.0.11", "ryu", "sval", ] @@ -5829,7 +5893,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" dependencies = [ "deranged", - "itoa", + "itoa 1.0.11", "num-conv", "powerfmt", "serde", @@ -6018,10 +6082,10 @@ dependencies = [ "base64 0.22.1", "bytes", "h2", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -6089,8 +6153,8 @@ dependencies = [ "bitflags", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower", @@ -6202,6 +6266,31 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tuf" +version = "0.3.0-beta10" +source = "git+https://github.com/DataDog/rust-tuf/?tag=0.3.0-beta10-opw-3#9e8d6077b0e67f13233ad0a347bb7d640705da04" +dependencies = [ + "chrono", + "data-encoding", + "derp", + "futures-io", + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "itoa 0.4.8", + "log", + "percent-encoding", + "ring", + "serde", + "serde_derive", + "serde_json", + "tempfile", + "thiserror 1.0.68", + "untrusted 0.7.1", + "url", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -6260,6 +6349,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -6362,7 +6457,7 @@ checksum = "9170e001f458781e92711d2ad666110f153e4e50bfd5cbd02db6547625714187" dependencies = [ "float-cmp", "halfbrown", - "itoa", + "itoa 1.0.11", "ryu", ] diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index f3c5c97254..c04cc39ea6 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -266,6 +266,7 @@ pub unsafe extern "C" fn ddog_remote_config_reader_for_endpoint<'a>( language: language.to_utf8_lossy().into(), tracer_version: tracer_version.to_utf8_lossy().into(), endpoint: endpoint.clone(), + agentless: None, }, &Arc::new(Target { service: service_name.to_utf8_lossy().into(), diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 5b2d27b801..b21156a26e 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -779,6 +779,7 @@ impl SidecarInterface for ConnectionSidecarHandler { language: config.language, tracer_version: config.tracer_version, endpoint: config.endpoint, + agentless: None, }, products: config.remote_config_products, capabilities: config.remote_config_capabilities, diff --git a/libdd-common/src/lib.rs b/libdd-common/src/lib.rs index dbd2e4a090..adfb568b84 100644 --- a/libdd-common/src/lib.rs +++ b/libdd-common/src/lib.rs @@ -7,7 +7,8 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] use anyhow::Context; -use http::uri; +use http::uri::PathAndQuery; +use http::{uri, Uri}; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -332,6 +333,24 @@ impl Endpoint { /// Default value for the timeout field in milliseconds. pub const DEFAULT_TIMEOUT: u64 = 3_000; + pub fn agentless(site: &str, api_key: String) -> anyhow::Result { + Ok(Self { + url: Uri::builder() + .scheme("https") + .authority( + uri::Authority::try_from(site) + .with_context(|| format!("dd_site is an invalid url: {site}"))?, + ) + .path_and_query(PathAndQuery::from_static("")) + .build() + .with_context(|| format!("rc url is invalid for site: {site}"))?, + api_key: Some(api_key.into()), + timeout_ms: Self::DEFAULT_TIMEOUT, + test_token: None, + use_system_resolver: true, + }) + } + /// Returns an iterator of optional endpoint-specific headers (api-key, test-token) /// as (header_name, header_value) string tuples for any that are available. pub fn get_optional_headers(&self) -> impl Iterator { diff --git a/libdd-remote-config/Cargo.toml b/libdd-remote-config/Cargo.toml index ae7988ceef..1655877efb 100644 --- a/libdd-remote-config/Cargo.toml +++ b/libdd-remote-config/Cargo.toml @@ -23,7 +23,7 @@ client = [ "tokio-util", "manual_future", "time", - "tracing" + "tracing", ] regex-lite = ["libdd-common/regex-lite"] @@ -36,24 +36,33 @@ test = ["hyper/server", "hyper-util"] [dependencies] anyhow = { version = "1.0" } libdd-common = { path = "../libdd-common", version = "4.2.0", default-features = false } +libdd-capabilities = { path = "../libdd-capabilities" } +libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", features = ["https"]} libdd-trace-protobuf = { path = "../libdd-trace-protobuf", version = "3.0.2", optional = true } hyper = { workspace = true, optional = true, default-features = false } -http-body-util = {version = "0.1", optional = true } +http-body-util = { version = "0.1", optional = true } http = { version = "1.1", optional = true } base64 = { version = "0.22.1", optional = true } sha2 = { version = "0.10", optional = true } uuid = { version = "1.7.0", features = ["v4"], optional = true } futures-util = { version = "0.3", optional = true } tokio = { version = "1.36.0", optional = true } -tokio-util = { version = "0.7.10", optional = true } +tokio-util = { version = "0.7.10", optional = true } manual_future = { version = "0.1.1", optional = true } -time = { version = "0.3", features = ["parsing", "serde", "formatting"], optional = true } +time = { version = "0.3", features = [ + "parsing", + "serde", + "formatting", +], optional = true } tracing = { version = "0.1", default-features = false, optional = true } serde = "1.0" serde_json = { version = "1.0", features = ["raw_value"] } serde_with = "3" thiserror = "2" hashbrown = "0.15" +tuf = { git = "https://github.com/DataDog/rust-tuf/", tag = "0.3.0-beta10-opw-3" } +prost = "0.14.1" +futures = { version = "0.3", features = ["executor"] } # Test feature hyper-util = { workspace = true, features = ["service"], optional = true } diff --git a/libdd-remote-config/examples/remote_config_fetch.rs b/libdd-remote-config/examples/remote_config_fetch.rs index eb0c79ae95..765a03dc26 100644 --- a/libdd-remote-config/examples/remote_config_fetch.rs +++ b/libdd-remote-config/examples/remote_config_fetch.rs @@ -3,12 +3,13 @@ use libdd_common::tag::Tag; use libdd_common::Endpoint; +use libdd_remote_config::agentless_client::AgentlessConfig; use libdd_remote_config::fetch::{ConfigInvariants, ConfigOptions, SingleChangesFetcher}; use libdd_remote_config::file_change_tracker::{Change, FilePath}; use libdd_remote_config::file_storage::ParsedFileStorage; use libdd_remote_config::RemoteConfigProduct::ApmTracing; use libdd_remote_config::{RemoteConfigParsed, Target}; -use std::time::Duration; +use std::process::Command; use tokio::time::sleep; const RUNTIME_ID: &str = "23e76587-5ae1-410c-a05c-137cae600a10"; @@ -16,8 +17,51 @@ const SERVICE: &str = "testservice"; const ENV: &str = "testenv"; const VERSION: &str = "1.2.3"; +fn get_hostname() -> String { + Command::new("hostname") + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()) +} + #[tokio::main(flavor = "current_thread")] async fn main() { + let hostname = get_hostname(); + println!("Hostname: {hostname}"); + + let dd_api_key = std::env::var("DD_API_KEY").ok(); + let dd_site = std::env::var("DD_SITE").ok(); + + let (endpoint, agentless) = match (dd_api_key, dd_site) { + (Some(api_key), Some(site)) => { + println!("DD_API_KEY and DD_SITE are set — enabling agentless mode (site: {site})"); + let endpoint = Endpoint::agentless(&site, api_key) + .expect("Failed to build agentless endpoint from DD_SITE"); + ( + endpoint, + Some(AgentlessConfig { + hostname, + ..Default::default() + }), + ) + } + _ => { + println!("DD_API_KEY / DD_SITE not set — connecting to local agent"); + ( + Endpoint { + url: http::Uri::from_static("http://localhost:8126"), + api_key: None, + timeout_ms: 5000, // custom timeout, defaults to 3 seconds + test_token: None, + ..Default::default() + }, + None, + ) + } + }; + // SingleChangesFetcher is ideal for a single static (runtime_id, service, env, version) tuple // Otherwise a SharedFetcher (or even a MultiTargetFetcher for a potentially high number of // targets) for multiple targets is needed. These can be manually wired together with a @@ -40,18 +84,15 @@ async fn main() { invariants: ConfigInvariants { language: "awesomelang".to_string(), tracer_version: "99.10.5".to_string(), - endpoint: Endpoint { - url: http::Uri::from_static("http://localhost:8126"), - api_key: None, - timeout_ms: 5000, // custom timeout, defaults to 3 seconds - test_token: None, - ..Default::default() - }, + endpoint, + agentless, }, products: vec![ApmTracing], capabilities: vec![], }, - ); + ) + .await + .expect("Failed to create SingleChangesFetcher"); loop { match fetcher.fetch_changes().await { @@ -82,7 +123,7 @@ async fn main() { } } - sleep(Duration::from_secs(1)).await; + sleep(fetcher.get_refresh_interval()).await; } } diff --git a/libdd-remote-config/roots/gov/config_root.json b/libdd-remote-config/roots/gov/config_root.json new file mode 100644 index 0000000000..40402b5e70 --- /dev/null +++ b/libdd-remote-config/roots/gov/config_root.json @@ -0,0 +1,73 @@ +{ + "signed": { + "_type": "root", + "spec_version": "1.0", + "version": 1, + "expires": "0001-01-01T00:00:00Z", + "keys": { + "8907affe5835f969ee7680dda2e5b0ece95d839611d481483d89c22b2df42993": { + "keytype": "ecdsa", + "scheme": "ecdsa-sha2-nistp256", + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keyval": { + "public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAES2cvfa8r3HZ4AQeDUurdth7xqFk3\nqOuYtR877knUfOtJe+xU/F/ESVrl4B0ZMcyF3TaucgMsae4OVlc2lAW3Nw==\n-----END PUBLIC KEY-----\n" + } + }, + "fe9b1451a0446f049888c4ece57fa4c8127f50cc2401d0bb15712e9367953425": { + "keytype": "ecdsa", + "scheme": "ecdsa-sha2-nistp256", + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keyval": { + "public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAETSYtYSgft/owrcf+DsvGzdl+wpSr\nAVe0hmZL/hvdC0oogI98nYTYzycP0B5M5xBeP4ZfJm/mlFFsqCHHosQWkA==\n-----END PUBLIC KEY-----\n" + } + } + }, + "roles": { + "root": { + "keyids": [ + "fe9b1451a0446f049888c4ece57fa4c8127f50cc2401d0bb15712e9367953425", + "8907affe5835f969ee7680dda2e5b0ece95d839611d481483d89c22b2df42993" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "fe9b1451a0446f049888c4ece57fa4c8127f50cc2401d0bb15712e9367953425", + "8907affe5835f969ee7680dda2e5b0ece95d839611d481483d89c22b2df42993" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "fe9b1451a0446f049888c4ece57fa4c8127f50cc2401d0bb15712e9367953425", + "8907affe5835f969ee7680dda2e5b0ece95d839611d481483d89c22b2df42993" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "fe9b1451a0446f049888c4ece57fa4c8127f50cc2401d0bb15712e9367953425", + "8907affe5835f969ee7680dda2e5b0ece95d839611d481483d89c22b2df42993" + ], + "threshold": 2 + } + }, + "consistent_snapshot": true + }, + "signatures": [ + { + "keyid": "fe9b1451a0446f049888c4ece57fa4c8127f50cc2401d0bb15712e9367953425", + "sig": "3046022100f693d8c4ec048f6ac08ab01f5a7cc641c92d3ccf9787b949897da91e57b5c7240221009841eb205814b96b31447bc2893234f691c9ef16d54676f24b94138954cb4d23" + }, + { + "keyid": "8907affe5835f969ee7680dda2e5b0ece95d839611d481483d89c22b2df42993", + "sig": "3044022060b37818fb24ddca63a7bde900572c833cbf4b157c856734ead79c2f5c28775902207a423e9207db7767aff5863817a089f330af20aa9d21cdeb7f0cc9de337a404f" + } + ] +} \ No newline at end of file diff --git a/libdd-remote-config/roots/gov/director_root.json b/libdd-remote-config/roots/gov/director_root.json new file mode 100644 index 0000000000..058ff2a3b4 --- /dev/null +++ b/libdd-remote-config/roots/gov/director_root.json @@ -0,0 +1,73 @@ +{ + "signed": { + "_type": "root", + "spec_version": "1.0", + "version": 1, + "expires": "0001-01-01T00:00:00Z", + "keys": { + "1bd43b99872bee114b2b1c33cff7afbdb8ccfc799751aaa11c2336f3540d1c10": { + "keytype": "ecdsa", + "scheme": "ecdsa-sha2-nistp256", + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keyval": { + "public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUDdWZozMy6DojzrxkhevLhLzom0E\nnW0C7JWPXgnoL58OHhqDTHhkiUP5H3+fGdVKZ33Vca686aWWSwZUY6xSRQ==\n-----END PUBLIC KEY-----\n" + } + }, + "3360f9a30c063542b2d193fe01854ea3b7ae92c641812b97ce01180bf150c835": { + "keytype": "ecdsa", + "scheme": "ecdsa-sha2-nistp256", + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keyval": { + "public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEE/Evc+4Qx1yPwe0SyvP52C9z8inY\ncTH0eCXHRu+mzShDx7Ne8gyA/vU696i9jcc4pfsOwo1WpIkJsXuqP0jG6A==\n-----END PUBLIC KEY-----\n" + } + } + }, + "roles": { + "root": { + "keyids": [ + "1bd43b99872bee114b2b1c33cff7afbdb8ccfc799751aaa11c2336f3540d1c10", + "3360f9a30c063542b2d193fe01854ea3b7ae92c641812b97ce01180bf150c835" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "1bd43b99872bee114b2b1c33cff7afbdb8ccfc799751aaa11c2336f3540d1c10", + "3360f9a30c063542b2d193fe01854ea3b7ae92c641812b97ce01180bf150c835" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "1bd43b99872bee114b2b1c33cff7afbdb8ccfc799751aaa11c2336f3540d1c10", + "3360f9a30c063542b2d193fe01854ea3b7ae92c641812b97ce01180bf150c835" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "1bd43b99872bee114b2b1c33cff7afbdb8ccfc799751aaa11c2336f3540d1c10", + "3360f9a30c063542b2d193fe01854ea3b7ae92c641812b97ce01180bf150c835" + ], + "threshold": 2 + } + }, + "consistent_snapshot": true + }, + "signatures": [ + { + "keyid": "1bd43b99872bee114b2b1c33cff7afbdb8ccfc799751aaa11c2336f3540d1c10", + "sig": "3045022016088517105a41506c4465e636483b2eb782d03322da78273a28dd427f2acc78022100cfd965ae1e32c86761d6256a67708e5808f3f98413976373febe23bfefec8da1" + }, + { + "keyid": "3360f9a30c063542b2d193fe01854ea3b7ae92c641812b97ce01180bf150c835", + "sig": "3045022007f880ebf48676740f6cb9d216c91e0f51427ef9f44a73f4a7de8da6e4b4b53d022100b0f87441a0761422d7ac57582215d9570de78a0c30dd86479accedf7a00a6a1b" + } + ] +} \ No newline at end of file diff --git a/libdd-remote-config/roots/prod/config_root.json b/libdd-remote-config/roots/prod/config_root.json new file mode 100644 index 0000000000..d5a5eb7894 --- /dev/null +++ b/libdd-remote-config/roots/prod/config_root.json @@ -0,0 +1,63 @@ +{ + "signed": { + "_type": "root", + "spec_version": "1.0", + "version": 16, + "expires": "2026-10-31T17:00:00Z", + "keys": { + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "91d413c791907aae0be739d94a1e5e59c5d5ba65a8bbc1fb2153a5680f2d5958" } + }, + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "9323800f89d833ee263d3661c2616da89e405b92beeec334f21d54b5f60fbd85" } + } + }, + "roles": { + "root": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + } + }, + "consistent_snapshot": true + }, + "signatures": [ + { + "keyid": "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "sig": "a8b4ee59576c82bc1bc944df014bbeb90f5cba4ffbd8b7878461da2c934fd3bf93ac4c3b85a7936584da4a5a0cfe93b7150b559fc96423a98a70a11fc844f208" + }, + { + "keyid": "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db", + "sig": "3332e240a023dc267e87e210c7b46b9fa5772932d84936e3a7a5b5018b0f45fbf068ce60b97beb6e7e6c0c12a68d68a44461e590a934b577c71d4ff6dd94db09" + } + ] +} diff --git a/libdd-remote-config/roots/prod/director_root.json b/libdd-remote-config/roots/prod/director_root.json new file mode 100644 index 0000000000..f3882d62a0 --- /dev/null +++ b/libdd-remote-config/roots/prod/director_root.json @@ -0,0 +1,63 @@ +{ + "signed": { + "_type": "root", + "spec_version": "1.0", + "version": 15, + "expires": "2026-10-31T17:00:00Z", + "keys": { + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "286d6ae328365afec0f92519ceab68cd627e34072cde90b2f5d167badea970f2" } + }, + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "afdd68be53815d67f8fa99cf101aac4589a358c660adf7dd4e179fe96834d3c9" } + } + }, + "roles": { + "root": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + } + }, + "consistent_snapshot": true + }, + "signatures": [ + { + "keyid": "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8", + "sig": "ccbe8cdd7dfb9a9d6b4bef8075a7aaf9baafe69a07100f22c04677a9737a23b24055ac3a0776c7021ae6a2fd175a251c0604164ea6705a0a896844766d2ecd07" + }, + { + "keyid": "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "sig": "068a2e37e93688702e75ebb328b74cd8879832a63179ba1c54976aae4ee03a5e936c7b7274d4a6aa6755c27cfe800097984d94c83be901bde72103dccebcc008" + } + ] +} diff --git a/libdd-remote-config/roots/staging/config_root.json b/libdd-remote-config/roots/staging/config_root.json new file mode 100644 index 0000000000..9d44633729 --- /dev/null +++ b/libdd-remote-config/roots/staging/config_root.json @@ -0,0 +1,73 @@ +{ + "signatures": [ + { + "keyid": "bd3ea764afdf757f07bab1e9e501a5fda1d49a8da3eaddc53a50dbe2aff92545", + "sig": "928d0b9de72a1a1c2fad453e52950509a434814ca0dc5fb43db5100fdbd732461b38b522051ffedc7c226426ce102c245bc69895fde0f0ca0d9615f84027c60f" + }, + { + "keyid": "6aac6a51efedb4e54915bf9fbd2cfb49fbf428d46052bcaf3c72409c33ecdf5e", + "sig": "146d301f5dd97125ddd34d13ad5c7b1f071bbd249d7c86d17a095c0fbfd680ed21737f45997361e14e79be973914cfb35da39c02ce58f81df12afd9eb49d0003" + } + ], + "signed": { + "_type": "root", + "consistent_snapshot": true, + "expires": "2025-12-01T17:00:00Z", + "keys": { + "6aac6a51efedb4e54915bf9fbd2cfb49fbf428d46052bcaf3c72409c33ecdf5e": { + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keytype": "ed25519", + "keyval": { + "public": "09402247ef6252018e52c7ba6a3a484936f14dad6ae921c556a1d092f4a68f0f" + }, + "scheme": "ed25519" + }, + "bd3ea764afdf757f07bab1e9e501a5fda1d49a8da3eaddc53a50dbe2aff92545": { + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keytype": "ed25519", + "keyval": { + "public": "cf248bc222a5dfc9676a2a3ef90526c84adb09649db56686705f69f42908d7d8" + }, + "scheme": "ed25519" + } + }, + "roles": { + "root": { + "keyids": [ + "6aac6a51efedb4e54915bf9fbd2cfb49fbf428d46052bcaf3c72409c33ecdf5e", + "bd3ea764afdf757f07bab1e9e501a5fda1d49a8da3eaddc53a50dbe2aff92545" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "6aac6a51efedb4e54915bf9fbd2cfb49fbf428d46052bcaf3c72409c33ecdf5e", + "bd3ea764afdf757f07bab1e9e501a5fda1d49a8da3eaddc53a50dbe2aff92545" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "6aac6a51efedb4e54915bf9fbd2cfb49fbf428d46052bcaf3c72409c33ecdf5e", + "bd3ea764afdf757f07bab1e9e501a5fda1d49a8da3eaddc53a50dbe2aff92545" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "6aac6a51efedb4e54915bf9fbd2cfb49fbf428d46052bcaf3c72409c33ecdf5e", + "bd3ea764afdf757f07bab1e9e501a5fda1d49a8da3eaddc53a50dbe2aff92545" + ], + "threshold": 2 + } + }, + "spec_version": "1.0", + "version": 29 + } +} \ No newline at end of file diff --git a/libdd-remote-config/roots/staging/director_root.json b/libdd-remote-config/roots/staging/director_root.json new file mode 100644 index 0000000000..7361c7b9e9 --- /dev/null +++ b/libdd-remote-config/roots/staging/director_root.json @@ -0,0 +1,73 @@ +{ + "signatures": [ + { + "keyid": "233a529fe7c63b5b9081f6e0e2681cc227f85e04ad434d0a165a2f69b87255a6", + "sig": "6d7ddf4bcbd1ce223b5352cae4671ef42800d79f0c94dda905cf0dd8a6198ba69795a19201dc7230e4bd872cf109e827233678bf76389910933472417488320e" + }, + { + "keyid": "6ca796e7b4883af3bb3d522dc0009984dcbf5ad2a6c9ea354d30acc32d8b75d1", + "sig": "a1236d12903e1c4024fc6340c50a0f2fe9972e967eb2bace8d6594e156f0466f772bfc0c9f30e07067904073c0d7ba7d48ad00341405312daf0d7bc502ccc50f" + } + ], + "signed": { + "_type": "root", + "consistent_snapshot": true, + "expires": "1970-01-01T00:00:00Z", + "keys": { + "233a529fe7c63b5b9081f6e0e2681cc227f85e04ad434d0a165a2f69b87255a6": { + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keytype": "ed25519", + "keyval": { + "public": "f7c278f32e69ce7d5ca5b81bd2cbe2b4b44177eee36ed025ec06bd19e47eaefe" + }, + "scheme": "ed25519" + }, + "6ca796e7b4883af3bb3d522dc0009984dcbf5ad2a6c9ea354d30acc32d8b75d1": { + "keyid_hash_algorithms": [ + "sha256", + "sha512" + ], + "keytype": "ed25519", + "keyval": { + "public": "47be15ec10499208aa5ef9a1e32010cc05c047a98d18ad084d6e4e51baa1b93c" + }, + "scheme": "ed25519" + } + }, + "roles": { + "root": { + "keyids": [ + "6ca796e7b4883af3bb3d522dc0009984dcbf5ad2a6c9ea354d30acc32d8b75d1", + "233a529fe7c63b5b9081f6e0e2681cc227f85e04ad434d0a165a2f69b87255a6" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "6ca796e7b4883af3bb3d522dc0009984dcbf5ad2a6c9ea354d30acc32d8b75d1", + "233a529fe7c63b5b9081f6e0e2681cc227f85e04ad434d0a165a2f69b87255a6" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "6ca796e7b4883af3bb3d522dc0009984dcbf5ad2a6c9ea354d30acc32d8b75d1", + "233a529fe7c63b5b9081f6e0e2681cc227f85e04ad434d0a165a2f69b87255a6" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "6ca796e7b4883af3bb3d522dc0009984dcbf5ad2a6c9ea354d30acc32d8b75d1", + "233a529fe7c63b5b9081f6e0e2681cc227f85e04ad434d0a165a2f69b87255a6" + ], + "threshold": 2 + } + }, + "spec_version": "1.0", + "version": 1 + } +} \ No newline at end of file diff --git a/libdd-remote-config/src/agentless_client/mod.rs b/libdd-remote-config/src/agentless_client/mod.rs new file mode 100644 index 0000000000..a2f8d7704f --- /dev/null +++ b/libdd-remote-config/src/agentless_client/mod.rs @@ -0,0 +1,1001 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + fmt, + ops::RangeInclusive, + path::PathBuf, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use anyhow::{bail, format_err}; +use base64::Engine; +use futures::AsyncReadExt as _; +use hashbrown::{HashMap, HashSet}; +use http::{ + header, + uri::{Authority, PathAndQuery}, + Method, Request, Uri, +}; +use libdd_capabilities::{Bytes, HttpClientCapability}; +use libdd_common::Endpoint; +use libdd_trace_protobuf::remoteconfig; +use prost::Message; +use serde_json::Value; +use tracing::debug; +use tuf::repository::RepositoryStorage; +use tuf::{ + metadata::{ + Metadata, MetadataPath, MetadataVersion, RawSignedMetadata, TargetDescription, TargetPath, + }, + repository::RepositoryProvider as _, +}; + +// Embedded TUF trust roots, per site +const PROD_CONFIG_ROOT: &[u8] = include_bytes!("../../roots/prod/config_root.json"); +const PROD_CONFIG_ROOT_VERSION: u64 = 16; + +const PROD_DIRECTOR_ROOT: &[u8] = include_bytes!("../../roots/prod/director_root.json"); +const PROD_DIRECTOR_ROOT_VERSION: u64 = 15; + +const STAGING_CONFIG_ROOT: &[u8] = include_bytes!("../../roots/staging/config_root.json"); +const STAGING_CONFIG_ROOT_VERSION: u64 = 29; + +const STAGING_DIRECTOR_ROOT: &[u8] = include_bytes!("../../roots/staging/director_root.json"); +const STAGING_DIRECTOR_ROOT_VERSION: u64 = 1; + +const GOV_CONFIG_ROOT: &[u8] = include_bytes!("../../roots/gov/config_root.json"); +const GOV_CONFIG_ROOT_VERSION: u64 = 1; + +const GOV_DIRECTOR_ROOT: &[u8] = include_bytes!("../../roots/gov/director_root.json"); +const GOV_DIRECTOR_ROOT_VERSION: u64 = 1; + +/// Datadog site selection used to pick a default TUF trust-root pair. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Site { + Prod, + Staging, + Gov, +} + +impl Site { + /// Map an endpoint authority/host to a Datadog site. + /// + /// The configured agentless endpoint authority looks like `config.` + /// (see `make_agentless_configs_endpoint`), so we strip a leading + /// `config.` prefix and apply the same rules the agent uses. + fn from_host(host: &str) -> Self { + let site = host.strip_prefix("config.").unwrap_or(host); + if site == "datad0g.com" || site.ends_with(".datad0g.com") { + Site::Staging + } else if site == "ddog-gov.com" || site.ends_with(".ddog-gov.com") { + Site::Gov + } else { + Site::Prod + } + } + + fn embedded_config_root(self) -> (&'static [u8], u64) { + match self { + Site::Prod => (PROD_CONFIG_ROOT, PROD_CONFIG_ROOT_VERSION), + Site::Staging => (STAGING_CONFIG_ROOT, STAGING_CONFIG_ROOT_VERSION), + Site::Gov => (GOV_CONFIG_ROOT, GOV_CONFIG_ROOT_VERSION), + } + } + + fn embedded_director_root(self) -> (&'static [u8], u64) { + match self { + Site::Prod => (PROD_DIRECTOR_ROOT, PROD_DIRECTOR_ROOT_VERSION), + Site::Staging => (STAGING_DIRECTOR_ROOT, STAGING_DIRECTOR_ROOT_VERSION), + Site::Gov => (GOV_DIRECTOR_ROOT, GOV_DIRECTOR_ROOT_VERSION), + } + } +} + +/// Extract the `version` integer from a signed TUF root JSON document. Used +/// only when loading an override root from disk; the embedded roots have +/// their versions hardcoded above. +fn parse_root_version(raw: &[u8]) -> anyhow::Result { + let v: Value = serde_json::from_slice(raw)?; + v.get("signed") + .and_then(|s| s.get("version")) + .and_then(Value::as_u64) + .ok_or_else(|| format_err!("missing or invalid signed.version in TUF root")) +} + +/// Read a TUF root override from disk, returning the bytes and their parsed +/// version +fn load_root(override_path: &std::path::Path) -> anyhow::Result<(Vec, u64)> { + let bytes = std::fs::read(override_path) + .map_err(|e| format_err!("failed to read TUF root override at {override_path:?}: {e}"))?; + let version = parse_root_version(&bytes)?; + Ok((bytes, version)) +} + +const FAKE_AGENT_VERSION: &str = "7.78.4"; + +type TUFRepo = tuf::repository::EphemeralRepository; +type TUFClient = tuf::client::Client; + +// Make a remote config API endpoint from and endpoint where `e.url` is the base dd site +// If the endpoint is not suitable (api key not set, not https), returns N +pub fn make_agentless_configs_endpoint(e: &Endpoint) -> Option { + let e = e.clone(); + dbg!(&e); + if !(e.url.scheme_str().is_some_and(|s| s == "https") + && e.url.authority().is_some() + && e.api_key.is_some()) + { + return None; + } + + let mut parts = e.url.into_parts(); + parts.authority = + Some(Authority::try_from(format!("config.{}", parts.authority?.as_str())).ok()?); + parts.path_and_query = Some(PathAndQuery::from_static("/api/v0.1/configurations")); + + Some(Endpoint { + url: Uri::from_parts(parts).ok()?, + ..e + }) +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Default)] +pub struct AgentlessConfig { + /// Hostname reported to the RC backend in `LatestConfigsRequest.hostname`. + /// Required (must be non-empty) in agentless mode; an empty value causes + /// `ConfigFetcherState::new` to downgrade to agent mode. + pub hostname: String, + /// Optional path to a TUF config-repo root JSON to use instead of the + /// embedded one. Useful for staging/private deployments where the trust + /// chain differs from the published defaults. + pub config_root_override_path: Option, + /// Optional path to a TUF director-repo root JSON to use instead of the + /// embedded one. + pub director_root_override_path: Option, +} + +pub type NativeAgentlessFetcher = AgentlessFetcher; + +pub struct AgentlessFetcher { + http: C, + initialized: bool, + opaque_backend_state: Vec, + director_client: TUFClient, + config_client: TUFClient, + initial_config_root_version: u64, + initial_director_root_version: u64, + hostname: String, + products: HashSet, + refresh_interval: Duration, + /// Number of consecutive `fetch_config` failures. Reset to 0 on success. + consecutive_failures: u32, + endpoint: Endpoint, + // TODO: Not sure this is needed if the wrapped client already caches files? + target_cache: HashMap, +} + +struct CachedFile { + hashes: Vec<(&'static tuf::crypto::HashAlgorithm, tuf::crypto::HashValue)>, + target_file: Vec, + version: u64, +} + +pub struct ClientTargetResponse<'a> { + pub path: &'a str, + pub version: u64, + pub hashes: &'a [(&'static tuf::crypto::HashAlgorithm, tuf::crypto::HashValue)], + pub content: &'a [u8], +} + +pub struct ClientResponse<'a> { + pub root_version: u64, + pub target_version: u64, + pub opaque_backend_state: &'a [u8], + pub targets: Vec>, + pub refresh_interval: Duration, +} + +struct BorrowedTufTarget<'a> { + pub path: &'a tuf::metadata::TargetPath, + pub desc: &'a tuf::metadata::TargetDescription, +} + +const CUSTOM_METADATA_EXPIRY_PATH: &str = "expires"; + +impl<'a> BorrowedTufTarget<'a> { + pub fn try_create(path: &'a TargetPath, desc: &'a TargetDescription) -> anyhow::Result { + if let Some(expiry) = desc.custom().get(CUSTOM_METADATA_EXPIRY_PATH) { + let expiry_ts = expiry + .as_u64() + .ok_or_else(|| format_err!("expiry not a number"))?; + + if expiry_ts * 1000 <= now_unix_milli_ts() { + bail!("expired target at path: {path}") + } + } + + Ok(Self { path, desc }) + } +} + +enum FetchTargetResult { + Cached, + New(CachedFile), +} + +impl AgentlessFetcher { + /// Create a new `AgentlessFetcher` client. + /// + /// # Errors + /// Returns an error if TUF root initialization fails. + /// This can happen for instance if the trust root certificates have expired + pub async fn new(cfg: AgentlessConfig, endpoint: Endpoint) -> anyhow::Result { + // Pick the default trust roots based on the endpoint's host. Overrides + // (if any) take precedence. + let site = endpoint + .url + .authority() + .map(|a| Site::from_host(a.as_str())) + .unwrap_or(Site::Prod); + + let (config_root_bytes, initial_config_root_version) = + match cfg.config_root_override_path.as_deref() { + Some(p) => load_root(p)?, + None => { + let (embedded, version) = site.embedded_config_root(); + (embedded.to_vec(), version) + } + }; + let (director_root_bytes, initial_director_root_version) = + match cfg.director_root_override_path.as_deref() { + Some(p) => load_root(p)?, + None => { + let (embedded, version) = site.embedded_director_root(); + (embedded.to_vec(), version) + } + }; + + Ok(Self { + endpoint, + http: C::new_client(), + director_client: TUFClient::with_trusted_root( + tuf::client::Config::default(), + &RawSignedMetadata::new(director_root_bytes), + TUFRepo::new(), + TUFRepo::new(), + ) + .await?, + config_client: TUFClient::with_trusted_root( + tuf::client::Config::default(), + &RawSignedMetadata::new(config_root_bytes), + TUFRepo::new(), + TUFRepo::new(), + ) + .await?, + initial_config_root_version, + initial_director_root_version, + hostname: cfg.hostname, + products: HashSet::new(), + target_cache: HashMap::new(), + + opaque_backend_state: Vec::new(), + refresh_interval: Duration::from_secs(60), + consecutive_failures: 0, + initialized: false, + }) + } + + /// Number of consecutive failed `fetch_config` calls. `0` after a success. + pub fn consecutive_failures(&self) -> u32 { + self.consecutive_failures + } + + /// Recommended delay before the next `fetch_config` attempt given the + /// current consecutive-failure count. Returns `None` when no backoff + /// applies (i.e. either no failures yet, or only a single one). + pub fn next_backoff(&self) -> Option { + compute_backoff(self.consecutive_failures) + } + + /// Return the value of a particular target , checking both its length and + /// hashes against the metadata in the config repo. + /// + /// If it is already in the cache, return `Cached` + async fn fetch_target( + &self, + target: &BorrowedTufTarget<'_>, + ) -> anyhow::Result { + let expected_hashes = tuf::crypto::retain_supported_hashes(target.desc.hashes()); + if expected_hashes.is_empty() { + bail!("no supported hash for path: {}", target.path); + } + let (target_hash_algo, target_hash) = &expected_hashes[0]; + let target_path = target.path; + + let version = target + .desc + .custom() + .get("v") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + if let Some(item) = self.target_cache.get(target_path) { + if item + .hashes + .iter() + .find(|(alg, _)| alg == target_hash_algo) + .is_some_and(|(_, h)| h == target_hash) + && item.target_file.len() as u64 == target.desc.length() + { + return Ok(FetchTargetResult::Cached); + } + } + + // Fetch from the content from the remote __Unverified__ repo + // This is fine as we are comparing the (hash + len) with a validated + // target + let mut read = self + .director_client + .remote_repo() + .fetch_target(target_path) + .await?; + let mut buf = Vec::new(); + read.read_to_end(&mut buf).await?; + + let expected_len = target.desc.length() as usize; + if buf.len() != expected_len { + bail!("bad length for file at path: {}", target.path) + } + + { + let hash_algs = expected_hashes + .iter() + .map(|(alg, _val)| (*alg).clone()) + .collect::>(); + let actual_hashes = + tuf::crypto::calculate_hashes_from_slice(&buf, hash_algs.as_slice())?; + let expected: HashMap<_, _> = expected_hashes + .iter() + .map(|(alg, val)| (alg, val)) + .collect(); + + if !(actual_hashes.len() == expected.len() + && actual_hashes + .iter() + .all(|(k, v)| expected.get(&k).is_some_and(|e| *e == v))) + { + bail!("hash did not match: {}", target.path) + } + } + + Ok(FetchTargetResult::New(CachedFile { + hashes: expected_hashes, + target_file: buf, + version, + })) + } + + pub async fn fetch_config( + &mut self, + c: remoteconfig::Client, + ) -> anyhow::Result> { + let ( + current_config_snapshot_version, + current_config_root_version, + current_director_root_version, + ) = if self.initialized { + ( + u64::from( + self.config_client + .database() + .trusted_snapshot() + .ok_or(anyhow::anyhow!("Missing snapshot data"))? + .version(), + ), + u64::from(self.config_client.database().trusted_root().version()), + u64::from(self.director_client.database().trusted_root().version()), + ) + } else { + ( + 0, + self.initial_config_root_version, + self.initial_director_root_version, + ) + }; + + let all_products = c.products.iter().fold(HashSet::new(), |mut acc, p| { + acc.get_or_insert_with(p, String::clone); + acc + }); + let new_products = all_products + .difference(&self.products) + .cloned() + .collect::>(); + let old_products = self + .products + .intersection(&all_products) + .cloned() + .collect::>(); + + let now = now_unix_milli_ts(); + + let (has_error, error) = match c.state.as_ref() { + Some(state) if state.has_error => (true, state.error.clone()), + _ => (false, String::new()), + }; + + let request = remoteconfig::LatestConfigsRequest { + hostname: self.hostname.clone(), + current_config_snapshot_version, + current_config_root_version, + current_director_root_version, + products: old_products, + new_products, + backend_client_state: self.opaque_backend_state.clone(), + active_clients: vec![remoteconfig::Client { + last_seen: now, + ..c + }], + agent_version: FAKE_AGENT_VERSION.to_owned(), + has_error, + error, + trace_agent_env: String::new(), + org_uuid: String::new(), + tags: vec![], + agent_uuid: String::new(), + }; + let response = match self.get_latest_config(request).await { + Ok(r) => r, + Err(e) => { + self.consecutive_failures = self.consecutive_failures.saturating_add(1); + return Err(e); + } + }; + self.consecutive_failures = 0; + + self.apply(&response).await?; + if !self.initialized { + self.initialized = true; + } + self.products = all_products; + + // TODO: + // In the future we will want to query configs for mutliple clients (for PHP, which can have + // many processes use the same rc client) + // This means we will need to dispatch the different files based on filter predicates + // which we currently do not parse + + Ok(ClientResponse { + root_version: u64::from(self.config_client.database().trusted_root().version()), + target_version: u64::from( + self.config_client + .database() + .trusted_targets() + .ok_or(anyhow::anyhow!("Missing target data"))? + .version(), + ), + opaque_backend_state: &self.opaque_backend_state, + targets: self + .target_cache + .iter() + .map(|(p, t)| ClientTargetResponse { + path: p.as_str(), + version: t.version, + hashes: &t.hashes, + content: t.target_file.as_slice(), + }) + .collect(), + refresh_interval: self.refresh_interval, + }) + } + + /// Query the Remote Config org-status endpoint. + /// + /// # Errors + /// Returns an error if the HTTP request fails or the response cannot be decoded. + pub async fn get_org_status(&self) -> anyhow::Result { + let path = PathAndQuery::from_static("/api/v0.1/status"); + let res = self.send_request(Method::GET, path, Bytes::new()).await?; + parse_rc_response(res) + } + + pub async fn get_org_data(&self) -> anyhow::Result { + let path = PathAndQuery::from_static("/api/v0.1/org"); + let res = self.send_request(Method::GET, path, Bytes::new()).await?; + parse_rc_response(res) + } + + /// Fetch the latest Remote Config for this client. + /// + /// # Errors + /// Returns an error if the HTTP request fails or the response cannot be decoded. + async fn get_latest_config( + &self, + req: remoteconfig::LatestConfigsRequest, + ) -> anyhow::Result { + dbg!(&req); + let path = PathAndQuery::from_static("/api/v0.1/configurations"); + let body = Bytes::from(req.encode_to_vec()); + let res = self.send_request(Method::POST, path, body).await?; + let res = parse_rc_response(res)?; + dbg!(debug_latest_configs_response(&res)); + Ok(res) + } + + #[allow(clippy::future_not_send)] + async fn send_request( + &self, + method: Method, + path: PathAndQuery, + body: Bytes, + ) -> anyhow::Result> { + let req = self + .endpoint + .set_standard_headers( + Request::builder(), + concat!("Libdatadog/", env!("CARGO_PKG_VERSION")), + ) + .header(header::CONTENT_TYPE, "application/x-protobuf") + .uri(url_with_path(self.endpoint.url.clone(), path)?) + .method(method) + .body(body)?; + Ok(self.http.request(req).await?) + } + + async fn apply( + &mut self, + response: &remoteconfig::LatestConfigsResponse, + ) -> anyhow::Result<()> { + // At a high level, we're populating the "remote" repos with the metadata + // that we received from upstream (which does not validate it), and then using the clients' + // `update` methods to synchronize that metadata to the "local" repos, during which + // validation is performed. + + let root_path = MetadataPath::root(); + let timestamp_path = MetadataPath::timestamp(); + let snapshot_path = MetadataPath::snapshot(); + let targets_path = MetadataPath::targets(); + + let repo = self.director_client.remote_repo_mut(); + *repo = TUFRepo::new(); + for target_file in &response.target_files { + let trimmed_path = trim_hash_target_path(&target_file.path)?; + repo.store_target( + &TargetPath::new(&trimmed_path)?, + &mut target_file.raw.as_slice(), + ) + .await?; + } + + let config_repo_mut = self.config_client.remote_repo_mut(); + *config_repo_mut = TUFRepo::new(); + let Some(metas) = response.config_metas.as_ref() else { + bail!("missing config meta from LatestConfigsResponse") + }; + + store(config_repo_mut, &root_path, &metas.roots).await?; + store_noversion(config_repo_mut, ×tamp_path, &metas.timestamp).await?; + store(config_repo_mut, &snapshot_path, &metas.snapshot).await?; + store(config_repo_mut, &targets_path, &metas.top_targets).await?; + // TODO: We do not store the delegated targets metadata + // This will need to be revisited in order to support proper Uptane + // verification of the full configuration data. + // store(repo, &targets_path, &metas.delegated_targets).await?; + + let director_remote_repo = self.director_client.remote_repo_mut(); + let Some(metas) = response.director_metas.as_ref() else { + bail!("missing director meta from LatestConfigsResponse") + }; + + store(director_remote_repo, &root_path, &metas.roots).await?; + store_noversion(director_remote_repo, ×tamp_path, &metas.timestamp).await?; + store(director_remote_repo, &snapshot_path, &metas.snapshot).await?; + store(director_remote_repo, &targets_path, &metas.targets).await?; + + self.config_client.update().await?; + self.director_client.update().await?; + + let mut new_target_path_set = HashSet::new(); + for target in trusted_targets(&self.director_client)? { + new_target_path_set.insert(target.path); + match self.fetch_target(&target).await? { + FetchTargetResult::Cached => {} + FetchTargetResult::New(cached_target) => { + self.target_cache.insert(target.path.clone(), cached_target); + } + } + } + self.target_cache + .retain(|key, _| new_target_path_set.contains(key)); + + // The Remote Config service uses a `custom` field at the top-level of the targets metadata + // to store this field which we are supposed to echo back to the server. That `custom` field + // is not explicitly part of the TUF spec, which is why we need to pull it out of the + // `additional_fields` catch-all here. + if let Some((opaque_backend_state, refresh_interval)) = + get_director_custom(&self.director_client) + { + if let Some(opaque_backend_state) = opaque_backend_state { + self.opaque_backend_state = opaque_backend_state; + } + if let Some(refresh_interval) = refresh_interval { + self.refresh_interval = refresh_interval; + } + } + + Ok(()) + } +} + +const REFRESH_INTERVAL_BOUNDS: RangeInclusive = + Duration::from_secs(1)..=Duration::from_secs(60); + +fn get_director_custom(director_client: &TUFClient) -> Option<(Option>, Option)> { + let custom = director_client + .database() + .trusted_targets()? + .additional_fields() + .get("custom")?; + + Some(( + custom + .get("opaque_backend_state") + .and_then(Value::as_str) + .and_then(|s| base64::engine::general_purpose::STANDARD.decode(s).ok()), + custom + .get("agent_refresh_interval") + .and_then(Value::as_u64) + .map(Duration::from_secs) + // Mirror the agent: silently drop values outside `[1s, 1m]` + .filter(|d| REFRESH_INTERVAL_BOUNDS.contains(d)), + )) +} + +fn url_with_path(base: http::Uri, path: PathAndQuery) -> anyhow::Result { + let mut parts = base.into_parts(); + parts.path_and_query = Some(path); + Ok(http::Uri::from_parts(parts)?) +} + +fn parse_rc_response( + response: http::Response, +) -> anyhow::Result { + let status = response.status().as_u16(); + let body = response.into_body(); + if !(200..300).contains(&status) { + bail!( + "Non 2XX status code: {}\n{}", + status, + String::from_utf8_lossy(&body) + ) + } + + Ok(T::decode(body)?) +} + +/// Compute the backoff delay to wait before the next `fetch_config` attempt, +/// given the number of consecutive failures observed so far. +fn compute_backoff(consecutive_failures: u32) -> Option { + match consecutive_failures { + 0 | 1 => None, + 2 => Some(jitter_secs(30, 60)), + 3 => Some(jitter_secs(60, 120)), + _ => Some(Duration::from_secs(120)), + } +} + +/// Pseudo-random duration in `[min_secs, max_secs]`, derived from the current +/// wall-clock subsecond nanos. This is sufficient for jitter purposes (we do +/// not need cryptographic randomness here, and pulling in a `rand` dependency +/// for one usage is overkill). +fn jitter_secs(min_secs: u64, max_secs: u64) -> Duration { + let (lo, hi) = if min_secs <= max_secs { + (min_secs, max_secs) + } else { + (max_secs, min_secs) + }; + let span = hi.saturating_sub(lo); + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| u64::from(d.subsec_nanos())) + .unwrap_or(0); + let offset = if span == 0 { 0 } else { nanos % (span + 1) }; + Duration::from_secs(lo + offset) +} + +fn now_unix_milli_ts() -> u64 { + u64::try_from( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_millis(), + ) + .unwrap_or(u64::MAX) +} + +/// Return the available, unexpired target paths and their descriptions based on the current +/// metadata. +fn trusted_targets( + director_client: &TUFClient, +) -> anyhow::Result> + '_> { + Ok(director_client + .database() + .trusted_targets() + .ok_or_else(|| format_err!("missing targets from TUF director client"))? + .targets() + .iter() + .filter_map(|(path, desc)| { + BorrowedTufTarget::try_create(path, desc) + .inspect_err(|e| { + debug!(%path, "Skipping target: error {}", e); + }) + .ok() + })) +} + +async fn store<'a, T>(repo: &mut TUFRepo, path: &MetadataPath, tms: T) -> anyhow::Result<()> +where + T: IntoIterator + 'a, +{ + for tm in tms { + repo.store_metadata( + path, + MetadataVersion::Number(tm.version as u32), + &mut tm.raw.as_slice(), + ) + .await?; + } + Ok(()) +} + +async fn store_noversion( + repo: &mut TUFRepo, + path: &MetadataPath, + tms: &Option, +) -> anyhow::Result<()> { + if let Some(tm) = tms { + repo.store_metadata(path, MetadataVersion::None, &mut tm.raw.as_slice()) + .await?; + } + Ok(()) +} + +/// Strip the leading `.` prefix from the basename of a TUF target path. +/// For instance "datadog/2///.config"` => `"datadog/2///config"` +/// +/// See https://datadoghq.atlassian.net/browse/RC-1859 for more information. +fn trim_hash_target_path(target_path: &str) -> anyhow::Result { + let (parent, basename) = target_path + .rsplit_once('/') + .ok_or_else(|| format_err!("invalid target: {target_path}"))?; + if basename.is_empty() { + bail!("invalid target: {target_path}") + } + + // Strip the leading `.` component if present. If the basename + // contains no `.`, keep it as-is (matches the previous behaviour). + let basename_trimmed = basename.split_once('.').map_or(basename, |(_, rest)| rest); + + Ok(format!("{parent}/{basename_trimmed}")) +} + +// ── Debug helpers: render `raw: Vec` fields as JSON ──────────────────── + +struct RawJson<'a>(&'a [u8]); + +impl fmt::Debug for RawJson<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let RawJson(bytes) = self; + match serde_json::from_slice::(bytes) { + Ok(v) => write!(f, "{v:#}"), + Err(_) => write!(f, "<{} non-JSON bytes>", bytes.len()), + } + } +} + +struct DebugTopMeta<'a>(&'a remoteconfig::TopMeta); + +impl fmt::Debug for DebugTopMeta<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::TopMeta { version, raw } = self.0; + f.debug_struct("TopMeta") + .field("version", version) + .field("raw", &RawJson(raw)) + .finish() + } +} + +struct DebugDelegatedMeta<'a>(&'a remoteconfig::DelegatedMeta); + +impl fmt::Debug for DebugDelegatedMeta<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::DelegatedMeta { version, role, raw } = self.0; + f.debug_struct("DelegatedMeta") + .field("version", version) + .field("role", role) + .field("raw", &RawJson(raw)) + .finish() + } +} + +struct DebugFile<'a>(&'a remoteconfig::File); + +impl fmt::Debug for DebugFile<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::File { path, raw } = self.0; + f.debug_struct("File") + .field("path", path) + .field("raw", &RawJson(raw)) + .finish() + } +} + +struct DebugConfigMetas<'a>(&'a remoteconfig::ConfigMetas); + +impl fmt::Debug for DebugConfigMetas<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::ConfigMetas { + roots, + timestamp, + snapshot, + top_targets, + delegated_targets, + } = self.0; + f.debug_struct("ConfigMetas") + .field("roots", &roots.iter().map(DebugTopMeta).collect::>()) + .field("timestamp", ×tamp.as_ref().map(DebugTopMeta)) + .field("snapshot", &snapshot.as_ref().map(DebugTopMeta)) + .field("top_targets", &top_targets.as_ref().map(DebugTopMeta)) + .field( + "delegated_targets", + &delegated_targets + .iter() + .map(DebugDelegatedMeta) + .collect::>(), + ) + .finish() + } +} + +struct DebugDirectorMetas<'a>(&'a remoteconfig::DirectorMetas); + +impl fmt::Debug for DebugDirectorMetas<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::DirectorMetas { + roots, + timestamp, + snapshot, + targets, + } = &self.0; + f.debug_struct("DirectorMetas") + .field("roots", &roots.iter().map(DebugTopMeta).collect::>()) + .field("timestamp", ×tamp.as_ref().map(DebugTopMeta)) + .field("snapshot", &snapshot.as_ref().map(DebugTopMeta)) + .field("targets", &targets.as_ref().map(DebugTopMeta)) + .finish() + } +} + +struct DebugLatestConfigsResponse<'a>(&'a remoteconfig::LatestConfigsResponse); + +impl fmt::Debug for DebugLatestConfigsResponse<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::LatestConfigsResponse { + config_metas, + director_metas, + target_files, + } = &self.0; + f.debug_struct("LatestConfigsResponse") + .field("config_metas", &config_metas.as_ref().map(DebugConfigMetas)) + .field( + "director_metas", + &director_metas.as_ref().map(DebugDirectorMetas), + ) + .field( + "target_files", + &target_files.iter().map(DebugFile).collect::>(), + ) + .finish() + } +} + +/// Returns a value that implements [`fmt::Debug`] for [`remoteconfig::LatestConfigsResponse`], +/// rendering every `raw` byte field as a parsed JSON value instead of a raw byte array. +pub fn debug_latest_configs_response( + resp: &remoteconfig::LatestConfigsResponse, +) -> impl fmt::Debug + '_ { + DebugLatestConfigsResponse(resp) +} + +#[cfg(test)] +mod tests { + use super::trim_hash_target_path; + use super::{ + Site, GOV_CONFIG_ROOT, GOV_CONFIG_ROOT_VERSION, GOV_DIRECTOR_ROOT, + GOV_DIRECTOR_ROOT_VERSION, PROD_CONFIG_ROOT, PROD_CONFIG_ROOT_VERSION, PROD_DIRECTOR_ROOT, + PROD_DIRECTOR_ROOT_VERSION, STAGING_CONFIG_ROOT, STAGING_CONFIG_ROOT_VERSION, + STAGING_DIRECTOR_ROOT, STAGING_DIRECTOR_ROOT_VERSION, + }; + + #[test] + fn strips_hash_prefix() { + assert_eq!( + trim_hash_target_path("datadog/2/APM_TRACING/abcd/deadbeef.config").unwrap(), + "datadog/2/APM_TRACING/abcd/config" + ); + } + + #[test] + fn no_hash_prefix_is_kept() { + assert_eq!( + trim_hash_target_path("datadog/2/APM_TRACING/abcd/config").unwrap(), + "datadog/2/APM_TRACING/abcd/config" + ); + } + + #[test] + fn backslash_is_not_a_separator() { + // Windows-style separators must NOT be treated as path separators. + // The whole string is the basename here. + assert!(trim_hash_target_path(r"datadog\2\foo.bar").is_err()); + } + + #[test] + fn empty_or_no_slash_is_error() { + assert!(trim_hash_target_path("").is_err()); + assert!(trim_hash_target_path("deadbeef.config").is_err()); + } + + #[test] + fn trailing_slash_is_error() { + assert!(trim_hash_target_path("datadog/2/foo/").is_err()); + } + + #[test] + fn test_root_versions_match() { + // Every embedded root's hardcoded version must match the + // `signed.version` field inside the JSON. If you bump a root file, + // bump the matching `_VERSION` constant in this module too. + for (raw, expected) in [ + (PROD_CONFIG_ROOT, PROD_CONFIG_ROOT_VERSION), + (PROD_DIRECTOR_ROOT, PROD_DIRECTOR_ROOT_VERSION), + (STAGING_CONFIG_ROOT, STAGING_CONFIG_ROOT_VERSION), + (STAGING_DIRECTOR_ROOT, STAGING_DIRECTOR_ROOT_VERSION), + (GOV_CONFIG_ROOT, GOV_CONFIG_ROOT_VERSION), + (GOV_DIRECTOR_ROOT, GOV_DIRECTOR_ROOT_VERSION), + ] { + let v: serde_json::Value = serde_json::from_slice(raw).unwrap(); + assert_eq!(v["signed"]["version"], expected); + } + } + + #[test] + fn test_compute_backoff() { + use super::compute_backoff; + use std::time::Duration; + + assert_eq!(compute_backoff(0), None); + assert_eq!(compute_backoff(1), None); + + let b2 = compute_backoff(2).unwrap(); + assert!((Duration::from_secs(30)..=Duration::from_secs(60)).contains(&b2)); + + let b3 = compute_backoff(3).unwrap(); + assert!((Duration::from_secs(60)..=Duration::from_secs(120)).contains(&b3)); + + assert_eq!(compute_backoff(4), Some(Duration::from_secs(120))); + assert_eq!(compute_backoff(42), Some(Duration::from_secs(120))); + } + + #[test] + fn test_site_from_host() { + assert_eq!(Site::from_host("config.datadoghq.com"), Site::Prod); + assert_eq!(Site::from_host("config.us3.datadoghq.com"), Site::Prod); + assert_eq!(Site::from_host("config.datadoghq.eu"), Site::Prod); + assert_eq!(Site::from_host("config.datad0g.com"), Site::Staging); + assert_eq!(Site::from_host("datad0g.com"), Site::Staging); + assert_eq!(Site::from_host("config.ddog-gov.com"), Site::Gov); + assert_eq!(Site::from_host("config.foo.ddog-gov.com"), Site::Gov); + } +} diff --git a/libdd-remote-config/src/fetch/fetcher.rs b/libdd-remote-config/src/fetch/fetcher.rs index ffa55def0e..3b3b4e2d56 100644 --- a/libdd-remote-config/src/fetch/fetcher.rs +++ b/libdd-remote-config/src/fetch/fetcher.rs @@ -1,6 +1,10 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::agentless_client::{ + self, make_agentless_configs_endpoint, AgentlessConfig, AgentlessFetcher, + NativeAgentlessFetcher, +}; use crate::targets::{Root, TargetsList}; use crate::{RemoteConfigCapabilities, RemoteConfigPath, RemoteConfigProduct, Target}; use base64::Engine; @@ -51,6 +55,15 @@ pub struct ConfigInvariants { pub language: String, pub tracer_version: String, pub endpoint: Endpoint, + /// Enables and configures agentless mode. If some the fetcher will + /// talk directly to the RC backend + pub agentless: Option, +} + +impl ConfigInvariants { + pub fn agentless_enabled(&self) -> bool { + self.agentless.is_some() + } } struct StoredTargetFile { @@ -154,10 +167,30 @@ impl ConfigFetcherFilesLock<'_, S> { impl ConfigFetcherState { pub fn new(invariants: ConfigInvariants) -> Self { + let (endpoint, agentless) = match &invariants.agentless { + Some(agentless_cfg) => match ( + make_agentless_configs_endpoint(&invariants.endpoint), + agentless_cfg.hostname.is_empty(), + ) { + (Some(e), false) => (e, Some(agentless_cfg.clone())), + (Some(_), true) => { + warn!("rc_config_fetcher: agentless enabled but the hostname is empty. Downgrading to agent endpoint"); + (make_agent_configs_endpoint(&invariants.endpoint), None) + } + (None, _) => { + warn!("rc_config_fetcher: agentless enabled but the endpoint is invalid. Downgrading to agent endpoint"); + (make_agent_configs_endpoint(&invariants.endpoint), None) + } + }, + None => (make_agent_configs_endpoint(&invariants.endpoint), None), + }; ConfigFetcherState { target_files_by_path: Default::default(), - endpoint: get_agent_configs_endpoint(&invariants.endpoint), - invariants, + endpoint, + invariants: ConfigInvariants { + agentless, + ..invariants + }, expire_unused_files: true, } } @@ -201,9 +234,16 @@ impl ConfigFetcherState { } } +#[allow(clippy::large_enum_variant)] +enum FetcherMode { + Agent, + Agentless(NativeAgentlessFetcher), +} + pub struct ConfigFetcher { pub file_storage: S, state: Arc>, + mode: FetcherMode, } pub struct ConfigClientState { @@ -215,6 +255,8 @@ pub struct ConfigClientState { /// Services discovered at runtime. Sent to the agent on each poll so it can route configs /// targeting those services to this client. Updated out-of-band by the consumer extra_services: Vec, + /// Server-recommended interval between consecutive polls. + refresh_interval: Option, } impl Default for ConfigClientState { @@ -226,6 +268,7 @@ impl Default for ConfigClientState { root_version: 1, last_error: None, extra_services: vec![], + refresh_interval: None, } } } @@ -234,14 +277,32 @@ impl ConfigClientState { pub fn set_extra_services(&mut self, services: Vec) { self.extra_services = services; } + + pub fn server_recommended_refresh_interval(&self) -> Option { + self.refresh_interval + } } impl ConfigFetcher { - pub fn new(file_storage: S, state: Arc>) -> Self { - ConfigFetcher { + /// Create a new config fetcher + /// This is guaranteed to be immediate (no await point) if `state.invariants.agentless_enabled` + /// is false + pub async fn new( + file_storage: S, + state: Arc>, + ) -> anyhow::Result { + let mode: FetcherMode = match &state.invariants.agentless { + Some(agentless_cfg) => FetcherMode::Agentless( + AgentlessFetcher::new(agentless_cfg.clone(), state.endpoint.clone()).await?, + ), + None => FetcherMode::Agent, + }; + + Ok(ConfigFetcher { file_storage, state, - } + mode, + }) } /// Sets the apply state on a stored file. @@ -317,39 +378,20 @@ impl ConfigFetcher { client_agent: None, last_seen: 0, capabilities: product_capabilities.encoded_capabilities.clone(), + is_updater: false, + client_updater: None, }), cached_target_files, } } - /// Quite generic fetching implementation: - /// - runs a request against the Remote Config Server, - /// - validates the data, - /// - removes unused files - /// - checks if the files are already known, - /// - stores new files, - /// - returns all currently active files. - /// - /// It also makes sure that old files are dropped before new files are inserted. - /// - /// Returns None if nothing changed. Otherwise Some(active configs). - pub async fn fetch_once( + async fn fetch_agent( &mut self, - runtime_id: &str, + config_req: ClientGetConfigsRequest, target: &Target, - product_capabilities: &ConfigProductCapabilities, - client_id: &str, client_state: &mut ConfigClientState, ) -> anyhow::Result>>> { - let config_req = self.build_config_request( - runtime_id, - target, - product_capabilities, - client_id, - &*client_state, - ); trace!("Submitting remote config request: {config_req:?}"); - let req = self .state .endpoint @@ -555,9 +597,192 @@ impl ConfigFetcher { client_state.last_config_paths = config_paths; Ok(Some(configs)) } + + /// Quite generic fetching implementation: + /// - runs a request against the Remote Config Server, + /// - validates the data, + /// - removes unused files + /// - checks if the files are already known, + /// - stores new files, + /// - returns all currently active files. + /// + /// It also makes sure that old files are dropped before new files are inserted. + /// + /// Returns None if nothing changed. Otherwise Some(active configs). + pub async fn fetch_once( + &mut self, + runtime_id: &str, + target: &Target, + product_capabilities: &ConfigProductCapabilities, + client_id: &str, + client_state: &mut ConfigClientState, + ) -> anyhow::Result>>> { + let config_req = self.build_config_request( + runtime_id, + target, + product_capabilities, + client_id, + &*client_state, + ); + match &mut self.mode { + FetcherMode::Agent => self.fetch_agent(config_req, target, client_state).await, + FetcherMode::Agentless(agentless_fetcher) => { + #[allow(clippy::expect_used)] + let client = config_req.client.expect( + "RC ConfigFetcher::build_config_request should always return a `Some` client", + ); + // Capture errors into `client_state.last_error` so the next + // call propagates `has_error` / `error` to the backend. + let res = match agentless_fetcher.fetch_config(client).await { + Ok(res) => res, + Err(e) => { + client_state.last_error = Some(format!("{e:#}")); + // Surface the recommended backoff to the consumer of + // `ConfigClientState::server_recommended_refresh_interval` + // so it waits before the next attempt. `None` means + // "no extra backoff, use the regular interval". + if let Some(backoff) = agentless_fetcher.next_backoff() { + client_state.refresh_interval = Some(backoff); + } + return Err(e); + } + }; + + client_state.root_version = res.root_version; + client_state.targets_version = res.target_version; + client_state.refresh_interval = Some(res.refresh_interval); + if res.opaque_backend_state != client_state.opaque_backend_state { + client_state.opaque_backend_state = res.opaque_backend_state.to_vec(); + } + client_state.last_error = None; + + let mut target_files = self.state.target_files_by_path.lock_or_panic(); + let mut config_paths = HashSet::new(); + for &agentless_client::ClientTargetResponse { path, .. } in &res.targets { + match RemoteConfigPath::try_parse(path) { + Ok(parsed) => { + config_paths.insert(parsed.into()); + } + Err(e) => warn!("Failed parsing remote config path: {path} - {e:?}"), + } + } + + if self.state.expire_unused_files { + target_files.retain(|k, _| config_paths.contains(k.as_ref())); + } + + for agentless_client::ClientTargetResponse { + path, + content: target_file, + version, + hashes, + } in res.targets + { + let parsed_path = match RemoteConfigPath::try_parse(path) { + Ok(parsed_path) => parsed_path, + Err(e) => { + warn!("Failed parsing remote config path: {path} - {e:?}"); + continue; + } + }; + let Some((_, hash)) = hashes + .iter() + .find(|(h, _)| *h == &tuf::crypto::HashAlgorithm::Sha256) + .or_else(|| { + hashes + .iter() + .find(|(h, _)| *h == &tuf::crypto::HashAlgorithm::Sha512) + }) + else { + // todo no supported hash algorithm? + continue; + }; + let hash = hash.to_string(); + + let handle = if let Some(StoredTargetFile { + hash: old_hash, + handle, + .. + }) = target_files.get(&parsed_path) + { + if old_hash == &hash { + continue; + } + Some(handle.clone()) + } else { + None + }; + + let parsed_path: Arc = Arc::new(parsed_path.into()); + target_files.insert( + parsed_path.clone(), + StoredTargetFile { + hash, + state: ConfigState { + id: parsed_path.config_id.to_string(), + version, + product: parsed_path.product.to_string(), + apply_state: 2, // Acknowledged + apply_error: "".to_string(), + }, + meta: TargetFileMeta { + path: path.to_string(), + length: target_file.len() as i64, + hashes: hashes + .iter() + .map(|(algorithm, hash)| { + Ok(TargetFileHash { + algorithm: match algorithm { + tuf::crypto::HashAlgorithm::Sha256 => { + "sha256".to_string() + } + tuf::crypto::HashAlgorithm::Sha512 => { + "sha512".to_string() + } + tuf::crypto::HashAlgorithm::Unknown(u) => u.clone(), + a => { + anyhow::bail!("unhandled hash algorithm: {a:?}") + } + }, + hash: hash.to_string(), + }) + }) + .collect::>()?, + }, + handle: if let Some(handle) = handle { + self.file_storage + .update(&handle, version, target_file.to_vec())?; + handle + } else { + self.file_storage.store( + version, + parsed_path, + target_file.to_vec(), + )? + }, + expiring: false, + }, + ); + } + let mut configs = Vec::with_capacity(config_paths.len()); + for config in config_paths.iter() { + if let Some(target_file) = target_files.get_mut(config) { + target_file.expiring = false; + configs.push(target_file.handle.clone()); + } else { + anyhow::bail!( + "Found {config} in client_configs response, but it isn't stored." + ); + } + } + client_state.last_config_paths = config_paths; + Ok(Some(configs)) + } + } + } } -fn get_agent_configs_endpoint(endpoint: &Endpoint) -> Endpoint { +fn make_agent_configs_endpoint(endpoint: &Endpoint) -> Endpoint { let mut parts = endpoint.url.clone().into_parts(); parts.path_and_query = Some(PathAndQuery::from_static("/v0.7/config")); #[allow(clippy::unwrap_used)] @@ -689,7 +914,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage.clone(), Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); let mut response = http_common::empty_response(Response::builder()).unwrap(); @@ -727,6 +954,7 @@ pub mod tests { language: "php".to_string(), tracer_version: "1.2.3".to_string(), endpoint: server.endpoint.clone(), + agentless: None, }; let product_capabilities = ConfigProductCapabilities::new( vec![ @@ -739,7 +967,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage.clone(), Arc::new(ConfigFetcherState::new(invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); { @@ -923,7 +1153,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage, Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); // Default: nothing set, agent receives an empty list. @@ -1021,7 +1253,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage, Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); let fetched = fetcher diff --git a/libdd-remote-config/src/fetch/shared.rs b/libdd-remote-config/src/fetch/shared.rs index baf70b2c46..8153aa8899 100644 --- a/libdd-remote-config/src/fetch/shared.rs +++ b/libdd-remote-config/src/fetch/shared.rs @@ -275,7 +275,13 @@ impl SharedFetcher { S::StoredFile: RefcountedFile, { let state = storage.state.clone(); - let mut fetcher = ConfigFetcher::new(storage, state); + let mut fetcher = match ConfigFetcher::new(storage, state).await { + Ok(f) => f, + Err(e) => { + error!("failed to create the fetcher: {:?}", e); + return; + } + }; let mut opaque_state = ConfigClientState::default(); @@ -314,7 +320,9 @@ impl SharedFetcher { }; match fetched { - Ok(None) => clean_inactive(), // nothing changed + Ok(None) => { + clean_inactive(); + } Ok(Some(files)) => { if !files.is_empty() || !last_files.is_empty() { for file in files.iter() { @@ -349,6 +357,12 @@ impl SharedFetcher { } } + if let Some(interval) = opaque_state.server_recommended_refresh_interval() { + // Keep the run-loop interval in sync with the server-provided value + self.interval + .store(interval.as_nanos() as u64, Ordering::Relaxed); + } + select! { _ = self.cancellation.cancelled() => { break; } _ = sleep(Duration::from_nanos(self.interval.load(Ordering::Relaxed))) => {} diff --git a/libdd-remote-config/src/fetch/single.rs b/libdd-remote-config/src/fetch/single.rs index 3d48c69183..4e191591af 100644 --- a/libdd-remote-config/src/fetch/single.rs +++ b/libdd-remote-config/src/fetch/single.rs @@ -8,6 +8,9 @@ use crate::fetch::{ use crate::file_change_tracker::{Change, ChangeTracker, FilePath, UpdatedFiles}; use crate::{RemoteConfigCapabilities, RemoteConfigPath, RemoteConfigProduct, Target}; use std::sync::Arc; +use std::time::Duration; + +const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(5); /// Simple implementation pub struct SingleFetcher { @@ -16,7 +19,7 @@ pub struct SingleFetcher { product_capabilities: ConfigProductCapabilities, runtime_id: String, client_id: String, - opaque_state: ConfigClientState, + client_state: ConfigClientState, } #[derive(Clone, Debug)] @@ -27,12 +30,40 @@ pub struct ConfigOptions { } impl SingleFetcher { - pub fn new(sink: S, target: Target, runtime_id: String, options: ConfigOptions) -> Self { - SingleFetcher { + pub async fn new( + sink: S, + target: Target, + runtime_id: String, + options: ConfigOptions, + ) -> anyhow::Result { + Ok(SingleFetcher { fetcher: ConfigFetcher::new( sink, Arc::new(ConfigFetcherState::new(options.invariants)), + ) + .await?, + target: Arc::new(target), + product_capabilities: ConfigProductCapabilities::new( + options.products, + options.capabilities, ), + runtime_id, + client_id: uuid::Uuid::new_v4().to_string(), + client_state: ConfigClientState::default(), + }) + } + + pub fn new_no_agentless( + sink: S, + target: Target, + runtime_id: String, + options: ConfigOptions, + ) -> anyhow::Result { + Ok(SingleFetcher { + fetcher: futures::executor::block_on(ConfigFetcher::new( + sink, + Arc::new(ConfigFetcherState::new(options.invariants)), + ))?, target: Arc::new(target), product_capabilities: ConfigProductCapabilities::new( options.products, @@ -40,8 +71,8 @@ impl SingleFetcher { ), runtime_id, client_id: uuid::Uuid::new_v4().to_string(), - opaque_state: ConfigClientState::default(), - } + client_state: ConfigClientState::default(), + }) } pub fn with_client_id(mut self, client_id: String) -> Self { @@ -57,15 +88,24 @@ impl SingleFetcher { &self.target, &self.product_capabilities, self.client_id.as_str(), - &mut self.opaque_state, + &mut self.client_state, ) .await } - pub fn get_client_id(&self) -> &String { + pub fn get_client_id(&self) -> &str { &self.client_id } + /// Returns the server-recommended interval before the next poll. + /// In agentless mode this is updated after every successful fetch. + /// In agent mode it returns the default of 5 seconds. + pub fn get_refresh_interval(&self) -> Duration { + self.client_state + .server_recommended_refresh_interval() + .unwrap_or(DEFAULT_REFRESH_INTERVAL) + } + /// Sets the apply state on a stored file. pub fn set_config_state(&self, file: &RemoteConfigPath, state: ConfigApplyState) { self.fetcher.set_config_state(file, state) @@ -75,7 +115,7 @@ impl SingleFetcher { /// Sent to the agent on each subsequent poll so it can route configs targeting those /// services to this client. Replace-semantics: the new vec fully overrides the previous one. pub fn set_extra_services(&mut self, services: Vec) { - self.opaque_state.set_extra_services(services); + self.client_state.set_extra_services(services); } } @@ -91,11 +131,28 @@ impl SingleChangesFetcher where S::StoredFile: FilePath, { - pub fn new(sink: S, target: Target, runtime_id: String, options: ConfigOptions) -> Self { - SingleChangesFetcher { + pub async fn new( + sink: S, + target: Target, + runtime_id: String, + options: ConfigOptions, + ) -> anyhow::Result { + Ok(SingleChangesFetcher { changes: ChangeTracker::default(), - fetcher: SingleFetcher::new(sink, target, runtime_id, options), - } + fetcher: SingleFetcher::new(sink, target, runtime_id, options).await?, + }) + } + + pub fn new_no_agentless( + sink: S, + target: Target, + runtime_id: String, + options: ConfigOptions, + ) -> anyhow::Result { + Ok(SingleChangesFetcher { + changes: ChangeTracker::default(), + fetcher: SingleFetcher::new_no_agentless(sink, target, runtime_id, options)?, + }) } pub fn with_client_id(mut self, client_id: String) -> Self { @@ -116,10 +173,16 @@ where }) } - pub fn get_client_id(&self) -> &String { + pub fn get_client_id(&self) -> &str { self.fetcher.get_client_id() } + /// Returns the interval before the next poll. + /// See [`SingleFetcher::get_refresh_interval`]. + pub fn get_refresh_interval(&self) -> Duration { + self.fetcher.get_refresh_interval() + } + /// Sets the apply state on a stored file. pub fn set_config_state(&self, file: &S::StoredFile, state: ConfigApplyState) { self.fetcher.set_config_state(file.path(), state) diff --git a/libdd-remote-config/src/fetch/test_server.rs b/libdd-remote-config/src/fetch/test_server.rs index 1ed16ad0f3..cc2d5e751c 100644 --- a/libdd-remote-config/src/fetch/test_server.rs +++ b/libdd-remote-config/src/fetch/test_server.rs @@ -155,6 +155,7 @@ impl RemoteConfigServer { }) .collect(), client_configs: applied_files.keys().map(|k| k.to_string()).collect(), + config_status: 0, }; Response::new(http_common::Body::from( serde_json::to_vec(&response).unwrap(), @@ -215,6 +216,7 @@ impl RemoteConfigServer { language: "php".to_string(), tracer_version: "1.2.3".to_string(), endpoint: self.endpoint.clone(), + agentless: None, }, products: vec![ RemoteConfigProduct::ApmTracing, diff --git a/libdd-remote-config/src/lib.rs b/libdd-remote-config/src/lib.rs index 462362d160..01ca7c4fc7 100644 --- a/libdd-remote-config/src/lib.rs +++ b/libdd-remote-config/src/lib.rs @@ -7,6 +7,8 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] +pub mod agentless_client; + pub mod config; #[cfg(feature = "client")] pub mod fetch; diff --git a/libdd-trace-protobuf/build.rs b/libdd-trace-protobuf/build.rs index c9c891a681..a8a10fbee1 100644 --- a/libdd-trace-protobuf/build.rs +++ b/libdd-trace-protobuf/build.rs @@ -308,6 +308,11 @@ fn generate_protobuf() { "#[serde(default)]", ); + config.type_attribute("ClientUpdater", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("PackageState", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("PackageStateTask", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("TaskError", "#[derive(Deserialize, Serialize)]"); + config.include_file("_includes.rs"); config diff --git a/libdd-trace-protobuf/src/pb/remoteconfig.proto b/libdd-trace-protobuf/src/pb/remoteconfig.proto index 606bc851e7..cae7be854e 100644 --- a/libdd-trace-protobuf/src/pb/remoteconfig.proto +++ b/libdd-trace-protobuf/src/pb/remoteconfig.proto @@ -6,11 +6,73 @@ option go_package = "pkg/proto/pbgo/core"; // golang // Backend definitions +message ConfigMetas { + repeated TopMeta roots = 1; + TopMeta timestamp = 2; + TopMeta snapshot = 3; + TopMeta topTargets = 4; + repeated DelegatedMeta delegatedTargets = 5; +} + +message DirectorMetas { + repeated TopMeta roots = 1; + TopMeta timestamp = 2; + TopMeta snapshot = 3; + TopMeta targets = 4; +} + +message DelegatedMeta { + uint64 version = 1; + string role = 2; + bytes raw = 3; +} + +message TopMeta { + uint64 version = 1; + bytes raw = 2; +} + message File { string path = 1; bytes raw = 2; } +// Backend queries + +message LatestConfigsRequest { + string hostname = 1; + string agentVersion = 2; + // timestamp and snapshot versions move in tandem so they are the same. + uint64 current_config_snapshot_version = 3; + uint64 current_config_root_version = 9; + uint64 current_director_root_version = 8; + repeated string products = 4; + repeated string new_products = 5; + repeated Client active_clients = 6; + bytes backend_client_state = 10; + bool has_error = 11; + string error = 12; + string trace_agent_env = 13; + string org_uuid = 14; + repeated string tags = 15; + string agent_uuid = 16; +} + +message LatestConfigsResponse { + ConfigMetas config_metas = 1; + DirectorMetas director_metas = 2; + repeated File target_files = 3; +} + +message OrgDataResponse { + string uuid = 1; +} + +message OrgStatusResponse { + bool enabled = 1; + bool authorized = 2; +} + // Client definitions message Client { @@ -24,6 +86,9 @@ message Client { ClientAgent client_agent = 9; uint64 last_seen = 10; bytes capabilities = 11; + reserved 12, 13; + bool is_updater = 14; + ClientUpdater client_updater = 15; } message ClientTracer { @@ -47,6 +112,46 @@ message ClientAgent { repeated string cws_workloads = 5; } +message ClientUpdater { + repeated string tags = 1; + repeated PackageState packages = 2; + uint64 available_disk_space = 3; + string secrets_pub_key = 4; +} + +message PackageState { + string package = 1; + string stable_version = 2; + string experiment_version = 3; + PackageStateTask task = 4; + reserved 5, 6, 7, 8, 9, 10; + string stable_config_version = 11; + string experiment_config_version = 12; + string running_version = 13; + string running_config_version = 14; + uint64 heartbeat_timestamp = 15; + float completion = 16; +} + +message PackageStateTask { + string id = 1; + TaskState state = 2; + TaskError error = 3; +} + +enum TaskState { + IDLE = 0; + RUNNING = 1; + DONE = 2; + INVALID_STATE = 3; + ERROR = 4; +} + +message TaskError { + uint64 code = 1; + string message = 2; +} + message ConfigState { string id = 1; uint64 version = 2; @@ -78,14 +183,119 @@ message TargetFileMeta { repeated TargetFileHash hashes = 3; } +// ConfigSubscriptionProducts is used to targets specific products for tracking +// with a ConfigSubscriptionRequest. +enum ConfigSubscriptionProducts { + INVALID = 0; + + // LIVE_DEBUGGING corresponds to the LIVE_DEBUGING and LIVE_DEBUGGING_SYMBOLDB + // products. + LIVE_DEBUGGING = 1; +} + +// ConfigSubscriptionRequest is used to manage the state of the stream created +// using CreateConfigSubscription. +message ConfigSubscriptionRequest { + + enum Action { + INVALID = 0; + TRACK = 1; + UNTRACK = 2; + } + + + // RuntimeID of the client to track or untrack. + string runtime_id = 1; + + // Action indicates the action to take for the client with the given + // runtime_id. + Action action = 2; + + // If action is TRACK, products indicates the set of products for which the + // client is interested in receiving updates. + ConfigSubscriptionProducts products = 3; +} + +// ConfigSubscriptionResponse is streamed from CreateConfigSubscription with +// updates for matching clients and products. +message ConfigSubscriptionResponse { + + // Client is the client that was tracked or untracked. + Client client = 1; + + // Matched configs are all configs that were matched for the client given + // the subscription request. + // + // If a previously reported config is no longer matched, it will not be + // included in the response. + repeated string matched_configs = 2; + + // Target files are the target files that needs to be sent to the client. + repeated File target_files = 3; +} + message ClientGetConfigsRequest { Client client = 1; repeated TargetFileMeta cached_target_files = 2; } +enum ConfigStatus { + CONFIG_STATUS_OK = 0; + CONFIG_STATUS_EXPIRED = 1; +} + message ClientGetConfigsResponse { repeated bytes roots = 1; bytes targets = 2; repeated File target_files = 3; repeated string client_configs = 4; + ConfigStatus config_status = 5; +} + +// Full state + +message FileMetaState { + uint64 version = 1; + string hash = 2; +} + +message GetStateConfigResponse { + map config_state = 1; + map director_state = 2; + map target_filenames = 3; + repeated Client active_clients = 4; + repeated ConfigSubscriptionState config_subscription_states = 5; +} + +// ConfigSubscriptionState describes the state of a config subscription. +message ConfigSubscriptionState { + message TrackedClient { + string runtime_id = 1; + bool seen_any = 2; + ConfigSubscriptionProducts products = 3; + } + + // SubscriptionID is a process-unique identifier for the subscription. + uint64 subscription_id = 1; + + // TrackedClients is the list of clients that are currently tracked by the + // subscription. + repeated TrackedClient tracked_clients = 2; +} + +message ResetStateConfigResponse {} + + +message TracerPredicateV1 { + string clientID = 1; + string service = 2; + string environment = 3; + string appVersion = 4; + string tracerVersion = 5; + string language = 6; + string runtimeID = 7; +} + +message TracerPredicates { + repeated TracerPredicateV1 tracer_predicates_v1 = 1; } diff --git a/libdd-trace-protobuf/src/remoteconfig.rs b/libdd-trace-protobuf/src/remoteconfig.rs index 9e0a62494a..894da87077 100644 --- a/libdd-trace-protobuf/src/remoteconfig.rs +++ b/libdd-trace-protobuf/src/remoteconfig.rs @@ -3,6 +3,46 @@ use serde::{Deserialize, Serialize}; // This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfigMetas { + #[prost(message, repeated, tag = "1")] + pub roots: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub timestamp: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub snapshot: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub top_targets: ::core::option::Option, + #[prost(message, repeated, tag = "5")] + pub delegated_targets: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DirectorMetas { + #[prost(message, repeated, tag = "1")] + pub roots: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub timestamp: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub snapshot: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub targets: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DelegatedMeta { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(string, tag = "2")] + pub role: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "3")] + pub raw: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TopMeta { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(bytes = "vec", tag = "2")] + pub raw: ::prost::alloc::vec::Vec, +} #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct File { @@ -12,6 +52,61 @@ pub struct File { #[serde(with = "serde_bytes")] pub raw: ::prost::alloc::vec::Vec, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LatestConfigsRequest { + #[prost(string, tag = "1")] + pub hostname: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub agent_version: ::prost::alloc::string::String, + /// timestamp and snapshot versions move in tandem so they are the same. + #[prost(uint64, tag = "3")] + pub current_config_snapshot_version: u64, + #[prost(uint64, tag = "9")] + pub current_config_root_version: u64, + #[prost(uint64, tag = "8")] + pub current_director_root_version: u64, + #[prost(string, repeated, tag = "4")] + pub products: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, repeated, tag = "5")] + pub new_products: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, repeated, tag = "6")] + pub active_clients: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "10")] + pub backend_client_state: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "11")] + pub has_error: bool, + #[prost(string, tag = "12")] + pub error: ::prost::alloc::string::String, + #[prost(string, tag = "13")] + pub trace_agent_env: ::prost::alloc::string::String, + #[prost(string, tag = "14")] + pub org_uuid: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "15")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, tag = "16")] + pub agent_uuid: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LatestConfigsResponse { + #[prost(message, optional, tag = "1")] + pub config_metas: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub director_metas: ::core::option::Option, + #[prost(message, repeated, tag = "3")] + pub target_files: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct OrgDataResponse { + #[prost(string, tag = "1")] + pub uuid: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct OrgStatusResponse { + #[prost(bool, tag = "1")] + pub enabled: bool, + #[prost(bool, tag = "2")] + pub authorized: bool, +} #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Client { @@ -34,6 +129,10 @@ pub struct Client { pub last_seen: u64, #[prost(bytes = "vec", tag = "11")] pub capabilities: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "14")] + pub is_updater: bool, + #[prost(message, optional, tag = "15")] + pub client_updater: ::core::option::Option, } #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -74,6 +173,60 @@ pub struct ClientAgent { pub cws_workloads: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ClientUpdater { + #[prost(string, repeated, tag = "1")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, repeated, tag = "2")] + pub packages: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub available_disk_space: u64, + #[prost(string, tag = "4")] + pub secrets_pub_key: ::prost::alloc::string::String, +} +#[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PackageState { + #[prost(string, tag = "1")] + pub package: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub stable_version: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub experiment_version: ::prost::alloc::string::String, + #[prost(message, optional, tag = "4")] + pub task: ::core::option::Option, + #[prost(string, tag = "11")] + pub stable_config_version: ::prost::alloc::string::String, + #[prost(string, tag = "12")] + pub experiment_config_version: ::prost::alloc::string::String, + #[prost(string, tag = "13")] + pub running_version: ::prost::alloc::string::String, + #[prost(string, tag = "14")] + pub running_config_version: ::prost::alloc::string::String, + #[prost(uint64, tag = "15")] + pub heartbeat_timestamp: u64, + #[prost(float, tag = "16")] + pub completion: f32, +} +#[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PackageStateTask { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(enumeration = "TaskState", tag = "2")] + pub state: i32, + #[prost(message, optional, tag = "3")] + pub error: ::core::option::Option, +} +#[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TaskError { + #[prost(uint64, tag = "1")] + pub code: u64, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, +} +#[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ConfigState { #[prost(string, tag = "1")] @@ -121,6 +274,82 @@ pub struct TargetFileMeta { #[prost(message, repeated, tag = "3")] pub hashes: ::prost::alloc::vec::Vec, } +/// ConfigSubscriptionRequest is used to manage the state of the stream created +/// using CreateConfigSubscription. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ConfigSubscriptionRequest { + /// RuntimeID of the client to track or untrack. + #[prost(string, tag = "1")] + pub runtime_id: ::prost::alloc::string::String, + /// Action indicates the action to take for the client with the given + /// runtime_id. + #[prost(enumeration = "config_subscription_request::Action", tag = "2")] + pub action: i32, + /// If action is TRACK, products indicates the set of products for which the + /// client is interested in receiving updates. + #[prost(enumeration = "ConfigSubscriptionProducts", tag = "3")] + pub products: i32, +} +/// Nested message and enum types in `ConfigSubscriptionRequest`. +pub mod config_subscription_request { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Action { + Invalid = 0, + Track = 1, + Untrack = 2, + } + impl Action { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Invalid => "INVALID", + Self::Track => "TRACK", + Self::Untrack => "UNTRACK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INVALID" => Some(Self::Invalid), + "TRACK" => Some(Self::Track), + "UNTRACK" => Some(Self::Untrack), + _ => None, + } + } + } +} +/// ConfigSubscriptionResponse is streamed from CreateConfigSubscription with +/// updates for matching clients and products. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfigSubscriptionResponse { + /// Client is the client that was tracked or untracked. + #[prost(message, optional, tag = "1")] + pub client: ::core::option::Option, + /// Matched configs are all configs that were matched for the client given + /// the subscription request. + /// + /// If a previously reported config is no longer matched, it will not be + /// included in the response. + #[prost(string, repeated, tag = "2")] + pub matched_configs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Target files are the target files that needs to be sent to the client. + #[prost(message, repeated, tag = "3")] + pub target_files: ::prost::alloc::vec::Vec, +} #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ClientGetConfigsRequest { @@ -146,4 +375,175 @@ pub struct ClientGetConfigsResponse { #[prost(string, repeated, tag = "4")] #[serde(default)] pub client_configs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(enumeration = "ConfigStatus", tag = "5")] + pub config_status: i32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FileMetaState { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(string, tag = "2")] + pub hash: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetStateConfigResponse { + #[prost(map = "string, message", tag = "1")] + pub config_state: ::std::collections::HashMap< + ::prost::alloc::string::String, + FileMetaState, + >, + #[prost(map = "string, message", tag = "2")] + pub director_state: ::std::collections::HashMap< + ::prost::alloc::string::String, + FileMetaState, + >, + #[prost(map = "string, string", tag = "3")] + pub target_filenames: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + #[prost(message, repeated, tag = "4")] + pub active_clients: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "5")] + pub config_subscription_states: ::prost::alloc::vec::Vec, +} +/// ConfigSubscriptionState describes the state of a config subscription. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfigSubscriptionState { + /// SubscriptionID is a process-unique identifier for the subscription. + #[prost(uint64, tag = "1")] + pub subscription_id: u64, + /// TrackedClients is the list of clients that are currently tracked by the + /// subscription. + #[prost(message, repeated, tag = "2")] + pub tracked_clients: ::prost::alloc::vec::Vec< + config_subscription_state::TrackedClient, + >, +} +/// Nested message and enum types in `ConfigSubscriptionState`. +pub mod config_subscription_state { + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] + pub struct TrackedClient { + #[prost(string, tag = "1")] + pub runtime_id: ::prost::alloc::string::String, + #[prost(bool, tag = "2")] + pub seen_any: bool, + #[prost(enumeration = "super::ConfigSubscriptionProducts", tag = "3")] + pub products: i32, + } +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ResetStateConfigResponse {} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TracerPredicateV1 { + #[prost(string, tag = "1")] + pub client_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub service: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub environment: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub app_version: ::prost::alloc::string::String, + #[prost(string, tag = "5")] + pub tracer_version: ::prost::alloc::string::String, + #[prost(string, tag = "6")] + pub language: ::prost::alloc::string::String, + #[prost(string, tag = "7")] + pub runtime_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TracerPredicates { + #[prost(message, repeated, tag = "1")] + pub tracer_predicates_v1: ::prost::alloc::vec::Vec, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum TaskState { + Idle = 0, + Running = 1, + Done = 2, + InvalidState = 3, + Error = 4, +} +impl TaskState { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Idle => "IDLE", + Self::Running => "RUNNING", + Self::Done => "DONE", + Self::InvalidState => "INVALID_STATE", + Self::Error => "ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "IDLE" => Some(Self::Idle), + "RUNNING" => Some(Self::Running), + "DONE" => Some(Self::Done), + "INVALID_STATE" => Some(Self::InvalidState), + "ERROR" => Some(Self::Error), + _ => None, + } + } +} +/// ConfigSubscriptionProducts is used to targets specific products for tracking +/// with a ConfigSubscriptionRequest. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ConfigSubscriptionProducts { + Invalid = 0, + /// LIVE_DEBUGGING corresponds to the LIVE_DEBUGING and LIVE_DEBUGGING_SYMBOLDB + /// products. + LiveDebugging = 1, +} +impl ConfigSubscriptionProducts { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Invalid => "INVALID", + Self::LiveDebugging => "LIVE_DEBUGGING", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INVALID" => Some(Self::Invalid), + "LIVE_DEBUGGING" => Some(Self::LiveDebugging), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ConfigStatus { + Ok = 0, + Expired = 1, +} +impl ConfigStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Ok => "CONFIG_STATUS_OK", + Self::Expired => "CONFIG_STATUS_EXPIRED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CONFIG_STATUS_OK" => Some(Self::Ok), + "CONFIG_STATUS_EXPIRED" => Some(Self::Expired), + _ => None, + } + } } diff --git a/libdd-tracer-flare/src/lib.rs b/libdd-tracer-flare/src/lib.rs index 3ff667326f..af3a489e06 100644 --- a/libdd-tracer-flare/src/lib.rs +++ b/libdd-tracer-flare/src/lib.rs @@ -181,6 +181,7 @@ impl TracerFlareManager { language, tracer_version, endpoint: remote_config_endpoint, + agentless: None, }, products: vec![ RemoteConfigProduct::AgentConfig, @@ -189,18 +190,21 @@ impl TracerFlareManager { capabilities: vec![], }; - tracer_flare.listener = Some(SingleChangesFetcher::new( - ParsedFileStorage::default(), - Target { - service, - env, - app_version, - tags: vec![], - process_tags: vec![], - }, - runtime_id, - config_to_fetch, - )); + tracer_flare.listener = Some( + SingleChangesFetcher::new_no_agentless( + ParsedFileStorage::default(), + Target { + service, + env, + app_version, + tags: vec![], + process_tags: vec![], + }, + runtime_id, + config_to_fetch, + ) + .map_err(|e| FlareError::ListeningError(e.to_string()))?, + ); Ok(tracer_flare) }