From 70619f1be40d60c3bc97053c350775e5218fa0b6 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 2 Feb 2026 09:31:57 -0500 Subject: [PATCH 1/4] enhancement(agent-data-plane): move internal observability/control plane to supervisor scheme --- bin/agent-data-plane/src/cli/run.rs | 46 ++- .../src/internal/control_plane.rs | 287 ++++++++++++------ bin/agent-data-plane/src/internal/mod.rs | 101 +++--- .../src/internal/observability.rs | 151 ++++++--- lib/saluki-core/src/runtime/shutdown.rs | 20 ++ 5 files changed, 410 insertions(+), 195 deletions(-) diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index e863783a38..17d375c25a 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -27,6 +27,7 @@ use saluki_components::{ }, }; use saluki_config::{ConfigurationLoader, GenericConfiguration}; +use saluki_core::runtime::SupervisorError; use saluki_core::topology::TopologyBlueprint; use saluki_env::EnvironmentProvider as _; use saluki_error::{generic_error, ErrorContext as _, GenericError}; @@ -39,7 +40,7 @@ use crate::{ apm_onboarding::ApmOnboardingConfiguration, ottl_filter_processor::OttlFilterConfiguration, ottl_transform_processor::OttlTransformConfiguration, }, - internal::{remote_agent::RemoteAgentBootstrap, spawn_control_plane, spawn_internal_observability_topology}, + internal::{create_internal_supervisor, remote_agent::RemoteAgentBootstrap}, }; use crate::{config::DataPlaneConfiguration, env_provider::ADPEnvironmentProvider}; @@ -143,9 +144,8 @@ pub async fn handle_run_command( ) .await?; - spawn_internal_observability_topology(&dp_config, &component_registry, health_registry.clone()) - .error_context("Failed to spawn internal observability topology.")?; - spawn_control_plane( + // Create the internal supervisor (control plane + observability) + let mut internal_supervisor = create_internal_supervisor( &config, &dp_config, &component_registry, @@ -154,8 +154,12 @@ pub async fn handle_run_command( dsd_stats_config, ra_bootstrap, ) - .await - .error_context("Failed to spawn control plane.")?; + .error_context("Failed to create internal supervisor.")?; + + // Create shutdown channel for the internal supervisor - we'll drive it in the main select loop + let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::oneshot::channel(); + let internal_supervisor_fut = internal_supervisor.run_with_shutdown(internal_shutdown_rx); + tokio::pin!(internal_supervisor_fut); // Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any. let bounds_config = MemoryBoundsConfiguration::try_from_config(&config)?; @@ -210,6 +214,27 @@ pub async fn handle_run_command( let mut finished_with_error = false; select! { + result = &mut internal_supervisor_fut => { + match result { + Err(SupervisorError::FailedToInitialize { source, child_name }) => { + error!(process_name = child_name, "Internal supervisor failed to initialize: {}. Shutting down...", source); + finished_with_error = true; + } + // If we haven't hit an initialization error -- which implies an error we can't really recover from -- + // then just log for now, until we fully migrate everything over to the supervisor-based approach and + // can dial in our supervisor configuration. + // + // For right now, this matches the previous behavior where the process would exit if we couldn't + // configure/spawn the control plane or internal observability pipeline, but the process is unaffected + // if either of those components fail at _runtime_. + Err(e) => { + warn!("Internal supervisor exited: {}", e); + } + Ok(()) => { + warn!("Internal supervisor exited unexpectedly."); + } + } + } _ = running_topology.wait_for_unexpected_finish() => { error!("Component unexpectedly finished. Shutting down..."); finished_with_error = true; @@ -219,7 +244,14 @@ pub async fn handle_run_command( } } - match running_topology.shutdown_with_timeout(Duration::from_secs(30)).await { + // Shutdown the primary topology + let topology_result = running_topology.shutdown_with_timeout(Duration::from_secs(30)).await; + + // Signal the internal supervisor to shutdown (if still running) and drive it to completion + let _ = internal_shutdown_tx.send(()); + let _ = internal_supervisor_fut.await; + + match topology_result { Ok(()) => { if finished_with_error { warn!("Topology shutdown complete despite error(s).") diff --git a/bin/agent-data-plane/src/internal/control_plane.rs b/bin/agent-data-plane/src/internal/control_plane.rs index 11c1a70fbe..5dd922f053 100644 --- a/bin/agent-data-plane/src/internal/control_plane.rs +++ b/bin/agent-data-plane/src/internal/control_plane.rs @@ -1,22 +1,26 @@ -use std::{future::pending, path::PathBuf}; +use std::path::PathBuf; +use async_trait::async_trait; use memory_accounting::ComponentRegistry; use saluki_app::{ api::APIBuilder, config::ConfigAPIHandler, logging::acquire_logging_api_handler, metrics::acquire_metrics_api_handler, }; -use saluki_common::task::spawn_traced_named; use saluki_components::destinations::DogStatsDStatisticsConfiguration; use saluki_config::GenericConfiguration; -use saluki_error::{ErrorContext as _, GenericError}; +use saluki_core::runtime::{ + InitializationError, ProcessShutdown, RestartStrategy, RuntimeConfiguration, Supervisable, Supervisor, + SupervisorFuture, +}; +use saluki_error::{generic_error, ErrorContext as _, GenericError}; use saluki_health::HealthRegistry; -use saluki_io::net::{build_datadog_agent_server_tls_config, get_ipc_cert_file_path, ListenAddress}; -use tracing::{error, info}; +use saluki_io::net::{build_datadog_agent_server_tls_config, get_ipc_cert_file_path}; +use tracing::info; use crate::{ config::DataPlaneConfiguration, env_provider::ADPEnvironmentProvider, - internal::{initialize_and_launch_runtime, platform::PlatformSettings, remote_agent::RemoteAgentBootstrap}, + internal::{platform::PlatformSettings, remote_agent::RemoteAgentBootstrap}, }; /// Gets the IPC certificate file path from the configuration. @@ -39,101 +43,206 @@ fn get_cert_path_from_config(config: &GenericConfiguration) -> Result, -) -> Result<(), GenericError> { - // Build our unprivileged and privileged API server. - // - // The unprivileged API is purely for things like health checks or read-only information. The privileged API is - // meant for sensitive information or actions that require elevated permissions. - let unprivileged_api = APIBuilder::new() - .with_handler(health_registry.api_handler()) - .with_handler(component_registry.api_handler()); - - // Build the privileged API with certificate-based TLS configuration - let cert_path = get_cert_path_from_config(config)?; - let tls_config = build_datadog_agent_server_tls_config(cert_path).await?; - - let privileged_api = APIBuilder::new() - .with_tls_config(tls_config) - .with_optional_handler(acquire_logging_api_handler()) - .with_optional_handler(acquire_metrics_api_handler()) - .with_handler(ConfigAPIHandler::new(config.clone())) - .with_optional_handler(env_provider.workload_api_handler()) - .with_handler(dsd_stats_config.api_handler()); - - let dp_config = dp_config.clone(); - let init = async move { - // Handle any final configuration of our API endpoints and spawn them. - configure_and_spawn_api_endpoints(dp_config, unprivileged_api, privileged_api, ra_bootstrap).await?; - - health_registry.spawn().await?; - - Ok(()) - }; - - initialize_and_launch_runtime("rt-control-plane", init, |_| pending()) +/// A worker that runs the health registry. +pub struct HealthRegistryWorker { + health_registry: HealthRegistry, } -async fn configure_and_spawn_api_endpoints( - dp_config: DataPlaneConfiguration, unprivileged_api: APIBuilder, mut privileged_api: APIBuilder, - ra_bootstrap: Option, -) -> Result<(), GenericError> { - // If we're not in standalone mode and we've gotten some bootstrapped remote agent state, - // wire it up to the privileged API so the Core Agent can communicate with us. - if let Some(ra_bootstrap) = ra_bootstrap { - privileged_api = privileged_api.with_grpc_service(ra_bootstrap.create_status_service()); - privileged_api = privileged_api.with_grpc_service(ra_bootstrap.create_flare_service()); - if let Some(telemetry_service) = ra_bootstrap.create_telemetry_service() { - privileged_api = privileged_api.with_grpc_service(telemetry_service); - } +impl HealthRegistryWorker { + /// Creates a new `HealthRegistryWorker`. + pub fn new(health_registry: HealthRegistry) -> Self { + Self { health_registry } } +} - spawn_unprivileged_api(unprivileged_api, dp_config.api_listen_address().clone()).await?; - spawn_privileged_api(privileged_api, dp_config.secure_api_listen_address().clone()).await?; +#[async_trait] +impl Supervisable for HealthRegistryWorker { + fn name(&self) -> &str { + "health-registry" + } - Ok(()) + async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result { + // Spawn the health registry runner - it runs forever until the process exits + // + // TODO: This pattern doesn't allow us to gracefully restart the health registry worker if the supervisor itself + // is restarted. We could do something simpler like `HealthRegistry` having a mutex to ensure only one instance is + // running at a time, and that way the worker can be restarted gracefully, picking up where it left off. + let handle = self + .health_registry + .clone() + .spawn() + .await + .map_err(|e| InitializationError::Failed { source: e })?; + + Ok(Box::pin(async move { + tokio::select! { + result = handle => { + // The health registry task completed (shouldn't happen normally) + if let Err(e) = result { + return Err(generic_error!("Health registry task panicked: {}", e)); + } + Ok(()) + } + _ = process_shutdown.wait_for_shutdown() => { + // On shutdown, we just let the spawned task continue until the runtime shuts down. + // The health registry doesn't have a graceful shutdown mechanism. + Ok(()) + } + } + })) + } } -async fn spawn_unprivileged_api( - api_builder: APIBuilder, api_listen_address: ListenAddress, -) -> Result<(), GenericError> { - // TODO: Use something better than `pending()`... perhaps something like a more generalized - // `ComponentShutdownCoordinator` that allows for triggering and waiting for all attached tasks to signal that - // they've shutdown. - spawn_traced_named("adp-unprivileged-http-api", async move { - info!("Serving unprivileged API on {}.", api_listen_address); - - if let Err(e) = api_builder.serve(api_listen_address, pending()).await { - error!("Failed to serve unprivileged API: {}", e); +/// A worker that serves the unprivileged HTTP API. +pub struct UnprivilegedApiWorker { + dp_config: DataPlaneConfiguration, + health_registry: HealthRegistry, + component_registry: ComponentRegistry, +} + +impl UnprivilegedApiWorker { + /// Creates a new `UnprivilegedApiWorker`. + pub fn new( + dp_config: DataPlaneConfiguration, health_registry: HealthRegistry, component_registry: ComponentRegistry, + ) -> Self { + Self { + dp_config, + health_registry, + component_registry, } - }); + } +} + +#[async_trait] +impl Supervisable for UnprivilegedApiWorker { + fn name(&self) -> &str { + "unprivileged-api" + } + + async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { + let api_builder = APIBuilder::new() + .with_handler(self.health_registry.api_handler()) + .with_handler(self.component_registry.api_handler()); + + let listen_address = self.dp_config.api_listen_address().clone(); - Ok(()) + Ok(Box::pin(async move { + info!("Serving unprivileged API on {}.", listen_address); + api_builder.serve(listen_address, process_shutdown).await + })) + } } -async fn spawn_privileged_api(api_builder: APIBuilder, api_listen_address: ListenAddress) -> Result<(), GenericError> { - // TODO: Use something better than `pending()`... perhaps something like a more generalized - // `ComponentShutdownCoordinator` that allows for triggering and waiting for all attached tasks to signal that - // they've shutdown. - spawn_traced_named("adp-privileged-http-api", async move { - info!("Serving privileged API on {}.", api_listen_address); +/// A worker that serves the privileged HTTP API with TLS. +/// +/// This worker also handles remote agent registration when not in standalone mode. The remote agent gRPC services are +/// registered on the privileged API, and a background task periodically refreshes the registration with the Datadog +/// Agent. +pub struct PrivilegedApiWorker { + config: GenericConfiguration, + dp_config: DataPlaneConfiguration, + env_provider: ADPEnvironmentProvider, + dsd_stats_config: DogStatsDStatisticsConfiguration, + ra_bootstrap: Option, +} - if let Err(e) = api_builder.serve(api_listen_address, pending()).await { - error!("Failed to serve privileged API: {}", e); +impl PrivilegedApiWorker { + /// Creates a new `PrivilegedApiWorker`. + pub fn new( + config: GenericConfiguration, dp_config: DataPlaneConfiguration, env_provider: ADPEnvironmentProvider, + dsd_stats_config: DogStatsDStatisticsConfiguration, ra_bootstrap: Option, + ) -> Self { + Self { + config, + dp_config, + env_provider, + dsd_stats_config, + ra_bootstrap, } - }); + } +} + +#[async_trait] +impl Supervisable for PrivilegedApiWorker { + fn name(&self) -> &str { + "privileged-api" + } + + async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { + // Load our TLS configuration. + // + // TODO: should this need to happen during process init or could we do it once when creating `PrivilegedApiWorker` + // and simplify things? + let cert_path = + get_cert_path_from_config(&self.config).map_err(|e| InitializationError::Failed { source: e })?; + let tls_config = build_datadog_agent_server_tls_config(cert_path) + .await + .map_err(|e| InitializationError::Failed { source: e })?; + + let mut api_builder = APIBuilder::new() + .with_tls_config(tls_config) + // TODO: make these handlers cloneable and move them up to the config for the worker so they can + // be cloned for each initialization + .with_optional_handler(acquire_logging_api_handler()) + .with_optional_handler(acquire_metrics_api_handler()) + .with_handler(ConfigAPIHandler::new(self.config.clone())) + .with_optional_handler(self.env_provider.workload_api_handler()) + .with_handler(self.dsd_stats_config.api_handler()); + + // If we bootstrapped ourselves as a remote agent, add the necessary gRPC services to the API. + if let Some(ra_bootstrap) = &self.ra_bootstrap { + api_builder = api_builder.with_grpc_service(ra_bootstrap.create_status_service()); + api_builder = api_builder.with_grpc_service(ra_bootstrap.create_flare_service()); + + // Only register the telemetry service if telemetry is actually enabled. + if let Some(telemetry_service) = ra_bootstrap.create_telemetry_service() { + api_builder = api_builder.with_grpc_service(telemetry_service); + } + } + + let listen_address = self.dp_config.secure_api_listen_address().clone(); - Ok(()) + Ok(Box::pin(async move { + info!("Serving privileged API on {}.", listen_address); + api_builder.serve(listen_address, process_shutdown).await + })) + } +} + +/// Creates the control plane supervisor. +/// +/// This supervisor manages the health registry, unprivileged and privileged APIs, and optionally the remote agent +/// registration task. It runs on a dedicated single-threaded runtime. +/// +/// # Errors +/// +/// If the supervisor cannot be created, an error is returned. +pub fn create_control_plane_supervisor( + config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry, + health_registry: HealthRegistry, env_provider: ADPEnvironmentProvider, + dsd_stats_config: DogStatsDStatisticsConfiguration, ra_bootstrap: Option, +) -> Result { + let mut supervisor = Supervisor::new("ctrl-pln")? + .with_dedicated_runtime(RuntimeConfiguration::single_threaded()) + .with_restart_strategy(RestartStrategy::one_to_one()); + + // TODO: just make the API handler for `ComponentRegistry` cloneable so we can create/hold on to it in `UnprivilegedApiWorker` + // without having to create a scoped one here just to maintain the ownership necessary + let scoped_registry = component_registry.get_or_create("control-plane"); + + supervisor.add_worker(HealthRegistryWorker::new(health_registry.clone())); + supervisor.add_worker(UnprivilegedApiWorker::new( + dp_config.clone(), + health_registry, + scoped_registry, + )); + supervisor.add_worker(PrivilegedApiWorker::new( + config.clone(), + dp_config.clone(), + env_provider, + dsd_stats_config, + ra_bootstrap, + )); + + Ok(supervisor) } diff --git a/bin/agent-data-plane/src/internal/mod.rs b/bin/agent-data-plane/src/internal/mod.rs index a276987935..58d29fa7db 100644 --- a/bin/agent-data-plane/src/internal/mod.rs +++ b/bin/agent-data-plane/src/internal/mod.rs @@ -1,78 +1,55 @@ -use std::future::Future; - -use saluki_app::metrics::collect_runtime_metrics; -use saluki_error::{generic_error, ErrorContext as _, GenericError}; +use memory_accounting::ComponentRegistry; +use saluki_components::destinations::DogStatsDStatisticsConfiguration; +use saluki_config::GenericConfiguration; +use saluki_core::runtime::Supervisor; +use saluki_error::GenericError; +use saluki_health::HealthRegistry; mod control_plane; -pub use self::control_plane::spawn_control_plane; +pub use self::control_plane::create_control_plane_supervisor; mod observability; -pub use self::observability::spawn_internal_observability_topology; +pub use self::observability::create_observability_supervisor; pub mod platform; pub mod remote_agent; +use self::remote_agent::RemoteAgentBootstrap; +use crate::{config::DataPlaneConfiguration, env_provider::ADPEnvironmentProvider}; -/// Creates a single-threaded Tokio runtime, initializing it and driving it to completion. +/// Creates the root internal supervisor containing control plane and observability subsystems. /// -/// A dedicated background thread is spawned on which the runtime executes. The `init` future is run within the context -/// of the runtime and is expected to return a `Result` that indicates that initialization has either -/// succeeded or failed. `main_task` is used to create the future which the runtime will ultimately drive to completion. +/// The internal supervisor manages: +/// - **Control plane**: Health registry, unprivileged and privileged APIs, remote agent registration +/// - **Observability**: Internal telemetry topology (Prometheus metrics endpoint) /// -/// If initialization succeeds, `main_task` is called the result from `init` to create the main task future, and this -/// function returns `Ok(())`. If initialization fails, this function returns `Err(e)`. +/// Each subsystem runs on its own dedicated single-threaded runtime for isolation. /// /// # Errors /// -/// If the current thread runtime cannot be created, or the background thread for the runtime cannot be created, or an -/// error is returned from the execution of `init`, an error will be returned. -fn initialize_and_launch_runtime(name: &str, init: F, main_task: F2) -> Result<(), GenericError> -where - F: Future> + Send + 'static, - F2: FnOnce(T) -> T2 + Send + 'static, - T2: Future, -{ - let mut builder = tokio::runtime::Builder::new_current_thread(); - let runtime = builder - .enable_all() - .max_blocking_threads(2) - .build() - .error_context("Failed to build current thread runtime.")?; - - let runtime_id = name.to_string(); - let (init_tx, init_rx) = std::sync::mpsc::channel(); - std::thread::Builder::new() - .name(name.to_string()) - .spawn(move || { - // Run the initialization routine within the context of the runtime. - match runtime.block_on(init) { - Ok(init_value) => { - // Initialization succeeded, so inform the main thread that the runtime has been initialized and - // will continue running, and pass whatever we got back from initialization and drive the main - // task to completion. - init_tx.send(Ok(())).unwrap(); - - // Start collecting runtime metrics. - runtime.spawn(async move { - collect_runtime_metrics(&runtime_id).await; - }); - - runtime.block_on(main_task(init_value)); - } - Err(e) => { - // Initialization failed, so send the error back to the main thread. - init_tx.send(Err(e)).unwrap(); - } - } - }) - .with_error_context(|| format!("Failed to spawn thread for runtime '{}'.", name))?; - - // Wait for the initialization to complete and forward back the result if we get one. - match init_rx.recv() { - Ok(Ok(())) => Ok(()), - Ok(Err(e)) => Err(e), - Err(_) => Err(generic_error!( - "Initialization result channel closed unexpectedly. Runtime likely in an unexpected/corrupted state." - )), +/// If the supervisor cannot be created, an error is returned. +pub fn create_internal_supervisor( + config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry, + health_registry: HealthRegistry, env_provider: ADPEnvironmentProvider, + dsd_stats_config: DogStatsDStatisticsConfiguration, ra_bootstrap: Option, +) -> Result { + let mut root = Supervisor::new("internal-sup")?; + + // Add control plane supervisor (dedicated single-threaded runtime) + root.add_worker(create_control_plane_supervisor( + config, + dp_config, + component_registry, + health_registry.clone(), + env_provider, + dsd_stats_config, + ra_bootstrap, + )?); + + // Add observability supervisor if telemetry is enabled (dedicated single-threaded runtime) + if let Some(observability_sup) = create_observability_supervisor(dp_config, component_registry, health_registry)? { + root.add_worker(observability_sup); } + + Ok(root) } diff --git a/bin/agent-data-plane/src/internal/observability.rs b/bin/agent-data-plane/src/internal/observability.rs index 2279ff9e42..8b9574aad4 100644 --- a/bin/agent-data-plane/src/internal/observability.rs +++ b/bin/agent-data-plane/src/internal/observability.rs @@ -1,51 +1,128 @@ -use std::num::NonZeroUsize; +use std::{num::NonZeroUsize, time::Duration}; +use async_trait::async_trait; use memory_accounting::{ComponentRegistry, MemoryLimiter}; use saluki_components::{destinations::PrometheusConfiguration, sources::InternalMetricsConfiguration}; -use saluki_core::topology::{RunningTopology, TopologyBlueprint}; -use saluki_error::GenericError; +use saluki_core::{ + runtime::{ + InitializationError, ProcessShutdown, RestartStrategy, RuntimeConfiguration, Supervisable, Supervisor, + SupervisorFuture, + }, + topology::TopologyBlueprint, +}; +use saluki_error::{generic_error, GenericError}; use saluki_health::HealthRegistry; -use tracing::{info, warn}; +use tracing::info; -use crate::{config::DataPlaneConfiguration, internal::initialize_and_launch_runtime}; +use crate::config::DataPlaneConfiguration; // SAFETY: This value is clearly non-zero. const DEFAULT_INTERCONNECT_CAPACITY: NonZeroUsize = NonZeroUsize::new(4).unwrap(); -pub fn spawn_internal_observability_topology( +/// A worker that runs the internal telemetry topology. +/// +/// This topology collects internal metrics and exposes them via a Prometheus scrape endpoint. +pub struct InternalTelemetryWorker { + dp_config: DataPlaneConfiguration, + component_registry: ComponentRegistry, + health_registry: HealthRegistry, +} + +impl InternalTelemetryWorker { + /// Creates a new `InternalTelemetryWorker`. + pub fn new( + dp_config: DataPlaneConfiguration, component_registry: ComponentRegistry, health_registry: HealthRegistry, + ) -> Self { + Self { + dp_config, + component_registry, + health_registry, + } + } +} + +#[async_trait] +impl Supervisable for InternalTelemetryWorker { + fn name(&self) -> &str { + "internal-telemetry" + } + + async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result { + // Build the internal telemetry topology blueprint + let int_metrics_config = InternalMetricsConfiguration; + let prometheus_config = + PrometheusConfiguration::from_listen_address(self.dp_config.telemetry_listen_addr().clone()); + + info!( + "Internal telemetry enabled. Spawning Prometheus scrape endpoint on {}.", + self.dp_config.telemetry_listen_addr() + ); + + let mut blueprint = TopologyBlueprint::new("internal", &self.component_registry); + blueprint.with_interconnect_capacity(DEFAULT_INTERCONNECT_CAPACITY); + blueprint + .add_source("internal_metrics_in", int_metrics_config) + .map_err(|e| InitializationError::Failed { source: e })? + .add_destination("internal_metrics_out", prometheus_config) + .map_err(|e| InitializationError::Failed { source: e })? + .connect_component("internal_metrics_out", ["internal_metrics_in"]) + .map_err(|e| InitializationError::Failed { source: e })?; + + // Build and spawn the topology + let built_topology = blueprint + .build() + .await + .map_err(|e| InitializationError::Failed { source: e })?; + let mut running_topology = built_topology + .spawn(&self.health_registry, MemoryLimiter::noop()) + .await + .map_err(|e| InitializationError::Failed { source: e })?; + + Ok(Box::pin(async move { + tokio::select! { + _ = running_topology.wait_for_unexpected_finish() => { + Err(generic_error!("Internal telemetry topology finished unexpectedly.")) + } + _ = process_shutdown.wait_for_shutdown() => { + running_topology + .shutdown_with_timeout(Duration::from_secs(5)) + .await + } + } + })) + } +} + +/// Creates the observability supervisor. +/// +/// This supervisor manages the internal telemetry topology (Prometheus metrics endpoint). +/// It runs on a dedicated single-threaded runtime. +/// +/// Returns `None` if telemetry is not enabled. +/// +/// # Errors +/// +/// If the supervisor cannot be created, an error is returned. +pub fn create_observability_supervisor( dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry, health_registry: HealthRegistry, -) -> Result<(), GenericError> { - // When telemetry is enabled, we need to collect internal metrics, so add those components and route them here. +) -> Result, GenericError> { if !dp_config.telemetry_enabled() { - info!("Internal telemetry disabled. Skipping internal observability topology."); - return Ok(()); + info!("Internal telemetry disabled. Skipping observability supervisor."); + return Ok(None); } - // Build the internal telemetry topology. - let int_metrics_config = InternalMetricsConfiguration; - let prometheus_config = PrometheusConfiguration::from_listen_address(dp_config.telemetry_listen_addr().clone()); - - info!( - "Internal telemetry enabled. Spawning Prometheus scrape endpoint on {}.", - dp_config.telemetry_listen_addr() - ); - - let mut blueprint = TopologyBlueprint::new("internal", component_registry); - blueprint.with_interconnect_capacity(DEFAULT_INTERCONNECT_CAPACITY); - blueprint - .add_source("internal_metrics_in", int_metrics_config)? - .add_destination("internal_metrics_out", prometheus_config)? - .connect_component("internal_metrics_out", ["internal_metrics_in"])?; - - let init = async move { - let built_topology = blueprint.build().await?; - built_topology.spawn(&health_registry, MemoryLimiter::noop()).await - }; - - let main = |mut topology: RunningTopology| async move { - topology.wait_for_unexpected_finish().await; - warn!("Internal telemetry topology finished unexpectedly."); - }; - - initialize_and_launch_runtime("rt-internal-telemetry", init, main) + let mut supervisor = Supervisor::new("int-o11y")? + .with_dedicated_runtime(RuntimeConfiguration::single_threaded()) + .with_restart_strategy(RestartStrategy::one_to_one()); + + // Get a scoped component registry for internal observability + let scoped_registry = component_registry.get_or_create("internal-observability"); + + supervisor.add_worker(InternalTelemetryWorker::new( + dp_config.clone(), + scoped_registry, + health_registry, + )); + + Ok(Some(supervisor)) } diff --git a/lib/saluki-core/src/runtime/shutdown.rs b/lib/saluki-core/src/runtime/shutdown.rs index 49b006ab23..a4fe468dd2 100644 --- a/lib/saluki-core/src/runtime/shutdown.rs +++ b/lib/saluki-core/src/runtime/shutdown.rs @@ -1,8 +1,10 @@ use std::{ future::{pending, Future}, pin::Pin, + task::{Context, Poll}, }; +use futures::FutureExt as _; use tokio::sync::oneshot; /// A shutdown signal for a process. @@ -66,6 +68,24 @@ impl ProcessShutdown { } } +impl Future for ProcessShutdown { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(mut shutdown_rx) = self.shutdown.take() { + match shutdown_rx.poll_unpin(cx) { + Poll::Pending => { + self.shutdown = Some(shutdown_rx); + Poll::Pending + } + Poll::Ready(()) => Poll::Ready(()), + } + } else { + Poll::Ready(()) + } + } +} + impl ShutdownHandle { /// Triggers the process to shutdown. pub fn trigger(self) { From 4ff325bd101a7468f0910f41f209b7c1b0c53884 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Fri, 20 Feb 2026 21:11:25 -0500 Subject: [PATCH 2/4] fix --- bin/agent-data-plane/src/cli/run.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index 17d375c25a..b32d5fae8f 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -216,8 +216,8 @@ pub async fn handle_run_command( select! { result = &mut internal_supervisor_fut => { match result { - Err(SupervisorError::FailedToInitialize { source, child_name }) => { - error!(process_name = child_name, "Internal supervisor failed to initialize: {}. Shutting down...", source); + Err(SupervisorError::FailedToInitialize { child_name, source }) => { + error!(child_name, "Internal supervisor failed to initialize: {}. Shutting down...", source); finished_with_error = true; } // If we haven't hit an initialization error -- which implies an error we can't really recover from -- From 0bf7badb9e93aa1428b8ab51bd296129e5b625fb Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Thu, 26 Feb 2026 10:07:12 -0500 Subject: [PATCH 3/4] cleanup --- bin/agent-data-plane/src/cli/run.rs | 4 +- .../src/internal/control_plane.rs | 14 +--- .../src/internal/observability.rs | 8 +- lib/saluki-core/src/runtime/dedicated.rs | 33 +++++--- lib/saluki-core/src/runtime/shutdown.rs | 7 ++ lib/saluki-core/src/runtime/supervisor.rs | 76 ++++++++++++++----- 6 files changed, 95 insertions(+), 47 deletions(-) diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index b32d5fae8f..f06e29a865 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -247,7 +247,9 @@ pub async fn handle_run_command( // Shutdown the primary topology let topology_result = running_topology.shutdown_with_timeout(Duration::from_secs(30)).await; - // Signal the internal supervisor to shutdown (if still running) and drive it to completion + // Signal the internal supervisor to shutdown (if still running) and drive it to completion. + // If the supervisor already exited (i.e., the select! above matched its branch), both the send + // and await resolve immediately — the send is a no-op and the future is already complete. let _ = internal_shutdown_tx.send(()); let _ = internal_supervisor_fut.await; diff --git a/bin/agent-data-plane/src/internal/control_plane.rs b/bin/agent-data-plane/src/internal/control_plane.rs index 5dd922f053..42f34fd061 100644 --- a/bin/agent-data-plane/src/internal/control_plane.rs +++ b/bin/agent-data-plane/src/internal/control_plane.rs @@ -67,12 +67,7 @@ impl Supervisable for HealthRegistryWorker { // TODO: This pattern doesn't allow us to gracefully restart the health registry worker if the supervisor itself // is restarted. We could do something simpler like `HealthRegistry` having a mutex to ensure only one instance is // running at a time, and that way the worker can be restarted gracefully, picking up where it left off. - let handle = self - .health_registry - .clone() - .spawn() - .await - .map_err(|e| InitializationError::Failed { source: e })?; + let handle = self.health_registry.clone().spawn().await?; Ok(Box::pin(async move { tokio::select! { @@ -173,11 +168,8 @@ impl Supervisable for PrivilegedApiWorker { // // TODO: should this need to happen during process init or could we do it once when creating `PrivilegedApiWorker` // and simplify things? - let cert_path = - get_cert_path_from_config(&self.config).map_err(|e| InitializationError::Failed { source: e })?; - let tls_config = build_datadog_agent_server_tls_config(cert_path) - .await - .map_err(|e| InitializationError::Failed { source: e })?; + let cert_path = get_cert_path_from_config(&self.config)?; + let tls_config = build_datadog_agent_server_tls_config(cert_path).await?; let mut api_builder = APIBuilder::new() .with_tls_config(tls_config) diff --git a/bin/agent-data-plane/src/internal/observability.rs b/bin/agent-data-plane/src/internal/observability.rs index 8b9574aad4..172a47fb72 100644 --- a/bin/agent-data-plane/src/internal/observability.rs +++ b/bin/agent-data-plane/src/internal/observability.rs @@ -69,14 +69,10 @@ impl Supervisable for InternalTelemetryWorker { .map_err(|e| InitializationError::Failed { source: e })?; // Build and spawn the topology - let built_topology = blueprint - .build() - .await - .map_err(|e| InitializationError::Failed { source: e })?; + let built_topology = blueprint.build().await?; let mut running_topology = built_topology .spawn(&self.health_registry, MemoryLimiter::noop()) - .await - .map_err(|e| InitializationError::Failed { source: e })?; + .await?; Ok(Box::pin(async move { tokio::select! { diff --git a/lib/saluki-core/src/runtime/dedicated.rs b/lib/saluki-core/src/runtime/dedicated.rs index 84834a2581..7042cf6c8a 100644 --- a/lib/saluki-core/src/runtime/dedicated.rs +++ b/lib/saluki-core/src/runtime/dedicated.rs @@ -15,7 +15,10 @@ use std::{ use saluki_error::{generic_error, GenericError}; use tokio::sync::oneshot; -use super::{shutdown::ProcessShutdown, supervisor::Supervisor}; +use super::{ + shutdown::ProcessShutdown, + supervisor::{Supervisor, SupervisorError}, +}; /// Configuration for a dedicated Tokio runtime. #[derive(Clone, Debug)] @@ -80,16 +83,20 @@ pub enum RuntimeMode { /// /// Allows capturing any runtime initialization failures as well as the result of the supervisor's execution. pub(crate) struct DedicatedRuntimeHandle { + supervisor_id: String, init_rx: Option>>, - result_rx: oneshot::Receiver>, + result_rx: oneshot::Receiver>, thread_handle: Option>, } impl Future for DedicatedRuntimeHandle { - type Output = Result<(), GenericError>; + type Output = Result<(), SupervisorError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // First, check if initialization is still pending. + // + // NOTE: This is runtime-level initialization (building the Tokio runtime and OS thread), not + // supervisor child initialization. These errors are always fatal and non-restartable. if let Some(init_rx) = self.init_rx.as_mut() { let init_result = ready!(Pin::new(init_rx).poll(cx)); let maybe_init_error = match init_result { @@ -108,13 +115,19 @@ impl Future for DedicatedRuntimeHandle { let _ = handle.join(); } - return Poll::Ready(Err(error)); + return Poll::Ready(Err(SupervisorError::FailedToInitialize { + child_name: self.supervisor_id.clone(), + source: error.into(), + })); } } - // Check for a final result from the supervisor. - let result = ready!(Pin::new(&mut self.result_rx).poll(cx)) - .map_err(|_| generic_error!("no supervisor result received; supervisor likely panicked"))?; + // Check for a final result from the supervisor. The structured `SupervisorError` is preserved + // so the parent can distinguish init failures from runtime failures. + // + // If the channel is closed without a result, the runtime thread panicked or exited after + // successful init — this is a runtime failure, not an initialization failure. + let result = ready!(Pin::new(&mut self.result_rx).poll(cx)).unwrap_or_else(|_| Err(SupervisorError::Shutdown)); // Join on the thread to clean up. if let Some(handle) = self.thread_handle.take() { @@ -143,7 +156,8 @@ pub(crate) fn spawn_dedicated_runtime( let (init_tx, init_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel(); - let thread_name = format!("{}-sup-rt", supervisor.id()); + let supervisor_id = supervisor.id().to_string(); + let thread_name = format!("{}-sup-rt", supervisor_id); let thread_handle = std::thread::Builder::new() .name(thread_name.clone()) .spawn(move || { @@ -166,11 +180,12 @@ pub(crate) fn spawn_dedicated_runtime( // // We ignore failures with sending the result because we can't do anything about it anyways. let result = runtime.block_on(supervisor.run_with_process_shutdown(process_shutdown)); - let _ = result_tx.send(result.map_err(|e| e.into())); + let _ = result_tx.send(result); }) .map_err(|e| generic_error!("Failed to spawn dedicated runtime thread '{}': {}", thread_name, e))?; Ok(DedicatedRuntimeHandle { + supervisor_id, init_rx: Some(init_rx), result_rx, thread_handle: Some(thread_handle), diff --git a/lib/saluki-core/src/runtime/shutdown.rs b/lib/saluki-core/src/runtime/shutdown.rs index a4fe468dd2..4a5f3d7b01 100644 --- a/lib/saluki-core/src/runtime/shutdown.rs +++ b/lib/saluki-core/src/runtime/shutdown.rs @@ -61,6 +61,9 @@ impl ProcessShutdown { /// /// If the shutdown signal has been received during a previous call to this function, this function will return /// immediately for all subsequent calls. + /// + /// `ProcessShutdown` also implements [`Future`] directly, which can be useful when passing it to APIs + /// that accept a generic future (e.g., as a shutdown signal parameter). pub async fn wait_for_shutdown(&mut self) { if let Some(shutdown_rx) = self.shutdown.take() { let _ = shutdown_rx.await; @@ -68,6 +71,10 @@ impl ProcessShutdown { } } +/// Implements [`Future`] for direct use in `select!` or as a generic shutdown signal. +/// +/// This is equivalent to calling [`ProcessShutdown::wait_for_shutdown`] — once the shutdown signal is received +/// (or has been received previously), the future resolves immediately. impl Future for ProcessShutdown { type Output = (); diff --git a/lib/saluki-core/src/runtime/supervisor.rs b/lib/saluki-core/src/runtime/supervisor.rs index 183ba7e77d..4b61139ebf 100644 --- a/lib/saluki-core/src/runtime/supervisor.rs +++ b/lib/saluki-core/src/runtime/supervisor.rs @@ -2,7 +2,7 @@ use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; use async_trait::async_trait; use saluki_common::collections::FastIndexMap; -use saluki_error::{ErrorContext as _, GenericError}; +use saluki_error::GenericError; use snafu::{OptionExt as _, Snafu}; use tokio::{ pin, select, @@ -34,12 +34,34 @@ type WorkerFuture = Pin> + Send> #[derive(Debug)] enum WorkerError { /// The worker failed during async initialization. - Initialization(InitializationError), + /// + /// The optional `child_name` carries the name of the original failing child when the error originates from a + /// nested supervisor. This allows the parent to include it in its own `FailedToInitialize` error for better + /// diagnostics across supervision tree levels. + Initialization { + child_name: Option, + source: InitializationError, + }, /// The worker failed during runtime execution. Runtime(GenericError), } +impl From for WorkerError { + fn from(err: SupervisorError) -> Self { + match err { + // Propagate initialization failures so the parent supervisor does NOT attempt to restart. + // Preserve the original child name so the parent can include it in diagnostics. + SupervisorError::FailedToInitialize { child_name, source } => WorkerError::Initialization { + child_name: Some(child_name), + source, + }, + // All other supervisor errors (shutdown, no children, invalid name) are runtime-level. + other => WorkerError::Runtime(other.into()), + } + } +} + /// Process errors. #[derive(Debug, Snafu)] pub enum ProcessError { @@ -73,12 +95,12 @@ pub enum InitializationError { /// The underlying error that caused initialization to fail. source: GenericError, }, +} - /// The process is permanently unavailable and cannot be initialized. - /// - /// This is for cases where initialization is structurally impossible, not due to a transient error. - #[snafu(display("Process is permanently unavailable"))] - PermanentlyUnavailable, +impl From for InitializationError { + fn from(source: GenericError) -> Self { + Self::Failed { source } + } } /// Strategy for shutting down a process. @@ -107,6 +129,10 @@ pub trait Supervisable: Send + Sync { /// same runtime that is used for running the process is used for initialization. The resulting future is expected /// to complete as soon as reasonably possible after `process_shutdown` resolves. /// + /// **Important:** The `process_shutdown` signal must be moved into the returned [`SupervisorFuture`] so the + /// worker can respond to supervisor-initiated shutdown. If `process_shutdown` is dropped during initialization, + /// the worker will be unable to shut down gracefully and will be forcefully aborted after the shutdown timeout. + /// /// # Errors /// /// If the process cannot be initialized, an error is returned. @@ -204,10 +230,14 @@ impl ChildSpecification { Self::Worker(worker) => { let worker = Arc::clone(worker); Ok(Box::pin(async move { - let run_future = worker - .initialize(process_shutdown) - .await - .map_err(WorkerError::Initialization)?; + let run_future = + worker + .initialize(process_shutdown) + .await + .map_err(|source| WorkerError::Initialization { + child_name: None, + source, + })?; run_future.await.map_err(WorkerError::Runtime) })) } @@ -223,10 +253,10 @@ impl ChildSpecification { let handle = spawn_dedicated_runtime(sup.inner_clone(), config.clone(), process_shutdown) .map_err(|e| SupervisorError::FailedToInitialize { child_name, - source: InitializationError::Failed { source: e }, + source: e.into(), })?; - Ok(Box::pin(async move { handle.await.map_err(WorkerError::Runtime) })) + Ok(Box::pin(async move { handle.await.map_err(WorkerError::from) })) } } } @@ -416,12 +446,19 @@ impl Supervisor { let child_spec = self.get_child_spec(child_spec_idx); // Initialization failures are not eligible for restart -- they propagate immediately. - if let Err(WorkerError::Initialization(e)) = worker_result { - error!(supervisor_id = %self.supervisor_id, worker_name = child_spec.name(), "Child process failed to initialize: {}", e); + if let Err(WorkerError::Initialization { child_name, source }) = worker_result { + // If the error came from a nested supervisor, include the original child name + // to make the error chain more informative (e.g., "ctrl-pln/privileged-api"). + let full_name = match child_name { + Some(inner) => format!("{}/{}", child_spec.name(), inner), + None => child_spec.name().to_string(), + }; + + error!(supervisor_id = %self.supervisor_id, worker_name = full_name, "Child process failed to initialize: {}", source); worker_state.shutdown_workers().await; return Err(SupervisorError::FailedToInitialize { - child_name: child_spec.name().to_string(), - source: e, + child_name: full_name, + source, }); } @@ -429,7 +466,7 @@ impl Supervisor { let worker_result = worker_result .map_err(|e| match e { WorkerError::Runtime(e) => ProcessError::Terminated { source: e }, - WorkerError::Initialization(_) => unreachable!("handled above"), + WorkerError::Initialization { .. } => unreachable!("handled above"), }); match restart_state.evaluate_restart() { @@ -470,8 +507,7 @@ impl Supervisor { Box::pin(async move { sup.run_inner(process, process_shutdown) .await - .error_context("Nested supervisor failed to exit cleanly.") - .map_err(WorkerError::Runtime) + .map_err(WorkerError::from) }) } From e7ac7df3455c4fb01c6875ace49d19c147ef95c7 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Wed, 25 Mar 2026 08:46:54 -0400 Subject: [PATCH 4/4] cleanup --- .../src/internal/control_plane.rs | 26 +++++++++---------- .../src/internal/observability.rs | 12 ++++----- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/bin/agent-data-plane/src/internal/control_plane.rs b/bin/agent-data-plane/src/internal/control_plane.rs index 42f34fd061..5aa0364428 100644 --- a/bin/agent-data-plane/src/internal/control_plane.rs +++ b/bin/agent-data-plane/src/internal/control_plane.rs @@ -25,13 +25,11 @@ use crate::{ /// Gets the IPC certificate file path from the configuration. fn get_cert_path_from_config(config: &GenericConfiguration) -> Result { - // Try to get the auth token file path first let auth_token_file_path = config .try_get_typed::("auth_token_file_path") .error_context("Failed to get Agent auth token file path.")? .unwrap_or_else(PlatformSettings::get_auth_token_path); - // Try to get the explicit IPC cert file path let ipc_cert_file_path = config .try_get_typed::>("ipc_cert_file_path") .error_context("Failed to get Agent IPC cert file path.")? @@ -65,8 +63,8 @@ impl Supervisable for HealthRegistryWorker { // Spawn the health registry runner - it runs forever until the process exits // // TODO: This pattern doesn't allow us to gracefully restart the health registry worker if the supervisor itself - // is restarted. We could do something simpler like `HealthRegistry` having a mutex to ensure only one instance is - // running at a time, and that way the worker can be restarted gracefully, picking up where it left off. + // is restarted. We could do something simpler like `HealthRegistry` having a mutex to ensure only one instance + // is running at a time, and that way the worker can be restarted gracefully, picking up where it left off. let handle = self.health_registry.clone().spawn().await?; Ok(Box::pin(async move { @@ -79,8 +77,8 @@ impl Supervisable for HealthRegistryWorker { Ok(()) } _ = process_shutdown.wait_for_shutdown() => { - // On shutdown, we just let the spawned task continue until the runtime shuts down. - // The health registry doesn't have a graceful shutdown mechanism. + // On shutdown, we just let the spawned task continue until the runtime shuts down. The health + // registry doesn't have a graceful shutdown mechanism. Ok(()) } } @@ -166,15 +164,15 @@ impl Supervisable for PrivilegedApiWorker { async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { // Load our TLS configuration. // - // TODO: should this need to happen during process init or could we do it once when creating `PrivilegedApiWorker` - // and simplify things? + // TODO: should this need to happen during process init or could we do it once when creating + // `PrivilegedApiWorker` and simplify things? let cert_path = get_cert_path_from_config(&self.config)?; let tls_config = build_datadog_agent_server_tls_config(cert_path).await?; let mut api_builder = APIBuilder::new() .with_tls_config(tls_config) - // TODO: make these handlers cloneable and move them up to the config for the worker so they can - // be cloned for each initialization + // TODO: make these handlers cloneable and move them up to the config for the worker so they can be cloned + // for each initialization .with_optional_handler(acquire_logging_api_handler()) .with_optional_handler(acquire_metrics_api_handler()) .with_handler(ConfigAPIHandler::new(self.config.clone())) @@ -204,7 +202,9 @@ impl Supervisable for PrivilegedApiWorker { /// Creates the control plane supervisor. /// /// This supervisor manages the health registry, unprivileged and privileged APIs, and optionally the remote agent -/// registration task. It runs on a dedicated single-threaded runtime. +/// registration task. +/// +/// It runs on a dedicated single-threaded runtime. /// /// # Errors /// @@ -218,8 +218,8 @@ pub fn create_control_plane_supervisor( .with_dedicated_runtime(RuntimeConfiguration::single_threaded()) .with_restart_strategy(RestartStrategy::one_to_one()); - // TODO: just make the API handler for `ComponentRegistry` cloneable so we can create/hold on to it in `UnprivilegedApiWorker` - // without having to create a scoped one here just to maintain the ownership necessary + // TODO: just make the API handler for `ComponentRegistry` cloneable so we can create/hold on to it in + // `UnprivilegedApiWorker` without having to create a scoped one here just to maintain the ownership necessary let scoped_registry = component_registry.get_or_create("control-plane"); supervisor.add_worker(HealthRegistryWorker::new(health_registry.clone())); diff --git a/bin/agent-data-plane/src/internal/observability.rs b/bin/agent-data-plane/src/internal/observability.rs index 172a47fb72..ed579440e6 100644 --- a/bin/agent-data-plane/src/internal/observability.rs +++ b/bin/agent-data-plane/src/internal/observability.rs @@ -61,12 +61,9 @@ impl Supervisable for InternalTelemetryWorker { let mut blueprint = TopologyBlueprint::new("internal", &self.component_registry); blueprint.with_interconnect_capacity(DEFAULT_INTERCONNECT_CAPACITY); blueprint - .add_source("internal_metrics_in", int_metrics_config) - .map_err(|e| InitializationError::Failed { source: e })? - .add_destination("internal_metrics_out", prometheus_config) - .map_err(|e| InitializationError::Failed { source: e })? - .connect_component("internal_metrics_out", ["internal_metrics_in"]) - .map_err(|e| InitializationError::Failed { source: e })?; + .add_source("internal_metrics_in", int_metrics_config)? + .add_destination("internal_metrics_out", prometheus_config)? + .connect_component("internal_metrics_out", ["internal_metrics_in"])?; // Build and spawn the topology let built_topology = blueprint.build().await?; @@ -92,9 +89,10 @@ impl Supervisable for InternalTelemetryWorker { /// Creates the observability supervisor. /// /// This supervisor manages the internal telemetry topology (Prometheus metrics endpoint). +/// /// It runs on a dedicated single-threaded runtime. /// -/// Returns `None` if telemetry is not enabled. +/// Returns `Ok(None)` if telemetry is not enabled. /// /// # Errors ///