From e64a0df0a3b15badc8c66226d5d06d4b6f9090c4 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Fri, 1 Aug 2025 13:25:05 -0400 Subject: [PATCH 1/5] experiment(prometheus): refactor for extreme allocs reduction + determinism --- Cargo.lock | 8 + lib/saluki-common/src/strings.rs | 4 +- lib/saluki-components/Cargo.toml | 2 + .../src/destinations/prometheus/api.rs | 46 ++ .../src/destinations/prometheus/mod.rs | 637 ++++++++++-------- 5 files changed, 411 insertions(+), 286 deletions(-) create mode 100644 lib/saluki-components/src/destinations/prometheus/api.rs diff --git a/Cargo.lock b/Cargo.lock index 92d580cbf3..a19d241611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1013,6 +1013,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dtoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" + [[package]] name = "dunce" version = "1.0.5" @@ -3174,6 +3180,7 @@ dependencies = [ "chrono", "datadog-protos", "ddsketch-agent", + "dtoa", "float-cmp", "foldhash", "futures", @@ -3186,6 +3193,7 @@ dependencies = [ "hyper", "hyper-http-proxy", "indexmap 2.11.0", + "itoa", "memory-accounting", "metrics", "opentelemetry-semantic-conventions", diff --git a/lib/saluki-common/src/strings.rs b/lib/saluki-common/src/strings.rs index 2648e9508c..05906aeea8 100644 --- a/lib/saluki-common/src/strings.rs +++ b/lib/saluki-common/src/strings.rs @@ -35,7 +35,7 @@ impl StringBuilder<()> { /// Creates a new `StringBuilder` with the given limit. /// - /// Strings that exceed the limit will be discarded. + /// Strings that exceed the limit, in bytes, will be discarded. pub fn with_limit(limit: usize) -> Self { Self { buf: String::new(), @@ -190,7 +190,7 @@ mod tests { builder.clear(); assert_eq!(builder.push_str("hello"), Some(())); - assert_eq!(builder.push_str(" "), Some(())); + assert_eq!(builder.push(' '), Some(())); assert_eq!(builder.push_str("world"), Some(())); assert_eq!(builder.as_str(), "hello world"); } diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index b72af71236..f3b6591132 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -20,6 +20,7 @@ bytesize = { workspace = true } chrono = { workspace = true } datadog-protos = { workspace = true } ddsketch-agent = { workspace = true } +dtoa = { workspace = true } float-cmp = { workspace = true, features = ["ratio"] } foldhash = { workspace = true } futures = { workspace = true } @@ -32,6 +33,7 @@ http-serde-ext = { workspace = true } hyper = { workspace = true, features = ["client"] } hyper-http-proxy = { workspace = true } indexmap = { workspace = true, features = ["std"] } +itoa = { workspace = true } memory-accounting = { workspace = true } metrics = { workspace = true } opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } diff --git a/lib/saluki-components/src/destinations/prometheus/api.rs b/lib/saluki-components/src/destinations/prometheus/api.rs new file mode 100644 index 0000000000..8e4c800453 --- /dev/null +++ b/lib/saluki-components/src/destinations/prometheus/api.rs @@ -0,0 +1,46 @@ +use axum::{extract::State, response::IntoResponse, routing::get, Router}; +use http::StatusCode; +use saluki_io::net::{ + listener::ConnectionOrientedListener, + server::http::{ErrorHandle, HttpServer, ShutdownHandle}, + util::hyper::TowerToHyperService, +}; +use tokio::sync::{mpsc, oneshot}; + +#[derive(Clone)] +struct PayloadRequestor { + payload_req_tx: mpsc::Sender>, +} + +impl PayloadRequestor { + async fn try_get_payload(&self) -> Option { + let (payload_resp_tx, payload_resp_rx) = oneshot::channel(); + match self.payload_req_tx.send(payload_resp_tx).await { + Ok(()) => match payload_resp_rx.await { + Ok(payload) => Some(payload), + Err(_) => None, + }, + Err(_) => None, + } + } +} + +pub fn spawn_api_server( + listener: ConnectionOrientedListener, payload_req_tx: mpsc::Sender>, +) -> (ShutdownHandle, ErrorHandle) { + let payload_requestor = PayloadRequestor { payload_req_tx }; + let service = Router::new() + .route("/metrics", get(handle_scrape_request)) + .with_state(payload_requestor) + .into_service(); + + let http_server = HttpServer::from_listener(listener, TowerToHyperService::new(service)); + http_server.listen() +} + +async fn handle_scrape_request(State(payload_requestor): State) -> impl IntoResponse { + match payload_requestor.try_get_payload().await { + Some(payload) => (StatusCode::OK, payload), + None => (StatusCode::SERVICE_UNAVAILABLE, "Metrics unavailable.".to_string()), + } +} diff --git a/lib/saluki-components/src/destinations/prometheus/mod.rs b/lib/saluki-components/src/destinations/prometheus/mod.rs index b81f084e88..a4751f36d9 100644 --- a/lib/saluki-components/src/destinations/prometheus/mod.rs +++ b/lib/saluki-components/src/destinations/prometheus/mod.rs @@ -1,40 +1,30 @@ -use std::{ - convert::Infallible, - fmt::Write as _, - num::NonZeroUsize, - sync::{Arc, LazyLock}, -}; +use std::{num::NonZeroUsize, sync::LazyLock}; use async_trait::async_trait; use ddsketch_agent::DDSketch; -use http::{Request, Response}; -use hyper::{body::Incoming, service::service_fn}; +use indexmap::map::Entry; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; -use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator}; +use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator, strings::StringBuilder}; use saluki_config::GenericConfiguration; use saluki_context::{tags::Tag, Context}; use saluki_core::components::{destinations::*, ComponentContext}; use saluki_core::data_model::event::{ - metric::{Histogram, Metric, MetricValues}, + metric::{Histogram, MetricValues}, EventType, }; use saluki_error::GenericError; -use saluki_io::net::{ - listener::ConnectionOrientedListener, - server::http::{ErrorHandle, HttpServer, ShutdownHandle}, - ListenAddress, -}; +use saluki_io::net::{listener::ConnectionOrientedListener, ListenAddress}; use serde::Deserialize; -use stringtheory::{ - interning::{FixedSizeInterner, Interner as _}, - MetaString, -}; -use tokio::{select, sync::RwLock}; +use stringtheory::{interning::FixedSizeInterner, MetaString}; +use tokio::{select, sync::mpsc}; use tracing::debug; +mod api; +use self::api::spawn_api_server; + const CONTEXT_LIMIT: usize = 10_000; -const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024; -const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024; +const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 1024 * 1024; +const SUBPAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024; const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048; const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512; @@ -48,7 +38,9 @@ static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTO LazyLock::new(|| histogram_buckets::(1.0, 2.0)); // SAFETY: This is obviously not zero. -const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap(); +const STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(131_072).unwrap(); + +type InternedStringBuilder = StringBuilder>; /// Prometheus destination. /// @@ -94,11 +86,6 @@ impl DestinationBuilder for PrometheusConfiguration { async fn build(&self, _context: ComponentContext) -> Result, GenericError> { Ok(Box::new(Prometheus { listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?, - metrics: FastIndexMap::default(), - payload: Arc::new(RwLock::new(String::new())), - payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES), - tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES), - interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES), })) } } @@ -108,11 +95,7 @@ impl MemoryBounds for PrometheusConfiguration { builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::("component struct") - // This isn't _really_ bounded since the string buffer could definitely grow larger if the metric name was - // larger, but the default buffer size is far beyond any typical metric name that it should almost never - // grow beyond this initially allocated size. - .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE); + .with_single_value::("component struct"); builder .firm() @@ -120,90 +103,115 @@ impl MemoryBounds for PrometheusConfiguration { // simplifying things here because the ratio of true "contexts" to Prometheus contexts should be very high, // high enough to make this a reasonable approximation. .with_map::("state map", CONTEXT_LIMIT) - .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES) - .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES) + .with_fixed_amount("latest payload", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES) + .with_fixed_amount("payload buffer size", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES) + .with_fixed_amount("subpayload buffer size", SUBPAYLOAD_BUFFER_SIZE_LIMIT_BYTES) + .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE) .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES); } } struct Prometheus { listener: ConnectionOrientedListener, - metrics: FastIndexMap>, - payload: Arc>, - payload_buffer: String, - tags_buffer: String, - interner: FixedSizeInterner<1>, } #[async_trait] impl Destination for Prometheus { async fn run(mut self: Box, mut context: DestinationContext) -> Result<(), GenericError> { - let Self { - listener, - mut metrics, - payload, - mut payload_buffer, - mut tags_buffer, - interner, - } = *self; + let Self { listener } = *self; + + let interner = FixedSizeInterner::new(STRING_INTERNER_BYTES); + let mut name_builder = + StringBuilder::with_limit(NAME_NORMALIZATION_BUFFER_SIZE).with_interner(interner.clone()); + let mut tags_builder = StringBuilder::with_limit(TAGS_BUFFER_SIZE_LIMIT_BYTES).with_interner(interner); + + let mut latest_payload = String::new(); + let mut payload_builder = StringBuilder::with_limit(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES); + let mut subpayload_builder = StringBuilder::with_limit(SUBPAYLOAD_BUFFER_SIZE_LIMIT_BYTES); + let mut metrics = FastIndexMap::default(); let mut health = context.take_health_handle(); - let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload)); + let (payload_req_tx, mut payload_req_rx) = mpsc::channel(2); + let (http_shutdown, mut http_error) = spawn_api_server(listener, payload_req_tx); health.mark_ready(); debug!("Prometheus destination started."); let mut contexts = 0; - let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE); let mut tags_deduplicator = ReusableDeduplicator::new(); + let mut render_required = false; loop { select! { _ = health.live() => continue, - maybe_events = context.events().next() => match maybe_events { - Some(events) => { - // Process each metric event in the batch, either merging it with the existing value or - // inserting it for the first time. - for event in events { - if let Some(metric) = event.try_into_metric() { - // Break apart our metric into its constituent parts, and then normalize it for - // Prometheus: adjust the name if necessary, figuring out the equivalent Prometheus - // metric type, and so on. - let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) { - Some(prom_context) => prom_context, + maybe_events = context.events().next() => { + let events = match maybe_events { + Some(events) => events, + None => break, + }; + + // Process each metric event in the batch, either merging it with the existing value or + // inserting it for the first time. + for event in events { + let (context, values, _) = match event.try_into_metric() { + Some(metric) => metric.into_parts(), + None => continue, + }; + + // We don't support all internal metric types through Prometheus. + let prom_type = match PrometheusType::try_from_values(&values) { + Some(prom_type) => prom_type, + None => continue, + }; + + // Create an entry for the context if we don't already have one, obeying our configured context limit. + let grouped_values = match metrics.entry(context.name().clone()) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + let prom_name = match try_context_to_prom_name(&context, &mut name_builder) { + Some(prom_name) => prom_name, None => continue, }; - let (context, values, _) = metric.into_parts(); - - // Create an entry for the context if we don't already have one, obeying our configured context limit. - let existing_contexts = metrics.entry(prom_context.clone()).or_default(); - match existing_contexts.get_mut(&context) { - Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value), - None => { - if contexts >= CONTEXT_LIMIT { - debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name()); - continue - } - - let mut new_prom_value = get_prom_value_for_prom_context(&prom_context); - merge_metric_values_with_prom_value(values, &mut new_prom_value); - - existing_contexts.insert(context, new_prom_value); - contexts += 1; - } - } + let new_grouped_values = GroupedValues::new(prom_name, prom_type); + + entry.insert(new_grouped_values) } + }; + if !grouped_values.update(&context, &values) { + if contexts >= CONTEXT_LIMIT { + debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name()); + continue + } + let prom_tags = match try_context_to_prom_tags(&context, &mut tags_builder, &mut tags_deduplicator) { + Some(prom_tags) => prom_tags, + None => continue, + }; + + contexts += 1; + + grouped_values.insert(context, prom_tags, values); } - // Regenerate the scrape payload. - regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await; - }, - None => break, + render_required = true; + } }, - error = &mut http_error => { - if let Some(error) = error { + Some(payload_resp_tx) = payload_req_rx.recv() => { + // Check if the payload needs to be (re)rendered first. + if render_required { + render_required = false; + + render_payload(&metrics, &mut payload_builder, &mut subpayload_builder); + + latest_payload.clear(); + latest_payload.push_str(payload_builder.as_str()); + } + + let _ = payload_resp_tx.send(latest_payload.clone()); + } + maybe_error = &mut http_error => { + if let Some(error) = maybe_error { debug!(%error, "HTTP server error."); } break; @@ -221,55 +229,38 @@ impl Destination for Prometheus { } } -fn spawn_prom_scrape_service( - listener: ConnectionOrientedListener, payload: Arc>, -) -> (ShutdownHandle, ErrorHandle) { - let service = service_fn(move |_: Request| { - let payload = Arc::clone(&payload); - async move { - let payload = payload.read().await; - Ok::, Infallible>(Response::new(payload.to_string())) - } - }); - - let http_server = HttpServer::from_listener(listener, service); - http_server.listen() -} - -#[allow(clippy::mutable_key_type)] -async fn regenerate_payload( - metrics: &FastIndexMap>, payload: &Arc>, - payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator, +fn render_payload( + metrics: &FastIndexMap, payload_builder: &mut StringBuilder, + subpayload_builder: &mut StringBuilder, ) { - let mut payload = payload.write().await; - payload.clear(); + payload_builder.clear(); let mut metrics_written = 0; let metrics_total = metrics.len(); - for (prom_context, contexts) in metrics { - if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) { - if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES { - debug!( - metrics_written, - metrics_total, - payload_len = payload.len(), - "Writing additional metrics would exceed payload size limit. Skipping remaining metrics." - ); - break; - } - - // If we've already written some metrics, add a newline between each grouping. - if metrics_written > 0 { - payload.push('\n'); - } - - payload.push_str(payload_buffer); + for (metric_name, grouped_values) in metrics { + // Write this single metric out to the subpayload builder, which will include all of the individual contexts/tagsets. + if write_metrics(grouped_values, subpayload_builder).is_none() { + debug!( + contexts_len = grouped_values.len(), + "Failed to render contexts for metric '{}'. Skipping.", metric_name, + ); + continue; + } - metrics_written += 1; - } else { - debug!("Failed to write metric to payload. Continuing..."); + // Push the subpayload builder's string into the payload builder. + if payload_builder.push_str(subpayload_builder.as_str()).is_none() { + debug!( + metrics_written, + metrics_total, + payload_len = payload_builder.len(), + "Failed to include metric '{}' in payload due to insufficient space. Skipping remaining metrics.", + metric_name + ); + break; } + + metrics_written += 1; } } @@ -298,160 +289,150 @@ fn get_help_text(metric_name: &str) -> Option<&'static str> { } } -fn write_metrics( - payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext, - contexts: &FastIndexMap, tags_deduplicator: &mut ReusableDeduplicator, -) -> bool { - if contexts.is_empty() { - debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name); - return true; +fn write_metrics(grouped_values: &GroupedValues, builder: &mut StringBuilder) -> Option<()> { + builder.clear(); + + if grouped_values.is_empty() { + debug!("No contexts for metric '{}'. Skipping.", grouped_values.prom_name); + return Some(()); } - payload_buffer.clear(); + let mut integer_writer = itoa::Buffer::new(); + let mut float_writer = dtoa::Buffer::new(); - // Write HELP if available - if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) { - writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap(); + // Write HELP if available. + if let Some(help_text) = get_help_text(&grouped_values.prom_name) { + builder.push_str("# HELP ")?; + builder.push_str(&grouped_values.prom_name)?; + builder.push_str(" ")?; + builder.push_str(help_text)?; + builder.push_str("\n")?; } + // Write the metric header. - writeln!( - payload_buffer, - "# TYPE {} {}", - prom_context.metric_name, - prom_context.metric_type.as_str() - ) - .unwrap(); - - for (context, values) in contexts { - if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES { - debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated."); - break; - } + builder.push_str("# TYPE ")?; + builder.push_str(&grouped_values.prom_name)?; + builder.push_str(" ")?; + builder.push_str(grouped_values.prom_type.as_str())?; + builder.push_str("\n")?; - tags_buffer.clear(); - - // Format/encode the tags. - if !format_tags(tags_buffer, context, tags_deduplicator) { - return false; - } + for (_, (tags, values)) in &grouped_values.groups { + let metric_name = &grouped_values.prom_name; // Write the metric value itself. match values { PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => { + let value_str = float_writer.format(*value); + // No metric type-specific tags for counters or gauges, so just write them straight out. - payload_buffer.push_str(&prom_context.metric_name); - if !tags_buffer.is_empty() { - payload_buffer.push('{'); - payload_buffer.push_str(tags_buffer); - payload_buffer.push('}'); - } - writeln!(payload_buffer, " {}", value).unwrap(); + write_metric_line(builder, metric_name, None, &tags, None, value_str)?; } PrometheusValue::Histogram(histogram) => { // Write the histogram buckets. - for (upper_bound_str, count) in histogram.buckets() { - write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push(','); - } - writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap(); + for (le_str, count) in histogram.buckets() { + let count_str = integer_writer.format(count); + write_metric_line( + builder, + metric_name, + Some("_bucket"), + &tags, + Some(("le", le_str)), + count_str, + )?; } // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram. - write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push(','); - } - writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap(); + let count_str = integer_writer.format(histogram.count); + write_metric_line( + builder, + metric_name, + Some("_bucket"), + &tags, + Some(("le", "+Inf")), + count_str, + )?; // Write the histogram sum and count. - write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push('{'); - payload_buffer.push_str(tags_buffer); - payload_buffer.push('}'); - } - writeln!(payload_buffer, " {}", histogram.sum).unwrap(); - - write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push('{'); - payload_buffer.push_str(tags_buffer); - payload_buffer.push('}'); - } - writeln!(payload_buffer, " {}", histogram.count).unwrap(); + let sum_str = float_writer.format(histogram.sum); + let count_str = integer_writer.format(histogram.count); + write_metric_line(builder, &metric_name, Some("_sum"), &tags, None, sum_str)?; + write_metric_line(builder, &metric_name, Some("_count"), &tags, None, count_str)?; } PrometheusValue::Summary(sketch) => { // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent // the quantiles people generally care about. - for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] { - let q_value = sketch.quantile(quantile).unwrap_or_default(); - - write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push(','); - } - writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap(); + for (q, q_str) in [ + (0.1, "0.1"), + (0.25, "0.25"), + (0.5, "0.5"), + (0.95, "0.95"), + (0.99, "0.99"), + (0.999, "0.999"), + ] { + let q_value = sketch.quantile(q).unwrap_or_default(); + let q_value_str = float_writer.format(q_value); + + write_metric_line( + builder, + metric_name, + None, + &tags, + Some(("quantile", q_str)), + q_value_str, + )?; } - write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push('{'); - payload_buffer.push_str(tags_buffer); - payload_buffer.push('}'); - } - writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap(); + let sum_str = float_writer.format(sketch.sum().unwrap_or_default()); + write_metric_line(builder, metric_name, Some("_sum"), &tags, None, sum_str)?; - write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap(); - if !tags_buffer.is_empty() { - payload_buffer.push('{'); - payload_buffer.push_str(tags_buffer); - payload_buffer.push('}'); - } - writeln!(payload_buffer, " {}", sketch.count()).unwrap(); + let count_str = integer_writer.format(sketch.count()); + write_metric_line(builder, metric_name, Some("_count"), &tags, None, count_str)?; } } } - true + Some(()) } -fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator) -> bool { - let mut has_tags = false; +fn write_metric_line( + builder: &mut StringBuilder, metric_name: &str, suffix: Option<&str>, primary_tags: &str, + secondary_tag: Option<(&str, &str)>, value: &str, +) -> Option<()> { + // We handle some different things here: + // - the metric name can be suffixed (used for things like `_bucket`, `_count`, `_sum`, in histograms and summaries) + // - writing out the "primary" set of tags + // - passing in a single tag key/value pair (used for `le` in histograms, or `quantile` in summaries) - let chained_tags = context.tags().into_iter().chain(context.origin_tags()); - let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags); + let has_tags = !primary_tags.is_empty() || secondary_tag.is_some(); - for tag in deduplicated_tags { - // If we're not the first tag to be written, add a comma to separate the tags. - if has_tags { - tags_buffer.push(','); - } + builder.push_str(metric_name)?; + if let Some(suffix) = suffix { + builder.push_str(suffix)?; + } - let tag_name = tag.name(); - let tag_value = match tag.value() { - Some(value) => value, - None => { - debug!("Skipping bare tag."); - continue; - } - }; + if has_tags { + builder.push('{')?; + builder.push_str(primary_tags)?; - has_tags = true; + if let Some((tag_key, tag_value)) = secondary_tag { + if !primary_tags.is_empty() { + builder.push(',')?; + } + builder.push_str(tag_key)?; + builder.push_str("=\"")?; - // Can't exceed the tags buffer size limit: we calculate the addition as tag name/value length plus three bytes - // to account for having to format it as `name="value",`. - if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES { - debug!("Tags buffer size limit exceeded. Tags may be missing from this metric."); - return false; + builder.push_str(tag_value)?; + builder.push_str("\"")?; } - write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap(); + builder.push_str("} ")?; } - true + builder.push_str(value)?; + builder.push('\n') } +/// Prometheus metric type. #[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] enum PrometheusType { Counter, @@ -461,6 +442,16 @@ enum PrometheusType { } impl PrometheusType { + fn try_from_values(values: &MetricValues) -> Option { + match values { + MetricValues::Counter(_) => Some(Self::Counter), + MetricValues::Gauge(_) | MetricValues::Set(_) => Some(Self::Gauge), + MetricValues::Histogram(_) => Some(Self::Histogram), + MetricValues::Distribution(_) => Some(Self::Summary), + _ => None, + } + } + fn as_str(&self) -> &'static str { match self { Self::Counter => "counter", @@ -471,12 +462,9 @@ impl PrometheusType { } } -#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)] -struct PrometheusContext { - metric_name: MetaString, - metric_type: PrometheusType, -} - +/// Prometheus metric value. +/// +/// Holds the value of a Prometheus metric of the same type. enum PrometheusValue { Counter(f64), Gauge(f64), @@ -484,45 +472,95 @@ enum PrometheusValue { Summary(DDSketch), } -fn into_prometheus_metric( - metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>, -) -> Option { - // Normalize the metric name first, since we might fail due to the interner being full. - let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) { - Some(name) => name, +fn try_context_to_prom_name(context: &Context, builder: &mut InternedStringBuilder) -> Option { + match normalize_metric_name(context, builder) { + Some(name) => Some(name), + None => { + debug!( + "Failed to normalize metric name '{}'. Name either too long or interner is full.", + context.name() + ); + None + } + } +} + +fn try_context_to_prom_tags( + context: &Context, builder: &mut InternedStringBuilder, tags_deduplicator: &mut ReusableDeduplicator, +) -> Option { + match format_tags(context, builder, tags_deduplicator) { + Some(tags) => Some(tags), None => { debug!( - "Failed to intern normalized metric name. Skipping metric '{}'.", - metric.context().name() + "Failed to format tags for metric '{}'. Tags either too long or interner is full.", + context.name() ); - return None; + None } - }; - - let metric_type = match metric.values() { - MetricValues::Counter(_) => PrometheusType::Counter, - MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge, - MetricValues::Histogram(_) => PrometheusType::Histogram, - MetricValues::Distribution(_) => PrometheusType::Summary, - _ => return None, - }; - - Some(PrometheusContext { - metric_name, - metric_type, - }) + } +} + +/// Per-context values of a Prometheus metric. +/// +/// Holds the values attached to a specific context (metric name + tags), grouped by the _original_ +/// `Context` from which the values were extracted. Allows for efficient lookups of existing contexts. +struct GroupedValues { + prom_name: MetaString, + prom_type: PrometheusType, + groups: FastIndexMap, } -fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue { - match prom_context.metric_type { - PrometheusType::Counter => PrometheusValue::Counter(0.0), - PrometheusType::Gauge => PrometheusValue::Gauge(0.0), - PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)), - PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()), +impl GroupedValues { + fn new(prom_name: MetaString, prom_type: PrometheusType) -> Self { + Self { + prom_name, + prom_type, + groups: FastIndexMap::default(), + } + } + + fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + fn len(&self) -> usize { + self.groups.len() + } + + fn get_new_prom_value(&self) -> PrometheusValue { + match self.prom_type { + PrometheusType::Counter => PrometheusValue::Counter(0.0), + PrometheusType::Gauge => PrometheusValue::Gauge(0.0), + PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&self.prom_name)), + PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()), + } + } + + /// Inserts the given context and values. + /// + /// This overwrites the values if the context already exists. + fn insert(&mut self, context: Context, prom_tags: MetaString, values: MetricValues) { + let mut new_prom_values = self.get_new_prom_value(); + merge_metric_values_with_prom_value(&values, &mut new_prom_values); + + self.groups.insert(context, (prom_tags, new_prom_values)); + } + + /// Updates the given context by merging in the provided values. + /// + /// Returns `true` if the context exists and was updated, `false` otherwise. + fn update(&mut self, context: &Context, values: &MetricValues) -> bool { + match self.groups.get_mut(context) { + Some((_, prom_values)) => { + merge_metric_values_with_prom_value(values, prom_values); + true + } + None => false, + } } } -fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) { +fn merge_metric_values_with_prom_value(values: &MetricValues, prom_value: &mut PrometheusValue) { match (values, prom_value) { (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => { for (_, value) in counter_values { @@ -547,37 +585,68 @@ fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut Pr } (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => { for (_, value) in histogram_values { - prom_histogram.merge_histogram(&value); + prom_histogram.merge_histogram(value); } } (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => { for (_, value) in distribution_values { - prom_summary.merge(&value); + prom_summary.merge(value); } } _ => panic!("Mismatched metric types"), } } -fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option { - name_buf.clear(); +fn normalize_metric_name(context: &Context, builder: &mut InternedStringBuilder) -> Option { + builder.clear(); // Normalize the metric name to a valid Prometheus metric name. - for (i, c) in name.chars().enumerate() { + for (i, c) in context.name().chars().enumerate() { if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) { - name_buf.push(c); + builder.push(c)?; } else { // Convert periods to a set of two underscores, and anything else to a single underscore. // // This lets us ensure that the normal separators we use in metrics (periods) are converted in a way // where they can be distinguished on the collector side to potentially reconstitute them back to their // original form. - name_buf.push_str(if c == '.' { "__" } else { "_" }); + builder.push_str(if c == '.' { "__" } else { "_" })?; + } + } + + builder.try_intern() +} + +fn format_tags( + context: &Context, builder: &mut InternedStringBuilder, tags_deduplicator: &mut ReusableDeduplicator, +) -> Option { + builder.clear(); + + let chained_tags = context.tags().into_iter().chain(context.origin_tags()); + let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags); + + for tag in deduplicated_tags { + // If we're not the first tag to be written, add a comma to separate the tags. + if !builder.is_empty() { + builder.push(',')?; } + + let tag_name = tag.name(); + let tag_value = match tag.value() { + Some(value) => value, + None => { + debug!("Skipping bare tag."); + continue; + } + }; + + builder.push_str(tag_name)?; + builder.push_str("=\"")?; + builder.push_str(tag_value)?; + builder.push('"')?; } - // Now try and intern the normalized name. - interner.try_intern(name_buf).map(MetaString::from) + builder.try_intern() } #[inline] From 32e72d51e21f8d5ce96b80198015acc0e306a472 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 4 Aug 2025 12:58:13 -0400 Subject: [PATCH 2/5] switch to StringBuilder native method for writing numerics --- Cargo.lock | 8 --- lib/saluki-components/Cargo.toml | 2 - .../src/destinations/prometheus/mod.rs | 49 +++++++------------ 3 files changed, 18 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a19d241611..92d580cbf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1013,12 +1013,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dtoa" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" - [[package]] name = "dunce" version = "1.0.5" @@ -3180,7 +3174,6 @@ dependencies = [ "chrono", "datadog-protos", "ddsketch-agent", - "dtoa", "float-cmp", "foldhash", "futures", @@ -3193,7 +3186,6 @@ dependencies = [ "hyper", "hyper-http-proxy", "indexmap 2.11.0", - "itoa", "memory-accounting", "metrics", "opentelemetry-semantic-conventions", diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index f3b6591132..b72af71236 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -20,7 +20,6 @@ bytesize = { workspace = true } chrono = { workspace = true } datadog-protos = { workspace = true } ddsketch-agent = { workspace = true } -dtoa = { workspace = true } float-cmp = { workspace = true, features = ["ratio"] } foldhash = { workspace = true } futures = { workspace = true } @@ -33,7 +32,6 @@ http-serde-ext = { workspace = true } hyper = { workspace = true, features = ["client"] } hyper-http-proxy = { workspace = true } indexmap = { workspace = true, features = ["std"] } -itoa = { workspace = true } memory-accounting = { workspace = true } metrics = { workspace = true } opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } diff --git a/lib/saluki-components/src/destinations/prometheus/mod.rs b/lib/saluki-components/src/destinations/prometheus/mod.rs index a4751f36d9..34bc871e2d 100644 --- a/lib/saluki-components/src/destinations/prometheus/mod.rs +++ b/lib/saluki-components/src/destinations/prometheus/mod.rs @@ -297,9 +297,6 @@ fn write_metrics(grouped_values: &GroupedValues, builder: &mut StringBuilder) -> return Some(()); } - let mut integer_writer = itoa::Buffer::new(); - let mut float_writer = dtoa::Buffer::new(); - // Write HELP if available. if let Some(help_text) = get_help_text(&grouped_values.prom_name) { builder.push_str("# HELP ")?; @@ -322,41 +319,35 @@ fn write_metrics(grouped_values: &GroupedValues, builder: &mut StringBuilder) -> // Write the metric value itself. match values { PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => { - let value_str = float_writer.format(*value); - // No metric type-specific tags for counters or gauges, so just write them straight out. - write_metric_line(builder, metric_name, None, &tags, None, value_str)?; + write_metric_line(builder, metric_name, None, &tags, None, *value)?; } PrometheusValue::Histogram(histogram) => { // Write the histogram buckets. for (le_str, count) in histogram.buckets() { - let count_str = integer_writer.format(count); write_metric_line( builder, metric_name, Some("_bucket"), &tags, Some(("le", le_str)), - count_str, + count, )?; } // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram. - let count_str = integer_writer.format(histogram.count); write_metric_line( builder, metric_name, Some("_bucket"), &tags, Some(("le", "+Inf")), - count_str, + histogram.count, )?; // Write the histogram sum and count. - let sum_str = float_writer.format(histogram.sum); - let count_str = integer_writer.format(histogram.count); - write_metric_line(builder, &metric_name, Some("_sum"), &tags, None, sum_str)?; - write_metric_line(builder, &metric_name, Some("_count"), &tags, None, count_str)?; + write_metric_line(builder, &metric_name, Some("_sum"), &tags, None, histogram.sum)?; + write_metric_line(builder, &metric_name, Some("_count"), &tags, None, histogram.count)?; } PrometheusValue::Summary(sketch) => { // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent @@ -370,23 +361,19 @@ fn write_metrics(grouped_values: &GroupedValues, builder: &mut StringBuilder) -> (0.999, "0.999"), ] { let q_value = sketch.quantile(q).unwrap_or_default(); - let q_value_str = float_writer.format(q_value); - write_metric_line( - builder, - metric_name, - None, - &tags, - Some(("quantile", q_str)), - q_value_str, - )?; + write_metric_line(builder, metric_name, None, &tags, Some(("quantile", q_str)), q_value)?; } - let sum_str = float_writer.format(sketch.sum().unwrap_or_default()); - write_metric_line(builder, metric_name, Some("_sum"), &tags, None, sum_str)?; - - let count_str = integer_writer.format(sketch.count()); - write_metric_line(builder, metric_name, Some("_count"), &tags, None, count_str)?; + write_metric_line( + builder, + metric_name, + Some("_sum"), + &tags, + None, + sketch.sum().unwrap_or_default(), + )?; + write_metric_line(builder, metric_name, Some("_count"), &tags, None, sketch.count())?; } } } @@ -394,9 +381,9 @@ fn write_metrics(grouped_values: &GroupedValues, builder: &mut StringBuilder) -> Some(()) } -fn write_metric_line( +fn write_metric_line( builder: &mut StringBuilder, metric_name: &str, suffix: Option<&str>, primary_tags: &str, - secondary_tag: Option<(&str, &str)>, value: &str, + secondary_tag: Option<(&str, &str)>, value: N, ) -> Option<()> { // We handle some different things here: // - the metric name can be suffixed (used for things like `_bucket`, `_count`, `_sum`, in histograms and summaries) @@ -428,7 +415,7 @@ fn write_metric_line( builder.push_str("} ")?; } - builder.push_str(value)?; + builder.push_numeric(value)?; builder.push('\n') } From 95f9c67f0bf225968040c77df88f4f2795875d66 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 18 Aug 2025 10:08:50 -0400 Subject: [PATCH 3/5] switch to write macro-based approach to make code saner --- .../src/destinations/prometheus/api.rs | 5 +- .../src/destinations/prometheus/mod.rs | 121 +++++++++--------- 2 files changed, 58 insertions(+), 68 deletions(-) diff --git a/lib/saluki-components/src/destinations/prometheus/api.rs b/lib/saluki-components/src/destinations/prometheus/api.rs index 8e4c800453..6f14e06a64 100644 --- a/lib/saluki-components/src/destinations/prometheus/api.rs +++ b/lib/saluki-components/src/destinations/prometheus/api.rs @@ -16,10 +16,7 @@ impl PayloadRequestor { async fn try_get_payload(&self) -> Option { let (payload_resp_tx, payload_resp_rx) = oneshot::channel(); match self.payload_req_tx.send(payload_resp_tx).await { - Ok(()) => match payload_resp_rx.await { - Ok(payload) => Some(payload), - Err(_) => None, - }, + Ok(()) => payload_resp_rx.await.ok(), Err(_) => None, } } diff --git a/lib/saluki-components/src/destinations/prometheus/mod.rs b/lib/saluki-components/src/destinations/prometheus/mod.rs index 34bc871e2d..6dc934486b 100644 --- a/lib/saluki-components/src/destinations/prometheus/mod.rs +++ b/lib/saluki-components/src/destinations/prometheus/mod.rs @@ -1,4 +1,8 @@ -use std::{num::NonZeroUsize, sync::LazyLock}; +use std::{ + fmt::{self, Display, Write}, + num::NonZeroUsize, + sync::LazyLock, +}; use async_trait::async_trait; use ddsketch_agent::DDSketch; @@ -28,6 +32,15 @@ const SUBPAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024; const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048; const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512; +macro_rules! quantile_strs { + ($($q:literal),*) => { &[$(($q, stringify!($q))),*] }; +} + +const HISTOGRAM_QUANTILES: &[(f64, &str); 6] = quantile_strs!(0.1, 0.25, 0.5, 0.95, 0.99, 0.999); +const SUFFIX_BUCKET: Option<&str> = Some("_bucket"); +const SUFFIX_COUNT: Option<&str> = Some("_count"); +const SUFFIX_SUM: Option<&str> = Some("_sum"); + // Histogram-related constants and pre-calculated buckets. const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30; static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> = @@ -240,7 +253,7 @@ fn render_payload( for (metric_name, grouped_values) in metrics { // Write this single metric out to the subpayload builder, which will include all of the individual contexts/tagsets. - if write_metrics(grouped_values, subpayload_builder).is_none() { + if write_metrics(grouped_values, subpayload_builder).is_err() { debug!( contexts_len = grouped_values.len(), "Failed to render contexts for metric '{}'. Skipping.", metric_name, @@ -289,102 +302,79 @@ fn get_help_text(metric_name: &str) -> Option<&'static str> { } } -fn write_metrics(grouped_values: &GroupedValues, builder: &mut StringBuilder) -> Option<()> { +fn write_metrics(values: &GroupedValues, builder: &mut StringBuilder) -> fmt::Result { builder.clear(); - if grouped_values.is_empty() { - debug!("No contexts for metric '{}'. Skipping.", grouped_values.prom_name); - return Some(()); + if values.is_empty() { + debug!("No contexts for metric '{}'. Skipping.", values.name()); + return Ok(()); } + let metric_name = values.name(); + // Write HELP if available. - if let Some(help_text) = get_help_text(&grouped_values.prom_name) { - builder.push_str("# HELP ")?; - builder.push_str(&grouped_values.prom_name)?; - builder.push_str(" ")?; - builder.push_str(help_text)?; - builder.push_str("\n")?; + if let Some(help_text) = get_help_text(metric_name) { + writeln!(builder, "# HELP {} {}", metric_name, help_text)?; } // Write the metric header. - builder.push_str("# TYPE ")?; - builder.push_str(&grouped_values.prom_name)?; - builder.push_str(" ")?; - builder.push_str(grouped_values.prom_type.as_str())?; - builder.push_str("\n")?; - - for (_, (tags, values)) in &grouped_values.groups { - let metric_name = &grouped_values.prom_name; + writeln!(builder, "# TYPE {} {}", metric_name, values.type_str())?; + for (_, (tags, value)) in &values.groups { // Write the metric value itself. - match values { + match value { PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => { // No metric type-specific tags for counters or gauges, so just write them straight out. - write_metric_line(builder, metric_name, None, &tags, None, *value)?; + write_metric_line(builder, metric_name, None, tags, None, *value)?; } PrometheusValue::Histogram(histogram) => { // Write the histogram buckets. - for (le_str, count) in histogram.buckets() { - write_metric_line( - builder, - metric_name, - Some("_bucket"), - &tags, - Some(("le", le_str)), - count, - )?; + for (le, count) in histogram.buckets() { + write_metric_line(builder, metric_name, SUFFIX_BUCKET, tags, Some(("le", le)), count)?; } // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram. write_metric_line( builder, metric_name, - Some("_bucket"), - &tags, + SUFFIX_BUCKET, + tags, Some(("le", "+Inf")), histogram.count, )?; // Write the histogram sum and count. - write_metric_line(builder, &metric_name, Some("_sum"), &tags, None, histogram.sum)?; - write_metric_line(builder, &metric_name, Some("_count"), &tags, None, histogram.count)?; + write_metric_line(builder, metric_name, SUFFIX_SUM, tags, None, histogram.sum)?; + write_metric_line(builder, metric_name, SUFFIX_COUNT, tags, None, histogram.count)?; } PrometheusValue::Summary(sketch) => { // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent // the quantiles people generally care about. - for (q, q_str) in [ - (0.1, "0.1"), - (0.25, "0.25"), - (0.5, "0.5"), - (0.95, "0.95"), - (0.99, "0.99"), - (0.999, "0.999"), - ] { - let q_value = sketch.quantile(q).unwrap_or_default(); - - write_metric_line(builder, metric_name, None, &tags, Some(("quantile", q_str)), q_value)?; + for (q, q_str) in HISTOGRAM_QUANTILES { + let q_value = sketch.quantile(*q).unwrap_or_default(); + write_metric_line(builder, metric_name, None, tags, Some(("quantile", q_str)), q_value)?; } write_metric_line( builder, metric_name, - Some("_sum"), - &tags, + SUFFIX_SUM, + tags, None, sketch.sum().unwrap_or_default(), )?; - write_metric_line(builder, metric_name, Some("_count"), &tags, None, sketch.count())?; + write_metric_line(builder, metric_name, SUFFIX_COUNT, tags, None, sketch.count())?; } } } - Some(()) + Ok(()) } -fn write_metric_line( +fn write_metric_line( builder: &mut StringBuilder, metric_name: &str, suffix: Option<&str>, primary_tags: &str, secondary_tag: Option<(&str, &str)>, value: N, -) -> Option<()> { +) -> fmt::Result { // We handle some different things here: // - the metric name can be suffixed (used for things like `_bucket`, `_count`, `_sum`, in histograms and summaries) // - writing out the "primary" set of tags @@ -392,31 +382,26 @@ fn write_metric_line( let has_tags = !primary_tags.is_empty() || secondary_tag.is_some(); - builder.push_str(metric_name)?; + write!(builder, "{metric_name}")?; if let Some(suffix) = suffix { - builder.push_str(suffix)?; + write!(builder, "{suffix}")?; } if has_tags { - builder.push('{')?; - builder.push_str(primary_tags)?; + write!(builder, "{{{primary_tags}")?; if let Some((tag_key, tag_value)) = secondary_tag { if !primary_tags.is_empty() { - builder.push(',')?; + builder.push(',').ok_or(fmt::Error)?; } - builder.push_str(tag_key)?; - builder.push_str("=\"")?; - builder.push_str(tag_value)?; - builder.push_str("\"")?; + write!(builder, "{tag_key}=\"{tag_value}\"")?; } - builder.push_str("} ")?; + write!(builder, "}}")?; } - builder.push_numeric(value)?; - builder.push('\n') + writeln!(builder, " {value}") } /// Prometheus metric type. @@ -514,6 +499,14 @@ impl GroupedValues { self.groups.len() } + fn name(&self) -> &str { + &self.prom_name + } + + fn type_str(&self) -> &str { + self.prom_type.as_str() + } + fn get_new_prom_value(&self) -> PrometheusValue { match self.prom_type { PrometheusType::Counter => PrometheusValue::Counter(0.0), From fd06647fab38a19029f40b8a7a38337b93712cf8 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 18 Aug 2025 11:00:25 -0400 Subject: [PATCH 4/5] point to the new scrape endpoint --- .../quality_gates_idle_rss/lading/lading.yaml | 2 +- .../quality_gates_rss/lading/lading.yaml | 2 +- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 38 +++++++++++++++++- .../lading/lading.yaml | 38 +++++++++++++++++- .../lading/lading.yaml | 39 +++++++++++++++++-- .../lading/lading.yaml | 38 +++++++++++++++++- .../lading/lading.yaml | 38 +++++++++++++++++- .../lading/lading.yaml | 39 +++++++++++++++++-- .../quality_gates_idle_rss/lading/lading.yaml | 2 +- 15 files changed, 435 insertions(+), 35 deletions(-) diff --git a/test/smp/regression/adp-checks-agent/cases/quality_gates_idle_rss/lading/lading.yaml b/test/smp/regression/adp-checks-agent/cases/quality_gates_idle_rss/lading/lading.yaml index e55c957ff6..5c3ac1efdc 100644 --- a/test/smp/regression/adp-checks-agent/cases/quality_gates_idle_rss/lading/lading.yaml +++ b/test/smp/regression/adp-checks-agent/cases/quality_gates_idle_rss/lading/lading.yaml @@ -6,4 +6,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/adp-checks-agent/cases/quality_gates_rss/lading/lading.yaml b/test/smp/regression/adp-checks-agent/cases/quality_gates_rss/lading/lading.yaml index e55c957ff6..5c3ac1efdc 100644 --- a/test/smp/regression/adp-checks-agent/cases/quality_gates_rss/lading/lading.yaml +++ b/test/smp/regression/adp-checks-agent/cases/quality_gates_rss/lading/lading.yaml @@ -6,4 +6,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_100mb_250k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_100mb_250k_contexts/lading/lading.yaml index 06057cff0a..3eb1dba872 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_100mb_250k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_100mb_250k_contexts/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -48,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts/lading/lading.yaml index f0954331fa..21e6a3d16d 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -48,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts_distributions_only/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts_distributions_only/lading/lading.yaml index 3737b94e53..e7f30937d6 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts_distributions_only/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_100mb_3k_contexts_distributions_only/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -47,4 +80,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_10mb_3k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_10mb_3k_contexts/lading/lading.yaml index c6e4cc441b..2c67a5bdb6 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_10mb_3k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_10mb_3k_contexts/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -48,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts/lading/lading.yaml index dbb663890c..f153cc0d1a 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -48,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts_dualship/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts_dualship/lading/lading.yaml index a149f75312..7dce6cf0fa 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts_dualship/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_1mb_3k_contexts_dualship/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -50,4 +83,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts/lading/lading.yaml index 02cafac333..8aafb280d8 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts/lading/lading.yaml @@ -1,6 +1,40 @@ generator: - unix_datagram: - seed: [5, 15, 17, 20, 22, 24, 48, 52, 61, 65, 73, 81, 97, 104, 109, 119, 147, 149, 153, 156, 158, 168, 175, 186, 193, 201, 216, 219, 224, 230, 232, 249] + seed: + [ + 5, + 15, + 17, + 20, + 22, + 24, + 48, + 52, + 61, + 65, + 73, + 81, + 97, + 104, + 109, + 119, + 147, + 149, + 153, + 156, + 158, + 168, + 175, + 186, + 193, + 201, + 216, + 219, + 224, + 230, + 232, + 249, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: &variant @@ -47,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts_memlimit/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts_memlimit/lading/lading.yaml index 02cafac333..8aafb280d8 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts_memlimit/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_1mb_50k_contexts_memlimit/lading/lading.yaml @@ -1,6 +1,40 @@ generator: - unix_datagram: - seed: [5, 15, 17, 20, 22, 24, 48, 52, 61, 65, 73, 81, 97, 104, 109, 119, 147, 149, 153, 156, 158, 168, 175, 186, 193, 201, 216, 219, 224, 230, 232, 249] + seed: + [ + 5, + 15, + 17, + 20, + 22, + 24, + 48, + 52, + 61, + 65, + 73, + 81, + 97, + 104, + 109, + 119, + 147, + 149, + 153, + 156, + 158, + 168, + 175, + 186, + 193, + 201, + 216, + 219, + 224, + 230, + 232, + 249, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: &variant @@ -47,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_500mb_3k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_500mb_3k_contexts/lading/lading.yaml index 76836adb43..372e41cda1 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_500mb_3k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_500mb_3k_contexts/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -48,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining/lading/lading.yaml index f5b4203fe3..589b57df63 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining/lading/lading.yaml @@ -1,6 +1,40 @@ generator: - unix_datagram: - seed: [5, 15, 17, 20, 22, 24, 48, 52, 61, 65, 73, 81, 97, 104, 109, 119, 147, 149, 153, 156, 158, 168, 175, 186, 193, 201, 216, 219, 224, 230, 232, 249] + seed: + [ + 5, + 15, + 17, + 20, + 22, + 24, + 48, + 52, + 61, + 65, + 73, + 81, + 97, + 104, + 109, + 119, + 147, + 149, + 153, + 156, + 158, + 168, + 175, + 186, + 193, + 201, + 216, + 219, + 224, + 230, + 232, + 249, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -47,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining_no_allocs/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining_no_allocs/lading/lading.yaml index f5b4203fe3..589b57df63 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining_no_allocs/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_50mb_10k_contexts_no_inlining_no_allocs/lading/lading.yaml @@ -1,6 +1,40 @@ generator: - unix_datagram: - seed: [5, 15, 17, 20, 22, 24, 48, 52, 61, 65, 73, 81, 97, 104, 109, 119, 147, 149, 153, 156, 158, 168, 175, 186, 193, 201, 216, 219, 224, 230, 232, 249] + seed: + [ + 5, + 15, + 17, + 20, + 22, + 24, + 48, + 52, + 61, + 65, + 73, + 81, + 97, + 104, + 109, + 119, + 147, + 149, + 153, + 156, + 158, + 168, + 175, + 186, + 193, + 201, + 216, + 219, + 224, + 230, + 232, + 249, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -47,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/dsd_uds_512kb_3k_contexts/lading/lading.yaml b/test/smp/regression/saluki/cases/dsd_uds_512kb_3k_contexts/lading/lading.yaml index d8d3ca36e2..a49560d603 100644 --- a/test/smp/regression/saluki/cases/dsd_uds_512kb_3k_contexts/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/dsd_uds_512kb_3k_contexts/lading/lading.yaml @@ -1,7 +1,40 @@ generator: - unix_datagram: - seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + seed: + [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 89, + 97, + 101, + 103, + 107, + 109, + 113, + 127, + 131, + ] path: "/tmp/adp-dogstatsd-dgram.sock" block_cache_method: Fixed variant: @@ -48,4 +81,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" diff --git a/test/smp/regression/saluki/cases/quality_gates_idle_rss/lading/lading.yaml b/test/smp/regression/saluki/cases/quality_gates_idle_rss/lading/lading.yaml index e55c957ff6..5c3ac1efdc 100644 --- a/test/smp/regression/saluki/cases/quality_gates_idle_rss/lading/lading.yaml +++ b/test/smp/regression/saluki/cases/quality_gates_idle_rss/lading/lading.yaml @@ -6,4 +6,4 @@ blackhole: target_metrics: - prometheus: - uri: "http://127.0.0.1:5102/scrape" + uri: "http://127.0.0.1:5102/metrics" From 116a40a4f15642c912a1310bdf85c3f4bc072274 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 23 Sep 2025 16:20:53 -0400 Subject: [PATCH 5/5] slight reorganization --- .../src/destinations/prometheus/mod.rs | 364 +++++++++--------- 1 file changed, 175 insertions(+), 189 deletions(-) diff --git a/lib/saluki-components/src/destinations/prometheus/mod.rs b/lib/saluki-components/src/destinations/prometheus/mod.rs index 6dc934486b..2339f6c730 100644 --- a/lib/saluki-components/src/destinations/prometheus/mod.rs +++ b/lib/saluki-components/src/destinations/prometheus/mod.rs @@ -242,6 +242,167 @@ impl Destination for Prometheus { } } +#[derive(Clone)] +struct PrometheusHistogram { + sum: f64, + count: u64, + buckets: Vec<(f64, &'static str, u64)>, +} + +impl PrometheusHistogram { + fn new(metric_name: &str) -> Self { + // Super hacky but effective way to decide when to switch to the time-oriented buckets. + let base_buckets = if metric_name.ends_with("_seconds") { + &TIME_HISTOGRAM_BUCKETS[..] + } else { + &NON_TIME_HISTOGRAM_BUCKETS[..] + }; + + let buckets = base_buckets + .iter() + .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0)) + .collect(); + + Self { + sum: 0.0, + count: 0, + buckets, + } + } + + fn merge_histogram(&mut self, histogram: &Histogram) { + for sample in histogram.samples() { + self.add_sample(sample.value.into_inner(), sample.weight); + } + } + + fn add_sample(&mut self, value: f64, weight: u64) { + self.sum += value * weight as f64; + self.count += weight; + + // Add the value to each bucket that it falls into, up to the maximum number of buckets. + for (upper_bound, _, count) in &mut self.buckets { + if value <= *upper_bound { + *count += weight; + } + } + } + + fn buckets(&self) -> impl Iterator + '_ { + self.buckets + .iter() + .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count)) + } +} + +/// Prometheus metric type. +#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] +enum PrometheusType { + Counter, + Gauge, + Histogram, + Summary, +} + +impl PrometheusType { + fn try_from_values(values: &MetricValues) -> Option { + match values { + MetricValues::Counter(_) => Some(Self::Counter), + MetricValues::Gauge(_) | MetricValues::Set(_) => Some(Self::Gauge), + MetricValues::Histogram(_) => Some(Self::Histogram), + MetricValues::Distribution(_) => Some(Self::Summary), + _ => None, + } + } + + fn as_str(&self) -> &'static str { + match self { + Self::Counter => "counter", + Self::Gauge => "gauge", + Self::Histogram => "histogram", + Self::Summary => "summary", + } + } +} + +/// Prometheus metric value. +/// +/// Holds the value of a Prometheus metric of the same type. +enum PrometheusValue { + Counter(f64), + Gauge(f64), + Histogram(PrometheusHistogram), + Summary(DDSketch), +} + +/// Per-context values of a Prometheus metric. +/// +/// Holds the values attached to a specific context (metric name + tags), grouped by the _original_ +/// `Context` from which the values were extracted. Allows for efficient lookups of existing contexts. +struct GroupedValues { + prom_name: MetaString, + prom_type: PrometheusType, + groups: FastIndexMap, +} + +impl GroupedValues { + fn new(prom_name: MetaString, prom_type: PrometheusType) -> Self { + Self { + prom_name, + prom_type, + groups: FastIndexMap::default(), + } + } + + fn is_empty(&self) -> bool { + self.groups.is_empty() + } + + fn len(&self) -> usize { + self.groups.len() + } + + fn name(&self) -> &str { + &self.prom_name + } + + fn type_str(&self) -> &str { + self.prom_type.as_str() + } + + fn get_new_prom_value(&self) -> PrometheusValue { + match self.prom_type { + PrometheusType::Counter => PrometheusValue::Counter(0.0), + PrometheusType::Gauge => PrometheusValue::Gauge(0.0), + PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&self.prom_name)), + PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()), + } + } + + /// Inserts the given context and values. + /// + /// This overwrites the values if the context already exists. + fn insert(&mut self, context: Context, prom_tags: MetaString, values: MetricValues) { + let mut new_prom_values = self.get_new_prom_value(); + merge_metric_values_with_prom_value(&values, &mut new_prom_values); + + self.groups.insert(context, (prom_tags, new_prom_values)); + } + + /// Updates the given context by merging in the provided values. + /// + /// Returns `true` if the context exists and was updated, `false` otherwise. + fn update(&mut self, context: &Context, values: &MetricValues) -> bool { + match self.groups.get_mut(context) { + Some((_, prom_values)) => { + merge_metric_values_with_prom_value(values, prom_values); + true + } + None => false, + } + } +} + fn render_payload( metrics: &FastIndexMap, payload_builder: &mut StringBuilder, subpayload_builder: &mut StringBuilder, @@ -302,8 +463,8 @@ fn get_help_text(metric_name: &str) -> Option<&'static str> { } } -fn write_metrics(values: &GroupedValues, builder: &mut StringBuilder) -> fmt::Result { - builder.clear(); +fn write_metrics(values: &GroupedValues, buf: &mut StringBuilder) -> fmt::Result { + buf.clear(); if values.is_empty() { debug!("No contexts for metric '{}'. Skipping.", values.name()); @@ -314,56 +475,42 @@ fn write_metrics(values: &GroupedValues, builder: &mut StringBuilder) -> fmt::Re // Write HELP if available. if let Some(help_text) = get_help_text(metric_name) { - writeln!(builder, "# HELP {} {}", metric_name, help_text)?; + writeln!(buf, "# HELP {} {}", metric_name, help_text)?; } // Write the metric header. - writeln!(builder, "# TYPE {} {}", metric_name, values.type_str())?; + writeln!(buf, "# TYPE {} {}", metric_name, values.type_str())?; for (_, (tags, value)) in &values.groups { // Write the metric value itself. match value { PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => { // No metric type-specific tags for counters or gauges, so just write them straight out. - write_metric_line(builder, metric_name, None, tags, None, *value)?; + write_metric_line(buf, metric_name, None, tags, None, *value)?; } - PrometheusValue::Histogram(histogram) => { + PrometheusValue::Histogram(hist) => { // Write the histogram buckets. - for (le, count) in histogram.buckets() { - write_metric_line(builder, metric_name, SUFFIX_BUCKET, tags, Some(("le", le)), count)?; + for (le, count) in hist.buckets() { + write_metric_line(buf, metric_name, SUFFIX_BUCKET, tags, Some(("le", le)), count)?; } // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram. - write_metric_line( - builder, - metric_name, - SUFFIX_BUCKET, - tags, - Some(("le", "+Inf")), - histogram.count, - )?; + write_metric_line(buf, metric_name, SUFFIX_BUCKET, tags, Some(("le", "+Inf")), hist.count)?; // Write the histogram sum and count. - write_metric_line(builder, metric_name, SUFFIX_SUM, tags, None, histogram.sum)?; - write_metric_line(builder, metric_name, SUFFIX_COUNT, tags, None, histogram.count)?; + write_metric_line(buf, metric_name, SUFFIX_SUM, tags, None, hist.sum)?; + write_metric_line(buf, metric_name, SUFFIX_COUNT, tags, None, hist.count)?; } PrometheusValue::Summary(sketch) => { // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent // the quantiles people generally care about. for (q, q_str) in HISTOGRAM_QUANTILES { let q_value = sketch.quantile(*q).unwrap_or_default(); - write_metric_line(builder, metric_name, None, tags, Some(("quantile", q_str)), q_value)?; + write_metric_line(buf, metric_name, None, tags, Some(("quantile", q_str)), q_value)?; } - write_metric_line( - builder, - metric_name, - SUFFIX_SUM, - tags, - None, - sketch.sum().unwrap_or_default(), - )?; - write_metric_line(builder, metric_name, SUFFIX_COUNT, tags, None, sketch.count())?; + write_metric_line(buf, metric_name, SUFFIX_SUM, tags, None, sketch.sum().unwrap_or(0.0))?; + write_metric_line(buf, metric_name, SUFFIX_COUNT, tags, None, sketch.count())?; } } } @@ -404,46 +551,6 @@ fn write_metric_line( writeln!(builder, " {value}") } -/// Prometheus metric type. -#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] -enum PrometheusType { - Counter, - Gauge, - Histogram, - Summary, -} - -impl PrometheusType { - fn try_from_values(values: &MetricValues) -> Option { - match values { - MetricValues::Counter(_) => Some(Self::Counter), - MetricValues::Gauge(_) | MetricValues::Set(_) => Some(Self::Gauge), - MetricValues::Histogram(_) => Some(Self::Histogram), - MetricValues::Distribution(_) => Some(Self::Summary), - _ => None, - } - } - - fn as_str(&self) -> &'static str { - match self { - Self::Counter => "counter", - Self::Gauge => "gauge", - Self::Histogram => "histogram", - Self::Summary => "summary", - } - } -} - -/// Prometheus metric value. -/// -/// Holds the value of a Prometheus metric of the same type. -enum PrometheusValue { - Counter(f64), - Gauge(f64), - Histogram(PrometheusHistogram), - Summary(DDSketch), -} - fn try_context_to_prom_name(context: &Context, builder: &mut InternedStringBuilder) -> Option { match normalize_metric_name(context, builder) { Some(name) => Some(name), @@ -472,74 +579,6 @@ fn try_context_to_prom_tags( } } -/// Per-context values of a Prometheus metric. -/// -/// Holds the values attached to a specific context (metric name + tags), grouped by the _original_ -/// `Context` from which the values were extracted. Allows for efficient lookups of existing contexts. -struct GroupedValues { - prom_name: MetaString, - prom_type: PrometheusType, - groups: FastIndexMap, -} - -impl GroupedValues { - fn new(prom_name: MetaString, prom_type: PrometheusType) -> Self { - Self { - prom_name, - prom_type, - groups: FastIndexMap::default(), - } - } - - fn is_empty(&self) -> bool { - self.groups.is_empty() - } - - fn len(&self) -> usize { - self.groups.len() - } - - fn name(&self) -> &str { - &self.prom_name - } - - fn type_str(&self) -> &str { - self.prom_type.as_str() - } - - fn get_new_prom_value(&self) -> PrometheusValue { - match self.prom_type { - PrometheusType::Counter => PrometheusValue::Counter(0.0), - PrometheusType::Gauge => PrometheusValue::Gauge(0.0), - PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&self.prom_name)), - PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()), - } - } - - /// Inserts the given context and values. - /// - /// This overwrites the values if the context already exists. - fn insert(&mut self, context: Context, prom_tags: MetaString, values: MetricValues) { - let mut new_prom_values = self.get_new_prom_value(); - merge_metric_values_with_prom_value(&values, &mut new_prom_values); - - self.groups.insert(context, (prom_tags, new_prom_values)); - } - - /// Updates the given context by merging in the provided values. - /// - /// Returns `true` if the context exists and was updated, `false` otherwise. - fn update(&mut self, context: &Context, values: &MetricValues) -> bool { - match self.groups.get_mut(context) { - Some((_, prom_values)) => { - merge_metric_values_with_prom_value(values, prom_values); - true - } - None => false, - } - } -} - fn merge_metric_values_with_prom_value(values: &MetricValues, prom_value: &mut PrometheusValue) { match (values, prom_value) { (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => { @@ -641,59 +680,6 @@ fn is_valid_name_char(c: char) -> bool { c.is_ascii_alphanumeric() || c == '_' || c == ':' } -#[derive(Clone)] -struct PrometheusHistogram { - sum: f64, - count: u64, - buckets: Vec<(f64, &'static str, u64)>, -} - -impl PrometheusHistogram { - fn new(metric_name: &str) -> Self { - // Super hacky but effective way to decide when to switch to the time-oriented buckets. - let base_buckets = if metric_name.ends_with("_seconds") { - &TIME_HISTOGRAM_BUCKETS[..] - } else { - &NON_TIME_HISTOGRAM_BUCKETS[..] - }; - - let buckets = base_buckets - .iter() - .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0)) - .collect(); - - Self { - sum: 0.0, - count: 0, - buckets, - } - } - - fn merge_histogram(&mut self, histogram: &Histogram) { - for sample in histogram.samples() { - self.add_sample(sample.value.into_inner(), sample.weight); - } - } - - fn add_sample(&mut self, value: f64, weight: u64) { - self.sum += value * weight as f64; - self.count += weight; - - // Add the value to each bucket that it falls into, up to the maximum number of buckets. - for (upper_bound, _, count) in &mut self.buckets { - if value <= *upper_bound { - *count += weight; - } - } - } - - fn buckets(&self) -> impl Iterator + '_ { - self.buckets - .iter() - .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count)) - } -} - fn histogram_buckets(base: f64, scale: f64) -> [(f64, &'static str); N] { // We generate a set of "log-linear" buckets: logarithmically spaced values which are then subdivided linearly. //