diff --git a/Cargo.toml b/Cargo.toml index 4927c5e..79170cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ otlp = [ ] journald = ["dep:tracing-journald"] log-control = ["dep:axum"] -tokio-metrics = ["otlp"] +tokio-metrics = ["dep:opentelemetry"] [dependencies] tracing = "0.1" @@ -36,7 +36,7 @@ tracing-core = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "registry"] } thiserror = "2.0" serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.52", features = ["net", "rt", "sync", "time"] } +tokio = { version = "1.52", features = ["net", "rt", "rt-multi-thread", "sync", "time"] } opentelemetry = { version = "0.31", optional = true } opentelemetry-otlp = { version = "0.31", features = ["http-proto", "trace", "metrics", "logs"], optional = true } opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "metrics", "logs"], optional = true } @@ -61,3 +61,19 @@ name = "stdout_custom_filter" toml = "1.1" tokio = { version = "1.52", features = ["macros", "rt-multi-thread", "time"] } tower = { version = "0.5", features = ["util"] } + +[[test]] +name = "init_otlp" +required-features = ["otlp"] + +[[test]] +name = "init_otlp_idempotent" +required-features = ["otlp"] + +[[test]] +name = "init_log_control" +required-features = ["log-control"] + +[[test]] +name = "init_otlp_log_control" +required-features = ["otlp", "log-control"] diff --git a/README.md b/README.md index 217ccfb..8084f4a 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Opinionated shared telemetry setup for workspace Rust services. - optional OTLP export for traces, logs, and metrics - optional `journald` output - optional localhost-only log-level control API -- optional Tokio runtime metrics exported through the OTLP/OpenTelemetry pipeline +- optional Tokio runtime metrics exported through the OpenTelemetry global meter pipeline The API is intentionally small and opinionated. @@ -46,9 +46,15 @@ same process is unsupported. For graceful shutdown, call `TelemetryGuard::shutdown().await` during teardown. Dropping the guard without calling `shutdown` first performs a best-effort fallback. -When OTLP is enabled, provider shutdown is blocking, so drop avoids running that -blocking shutdown path on an active Tokio runtime thread and may skip the final -OTLP flush. Explicit shutdown during teardown is strongly preferred. +When OTLP is enabled, drop attempts provider shutdown through +`tokio::task::block_in_place` on a multi-thread Tokio runtime. This crate enables +Tokio's `rt-multi-thread` feature to support that drop path. Consumers running a +`current_thread`-only runtime should call `TelemetryGuard::shutdown().await` +explicitly rather than relying on `Drop`. On a `current_thread` runtime the drop +fallback is unavailable, so it emits a stderr warning and may skip the final +OTLP flush. Explicit shutdown during teardown is strongly preferred so +background tasks can finish and final telemetry can flush in a predictable +order. ## Optional features @@ -57,7 +63,7 @@ OTLP flush. Explicit shutdown during teardown is strongly preferred. - `journald`: `tracing-journald` output; use `TelemetryBuilder::enable_journald()` - `log-control`: HTTP endpoints on `127.0.0.1` for runtime filter changes; exposes `LogControlConfig` and `TelemetryBuilder::with_log_control(...)` -- `tokio-metrics`: Tokio runtime gauges exported through OTLP/OpenTelemetry; implies `otlp`; use `TelemetryBuilder::enable_tokio_metrics()` +- `tokio-metrics`: Tokio runtime gauges exported through OpenTelemetry; use `TelemetryBuilder::enable_tokio_metrics()` GreptimeDB export does not require a dedicated crate feature. Configure GreptimeDB OTLP headers explicitly through `OtlpConfig::headers`; see @@ -69,9 +75,9 @@ This example assumes the `otlp`, `log-control`, `journald`, and `tokio-metrics` crate features are enabled. ```rust -use std::collections::HashMap; -use telemetry_setup::{LogControlConfig, OtlpConfig, OtlpHeadersConfig, TelemetryBuilder}; - +# use std::collections::HashMap; +# use telemetry_setup::{LogControlConfig, OtlpConfig, OtlpHeadersConfig, TelemetryBuilder}; +# fn example() -> Result<(), telemetry_setup::TelemetryError> { let _telemetry = TelemetryBuilder::new("controller") .with_stdout_filter("info") .with_otlp_config(OtlpConfig { @@ -97,6 +103,8 @@ let _telemetry = TelemetryBuilder::new("controller") .enable_journald() .enable_tokio_metrics() .init()?; +# Ok(()) +# } ``` Defaults: @@ -104,7 +112,8 @@ Defaults: - local filter comes from `RUST_LOG`, falling back to `info` - log-control port defaults to `6669` - OTLP defaults are local-first and target `http://localhost:4318` for OTLP/HTTP -- OTLP metric export and Tokio runtime metric collection default to a 5 second interval and can be overridden with `OtlpConfig::metrics_interval` +- OTLP metric export defaults to a 5 second interval and can be overridden with `OtlpConfig::metrics_interval` between 1 second and 1 day +- Tokio runtime metric collection defaults to a 5 second interval and can be overridden with `TelemetryBuilder::with_tokio_metrics_interval(...)` ## Documentation and examples @@ -121,10 +130,10 @@ Compilable example applications live in `examples/`: Notes: -- Tokio metrics are OTLP-only and require both the `tokio-metrics` crate feature, the `otlp` feature, and an OTLP configuration at runtime +- Tokio metrics require the `tokio-metrics` crate feature and any installed OpenTelemetry meter provider - Tokio metrics also require compiling the process with `RUSTFLAGS="--cfg tokio_unstable"` - call `TelemetryGuard::shutdown().await` to gracefully stop background tasks before OTLP providers are shut down so final telemetry can flush cleanly -- dropping `TelemetryGuard` without calling `shutdown` first falls back to best-effort teardown and aborts any tasks that are still running +- dropping `TelemetryGuard` without calling `shutdown` first falls back to best-effort teardown, aborts any tasks that are still running, and then attempts provider shutdown via `tokio::task::block_in_place` on multi-thread runtimes - the OTLP log rate limit is intentionally approximate at Unix-second boundaries under contention; it is a best-effort overload guard, not exact accounting - OTLP rate-limit warnings use at most one helper thread at a time to avoid spawning a new thread on every overflow transition during sustained overload - use `TelemetryBuilder::without_env_var()` when the process environment must not override the fallback stdout filter diff --git a/examples/greptime_otlp.rs b/examples/greptime_otlp.rs index c2a4c1a..3374f20 100644 --- a/examples/greptime_otlp.rs +++ b/examples/greptime_otlp.rs @@ -6,7 +6,7 @@ use telemetry_setup::{OtlpConfig, TelemetryBuilder}; async fn main() -> Result<(), Box> { let otlp_config: OtlpConfig = toml::from_str(include_str!("greptime_otlp.toml"))?; - let telemetry = TelemetryBuilder::new("controller") + let mut telemetry = TelemetryBuilder::new("controller") .with_otlp_config(otlp_config) .init()?; diff --git a/src/builder/feature_checks.rs b/src/builder/feature_checks.rs index 99b6eab..5dabbb8 100644 --- a/src/builder/feature_checks.rs +++ b/src/builder/feature_checks.rs @@ -50,33 +50,6 @@ pub(crate) fn reject_tokio_metrics_without_feature(enabled: bool) -> Result<(), Ok(()) } -/// Rejects Tokio metrics when OTLP export cannot be configured. -/// -/// # Arguments -/// -/// * `enabled` - Whether the builder requested Tokio runtime metric collection. -/// * `otlp_configured` - Whether the builder has an OTLP configuration. -/// -/// # Returns -/// -/// `Ok(())` when Tokio metrics are disabled or OTLP is configured. -/// -/// # Errors -/// -/// Returns [`TelemetryError::TokioMetricsRequiresOtlp`] when Tokio metrics are enabled -/// without a runtime OTLP configuration. -#[cfg(feature = "tokio-metrics")] -pub(crate) fn reject_tokio_metrics_without_otlp_config( - enabled: bool, - otlp_configured: bool, -) -> Result<(), TelemetryError> { - if enabled && !otlp_configured { - return Err(TelemetryError::TokioMetricsRequiresOtlp); - } - - Ok(()) -} - #[cfg(test)] mod tests { use super::{reject_journald_without_feature, reject_tokio_metrics_without_feature}; @@ -86,10 +59,4 @@ mod tests { assert!(reject_journald_without_feature(false).is_ok()); assert!(reject_tokio_metrics_without_feature(false).is_ok()); } - - #[cfg(feature = "tokio-metrics")] - #[test] - fn tokio_metrics_with_otlp_config_is_accepted() { - assert!(super::reject_tokio_metrics_without_otlp_config(true, true).is_ok()); - } } diff --git a/src/builder/filter.rs b/src/builder/filter.rs index 7fa2401..fe183cf 100644 --- a/src/builder/filter.rs +++ b/src/builder/filter.rs @@ -1,15 +1,10 @@ // SPDX-License-Identifier: MIT -use std::sync::Arc; - -pub(crate) type EnvLookup = Arc Option + Send + Sync>; - /// Resolves the local filter from the configured environment variable or fallback string. /// /// # Arguments /// /// * `env_var_name` - Optional environment variable name to query first. -/// * `env_lookup` - Lookup callback used to read environment values. /// * `stdout_filter` - Optional explicit fallback filter. /// /// # Returns @@ -17,11 +12,36 @@ pub(crate) type EnvLookup = Arc Option + Send + Sync>; /// The environment-provided filter, explicit fallback filter, or `info` default. pub(crate) fn resolve_stdout_filter( env_var_name: &Option, - env_lookup: &EnvLookup, stdout_filter: &Option, ) -> String { if let Some(env_var_name) = env_var_name - && let Some(filter) = env_lookup(env_var_name) + && let Ok(filter) = std::env::var(env_var_name) + { + return filter; + } + + stdout_filter.clone().unwrap_or_else(|| "info".to_string()) +} + +/// Resolves the local filter using a caller-provided lookup function. +/// +/// # Arguments +/// +/// * `env_var_name` - Optional environment variable name to query first. +/// * `lookup` - Function used to resolve environment values for tests. +/// * `stdout_filter` - Optional explicit fallback filter. +/// +/// # Returns +/// +/// The lookup-provided filter, explicit fallback filter, or `info` default. +#[cfg(test)] +pub(crate) fn resolve_stdout_filter_with_lookup( + env_var_name: &Option, + lookup: impl Fn(&str) -> Option, + stdout_filter: &Option, +) -> String { + if let Some(env_var_name) = env_var_name + && let Some(filter) = lookup(env_var_name) { return filter; } diff --git a/src/builder/layers.rs b/src/builder/layers.rs index b83c287..2a186e2 100644 --- a/src/builder/layers.rs +++ b/src/builder/layers.rs @@ -1,16 +1,25 @@ // SPDX-License-Identifier: MIT +//! Construction helpers for the individual subscriber layers used by +//! [`TelemetryBuilder::init`](crate::TelemetryBuilder::init). +//! +//! Every helper returns its layer with concrete `impl Layer` types so the +//! subscriber can be composed via static `.with(...)` calls in `init()`. When +//! runtime log control is enabled, helpers that expose mutable filters wrap +//! those filters in [`tracing_subscriber::reload::Layer`] so the active filter +//! can be updated at runtime. + +use tracing_subscriber::Layer; use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::reload; -use tracing_subscriber::{Layer, Registry}; -#[cfg(any(feature = "journald", feature = "otlp"))] +#[cfg(any(feature = "journald", feature = "otlp", feature = "log-control"))] use crate::error::TelemetryError; -pub(crate) type BoxedLayer = Box + Send + Sync>; -pub(crate) type FilterReloadHandle = reload::Handle; +#[cfg(feature = "log-control")] +use crate::log_control::ReloadCallback; -/// Builds the stdout formatting layer and its reload handle. +/// Builds the stdout formatting layer and its filter reload handle. /// /// # Arguments /// @@ -18,14 +27,42 @@ pub(crate) type FilterReloadHandle = reload::Handle; /// /// # Returns /// -/// A boxed subscriber layer and the reload handle used to update its filter. -pub(crate) fn build_fmt_layer(stdout_filter: EnvFilter) -> (BoxedLayer, FilterReloadHandle) { +/// A subscriber layer composable onto any subscriber `S` and the reload handle +/// used to update its filter at runtime. +#[cfg(feature = "log-control")] +pub(crate) fn build_fmt_layer( + stdout_filter: EnvFilter, +) -> (impl Layer + Send + Sync + 'static, ReloadCallback) +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ let (filter_layer, filter_handle) = reload::Layer::new(stdout_filter); let layer = tracing_subscriber::fmt::layer().with_filter(filter_layer); - (Box::new(layer), filter_handle) + let reload = build_reload_callback(filter_handle); + (layer, reload) } -/// Builds the journald layer and its reload handle. +#[cfg(not(feature = "log-control"))] +pub(crate) fn build_fmt_layer( + stdout_filter: EnvFilter, +) -> (impl Layer + Send + Sync + 'static, ()) +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ + let (filter_layer, _) = reload::Layer::new(stdout_filter); + let layer = tracing_subscriber::fmt::layer().with_filter(filter_layer); + (layer, ()) +} + +/// Builds the journald layer and its filter reload handle. /// /// # Arguments /// @@ -33,40 +70,60 @@ pub(crate) fn build_fmt_layer(stdout_filter: EnvFilter) -> (BoxedLayer, FilterRe /// /// # Returns /// -/// A boxed journald layer and the reload handle used to update its filter. +/// A journald layer composable onto any subscriber `S` and the reload handle +/// used to update its filter at runtime. /// /// # Errors /// /// Returns [`TelemetryError`] when the filter cannot be parsed or the journald /// layer cannot be constructed. -#[cfg(feature = "journald")] -pub(crate) fn build_journald_layer( +#[cfg(all(feature = "journald", feature = "log-control"))] +pub(crate) fn build_journald_layer( filter_spec: &str, -) -> Result<(BoxedLayer, FilterReloadHandle), TelemetryError> { +) -> Result<(impl Layer + Send + Sync + 'static, ReloadCallback), TelemetryError> +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ let journald_filter = EnvFilter::try_new(filter_spec).map_err(TelemetryError::subscriber)?; let (filter_layer, filter_handle) = reload::Layer::new(journald_filter); let layer = tracing_journald::layer() .map_err(TelemetryError::subscriber)? .with_filter(filter_layer); - Ok((Box::new(layer), filter_handle)) + let reload = build_reload_callback(filter_handle); + Ok((layer, reload)) +} + +#[cfg(all(feature = "journald", not(feature = "log-control")))] +pub(crate) fn build_journald_layer( + filter_spec: &str, +) -> Result<(impl Layer + Send + Sync + 'static, ()), TelemetryError> +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ + let journald_filter = EnvFilter::try_new(filter_spec).map_err(TelemetryError::subscriber)?; + let (filter_layer, _) = reload::Layer::new(journald_filter); + let layer = tracing_journald::layer() + .map_err(TelemetryError::subscriber)? + .with_filter(filter_layer); + Ok((layer, ())) } -/// Built OTLP layers, reload handles, and providers that must be kept alive. +/// Built OTLP providers. #[cfg(feature = "otlp")] pub(crate) struct OtlpLayerParts { - /// Trace export layer. - pub trace_layer: BoxedLayer, - /// Log export layer. - pub log_layer: BoxedLayer, - /// Reload handle for the trace layer filter. - pub trace_reload_handle: FilterReloadHandle, - /// Reload handle for the log layer filter. - pub log_reload_handle: FilterReloadHandle, - /// Providers backing the OTLP layers. + /// Providers backing the OTLP layers; kept alive by the guard. pub providers: crate::otlp::BuiltProviders, } -/// Builds OTLP trace and log layers, their reload handles, and backing providers. +/// Builds OTLP providers. /// /// # Arguments /// @@ -75,49 +132,130 @@ pub(crate) struct OtlpLayerParts { /// /// # Returns /// -/// Constructed OTLP layers plus provider guard parts. +/// Constructed OTLP providers. /// /// # Errors /// -/// Returns [`TelemetryError`] when providers cannot be built or filters cannot be parsed. +/// Returns [`TelemetryError`] when providers cannot be built. #[cfg(feature = "otlp")] -pub(crate) fn build_otlp_layers( +pub(crate) fn build_otlp_parts( service_name: &str, otlp_config: &crate::otlp::OtlpConfig, ) -> Result { + let providers = crate::otlp::build_providers(service_name, otlp_config)?; + + Ok(OtlpLayerParts { providers }) +} + +/// Builds the OTLP trace export layer for subscriber `S`. +#[cfg(all(feature = "otlp", feature = "log-control"))] +pub(crate) fn build_otlp_trace_layer( + tracer_provider: &opentelemetry_sdk::trace::SdkTracerProvider, + filter_spec: &str, +) -> Result<(impl Layer + Send + Sync + 'static, ReloadCallback), TelemetryError> +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ use opentelemetry::trace::TracerProvider; - use tracing_subscriber::filter::FilterExt; - let providers = crate::otlp::build_providers(service_name, otlp_config)?; - let tracer = providers - .tracer_provider - .tracer(providers.effective_service_name.clone()); - - let otlp_trace_filter = - EnvFilter::try_new(otlp_config.log_level.as_str()).map_err(TelemetryError::subscriber)?; - let otlp_log_filter = - EnvFilter::try_new(otlp_config.log_level.as_str()).map_err(TelemetryError::subscriber)?; - let (trace_filter_layer, trace_filter_handle) = reload::Layer::new(otlp_trace_filter); - let (log_filter_layer, log_filter_handle) = reload::Layer::new(otlp_log_filter); - - let trace_layer = tracing_opentelemetry::layer() + let trace_filter = EnvFilter::try_new(filter_spec).map_err(TelemetryError::subscriber)?; + let (filter_layer, filter_handle) = reload::Layer::new(trace_filter); + let tracer = tracer_provider.tracer("telemetry-setup"); + let layer = tracing_opentelemetry::layer() .with_tracer(tracer) - .with_filter(trace_filter_layer); + .with_filter(filter_layer); + let reload = build_reload_callback(filter_handle); + Ok((layer, reload)) +} + +/// Builds the OTLP trace export layer for subscriber `S`. +#[cfg(all(feature = "otlp", not(feature = "log-control")))] +pub(crate) fn build_otlp_trace_layer( + tracer_provider: &opentelemetry_sdk::trace::SdkTracerProvider, + filter_spec: &str, +) -> Result + Send + Sync + 'static, TelemetryError> +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ + use opentelemetry::trace::TracerProvider; + + let trace_filter = EnvFilter::try_new(filter_spec).map_err(TelemetryError::subscriber)?; + let tracer = tracer_provider.tracer("telemetry-setup"); + Ok(tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(trace_filter)) +} + +/// Builds the OTLP log export layer for subscriber `S`. +#[cfg(all(feature = "otlp", feature = "log-control"))] +pub(crate) fn build_otlp_log_layer( + logger_provider: &opentelemetry_sdk::logs::SdkLoggerProvider, + filter_spec: &str, + rate_limit_per_sec: Option, +) -> Result<(impl Layer + Send + Sync + 'static, ReloadCallback), TelemetryError> +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ + use tracing_subscriber::filter::FilterExt; - let log_layer = opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new( - &providers.logger_provider, + let log_filter = EnvFilter::try_new(filter_spec).map_err(TelemetryError::subscriber)?; + let (filter_layer, filter_handle) = reload::Layer::new(log_filter); + let layer = + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(logger_provider) + .with_filter(filter_layer.and(crate::otlp::RateLimitFilter::new_optional( + rate_limit_per_sec, + ))); + let reload = build_reload_callback(filter_handle); + Ok((layer, reload)) +} + +/// Builds the OTLP log export layer for subscriber `S`. +#[cfg(all(feature = "otlp", not(feature = "log-control")))] +pub(crate) fn build_otlp_log_layer( + logger_provider: &opentelemetry_sdk::logs::SdkLoggerProvider, + filter_spec: &str, + rate_limit_per_sec: Option, +) -> Result + Send + Sync + 'static, TelemetryError> +where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, +{ + use tracing_subscriber::filter::FilterExt; + + let log_filter = EnvFilter::try_new(filter_spec).map_err(TelemetryError::subscriber)?; + Ok( + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(logger_provider) + .with_filter(log_filter.and(crate::otlp::RateLimitFilter::new_optional( + rate_limit_per_sec, + ))), ) - .with_filter( - log_filter_layer.and(crate::otlp::RateLimitFilter::new_optional( - otlp_config.log_rate_limit_per_sec, - )), - ); - - Ok(OtlpLayerParts { - trace_layer: Box::new(trace_layer), - log_layer: Box::new(log_layer), - trace_reload_handle: trace_filter_handle, - log_reload_handle: log_filter_handle, - providers, +} + +#[cfg(feature = "log-control")] +fn build_reload_callback(handle: reload::Handle) -> ReloadCallback +where + S: tracing::Subscriber + Send + Sync + 'static, +{ + std::sync::Arc::new(move |spec: String| { + let filter = EnvFilter::try_new(spec.as_str()) + .map_err(|error| TelemetryError::subscriber(error).to_string())?; + handle + .reload(filter) + .map_err(|error| TelemetryError::subscriber(error).to_string()) }) } diff --git a/src/builder/mod.rs b/src/builder/mod.rs index e3b3920..40fa025 100644 --- a/src/builder/mod.rs +++ b/src/builder/mod.rs @@ -16,7 +16,9 @@ //! - [`TelemetryBuilder::enable_journald`] enables journald output when compiled //! with the `journald` feature. //! - [`TelemetryBuilder::enable_tokio_metrics`] enables Tokio runtime metrics when -//! compiled with `tokio-metrics` and OTLP is configured. +//! compiled with `tokio-metrics`. +//! - [`TelemetryBuilder::with_tokio_metrics_interval`] sets the Tokio runtime +//! metrics sampling cadence. //! - `with_otlp_config` and `with_log_control` are available behind their //! matching crate features. //! @@ -26,12 +28,12 @@ mod feature_checks; mod filter; pub(crate) mod layers; +use std::time::Duration; + use tracing_subscriber::filter::EnvFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use std::sync::Arc; - use tokio_util::sync::CancellationToken; use crate::error::TelemetryError; @@ -45,8 +47,6 @@ use crate::log_control::{ #[cfg(feature = "otlp")] use crate::otlp::OtlpConfig; -use self::filter::EnvLookup; - /// Builder for installing local logging, optional OTLP export, and related helpers. pub struct TelemetryBuilder { #[cfg_attr(not(feature = "otlp"), allow(dead_code))] @@ -55,11 +55,36 @@ pub struct TelemetryBuilder { otlp_config: Option, stdout_filter: Option, env_var_name: Option, - env_lookup: EnvLookup, #[cfg(feature = "log-control")] log_control_config: Option, enable_journald: bool, enable_tokio_metrics: bool, + tokio_metrics_interval: Duration, +} + +#[cfg(feature = "log-control")] +type StdoutReload = crate::log_control::ReloadCallback; +#[cfg(all(feature = "log-control", feature = "otlp"))] +type OtlpReload = Option; + +#[cfg(feature = "otlp")] +struct InitializedOtlpProviders { + tracer_provider: opentelemetry_sdk::trace::SdkTracerProvider, + logger_provider: opentelemetry_sdk::logs::SdkLoggerProvider, + meter_provider: opentelemetry_sdk::metrics::SdkMeterProvider, +} + +#[cfg(feature = "log-control")] +struct LogControlParts { + stdout_reload: StdoutReload, + otlp_reload: Option, +} + +struct InstalledSubscriber { + #[cfg(feature = "otlp")] + providers: Option, + #[cfg(feature = "log-control")] + log_control: LogControlParts, } impl std::fmt::Debug for TelemetryBuilder { @@ -74,6 +99,7 @@ impl std::fmt::Debug for TelemetryBuilder { debug.field("log_control_config", &self.log_control_config); debug.field("enable_journald", &self.enable_journald); debug.field("enable_tokio_metrics", &self.enable_tokio_metrics); + debug.field("tokio_metrics_interval", &self.tokio_metrics_interval); debug.finish_non_exhaustive() } } @@ -96,11 +122,11 @@ impl TelemetryBuilder { otlp_config: None, stdout_filter: None, env_var_name: Some("RUST_LOG".to_string()), - env_lookup: Arc::new(|name| std::env::var(name).ok()), #[cfg(feature = "log-control")] log_control_config: None, enable_journald: false, enable_tokio_metrics: false, + tokio_metrics_interval: Duration::from_secs(5), } } @@ -160,24 +186,6 @@ impl TelemetryBuilder { self } - /// Sets the environment lookup used to source the initial local filter in tests. - /// - /// # Arguments - /// - /// * `lookup` - Function that returns the environment value for a variable name. - /// - /// # Returns - /// - /// The updated builder. - #[cfg(test)] - fn with_env_lookup( - mut self, - lookup: impl Fn(&str) -> Option + Send + Sync + 'static, - ) -> Self { - self.env_lookup = Arc::new(lookup); - self - } - /// Enables the localhost log-control server using the provided configuration. /// /// # Arguments @@ -214,27 +222,6 @@ impl TelemetryBuilder { self } - /// Enables or disables journald output when the crate is built with that feature. - /// - /// Prefer [`TelemetryBuilder::enable_journald`] or - /// [`TelemetryBuilder::disable_journald`] in new code. - /// - /// # Arguments - /// - /// * `enable` - `true` to add a journald layer, or `false` to leave it disabled. - /// - /// # Returns - /// - /// The updated builder. - #[deprecated( - since = "0.1.1", - note = "use enable_journald() or disable_journald() instead" - )] - pub fn with_journald(mut self, enable: bool) -> Self { - self.enable_journald = enable; - self - } - /// Enables Tokio runtime metric collection. /// /// # Returns @@ -255,25 +242,17 @@ impl TelemetryBuilder { self } - /// Enables or disables Tokio runtime metric collection. - /// - /// Prefer [`TelemetryBuilder::enable_tokio_metrics`] or - /// [`TelemetryBuilder::disable_tokio_metrics`] in new code. + /// Sets the Tokio runtime metrics collection interval. /// /// # Arguments /// - /// * `enable` - `true` to spawn Tokio runtime metric collection, or `false` - /// to leave it disabled. + /// * `interval` - Delay between Tokio runtime metrics snapshots. /// /// # Returns /// /// The updated builder. - #[deprecated( - since = "0.1.1", - note = "use enable_tokio_metrics() or disable_tokio_metrics() instead" - )] - pub fn with_tokio_metrics(mut self, enable: bool) -> Self { - self.enable_tokio_metrics = enable; + pub fn with_tokio_metrics_interval(mut self, interval: Duration) -> Self { + self.tokio_metrics_interval = interval; self } @@ -298,34 +277,9 @@ impl TelemetryBuilder { feature_checks::reject_journald_without_feature(self.enable_journald)?; feature_checks::reject_tokio_metrics_without_feature(self.enable_tokio_metrics)?; - #[cfg(all(feature = "tokio-metrics", feature = "otlp"))] - feature_checks::reject_tokio_metrics_without_otlp_config( - self.enable_tokio_metrics, - self.otlp_config.is_some(), - )?; - #[cfg(all(feature = "tokio-metrics", not(feature = "otlp")))] - feature_checks::reject_tokio_metrics_without_otlp_config(self.enable_tokio_metrics, false)?; - let stdout_spec = self.resolve_stdout_filter(); let stdout_filter = EnvFilter::try_new(stdout_spec.as_str()).map_err(TelemetryError::subscriber)?; - let (fmt_layer, fmt_filter_handle) = layers::build_fmt_layer(stdout_filter); - #[cfg(not(feature = "log-control"))] - let _ = &fmt_filter_handle; - #[allow(unused_mut)] - let mut subscriber_layers = vec![fmt_layer]; - - #[cfg(feature = "journald")] - let journald_reload_handle = if self.enable_journald { - let (journald_layer, journald_reload_handle) = - layers::build_journald_layer(stdout_spec.as_str())?; - subscriber_layers.push(journald_layer); - Some(journald_reload_handle) - } else { - None - }; - #[cfg(all(feature = "journald", not(feature = "log-control")))] - let _ = &journald_reload_handle; #[cfg(all(feature = "log-control", feature = "otlp"))] let otlp_current = self @@ -335,39 +289,14 @@ impl TelemetryBuilder { #[cfg(all(feature = "log-control", not(feature = "otlp")))] let otlp_current = None; - #[cfg(feature = "otlp")] - let mut otlp_guard_parts = None; - #[cfg(feature = "otlp")] - let mut otlp_reload_handles = None; - #[cfg(feature = "otlp")] - if let Some(otlp_config) = self.otlp_config.as_ref() { - let otlp_layers = layers::build_otlp_layers(&self.service_name, otlp_config)?; - let layers::OtlpLayerParts { - trace_layer, - log_layer, - trace_reload_handle, - log_reload_handle, - providers, - } = otlp_layers; - subscriber_layers.push(trace_layer); - subscriber_layers.push(log_layer); - otlp_reload_handles = Some((trace_reload_handle, log_reload_handle)); - otlp_guard_parts = Some(providers); - } - #[cfg(all(feature = "otlp", not(feature = "log-control")))] - let _ = &otlp_reload_handles; - - tracing_subscriber::registry() - .with(subscriber_layers) - .try_init() - .map_err(TelemetryError::subscriber)?; + let _installed = self.install_subscriber(stdout_filter, stdout_spec.as_str())?; let cancel_token = CancellationToken::new(); #[allow(unused_mut)] let mut guard = TelemetryGuard::new(cancel_token); #[cfg(feature = "otlp")] - if let Some(providers) = otlp_guard_parts { + if let Some(providers) = _installed.providers { guard.tracer_provider = Some(providers.tracer_provider); guard.logger_provider = Some(providers.logger_provider); guard.meter_provider = Some(providers.meter_provider); @@ -375,56 +304,268 @@ impl TelemetryBuilder { #[cfg(feature = "tokio-metrics")] if self.enable_tokio_metrics { - let interval = self - .otlp_config - .as_ref() - .map(|config| config.metrics_interval) - .unwrap_or_else(|| std::time::Duration::from_secs(5)); let task = crate::tokio_metrics::start_tokio_metrics_monitoring( guard.cancel_token.child_token(), - interval, + self.tokio_metrics_interval, ); guard.background_tasks.push(task); } #[cfg(feature = "log-control")] if let Some(config) = self.log_control_config { + let state = ReloadState::new( + stdout_spec, + otlp_current, + _installed.log_control.stdout_reload, + _installed.log_control.otlp_reload, + ); + let task = spawn_log_control_server(config, state, guard.cancel_token.child_token())?; + guard.background_tasks.push(task); + } + + Ok(guard) + } + + fn install_subscriber( + &self, + stdout_filter: EnvFilter, + stdout_spec: &str, + ) -> Result { + #[cfg(not(feature = "journald"))] + let _ = stdout_spec; + if self.enable_journald { #[cfg(feature = "journald")] - let journald_reload_handle = journald_reload_handle.clone(); + { + self.install_with_journald(stdout_filter, stdout_spec) + } #[cfg(not(feature = "journald"))] - let journald_reload_handle = None; + unreachable!("journald feature checked before subscriber installation") + } else { + self.install_without_journald(stdout_filter) + } + } - let stdout_reload = - stdout_reload_callback(fmt_filter_handle.clone(), journald_reload_handle); + #[cfg(all(feature = "otlp", feature = "journald"))] + fn install_with_journald( + &self, + stdout_filter: EnvFilter, + stdout_spec: &str, + ) -> Result { + let (fmt_layer, _fmt_reload_or_unit) = + layers::build_fmt_layer::(stdout_filter); + let subscriber = tracing_subscriber::registry().with(fmt_layer); + let (journald_layer, _journald_reload_or_unit) = + layers::build_journald_layer::<_>(stdout_spec)?; + let subscriber = subscriber.with(journald_layer); + + if let Some(otlp_config) = self.otlp_config.as_ref() { + let installed_otlp = self.install_otlp_layers(subscriber, otlp_config)?; + + #[cfg(feature = "log-control")] + let log_control = LogControlParts { + stdout_reload: stdout_reload_callback( + _fmt_reload_or_unit, + Some(_journald_reload_or_unit), + ), + otlp_reload: installed_otlp.otlp_reload, + }; + Ok(InstalledSubscriber { + providers: installed_otlp.providers, + #[cfg(feature = "log-control")] + log_control, + }) + } else { + subscriber.try_init().map_err(TelemetryError::subscriber)?; + + #[cfg(feature = "log-control")] + let log_control = LogControlParts { + stdout_reload: stdout_reload_callback( + _fmt_reload_or_unit, + Some(_journald_reload_or_unit), + ), + otlp_reload: None, + }; + + Ok(InstalledSubscriber { + providers: None, + #[cfg(feature = "log-control")] + log_control, + }) + } + } + + #[cfg(all(not(feature = "otlp"), feature = "journald"))] + fn install_with_journald( + &self, + stdout_filter: EnvFilter, + stdout_spec: &str, + ) -> Result { + let (fmt_layer, _fmt_reload_or_unit) = + layers::build_fmt_layer::(stdout_filter); + let subscriber = tracing_subscriber::registry().with(fmt_layer); + let (journald_layer, _journald_reload_or_unit) = + layers::build_journald_layer::<_>(stdout_spec)?; + subscriber + .with(journald_layer) + .try_init() + .map_err(TelemetryError::subscriber)?; + + #[cfg(feature = "log-control")] + let log_control = LogControlParts { + stdout_reload: stdout_reload_callback( + _fmt_reload_or_unit, + Some(_journald_reload_or_unit), + ), + otlp_reload: None, + }; + + Ok(InstalledSubscriber { #[cfg(feature = "otlp")] - let otlp_reload = match otlp_reload_handles { - Some((trace_handle, log_handle)) => { - otlp_reload_callback(Some(trace_handle), Some(log_handle)) - } - None => None, + providers: None, + #[cfg(feature = "log-control")] + log_control, + }) + } + + #[cfg(feature = "otlp")] + fn install_without_journald( + &self, + stdout_filter: EnvFilter, + ) -> Result { + let (fmt_layer, _fmt_reload_or_unit) = + layers::build_fmt_layer::(stdout_filter); + let subscriber = tracing_subscriber::registry().with(fmt_layer); + + if let Some(otlp_config) = self.otlp_config.as_ref() { + let installed_otlp = self.install_otlp_layers(subscriber, otlp_config)?; + + #[cfg(feature = "log-control")] + let log_control = LogControlParts { + stdout_reload: stdout_reload_callback(_fmt_reload_or_unit, None), + otlp_reload: installed_otlp.otlp_reload, }; - #[cfg(not(feature = "otlp"))] - let otlp_reload = None; - let state = ReloadState::new(stdout_spec, otlp_current, stdout_reload, otlp_reload); - let task = spawn_log_control_server(config, state, guard.cancel_token.child_token())?; - guard.background_tasks.push(task); + Ok(InstalledSubscriber { + providers: installed_otlp.providers, + #[cfg(feature = "log-control")] + log_control, + }) + } else { + subscriber.try_init().map_err(TelemetryError::subscriber)?; + + #[cfg(feature = "log-control")] + let log_control = LogControlParts { + stdout_reload: stdout_reload_callback(_fmt_reload_or_unit, None), + otlp_reload: None, + }; + + Ok(InstalledSubscriber { + providers: None, + #[cfg(feature = "log-control")] + log_control, + }) } + } - Ok(guard) + #[cfg(not(feature = "otlp"))] + fn install_without_journald( + &self, + stdout_filter: EnvFilter, + ) -> Result { + let (fmt_layer, _fmt_reload_or_unit) = + layers::build_fmt_layer::(stdout_filter); + tracing_subscriber::registry() + .with(fmt_layer) + .try_init() + .map_err(TelemetryError::subscriber)?; + + #[cfg(feature = "log-control")] + let log_control = LogControlParts { + stdout_reload: stdout_reload_callback(_fmt_reload_or_unit, None), + otlp_reload: None, + }; + + Ok(InstalledSubscriber { + #[cfg(feature = "otlp")] + providers: None, + #[cfg(feature = "log-control")] + log_control, + }) + } + + #[cfg(feature = "otlp")] + fn install_otlp_layers( + &self, + subscriber: S, + otlp_config: &OtlpConfig, + ) -> Result + where + S: tracing::Subscriber + + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup> + + Send + + Sync + + 'static, + { + let layers::OtlpLayerParts { providers } = + layers::build_otlp_parts(&self.service_name, otlp_config)?; + + #[cfg(feature = "log-control")] + let (trace_layer, trace_reload) = layers::build_otlp_trace_layer::<_>( + &providers.tracer_provider, + otlp_config.log_level.as_str(), + )?; + #[cfg(not(feature = "log-control"))] + let trace_layer = layers::build_otlp_trace_layer::<_>( + &providers.tracer_provider, + otlp_config.log_level.as_str(), + )?; + let subscriber = subscriber.with(trace_layer); + #[cfg(feature = "log-control")] + let (log_layer, log_reload) = layers::build_otlp_log_layer::<_>( + &providers.logger_provider, + otlp_config.log_level.as_str(), + otlp_config.log_rate_limit_per_sec, + )?; + #[cfg(not(feature = "log-control"))] + let log_layer = layers::build_otlp_log_layer::<_>( + &providers.logger_provider, + otlp_config.log_level.as_str(), + otlp_config.log_rate_limit_per_sec, + )?; + subscriber + .with(log_layer) + .try_init() + .map_err(TelemetryError::subscriber)?; + + Ok(InstalledOtlp { + providers: Some(InitializedOtlpProviders { + tracer_provider: providers.tracer_provider, + logger_provider: providers.logger_provider, + meter_provider: providers.meter_provider, + }), + #[cfg(feature = "log-control")] + otlp_reload: otlp_reload_callback(Some(trace_reload), Some(log_reload)), + }) } /// Resolves the local filter from the configured environment variable or fallback string. fn resolve_stdout_filter(&self) -> String { - filter::resolve_stdout_filter(&self.env_var_name, &self.env_lookup, &self.stdout_filter) + filter::resolve_stdout_filter(&self.env_var_name, &self.stdout_filter) } } +#[cfg(feature = "otlp")] +struct InstalledOtlp { + providers: Option, + #[cfg(feature = "log-control")] + otlp_reload: OtlpReload, +} + #[cfg(test)] mod tests { use super::TelemetryBuilder; - use crate::error::TelemetryError; + use crate::builder::filter::resolve_stdout_filter_with_lookup; #[test] fn stdout_filter_defaults_to_info() { @@ -434,14 +575,19 @@ mod tests { #[test] fn stdout_filter_prefers_environment_override() { - let builder = TelemetryBuilder::new("controller") - .with_env_var("TELEMETRY_TEST_RUST_LOG") - .with_env_lookup(|name| { - (name == "TELEMETRY_TEST_RUST_LOG").then(|| "controller=debug".to_string()) - }) - .with_stdout_filter("info"); - - assert_eq!(builder.resolve_stdout_filter(), "controller=debug"); + let env_var_name = Some("TELEMETRY_TEST_RUST_LOG".to_string()); + let stdout_filter = Some("info".to_string()); + + assert_eq!( + resolve_stdout_filter_with_lookup( + &env_var_name, + |name| { + (name == "TELEMETRY_TEST_RUST_LOG").then(|| "controller=debug".to_string()) + }, + &stdout_filter, + ), + "controller=debug" + ); } #[cfg(not(feature = "journald"))] @@ -451,7 +597,7 @@ mod tests { assert!(matches!( result, - Err(TelemetryError::JournaldFeatureDisabled) + Err(crate::TelemetryError::JournaldFeatureDisabled) )); } @@ -464,33 +610,7 @@ mod tests { assert!(matches!( result, - Err(TelemetryError::TokioMetricsFeatureDisabled) - )); - } - - #[cfg(all(feature = "tokio-metrics", feature = "otlp"))] - #[test] - fn init_rejects_tokio_metrics_without_otlp_config() { - let result = TelemetryBuilder::new("controller") - .enable_tokio_metrics() - .init(); - - assert!(matches!( - result, - Err(TelemetryError::TokioMetricsRequiresOtlp) - )); - } - - #[cfg(all(feature = "tokio-metrics", not(feature = "otlp")))] - #[test] - fn init_rejects_tokio_metrics_when_otlp_feature_is_absent() { - let result = TelemetryBuilder::new("controller") - .enable_tokio_metrics() - .init(); - - assert!(matches!( - result, - Err(TelemetryError::TokioMetricsRequiresOtlp) + Err(crate::TelemetryError::TokioMetricsFeatureDisabled) )); } } diff --git a/src/error.rs b/src/error.rs index 4266783..ea9328f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -31,12 +31,15 @@ pub enum TelemetryError { /// Tokio metrics were requested without compiling the `tokio-metrics` feature. #[error("the telemetry crate was built without the `tokio-metrics` feature")] TokioMetricsFeatureDisabled, - /// Tokio metrics were enabled without a runtime OTLP configuration. - #[error("Tokio metrics require OTLP export to be configured")] - TokioMetricsRequiresOtlp, /// An OTLP endpoint URL was invalid. - #[error("invalid OTLP endpoint URL: {0}")] - OtlpEndpoint(#[source] Box), + #[error("invalid OTLP {signal} endpoint URL: {source}")] + OtlpEndpoint { + /// Signal whose endpoint URL failed to parse. + signal: &'static str, + /// Underlying URL parse error. + #[source] + source: Box, + }, /// A tracked telemetry background task failed during shutdown. #[error("telemetry background task failed: {0}")] BackgroundTask(#[from] BackgroundTaskError), @@ -87,11 +90,14 @@ impl TelemetryError { } #[cfg(feature = "otlp")] - pub(crate) fn otlp_endpoint(error: E) -> Self + pub(crate) fn otlp_endpoint(signal: &'static str, error: E) -> Self where E: std::error::Error + Send + Sync + 'static, { - Self::OtlpEndpoint(Box::new(error)) + Self::OtlpEndpoint { + signal, + source: Box::new(error), + } } } @@ -129,7 +135,7 @@ mod tests { let trace = TelemetryError::trace_provider(TestError("trace boom")); let log = TelemetryError::log_provider(TestError("log boom")); let meter = TelemetryError::meter_provider(TestError("meter boom")); - let endpoint = TelemetryError::otlp_endpoint(TestError("endpoint boom")); + let endpoint = TelemetryError::otlp_endpoint("traces", TestError("endpoint boom")); assert_eq!( trace.to_string(), @@ -145,12 +151,12 @@ mod tests { ); assert_eq!( endpoint.to_string(), - "invalid OTLP endpoint URL: endpoint boom" + "invalid OTLP traces endpoint URL: endpoint boom" ); assert!(matches!(trace, TelemetryError::TraceProvider(_))); assert!(matches!(log, TelemetryError::LogProvider(_))); assert!(matches!(meter, TelemetryError::MeterProvider(_))); - assert!(matches!(endpoint, TelemetryError::OtlpEndpoint(_))); + assert!(matches!(endpoint, TelemetryError::OtlpEndpoint { .. })); } #[cfg(feature = "log-control")] diff --git a/src/guard.rs b/src/guard.rs index 9487bcc..23fda3c 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -18,10 +18,11 @@ use tokio_util::sync::CancellationToken; /// providers. Dropping the guard without calling `shutdown` first performs a /// best-effort fallback that cancels the token and aborts any remaining tasks. /// -/// When OTLP is enabled, provider shutdown is blocking. Drop avoids running -/// that blocking shutdown path when the guard is dropped on an active Tokio -/// runtime thread and logs a warning instead, which may skip final OTLP flushes. -/// Prefer calling [`TelemetryGuard::shutdown`] explicitly during teardown. +/// When OTLP is enabled, provider shutdown is blocking. During drop, the guard +/// attempts to flush providers via `tokio::task::block_in_place` when dropped on +/// a multi-thread Tokio runtime. On a current-thread runtime that fallback is +/// unavailable, so drop emits a warning to stderr and may skip the final OTLP +/// flush. Prefer calling [`TelemetryGuard::shutdown`] explicitly during teardown. #[must_use = "keep TelemetryGuard alive for the process lifetime, or call shutdown().await during teardown"] #[derive(Debug)] pub struct TelemetryGuard { @@ -37,13 +38,6 @@ pub struct TelemetryGuard { pub(crate) background_tasks: Vec>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum DropShutdownBehavior { - SkipCompleted, - SkipOnRuntime, - ShutdownProviders, -} - impl TelemetryGuard { /// Creates an empty guard populated by [`crate::TelemetryBuilder`]. /// @@ -66,9 +60,9 @@ impl TelemetryGuard { /// Requests graceful shutdown, waits for background tasks to finish, and /// then shuts down OTLP providers. /// - /// This is the preferred teardown path because it allows background tasks - /// to exit cleanly and performs OTLP provider shutdown outside the degraded - /// drop fallback. + /// This method is idempotent. After the first call starts shutdown, later + /// calls return `Ok(())` immediately and do not repeat task or provider + /// teardown. /// /// # Returns /// @@ -80,11 +74,14 @@ impl TelemetryGuard { /// Returns [`crate::TelemetryError::BackgroundTask`] if a background task /// fails or is cancelled unexpectedly. Remaining tasks and providers are /// still shut down before the error is returned. - pub async fn shutdown(mut self) -> Result<(), crate::TelemetryError> { + pub async fn shutdown(&mut self) -> Result<(), crate::TelemetryError> { + if self.shutdown_completed.swap(true, Ordering::Relaxed) { + return Ok(()); + } + self.request_shutdown(); let task_result = self.wait_for_background_tasks().await; self.shutdown_providers(); - self.shutdown_completed.store(true, Ordering::Relaxed); task_result.map_err(crate::TelemetryError::background_task) } @@ -123,20 +120,6 @@ impl TelemetryGuard { task.await } - /// Determines how drop should handle OTLP provider shutdown. - fn drop_shutdown_behavior(&self) -> DropShutdownBehavior { - if self.shutdown_completed.load(Ordering::Relaxed) { - return DropShutdownBehavior::SkipCompleted; - } - - #[cfg(feature = "otlp")] - if tokio::runtime::Handle::try_current().is_ok() { - return DropShutdownBehavior::SkipOnRuntime; - } - - DropShutdownBehavior::ShutdownProviders - } - /// Shuts down any configured OTLP providers. fn shutdown_providers(&mut self) { #[cfg(feature = "otlp")] @@ -158,6 +141,25 @@ impl TelemetryGuard { } } } + + /// Runs drop-time provider shutdown. + /// + /// When dropping on a multi-thread Tokio runtime, provider shutdown is run + /// inside `tokio::task::block_in_place` so exporter teardown can block + /// without stalling the async scheduler. Outside a Tokio runtime, shutdown + /// runs directly. On a current-thread runtime, `block_in_place` panics and + /// the caller can fall back to a stderr warning. + fn run_drop_shutdown(shutdown: impl FnOnce()) -> Result<(), ()> { + if tokio::runtime::Handle::try_current().is_err() { + shutdown(); + return Ok(()); + } + + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + tokio::task::block_in_place(shutdown); + })) + .map_err(|_| ()) + } } impl Drop for TelemetryGuard { @@ -170,14 +172,17 @@ impl Drop for TelemetryGuard { } } - match self.drop_shutdown_behavior() { - DropShutdownBehavior::SkipCompleted => {} - DropShutdownBehavior::SkipOnRuntime => { - tracing::warn!( - "TelemetryGuard dropped before shutdown() completed; skipping blocking OTLP provider shutdown on an active Tokio runtime" - ); - } - DropShutdownBehavior::ShutdownProviders => self.shutdown_providers(), + if self.shutdown_completed.load(Ordering::Relaxed) { + return; + } + + if Self::run_drop_shutdown(|| self.shutdown_providers()).is_err() { + // The tracing subscriber may already be tearing down here, so emit + // this fallback warning directly to stderr instead of through + // `tracing`. + eprintln!( + "telemetry-setup: TelemetryGuard dropped without shutdown() on a current_thread runtime; OTLP providers were not flushed" + ); } } } @@ -190,7 +195,7 @@ mod tests { use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; - use super::{DropShutdownBehavior, TelemetryGuard}; + use super::TelemetryGuard; #[tokio::test] async fn shutdown_cancels_token_and_awaits_background_task() { @@ -210,6 +215,15 @@ mod tests { assert!(observed_cancel.load(Ordering::SeqCst)); } + #[tokio::test] + async fn shutdown_is_idempotent() { + let cancel_token = CancellationToken::new(); + let mut guard = TelemetryGuard::new(cancel_token); + + assert!(guard.shutdown().await.is_ok()); + assert!(guard.shutdown().await.is_ok()); + } + #[tokio::test] async fn shutdown_returns_first_join_error_after_awaiting_remaining_tasks() { let cancel_token = CancellationToken::new(); @@ -282,52 +296,28 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn drop_on_tokio_runtime_after_shutdown_does_not_abort_completed_task() { - let cancel_token = CancellationToken::new(); - let (completed_sender, completed_receiver) = oneshot::channel(); - let task_token = cancel_token.child_token(); - let mut guard = TelemetryGuard::new(cancel_token); - guard.background_tasks.push(tokio::spawn(async move { - task_token.cancelled().await; - let _ = completed_sender.send(()); - })); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn drop_shutdown_uses_block_in_place_on_multi_thread_runtime() { + let ran = Arc::new(AtomicBool::new(false)); + let ran_in_shutdown = ran.clone(); - let result = guard.shutdown().await; + let result = TelemetryGuard::run_drop_shutdown(move || { + ran_in_shutdown.store(true, Ordering::SeqCst); + }); assert!(result.is_ok()); - assert!(completed_receiver.await.is_ok()); + assert!(ran.load(Ordering::SeqCst)); } #[test] - fn drop_shutdown_behavior_skips_provider_shutdown_after_graceful_shutdown() { - let guard = TelemetryGuard::new(CancellationToken::new()); - guard.shutdown_completed.store(true, Ordering::Relaxed); - - assert_eq!( - guard.drop_shutdown_behavior(), - DropShutdownBehavior::SkipCompleted - ); - } - - #[cfg(feature = "otlp")] - #[tokio::test] - async fn drop_shutdown_behavior_skips_provider_shutdown_on_tokio_runtime() { - let guard = TelemetryGuard::new(CancellationToken::new()); - - assert_eq!( - guard.drop_shutdown_behavior(), - DropShutdownBehavior::SkipOnRuntime - ); - } + fn drop_shutdown_returns_error_on_current_thread_runtime() { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); - #[test] - fn drop_shutdown_behavior_shuts_down_providers_without_runtime() { - let guard = TelemetryGuard::new(CancellationToken::new()); + let result = runtime.block_on(async { TelemetryGuard::run_drop_shutdown(|| {}) }); - assert_eq!( - guard.drop_shutdown_behavior(), - DropShutdownBehavior::ShutdownProviders - ); + assert!(result.is_err()); } } diff --git a/src/lib.rs b/src/lib.rs index 0c578b3..8e4db29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,10 @@ //! does not discover or load repository-local config files automatically. //! Initialize telemetry with [`TelemetryBuilder`] and keep the returned //! [`TelemetryGuard`] alive for the process lifetime. For graceful teardown, -//! call [`TelemetryGuard::shutdown`] before the runtime exits. +//! call [`TelemetryGuard::shutdown`] before the runtime exits. Dropping the +//! guard without explicit shutdown is best-effort: on a multi-thread Tokio +//! runtime, OTLP providers are flushed through `tokio::task::block_in_place`, +//! while on a `current_thread` runtime drop may skip the final OTLP flush. //! //! # Stdout-only setup //! @@ -22,7 +25,7 @@ //! use telemetry_setup::TelemetryBuilder; //! //! # async fn run() -> Result<(), Box> { -//! let telemetry = TelemetryBuilder::new("controller").init()?; +//! let mut telemetry = TelemetryBuilder::new("controller").init()?; //! tracing::info!("started"); //! telemetry.shutdown().await?; //! # Ok(()) @@ -53,8 +56,7 @@ //! - `otlp`: OTLP trace, log, and metric export. //! - `journald`: `tracing-journald` output. //! - `log-control`: localhost-only runtime filter update endpoints. -//! - `tokio-metrics`: Tokio runtime gauges exported through OTLP/OpenTelemetry; -//! implies `otlp`. +//! - `tokio-metrics`: Tokio runtime gauges exported through OpenTelemetry. //! //! # Prerequisites //! diff --git a/src/log_control/mod.rs b/src/log_control/mod.rs index 79e9f3c..05e34d7 100644 --- a/src/log_control/mod.rs +++ b/src/log_control/mod.rs @@ -31,5 +31,5 @@ pub use config::LogControlConfig; #[cfg(feature = "otlp")] pub(crate) use reload::otlp_reload_callback; -pub(crate) use reload::{ReloadState, stdout_reload_callback}; +pub(crate) use reload::{ReloadCallback, ReloadState, stdout_reload_callback}; pub(crate) use server::spawn_log_control_server; diff --git a/src/log_control/reload.rs b/src/log_control/reload.rs index d9f1810..a1a9d4d 100644 --- a/src/log_control/reload.rs +++ b/src/log_control/reload.rs @@ -1,10 +1,6 @@ // SPDX-License-Identifier: MIT -use std::sync::{Arc, Mutex, RwLock}; - -use tracing_subscriber::filter::EnvFilter; - -use crate::builder::layers::FilterReloadHandle; +use std::sync::{Arc, Mutex}; pub(crate) type ReloadCallback = Arc Result<(), String> + Send + Sync>; @@ -12,13 +8,9 @@ pub(crate) type ReloadCallback = Arc Result<(), String> + Send /// Shared state used by the runtime log-control HTTP server. pub(crate) struct ReloadState { /// The currently active local stdout and journald filter string. - pub stdout_filter: Arc>, - /// Serializes stdout filter reloads with state updates. - pub stdout_update_lock: Arc>, + pub stdout_filter: Arc>, /// The currently active OTLP filter string, when OTLP export is enabled. - pub otlp_filter: Arc>>, - /// Serializes OTLP filter reloads with state updates. - pub otlp_update_lock: Arc>, + pub otlp_filter: Arc>>, /// Callback used to reload the local stdout and journald filter domain. pub stdout_reload: ReloadCallback, /// Callback used to reload OTLP trace and log filters. @@ -45,10 +37,8 @@ impl ReloadState { otlp_reload: Option, ) -> Self { Self { - stdout_filter: Arc::new(RwLock::new(stdout_filter)), - stdout_update_lock: Arc::new(Mutex::new(())), - otlp_filter: Arc::new(RwLock::new(otlp_filter)), - otlp_update_lock: Arc::new(Mutex::new(())), + stdout_filter: Arc::new(Mutex::new(stdout_filter)), + otlp_filter: Arc::new(Mutex::new(otlp_filter)), stdout_reload, otlp_reload, } @@ -59,30 +49,21 @@ impl ReloadState { /// /// # Arguments /// -/// * `fmt_filter_handle` - Reload handle for the stdout formatting layer. -/// * `journald_reload_handle` - Optional reload handle for the journald layer. +/// * `fmt_reload` - Callback for the stdout formatting layer. +/// * `journald_reload` - Optional callback for the journald layer. /// /// # Returns /// /// A callback suitable for [`ReloadState::stdout_reload`]. pub(crate) fn stdout_reload_callback( - fmt_filter_handle: FilterReloadHandle, - journald_reload_handle: Option, + fmt_reload: ReloadCallback, + journald_reload: Option, ) -> ReloadCallback { Arc::new(move |spec: String| { - let fmt_filter = EnvFilter::try_new(spec.as_str()).map_err(|error| error.to_string())?; - let journald_filter = if journald_reload_handle.is_some() { - Some(EnvFilter::try_new(spec.as_str()).map_err(|error| error.to_string())?) - } else { - None - }; - - fmt_filter_handle - .reload(fmt_filter) - .map_err(|error| error.to_string())?; - - if let (Some(handle), Some(filter)) = (journald_reload_handle.clone(), journald_filter) { - handle.reload(filter).map_err(|error| error.to_string())?; + fmt_reload(spec.clone())?; + + if let Some(reload) = &journald_reload { + reload(spec)?; } Ok(()) @@ -93,78 +74,61 @@ pub(crate) fn stdout_reload_callback( /// /// # Arguments /// -/// * `trace_handle` - Optional reload handle for the OTLP trace layer. -/// * `log_handle` - Optional reload handle for the OTLP log layer. +/// * `trace_reload` - Optional callback for the OTLP trace layer. +/// * `log_reload` - Optional callback for the OTLP log layer. /// /// # Returns /// /// A callback when both handles are present, or `None` when OTLP filtering is disabled. #[cfg(feature = "otlp")] pub(crate) fn otlp_reload_callback( - trace_handle: Option, - log_handle: Option, + trace_reload: Option, + log_reload: Option, ) -> Option { - match (trace_handle, log_handle) { - (Some(trace_handle), Some(log_handle)) => Some(Arc::new(move |spec: String| { - let trace_filter = - EnvFilter::try_new(spec.as_str()).map_err(|error| error.to_string())?; - let log_filter = - EnvFilter::try_new(spec.as_str()).map_err(|error| error.to_string())?; - trace_handle - .reload(trace_filter) - .map_err(|error| error.to_string())?; - log_handle - .reload(log_filter) - .map_err(|error| error.to_string())?; + match (trace_reload, log_reload) { + (Some(trace_reload), Some(log_reload)) => Some(Arc::new(move |spec: String| { + trace_reload(spec.clone())?; + log_reload(spec)?; Ok(()) })), _ => None, } } -/// Reads the current value from `lock`, tolerating poisoned locks. -pub(super) fn read_lock(lock: &RwLock) -> T { - match lock.read() { +/// Clones the current value from `lock`, tolerating poisoned locks. +pub(super) fn clone_mutex_value(lock: &Mutex) -> T { + match lock.lock() { Ok(guard) => guard.clone(), Err(poisoned) => poisoned.into_inner().clone(), } } -/// Writes `value` into `lock`, tolerating poisoned locks. -pub(super) fn write_lock(lock: &RwLock, value: T) { - match lock.write() { - Ok(mut guard) => *guard = value, - Err(poisoned) => *poisoned.into_inner() = value, - } -} - #[cfg(test)] mod tests { - use std::sync::RwLock; + use std::sync::Mutex; use tracing_subscriber::filter::EnvFilter; - use super::{read_lock, stdout_reload_callback, write_lock}; + use super::{clone_mutex_value, stdout_reload_callback}; - fn reload_handle() -> super::FilterReloadHandle { - let (layer, handle) = crate::builder::layers::build_fmt_layer(EnvFilter::new("info")); - // The reload handle points back to its layer. Keep the layer alive for - // the duration of the process so callback tests exercise successful - // reloads rather than `SubscriberGone` errors. - let _leaked_layer: &'static _ = Box::leak(layer); - handle + fn reload_callback() -> super::ReloadCallback { + let (layer, reload) = crate::builder::layers::build_fmt_layer::( + EnvFilter::new("info"), + ); + let _leaked_layer: &'static _ = Box::leak(Box::new(layer)); + reload } #[test] fn stdout_reload_callback_reloads_stdout_and_journald_filters() { - let callback = stdout_reload_callback(reload_handle(), Some(reload_handle())); + let callback = stdout_reload_callback(reload_callback(), Some(reload_callback())); assert!(callback("debug".to_string()).is_ok()); } #[test] fn stdout_reload_callback_reports_malformed_filters() { - let callback = stdout_reload_callback(reload_handle(), Some(reload_handle())); + let callback = stdout_reload_callback(reload_callback(), Some(reload_callback())); let error = callback("[".to_string()).unwrap_err(); @@ -174,8 +138,9 @@ mod tests { #[cfg(feature = "otlp")] #[test] fn otlp_reload_callback_reloads_trace_and_log_filters() { - let callback = super::otlp_reload_callback(Some(reload_handle()), Some(reload_handle())) - .expect("both handles should create callback"); + let callback = + super::otlp_reload_callback(Some(reload_callback()), Some(reload_callback())) + .expect("both handles should create callback"); assert!(callback("debug".to_string()).is_ok()); } @@ -183,8 +148,9 @@ mod tests { #[cfg(feature = "otlp")] #[test] fn otlp_reload_callback_reports_malformed_filters() { - let callback = super::otlp_reload_callback(Some(reload_handle()), Some(reload_handle())) - .expect("both handles should create callback"); + let callback = + super::otlp_reload_callback(Some(reload_callback()), Some(reload_callback())) + .expect("both handles should create callback"); let error = callback("[".to_string()).unwrap_err(); @@ -192,27 +158,13 @@ mod tests { } #[test] - fn read_lock_recovers_value_from_poisoned_lock() { - let lock = RwLock::new("info".to_string()); - let _ = std::panic::catch_unwind(|| { - let _guard = lock.write().unwrap(); - panic!("poison lock"); - }); - - assert_eq!(read_lock(&lock), "info"); - } - - #[test] - fn write_lock_recovers_and_updates_poisoned_lock() { - let lock = RwLock::new("info".to_string()); + fn clone_mutex_value_recovers_value_from_poisoned_lock() { + let lock = Mutex::new("info".to_string()); let _ = std::panic::catch_unwind(|| { - let mut guard = lock.write().unwrap(); - *guard = "warn".to_string(); + let _guard = lock.lock().unwrap(); panic!("poison lock"); }); - write_lock(&lock, "debug".to_string()); - - assert_eq!(read_lock(&lock), "debug"); + assert_eq!(clone_mutex_value(&lock), "info"); } } diff --git a/src/log_control/router.rs b/src/log_control/router.rs index f6999ae..f564345 100644 --- a/src/log_control/router.rs +++ b/src/log_control/router.rs @@ -6,7 +6,7 @@ use axum::routing::{get, put}; use axum::{Json, Router}; use serde::{Deserialize, Serialize}; -use super::reload::{ReloadState, read_lock, write_lock}; +use super::reload::{ReloadState, clone_mutex_value}; #[derive(Debug, Deserialize)] struct FilterUpdate { @@ -39,8 +39,8 @@ pub(super) fn build_router(state: ReloadState) -> Router { /// Returns the current stdout and OTLP filter strings. async fn get_filters(State(state): State) -> Json { Json(FiltersResponse { - stdout: read_lock(&state.stdout_filter), - otlp: read_lock(&state.otlp_filter), + stdout: clone_mutex_value(&state.stdout_filter), + otlp: clone_mutex_value(&state.otlp_filter), }) } @@ -49,16 +49,20 @@ async fn update_stdout_filter( State(state): State, Json(update): Json, ) -> Result, (StatusCode, String)> { - let _guard = state - .stdout_update_lock - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()); - (state.stdout_reload)(update.filter.clone()) - .map_err(|error| (StatusCode::BAD_REQUEST, error))?; - write_lock(&state.stdout_filter, update.filter); + let stdout = { + let mut stdout_filter = state + .stdout_filter + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + (state.stdout_reload)(update.filter.clone()) + .map_err(|error| (StatusCode::BAD_REQUEST, error))?; + *stdout_filter = update.filter; + stdout_filter.clone() + }; + Ok(Json(FiltersResponse { - stdout: read_lock(&state.stdout_filter), - otlp: read_lock(&state.otlp_filter), + stdout, + otlp: clone_mutex_value(&state.otlp_filter), })) } @@ -74,15 +78,19 @@ async fn update_otlp_filter( )); }; - let _guard = state - .otlp_update_lock - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()); - reload(update.filter.clone()).map_err(|error| (StatusCode::BAD_REQUEST, error))?; - write_lock(&state.otlp_filter, Some(update.filter)); + let otlp = { + let mut otlp_filter = state + .otlp_filter + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + reload(update.filter.clone()).map_err(|error| (StatusCode::BAD_REQUEST, error))?; + *otlp_filter = Some(update.filter); + otlp_filter.clone() + }; + Ok(Json(FiltersResponse { - stdout: read_lock(&state.stdout_filter), - otlp: read_lock(&state.otlp_filter), + stdout: clone_mutex_value(&state.stdout_filter), + otlp, })) } diff --git a/src/otlp/config.rs b/src/otlp/config.rs index 3953ec4..6a632b8 100644 --- a/src/otlp/config.rs +++ b/src/otlp/config.rs @@ -52,7 +52,8 @@ pub struct OtlpConfig { pub headers: OtlpHeadersConfig, /// Interval between OTLP metric export cycles. /// - /// The default is five seconds. + /// The default is five seconds. Values must be between one second and one + /// day. #[serde(with = "duration_seconds")] pub metrics_interval: std::time::Duration, } @@ -150,6 +151,38 @@ mod tests { assert_eq!(decoded.log_rate_limit_per_sec, None); assert_eq!(decoded.metrics_interval, std::time::Duration::from_secs(5)); } + + #[test] + fn otlp_config_rejects_zero_metrics_interval() { + let error = toml::from_str::( + r#" + metrics_interval = 0 + "#, + ) + .expect_err("zero metrics interval must be rejected"); + + assert!( + error + .to_string() + .contains("metrics_interval must be between 1 and 86400 seconds") + ); + } + + #[test] + fn otlp_config_rejects_excessive_metrics_interval() { + let error = toml::from_str::( + r#" + metrics_interval = 86401 + "#, + ) + .expect_err("excessive metrics interval must be rejected"); + + assert!( + error + .to_string() + .contains("metrics_interval must be between 1 and 86400 seconds") + ); + } } mod duration_seconds { @@ -157,6 +190,8 @@ mod duration_seconds { use serde::{Deserialize, Deserializer, Serializer}; + const MAX_METRICS_INTERVAL_SECONDS: u64 = 86_400; + pub fn serialize(value: &Duration, serializer: S) -> Result where S: Serializer, @@ -169,6 +204,11 @@ mod duration_seconds { D: Deserializer<'de>, { let seconds = u64::deserialize(deserializer)?; + if !(1..=MAX_METRICS_INTERVAL_SECONDS).contains(&seconds) { + return Err(serde::de::Error::custom(format!( + "metrics_interval must be between 1 and {MAX_METRICS_INTERVAL_SECONDS} seconds" + ))); + } Ok(Duration::from_secs(seconds)) } } diff --git a/src/otlp/endpoint.rs b/src/otlp/endpoint.rs index 3648fe0..db7f8f1 100644 --- a/src/otlp/endpoint.rs +++ b/src/otlp/endpoint.rs @@ -8,8 +8,13 @@ use super::config::OtlpConfig; use crate::error::TelemetryError; /// Returns the per-signal OTLP endpoint derived from `base_url` and `suffix`. -pub(super) fn signal_endpoint(base_url: &str, suffix: &str) -> Result { - let parsed = Url::parse(base_url).map_err(TelemetryError::otlp_endpoint)?; +pub(super) fn signal_endpoint( + base_url: &str, + suffix: &str, + signal: &'static str, +) -> Result { + let parsed = + Url::parse(base_url).map_err(|error| TelemetryError::otlp_endpoint(signal, error))?; if parsed.path().ends_with(suffix) { return Ok(base_url.to_string()); } @@ -53,19 +58,25 @@ mod tests { #[test] fn signal_endpoint_appends_suffix_once() { assert_eq!( - signal_endpoint("http://localhost:4318", "v1/traces").unwrap(), + signal_endpoint("http://localhost:4318", "v1/traces", "traces").unwrap(), "http://localhost:4318/v1/traces" ); assert_eq!( - signal_endpoint("http://localhost:4318/v1/traces", "v1/traces").unwrap(), + signal_endpoint("http://localhost:4318/v1/traces", "v1/traces", "traces").unwrap(), "http://localhost:4318/v1/traces" ); } #[test] fn signal_endpoint_reports_invalid_urls_as_endpoint_errors() { - let error = signal_endpoint("://not-a-url", "v1/traces").unwrap_err(); - assert!(matches!(error, TelemetryError::OtlpEndpoint(_))); + let error = signal_endpoint("://not-a-url", "v1/traces", "traces").unwrap_err(); + assert!(matches!( + error, + TelemetryError::OtlpEndpoint { + signal: "traces", + .. + } + )); } #[test] diff --git a/src/otlp/providers.rs b/src/otlp/providers.rs index 703434a..b44ea96 100644 --- a/src/otlp/providers.rs +++ b/src/otlp/providers.rs @@ -20,6 +20,7 @@ pub(crate) struct BuiltProviders { /// Meter provider installed into the OpenTelemetry global meter registry. pub meter_provider: SdkMeterProvider, /// The final `service.name` value applied to all emitted resources. + #[cfg(test)] pub effective_service_name: String, } @@ -83,6 +84,7 @@ pub(crate) fn build_providers( tracer_provider, logger_provider, meter_provider, + #[cfg(test)] effective_service_name, }) } @@ -94,7 +96,7 @@ fn build_trace_exporter( opentelemetry_otlp::SpanExporter::builder() .with_http() .with_protocol(Protocol::HttpBinary) - .with_endpoint(signal_endpoint(&config.url, "v1/traces")?) + .with_endpoint(signal_endpoint(&config.url, "v1/traces", "traces")?) .with_headers(signal_headers(config, &config.headers.traces)) .build() .map_err(TelemetryError::trace_provider) @@ -107,7 +109,7 @@ fn build_log_exporter( opentelemetry_otlp::LogExporter::builder() .with_http() .with_protocol(Protocol::HttpBinary) - .with_endpoint(signal_endpoint(&config.url, "v1/logs")?) + .with_endpoint(signal_endpoint(&config.url, "v1/logs", "logs")?) .with_headers(signal_headers(config, &config.headers.logs)) .build() .map_err(TelemetryError::log_provider) @@ -120,7 +122,7 @@ fn build_metric_exporter( opentelemetry_otlp::MetricExporter::builder() .with_http() .with_protocol(Protocol::HttpBinary) - .with_endpoint(signal_endpoint(&config.url, "v1/metrics")?) + .with_endpoint(signal_endpoint(&config.url, "v1/metrics", "metrics")?) .with_headers(signal_headers(config, &config.headers.metrics)) .build() .map_err(TelemetryError::meter_provider) @@ -178,6 +180,12 @@ mod tests { }, ); - assert!(matches!(result, Err(TelemetryError::OtlpEndpoint(_)))); + assert!(matches!( + result, + Err(TelemetryError::OtlpEndpoint { + signal: "traces", + .. + }) + )); } } diff --git a/src/tokio_metrics.rs b/src/tokio_metrics.rs index c6a70d6..baba802 100644 --- a/src/tokio_metrics.rs +++ b/src/tokio_metrics.rs @@ -15,7 +15,7 @@ struct TokioRuntimeMetrics { global_queue_depth: u64, } -/// Starts a background task that records Tokio runtime gauges every five seconds. +/// Starts a background task that records Tokio runtime gauges at the provided interval. /// /// This monitor must be started from within an active Tokio runtime because it /// uses [`tokio::runtime::Handle::current()`]. The underlying Tokio runtime @@ -29,11 +29,12 @@ struct TokioRuntimeMetrics { /// # Arguments /// /// * `cancel_token` - Token that signals the task to stop collecting metrics. +/// * `interval` - Delay between recording cycles. /// /// # Returns /// /// A join handle for the spawned background task. -pub fn start_tokio_metrics_monitoring( +pub(crate) fn start_tokio_metrics_monitoring( cancel_token: CancellationToken, interval: Duration, ) -> tokio::task::JoinHandle<()> { diff --git a/tests/init_log_control.rs b/tests/init_log_control.rs new file mode 100644 index 0000000..45a1427 --- /dev/null +++ b/tests/init_log_control.rs @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: MIT + +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; + +use telemetry_setup::{LogControlConfig, TelemetryBuilder}; + +fn free_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let port = listener.local_addr().expect("read local addr").port(); + drop(listener); + port +} + +async fn raw_http_request(port: u16, request: String) -> String { + tokio::task::spawn_blocking(move || { + let mut stream = TcpStream::connect(("127.0.0.1", port)).expect("connect log-control"); + stream.write_all(request.as_bytes()).expect("write request"); + stream.flush().expect("flush request"); + + let mut response = String::new(); + stream.read_to_string(&mut response).expect("read response"); + response + }) + .await + .expect("join blocking request") +} + +async fn raw_http_get(port: u16, path: &str) -> String { + raw_http_request( + port, + format!("GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n"), + ) + .await +} + +async fn raw_http_put_json(port: u16, path: &str, body: &str) -> String { + raw_http_request( + port, + format!( + "PUT {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", + body.len() + ), + ) + .await +} + +#[tokio::test] +async fn log_control_server_accepts_filter_update_after_init() { + let port = free_port(); + let mut guard = TelemetryBuilder::new("test-lc") + .without_env_var() + .with_stdout_filter("info") + .with_log_control(LogControlConfig { port }) + .init() + .expect("init with log-control should succeed"); + + let put_response = raw_http_put_json(port, "/filters/stdout", r#"{"filter":"debug"}"#).await; + assert!(put_response.starts_with("HTTP/1.1 200 OK\r\n")); + assert!(put_response.contains("\"stdout\":\"debug\"")); + + let get_response = raw_http_get(port, "/filters").await; + assert!(get_response.starts_with("HTTP/1.1 200 OK\r\n")); + assert!(get_response.contains("\"stdout\":\"debug\"")); + + guard.shutdown().await.expect("shutdown should succeed"); +} diff --git a/tests/init_otlp.rs b/tests/init_otlp.rs new file mode 100644 index 0000000..395a194 --- /dev/null +++ b/tests/init_otlp.rs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: MIT + +// This test must live in `tests/` instead of a unit test module because +// `TelemetryBuilder::init()` installs a process-global tracing subscriber. +// Successful initialization can only happen once per process, so an +// integration test gives this case its own fresh test binary. +use std::net::TcpListener; + +use telemetry_setup::{OtlpConfig, TelemetryBuilder}; + +fn unused_local_url() -> String { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("read local addr"); + drop(listener); + format!("http://{addr}") +} + +#[tokio::test] +async fn otlp_init_and_shutdown_succeeds_against_unavailable_collector() { + let mut guard = TelemetryBuilder::new("test-otlp") + .without_env_var() + .with_stdout_filter("info") + .with_otlp_config(OtlpConfig { + url: unused_local_url(), + ..OtlpConfig::default() + }) + .init() + .expect("OTLP init should succeed even with unreachable collector"); + + tracing::info!("test event"); + + guard + .shutdown() + .await + .expect("OTLP shutdown should succeed"); +} diff --git a/tests/init_otlp_idempotent.rs b/tests/init_otlp_idempotent.rs new file mode 100644 index 0000000..da6b4e5 --- /dev/null +++ b/tests/init_otlp_idempotent.rs @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: MIT + +// This test must live in `tests/` instead of a unit test module because +// `TelemetryBuilder::init()` installs a process-global tracing subscriber. +// Successful initialization can only happen once per process, so an +// integration test gives this case its own fresh test binary. +use std::net::TcpListener; + +use telemetry_setup::{OtlpConfig, TelemetryBuilder}; + +fn unused_local_url() -> String { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("read local addr"); + drop(listener); + format!("http://{addr}") +} + +#[tokio::test] +async fn otlp_shutdown_is_idempotent() { + let mut guard = TelemetryBuilder::new("test-otlp") + .without_env_var() + .with_stdout_filter("info") + .with_otlp_config(OtlpConfig { + url: unused_local_url(), + ..OtlpConfig::default() + }) + .init() + .expect("OTLP init should succeed"); + + guard + .shutdown() + .await + .expect("first shutdown should succeed"); + guard + .shutdown() + .await + .expect("second shutdown should succeed"); +} diff --git a/tests/init_otlp_log_control.rs b/tests/init_otlp_log_control.rs new file mode 100644 index 0000000..64c1a4d --- /dev/null +++ b/tests/init_otlp_log_control.rs @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: MIT + +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::time::Duration; + +use telemetry_setup::{LogControlConfig, OtlpConfig, TelemetryBuilder}; +use tracing::Level; + +fn free_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let port = listener.local_addr().expect("read local addr").port(); + drop(listener); + port +} + +fn unused_local_url() -> String { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("read local addr"); + drop(listener); + format!("http://{addr}") +} + +async fn raw_http_request(port: u16, request: String) -> String { + tokio::task::spawn_blocking(move || { + let mut stream = TcpStream::connect(("127.0.0.1", port)).expect("connect log-control"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set read timeout"); + stream.write_all(request.as_bytes()).expect("write request"); + stream.flush().expect("flush request"); + + let mut response = String::new(); + stream.read_to_string(&mut response).expect("read response"); + response + }) + .await + .expect("join blocking request") +} + +async fn raw_http_get(port: u16, path: &str) -> String { + raw_http_request( + port, + format!("GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n"), + ) + .await +} + +async fn raw_http_put_json(port: u16, path: &str, body: &str) -> String { + raw_http_request( + port, + format!( + "PUT {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", + body.len() + ), + ) + .await +} + +#[tokio::test] +async fn otlp_filter_update_reloads_live_filter_behavior() { + let port = free_port(); + let mut guard = TelemetryBuilder::new("test-otlp-log-control") + .without_env_var() + .with_stdout_filter("error") + .with_otlp_config(OtlpConfig { + url: unused_local_url(), + log_level: "warn".to_string(), + ..OtlpConfig::default() + }) + .with_log_control(LogControlConfig { port }) + .init() + .expect("init with otlp and log-control should succeed"); + + assert!(!tracing::enabled!(target: "telemetry_setup_otlp_reload_probe", Level::INFO)); + + let put_response = raw_http_put_json( + port, + "/filters/otlp", + r#"{"filter":"warn,telemetry_setup_otlp_reload_probe=info"}"#, + ) + .await; + assert!(put_response.starts_with("HTTP/1.1 200 OK\r\n")); + assert!(put_response.contains("\"stdout\":\"error\"")); + assert!(put_response.contains("\"otlp\":\"warn,telemetry_setup_otlp_reload_probe=info\"")); + + let get_response = raw_http_get(port, "/filters").await; + assert!(get_response.starts_with("HTTP/1.1 200 OK\r\n")); + assert!(get_response.contains("\"stdout\":\"error\"")); + assert!(get_response.contains("\"otlp\":\"warn,telemetry_setup_otlp_reload_probe=info\"")); + + assert!(tracing::enabled!(target: "telemetry_setup_otlp_reload_probe", Level::INFO)); + + guard.shutdown().await.expect("shutdown should succeed"); +} diff --git a/tests/init_stdout.rs b/tests/init_stdout.rs index 54e981d..113e3bf 100644 --- a/tests/init_stdout.rs +++ b/tests/init_stdout.rs @@ -8,7 +8,7 @@ use telemetry_setup::TelemetryBuilder; #[tokio::test] async fn stdout_only_init_succeeds_and_returns_shutdown_guard() { - let guard = TelemetryBuilder::new("controller") + let mut guard = TelemetryBuilder::new("controller") .without_env_var() .with_stdout_filter("info") .init() diff --git a/tests/readme_common_config.rs b/tests/readme_common_config.rs new file mode 100644 index 0000000..6f1e2f4 --- /dev/null +++ b/tests/readme_common_config.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: MIT + +#[cfg(all( + feature = "otlp", + feature = "log-control", + feature = "journald", + feature = "tokio-metrics" +))] +#[test] +fn readme_common_configuration_compiles() { + let _ = compile_readme_common_configuration; +} + +#[cfg(all( + feature = "otlp", + feature = "log-control", + feature = "journald", + feature = "tokio-metrics" +))] +fn compile_readme_common_configuration() -> Result<(), telemetry_setup::TelemetryError> { + use std::collections::HashMap; + + use telemetry_setup::{LogControlConfig, OtlpConfig, OtlpHeadersConfig, TelemetryBuilder}; + + let _telemetry = TelemetryBuilder::new("controller") + .with_stdout_filter("info") + .with_otlp_config(OtlpConfig { + url: "http://localhost:4318".to_string(), + headers: OtlpHeadersConfig { + common: HashMap::from([("X-Greptime-DB-Name".to_string(), "edge".to_string())]), + traces: HashMap::from([( + "x-greptime-pipeline-name".to_string(), + "greptime_trace_v1".to_string(), + )]), + logs: HashMap::from([( + "x-greptime-pipeline-name".to_string(), + "greptime_identity".to_string(), + )]), + ..OtlpHeadersConfig::default() + }, + ..OtlpConfig::default() + }) + .with_log_control(LogControlConfig { port: 6669 }) + .enable_journald() + .enable_tokio_metrics() + .init()?; + + Ok(()) +}