From 948b64ed0e2a46d12515bf21a62d743066a4203c Mon Sep 17 00:00:00 2001 From: Caleb Metz Date: Tue, 13 Jan 2026 13:58:37 -0500 Subject: [PATCH 1/2] adds `total_bytes_received` metric Signed-off-by: Caleb Metz changelog Signed-off-by: Claude --- CHANGELOG.md | 3 +++ lading/src/blackhole/common.rs | 6 ++++++ lading/src/blackhole/datadog.rs | 11 ++++++++++- lading/src/blackhole/datadog_stateful_logs.rs | 7 +++++-- lading/src/blackhole/http.rs | 7 +++++-- lading/src/blackhole/otlp.rs | 1 + lading/src/blackhole/otlp/grpc.rs | 16 ++++++++++------ lading/src/blackhole/otlp/http.rs | 7 +++++-- lading/src/blackhole/splunk_hec.rs | 7 +++++-- lading/src/blackhole/sqs.rs | 7 +++++-- lading/src/blackhole/tcp.rs | 6 +++++- lading/src/blackhole/udp.rs | 3 +++ lading/src/blackhole/unix_datagram.rs | 3 +++ lading/src/blackhole/unix_stream.rs | 6 +++++- 14 files changed, 71 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dd0a5d89..97908aaa7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - dogstatsd generator can be configured to use metric names, tag names and tag values from a configured list. Configured with `metric_names`, `tag_names` and `tag_values`. +- All blackholes now emit a `total_bytes_received` metric with a single + `component:blackhole` label, providing an aggregated byte count across all + blackhole types and IDs. ## Fixed - Fixed a race condition in `lading_signal` that caused lading to hang on shutdown. diff --git a/lading/src/blackhole/common.rs b/lading/src/blackhole/common.rs index 7ea3b54e6..5fa155adb 100644 --- a/lading/src/blackhole/common.rs +++ b/lading/src/blackhole/common.rs @@ -16,6 +16,12 @@ use tokio::{ }; use tracing::{debug, error, info, warn}; +/// Labels shared by all blackholes to ensure a single aggregated `total_bytes_received` series. +/// +/// Note: the meaning of "bytes" varies by transport — HTTP-based blackholes report wire bytes, +/// while gRPC-based blackholes (OTLP gRPC, Datadog Stateful Logs) report protobuf `encoded_len()`. +pub(super) static COMMON_BLACKHOLE_LABELS: &[(&str, &str)] = &[("component", "blackhole")]; + #[derive(thiserror::Error, Debug)] pub enum Error { /// Wrapper for [`std::io::Error`]. diff --git a/lading/src/blackhole/datadog.rs b/lading/src/blackhole/datadog.rs index 1340dd0ae..8fbec612c 100644 --- a/lading/src/blackhole/datadog.rs +++ b/lading/src/blackhole/datadog.rs @@ -5,6 +5,12 @@ //! //! All other endpoints return `202 Accepted` without processing. //! +//! ## Metrics +//! +//! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types +//! `requests_received`: Total requests received +//! //! # Payload //! //! The V2 protobuf format is defined in `proto/agent_payload.proto`. @@ -39,6 +45,7 @@ use tokio::net::TcpListener; use tracing::{debug, error, info, trace, warn}; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; use crate::proto::datadog::intake::metrics::MetricPayload; #[derive(thiserror::Error, Debug)] @@ -212,7 +219,9 @@ async fn handle_request( } }; - counter!("bytes_received", labels).increment(whole_body.len() as u64); + let body_len = whole_body.len() as u64; + counter!("bytes_received", labels).increment(body_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); let content_type = headers .get(header::CONTENT_TYPE) diff --git a/lading/src/blackhole/datadog_stateful_logs.rs b/lading/src/blackhole/datadog_stateful_logs.rs index 2e5a91b5e..2a641b991 100644 --- a/lading/src/blackhole/datadog_stateful_logs.rs +++ b/lading/src/blackhole/datadog_stateful_logs.rs @@ -6,6 +6,7 @@ //! ## Metrics //! //! - `bytes_received`: Total bytes received +//! - `total_bytes_received`: Aggregated bytes received across all blackhole types //! - `streams_received`: Total streams received //! - `batches_received`: Total batches received //! - `data_items_received`: Total data items in batches @@ -29,6 +30,7 @@ use tonic::{Request, Response, Status, transport}; use tracing::{error, info}; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`DatadogStatefulLogs`]. @@ -183,9 +185,10 @@ impl StatefulLogsService for StatefulLogsServiceImpl { match result { Ok(batch) => { let batch_id = batch.batch_id; - let size = batch.encoded_len(); + let size = batch.encoded_len() as u64; - counter!("bytes_received", &labels).increment(size as u64); + counter!("bytes_received", &labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("batches_received", &labels).increment(1); // Count data items in the batch diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index a627ff925..c2c258dc7 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `bytes_received_distr`: Distribution of compressed bytes per request (with `path` label) //! `decoded_bytes_received`: Total decoded bytes received //! `decoded_bytes_received_distr`: Distribution of decompressed bytes per request (with `path` label) @@ -19,7 +20,7 @@ use std::{net::SocketAddr, time::Duration}; use tracing::error; use super::General; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; fn default_concurrent_requests_max() -> usize { 100 @@ -160,7 +161,9 @@ async fn srv( let body: Bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &metric_labels).increment(body.len() as u64); + let body_len = body.len() as u64; + counter!("bytes_received", &metric_labels).increment(body_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); let mut labels_with_path = metric_labels.clone(); labels_with_path.push(("path".to_string(), path)); diff --git a/lading/src/blackhole/otlp.rs b/lading/src/blackhole/otlp.rs index 20106700e..d8cd3d52b 100644 --- a/lading/src/blackhole/otlp.rs +++ b/lading/src/blackhole/otlp.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! `metrics_received`: Number of metric data points received //! `spans_received`: Number of spans received diff --git a/lading/src/blackhole/otlp/grpc.rs b/lading/src/blackhole/otlp/grpc.rs index e86c215c6..e1cd6772d 100644 --- a/lading/src/blackhole/otlp/grpc.rs +++ b/lading/src/blackhole/otlp/grpc.rs @@ -1,5 +1,6 @@ //! gRPC implementation of the OTLP blackhole. +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; use metrics::counter; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -83,9 +84,10 @@ impl MetricsService for OtlpMetricsService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_points: u64 = 0; @@ -132,9 +134,10 @@ impl TraceService for OtlpTracesService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_spans: u64 = 0; @@ -171,9 +174,10 @@ impl LogsService for OtlpLogsService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_logs: u64 = 0; diff --git a/lading/src/blackhole/otlp/http.rs b/lading/src/blackhole/otlp/http.rs index 92a334918..bd5c30573 100644 --- a/lading/src/blackhole/otlp/http.rs +++ b/lading/src/blackhole/otlp/http.rs @@ -21,7 +21,7 @@ use std::time::Duration; use tokio::task::JoinHandle; use tracing::{error, info}; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; /// Run the HTTP server for OTLP pub(crate) fn run_server( @@ -197,6 +197,7 @@ impl OtlpHttpHandler { && length == 0 { counter!("bytes_received", &self.labels).increment(0); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(0); let (response_bytes, content_type) = match (path_ref, response_format) { ("/v1/metrics", ResponseFormat::Json) => ( @@ -236,7 +237,9 @@ impl OtlpHttpHandler { let body_bytes = body.collect().await?.to_bytes(); - counter!("bytes_received", &self.labels).increment(body_bytes.len() as u64); + let body_len = body_bytes.len() as u64; + counter!("bytes_received", &self.labels).increment(body_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); let response_bytes = match crate::codec::decode(content_encoding.as_ref(), body_bytes.clone()) { diff --git a/lading/src/blackhole/splunk_hec.rs b/lading/src/blackhole/splunk_hec.rs index 1599fbc70..4f4ab9151 100644 --- a/lading/src/blackhole/splunk_hec.rs +++ b/lading/src/blackhole/splunk_hec.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! @@ -19,7 +20,7 @@ use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use super::General; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; static ACK_ID: AtomicU64 = AtomicU64::new(0); @@ -102,7 +103,9 @@ async fn srv( let (parts, body) = req.into_parts(); let bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &*labels).increment(bytes.len() as u64); + let bytes_len = bytes.len() as u64; + counter!("bytes_received", &*labels).increment(bytes_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes_len); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { Err(response) => Ok(*response), diff --git a/lading/src/blackhole/sqs.rs b/lading/src/blackhole/sqs.rs index 2cae450f2..ee33d2826 100644 --- a/lading/src/blackhole/sqs.rs +++ b/lading/src/blackhole/sqs.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total messages received //! @@ -16,7 +17,7 @@ use std::{fmt::Write, net::SocketAddr}; use tracing::{debug, error}; use super::General; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Sqs`] @@ -235,7 +236,9 @@ async fn srv( let (_, body) = req.into_parts(); let bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &metric_labels).increment(bytes.len() as u64); + let bytes_len = bytes.len() as u64; + counter!("bytes_received", &metric_labels).increment(bytes_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes_len); let action = match serde_qs::from_bytes::(&bytes) { Ok(a) => a, diff --git a/lading/src/blackhole/tcp.rs b/lading/src/blackhole/tcp.rs index b8a0603c1..a211678bd 100644 --- a/lading/src/blackhole/tcp.rs +++ b/lading/src/blackhole/tcp.rs @@ -4,6 +4,7 @@ //! //! `connection_accepted`: Incoming connections received //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `message_received`: Total messages received //! @@ -17,6 +18,7 @@ use tokio_util::io::ReaderStream; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors emitted by [`Tcp`] @@ -85,7 +87,9 @@ impl Tcp { while let Some(msg) = stream.next().await { counter!("message_received", labels).increment(1); if let Ok(msg) = msg { - counter!("bytes_received", labels).increment(msg.len() as u64); + let len = msg.len() as u64; + counter!("bytes_received", labels).increment(len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(len); } } } diff --git a/lading/src/blackhole/udp.rs b/lading/src/blackhole/udp.rs index 896a33f50..92fa2310d 100644 --- a/lading/src/blackhole/udp.rs +++ b/lading/src/blackhole/udp.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `packet_received`: Total packets received //! @@ -14,6 +15,7 @@ use tokio::net::UdpSocket; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Udp`]. @@ -109,6 +111,7 @@ impl Udp { })?; counter!("packet_received", &self.metric_labels).increment(1); counter!("bytes_received", &self.metric_labels).increment(bytes as u64); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_datagram.rs b/lading/src/blackhole/unix_datagram.rs index 88c494b85..cba28f107 100644 --- a/lading/src/blackhole/unix_datagram.rs +++ b/lading/src/blackhole/unix_datagram.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! use std::{io, path::PathBuf}; @@ -14,6 +15,7 @@ use tokio::net; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`UnixDatagram`]. @@ -108,6 +110,7 @@ impl UnixDatagram { source: Box::new(source), })?; counter!("bytes_received", &self.metric_labels).increment(n as u64); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(n as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_stream.rs b/lading/src/blackhole/unix_stream.rs index a1e90a2b9..73fcfea8d 100644 --- a/lading/src/blackhole/unix_stream.rs +++ b/lading/src/blackhole/unix_stream.rs @@ -4,6 +4,7 @@ //! //! `connection_accepted`: Incoming connections received //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! @@ -17,6 +18,7 @@ use tokio_util::io::ReaderStream; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`UnixStream`]. @@ -127,7 +129,9 @@ impl UnixStream { while let Some(msg) = stream.next().await { counter!("message_received", labels).increment(1); if let Ok(msg) = msg { - counter!("bytes_received", labels).increment(msg.len() as u64); + let len = msg.len() as u64; + counter!("bytes_received", labels).increment(len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(len); } } } From 80e2e24686562c78ae472636243fde7e8d678805 Mon Sep 17 00:00:00 2001 From: Caleb Metz Date: Tue, 17 Feb 2026 09:28:24 -0500 Subject: [PATCH 2/2] removed labels Signed-off-by: Caleb Metz --- CHANGELOG.md | 5 ++--- lading/src/blackhole/common.rs | 6 ------ lading/src/blackhole/datadog.rs | 3 +-- lading/src/blackhole/datadog_stateful_logs.rs | 3 +-- lading/src/blackhole/http.rs | 4 ++-- lading/src/blackhole/otlp/grpc.rs | 7 +++---- lading/src/blackhole/otlp/http.rs | 6 +++--- lading/src/blackhole/splunk_hec.rs | 4 ++-- lading/src/blackhole/sqs.rs | 4 ++-- lading/src/blackhole/tcp.rs | 3 +-- lading/src/blackhole/udp.rs | 3 +-- lading/src/blackhole/unix_datagram.rs | 3 +-- lading/src/blackhole/unix_stream.rs | 3 +-- 13 files changed, 20 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97908aaa7..ec199128f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,9 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - dogstatsd generator can be configured to use metric names, tag names and tag values from a configured list. Configured with `metric_names`, `tag_names` and `tag_values`. -- All blackholes now emit a `total_bytes_received` metric with a single - `component:blackhole` label, providing an aggregated byte count across all - blackhole types and IDs. +- All blackholes now emit a `total_bytes_received` metric (no labels), + providing an aggregated byte count across all blackhole types and IDs. ## Fixed - Fixed a race condition in `lading_signal` that caused lading to hang on shutdown. diff --git a/lading/src/blackhole/common.rs b/lading/src/blackhole/common.rs index 5fa155adb..7ea3b54e6 100644 --- a/lading/src/blackhole/common.rs +++ b/lading/src/blackhole/common.rs @@ -16,12 +16,6 @@ use tokio::{ }; use tracing::{debug, error, info, warn}; -/// Labels shared by all blackholes to ensure a single aggregated `total_bytes_received` series. -/// -/// Note: the meaning of "bytes" varies by transport — HTTP-based blackholes report wire bytes, -/// while gRPC-based blackholes (OTLP gRPC, Datadog Stateful Logs) report protobuf `encoded_len()`. -pub(super) static COMMON_BLACKHOLE_LABELS: &[(&str, &str)] = &[("component", "blackhole")]; - #[derive(thiserror::Error, Debug)] pub enum Error { /// Wrapper for [`std::io::Error`]. diff --git a/lading/src/blackhole/datadog.rs b/lading/src/blackhole/datadog.rs index 8fbec612c..501f524b1 100644 --- a/lading/src/blackhole/datadog.rs +++ b/lading/src/blackhole/datadog.rs @@ -45,7 +45,6 @@ use tokio::net::TcpListener; use tracing::{debug, error, info, trace, warn}; use super::General; -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; use crate::proto::datadog::intake::metrics::MetricPayload; #[derive(thiserror::Error, Debug)] @@ -221,7 +220,7 @@ async fn handle_request( let body_len = whole_body.len() as u64; counter!("bytes_received", labels).increment(body_len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); + counter!("total_bytes_received").increment(body_len); let content_type = headers .get(header::CONTENT_TYPE) diff --git a/lading/src/blackhole/datadog_stateful_logs.rs b/lading/src/blackhole/datadog_stateful_logs.rs index 2a641b991..17de278b8 100644 --- a/lading/src/blackhole/datadog_stateful_logs.rs +++ b/lading/src/blackhole/datadog_stateful_logs.rs @@ -30,7 +30,6 @@ use tonic::{Request, Response, Status, transport}; use tracing::{error, info}; use super::General; -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`DatadogStatefulLogs`]. @@ -188,7 +187,7 @@ impl StatefulLogsService for StatefulLogsServiceImpl { let size = batch.encoded_len() as u64; counter!("bytes_received", &labels).increment(size); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); + counter!("total_bytes_received").increment(size); counter!("batches_received", &labels).increment(1); // Count data items in the batch diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index c2c258dc7..cd1018500 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -20,7 +20,7 @@ use std::{net::SocketAddr, time::Duration}; use tracing::error; use super::General; -use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; +use crate::blackhole::common; fn default_concurrent_requests_max() -> usize { 100 @@ -163,7 +163,7 @@ async fn srv( let body_len = body.len() as u64; counter!("bytes_received", &metric_labels).increment(body_len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); + counter!("total_bytes_received").increment(body_len); let mut labels_with_path = metric_labels.clone(); labels_with_path.push(("path".to_string(), path)); diff --git a/lading/src/blackhole/otlp/grpc.rs b/lading/src/blackhole/otlp/grpc.rs index e1cd6772d..ab5e3abc1 100644 --- a/lading/src/blackhole/otlp/grpc.rs +++ b/lading/src/blackhole/otlp/grpc.rs @@ -1,6 +1,5 @@ //! gRPC implementation of the OTLP blackhole. -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; use metrics::counter; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -87,7 +86,7 @@ impl MetricsService for OtlpMetricsService { let size = request.encoded_len() as u64; counter!("bytes_received", &self.labels).increment(size); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); + counter!("total_bytes_received").increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_points: u64 = 0; @@ -137,7 +136,7 @@ impl TraceService for OtlpTracesService { let size = request.encoded_len() as u64; counter!("bytes_received", &self.labels).increment(size); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); + counter!("total_bytes_received").increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_spans: u64 = 0; @@ -177,7 +176,7 @@ impl LogsService for OtlpLogsService { let size = request.encoded_len() as u64; counter!("bytes_received", &self.labels).increment(size); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); + counter!("total_bytes_received").increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_logs: u64 = 0; diff --git a/lading/src/blackhole/otlp/http.rs b/lading/src/blackhole/otlp/http.rs index bd5c30573..9085dfe46 100644 --- a/lading/src/blackhole/otlp/http.rs +++ b/lading/src/blackhole/otlp/http.rs @@ -21,7 +21,7 @@ use std::time::Duration; use tokio::task::JoinHandle; use tracing::{error, info}; -use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; +use crate::blackhole::common; /// Run the HTTP server for OTLP pub(crate) fn run_server( @@ -197,7 +197,7 @@ impl OtlpHttpHandler { && length == 0 { counter!("bytes_received", &self.labels).increment(0); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(0); + counter!("total_bytes_received").increment(0); let (response_bytes, content_type) = match (path_ref, response_format) { ("/v1/metrics", ResponseFormat::Json) => ( @@ -239,7 +239,7 @@ impl OtlpHttpHandler { let body_len = body_bytes.len() as u64; counter!("bytes_received", &self.labels).increment(body_len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); + counter!("total_bytes_received").increment(body_len); let response_bytes = match crate::codec::decode(content_encoding.as_ref(), body_bytes.clone()) { diff --git a/lading/src/blackhole/splunk_hec.rs b/lading/src/blackhole/splunk_hec.rs index 4f4ab9151..ba73a74c8 100644 --- a/lading/src/blackhole/splunk_hec.rs +++ b/lading/src/blackhole/splunk_hec.rs @@ -20,7 +20,7 @@ use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use super::General; -use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; +use crate::blackhole::common; static ACK_ID: AtomicU64 = AtomicU64::new(0); @@ -105,7 +105,7 @@ async fn srv( let bytes = body.boxed().collect().await?.to_bytes(); let bytes_len = bytes.len() as u64; counter!("bytes_received", &*labels).increment(bytes_len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes_len); + counter!("total_bytes_received").increment(bytes_len); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { Err(response) => Ok(*response), diff --git a/lading/src/blackhole/sqs.rs b/lading/src/blackhole/sqs.rs index ee33d2826..af0c05040 100644 --- a/lading/src/blackhole/sqs.rs +++ b/lading/src/blackhole/sqs.rs @@ -17,7 +17,7 @@ use std::{fmt::Write, net::SocketAddr}; use tracing::{debug, error}; use super::General; -use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; +use crate::blackhole::common; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Sqs`] @@ -238,7 +238,7 @@ async fn srv( let bytes = body.boxed().collect().await?.to_bytes(); let bytes_len = bytes.len() as u64; counter!("bytes_received", &metric_labels).increment(bytes_len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes_len); + counter!("total_bytes_received").increment(bytes_len); let action = match serde_qs::from_bytes::(&bytes) { Ok(a) => a, diff --git a/lading/src/blackhole/tcp.rs b/lading/src/blackhole/tcp.rs index a211678bd..38d27c906 100644 --- a/lading/src/blackhole/tcp.rs +++ b/lading/src/blackhole/tcp.rs @@ -18,7 +18,6 @@ use tokio_util::io::ReaderStream; use tracing::info; use super::General; -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors emitted by [`Tcp`] @@ -89,7 +88,7 @@ impl Tcp { if let Ok(msg) = msg { let len = msg.len() as u64; counter!("bytes_received", labels).increment(len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(len); + counter!("total_bytes_received").increment(len); } } } diff --git a/lading/src/blackhole/udp.rs b/lading/src/blackhole/udp.rs index 92fa2310d..4331af942 100644 --- a/lading/src/blackhole/udp.rs +++ b/lading/src/blackhole/udp.rs @@ -15,7 +15,6 @@ use tokio::net::UdpSocket; use tracing::info; use super::General; -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Udp`]. @@ -111,7 +110,7 @@ impl Udp { })?; counter!("packet_received", &self.metric_labels).increment(1); counter!("bytes_received", &self.metric_labels).increment(bytes as u64); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes as u64); + counter!("total_bytes_received").increment(bytes as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_datagram.rs b/lading/src/blackhole/unix_datagram.rs index cba28f107..c9bc93d31 100644 --- a/lading/src/blackhole/unix_datagram.rs +++ b/lading/src/blackhole/unix_datagram.rs @@ -15,7 +15,6 @@ use tokio::net; use tracing::info; use super::General; -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`UnixDatagram`]. @@ -110,7 +109,7 @@ impl UnixDatagram { source: Box::new(source), })?; counter!("bytes_received", &self.metric_labels).increment(n as u64); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(n as u64); + counter!("total_bytes_received").increment(n as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_stream.rs b/lading/src/blackhole/unix_stream.rs index 73fcfea8d..2f42027bd 100644 --- a/lading/src/blackhole/unix_stream.rs +++ b/lading/src/blackhole/unix_stream.rs @@ -18,7 +18,6 @@ use tokio_util::io::ReaderStream; use tracing::info; use super::General; -use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`UnixStream`]. @@ -131,7 +130,7 @@ impl UnixStream { if let Ok(msg) = msg { let len = msg.len() as u64; counter!("bytes_received", labels).increment(len); - counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(len); + counter!("total_bytes_received").increment(len); } } }