diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dd0a5d89..ec199128f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - dogstatsd generator can be configured to use metric names, tag names and tag values from a configured list. Configured with `metric_names`, `tag_names` and `tag_values`. +- All blackholes now emit a `total_bytes_received` metric (no labels), + providing an aggregated byte count across all blackhole types and IDs. ## Fixed - Fixed a race condition in `lading_signal` that caused lading to hang on shutdown. diff --git a/lading/src/blackhole/datadog.rs b/lading/src/blackhole/datadog.rs index 1340dd0ae..501f524b1 100644 --- a/lading/src/blackhole/datadog.rs +++ b/lading/src/blackhole/datadog.rs @@ -5,6 +5,12 @@ //! //! All other endpoints return `202 Accepted` without processing. //! +//! ## Metrics +//! +//! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types +//! `requests_received`: Total requests received +//! //! # Payload //! //! The V2 protobuf format is defined in `proto/agent_payload.proto`. @@ -212,7 +218,9 @@ async fn handle_request( } }; - counter!("bytes_received", labels).increment(whole_body.len() as u64); + let body_len = whole_body.len() as u64; + counter!("bytes_received", labels).increment(body_len); + counter!("total_bytes_received").increment(body_len); let content_type = headers .get(header::CONTENT_TYPE) diff --git a/lading/src/blackhole/datadog_stateful_logs.rs b/lading/src/blackhole/datadog_stateful_logs.rs index 2e5a91b5e..17de278b8 100644 --- a/lading/src/blackhole/datadog_stateful_logs.rs +++ b/lading/src/blackhole/datadog_stateful_logs.rs @@ -6,6 +6,7 @@ //! ## Metrics //! //! - `bytes_received`: Total bytes received +//! - `total_bytes_received`: Aggregated bytes received across all blackhole types //! - `streams_received`: Total streams received //! - `batches_received`: Total batches received //! - `data_items_received`: Total data items in batches @@ -183,9 +184,10 @@ impl StatefulLogsService for StatefulLogsServiceImpl { match result { Ok(batch) => { let batch_id = batch.batch_id; - let size = batch.encoded_len(); + let size = batch.encoded_len() as u64; - counter!("bytes_received", &labels).increment(size as u64); + counter!("bytes_received", &labels).increment(size); + counter!("total_bytes_received").increment(size); counter!("batches_received", &labels).increment(1); // Count data items in the batch diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index a627ff925..cd1018500 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `bytes_received_distr`: Distribution of compressed bytes per request (with `path` label) //! `decoded_bytes_received`: Total decoded bytes received //! `decoded_bytes_received_distr`: Distribution of decompressed bytes per request (with `path` label) @@ -160,7 +161,9 @@ async fn srv( let body: Bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &metric_labels).increment(body.len() as u64); + let body_len = body.len() as u64; + counter!("bytes_received", &metric_labels).increment(body_len); + counter!("total_bytes_received").increment(body_len); let mut labels_with_path = metric_labels.clone(); labels_with_path.push(("path".to_string(), path)); diff --git a/lading/src/blackhole/otlp.rs b/lading/src/blackhole/otlp.rs index 20106700e..d8cd3d52b 100644 --- a/lading/src/blackhole/otlp.rs +++ b/lading/src/blackhole/otlp.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! `metrics_received`: Number of metric data points received //! `spans_received`: Number of spans received diff --git a/lading/src/blackhole/otlp/grpc.rs b/lading/src/blackhole/otlp/grpc.rs index e86c215c6..ab5e3abc1 100644 --- a/lading/src/blackhole/otlp/grpc.rs +++ b/lading/src/blackhole/otlp/grpc.rs @@ -83,9 +83,10 @@ impl MetricsService for OtlpMetricsService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received").increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_points: u64 = 0; @@ -132,9 +133,10 @@ impl TraceService for OtlpTracesService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received").increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_spans: u64 = 0; @@ -171,9 +173,10 @@ impl LogsService for OtlpLogsService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received").increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_logs: u64 = 0; diff --git a/lading/src/blackhole/otlp/http.rs b/lading/src/blackhole/otlp/http.rs index 92a334918..9085dfe46 100644 --- a/lading/src/blackhole/otlp/http.rs +++ b/lading/src/blackhole/otlp/http.rs @@ -197,6 +197,7 @@ impl OtlpHttpHandler { && length == 0 { counter!("bytes_received", &self.labels).increment(0); + counter!("total_bytes_received").increment(0); let (response_bytes, content_type) = match (path_ref, response_format) { ("/v1/metrics", ResponseFormat::Json) => ( @@ -236,7 +237,9 @@ impl OtlpHttpHandler { let body_bytes = body.collect().await?.to_bytes(); - counter!("bytes_received", &self.labels).increment(body_bytes.len() as u64); + let body_len = body_bytes.len() as u64; + counter!("bytes_received", &self.labels).increment(body_len); + counter!("total_bytes_received").increment(body_len); let response_bytes = match crate::codec::decode(content_encoding.as_ref(), body_bytes.clone()) { diff --git a/lading/src/blackhole/splunk_hec.rs b/lading/src/blackhole/splunk_hec.rs index 1599fbc70..ba73a74c8 100644 --- a/lading/src/blackhole/splunk_hec.rs +++ b/lading/src/blackhole/splunk_hec.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! @@ -102,7 +103,9 @@ async fn srv( let (parts, body) = req.into_parts(); let bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &*labels).increment(bytes.len() as u64); + let bytes_len = bytes.len() as u64; + counter!("bytes_received", &*labels).increment(bytes_len); + counter!("total_bytes_received").increment(bytes_len); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { Err(response) => Ok(*response), diff --git a/lading/src/blackhole/sqs.rs b/lading/src/blackhole/sqs.rs index 2cae450f2..af0c05040 100644 --- a/lading/src/blackhole/sqs.rs +++ b/lading/src/blackhole/sqs.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total messages received //! @@ -235,7 +236,9 @@ async fn srv( let (_, body) = req.into_parts(); let bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &metric_labels).increment(bytes.len() as u64); + let bytes_len = bytes.len() as u64; + counter!("bytes_received", &metric_labels).increment(bytes_len); + counter!("total_bytes_received").increment(bytes_len); let action = match serde_qs::from_bytes::(&bytes) { Ok(a) => a, diff --git a/lading/src/blackhole/tcp.rs b/lading/src/blackhole/tcp.rs index b8a0603c1..38d27c906 100644 --- a/lading/src/blackhole/tcp.rs +++ b/lading/src/blackhole/tcp.rs @@ -4,6 +4,7 @@ //! //! `connection_accepted`: Incoming connections received //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `message_received`: Total messages received //! @@ -85,7 +86,9 @@ impl Tcp { while let Some(msg) = stream.next().await { counter!("message_received", labels).increment(1); if let Ok(msg) = msg { - counter!("bytes_received", labels).increment(msg.len() as u64); + let len = msg.len() as u64; + counter!("bytes_received", labels).increment(len); + counter!("total_bytes_received").increment(len); } } } diff --git a/lading/src/blackhole/udp.rs b/lading/src/blackhole/udp.rs index 896a33f50..4331af942 100644 --- a/lading/src/blackhole/udp.rs +++ b/lading/src/blackhole/udp.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `packet_received`: Total packets received //! @@ -109,6 +110,7 @@ impl Udp { })?; counter!("packet_received", &self.metric_labels).increment(1); counter!("bytes_received", &self.metric_labels).increment(bytes as u64); + counter!("total_bytes_received").increment(bytes as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_datagram.rs b/lading/src/blackhole/unix_datagram.rs index 88c494b85..c9bc93d31 100644 --- a/lading/src/blackhole/unix_datagram.rs +++ b/lading/src/blackhole/unix_datagram.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! use std::{io, path::PathBuf}; @@ -108,6 +109,7 @@ impl UnixDatagram { source: Box::new(source), })?; counter!("bytes_received", &self.metric_labels).increment(n as u64); + counter!("total_bytes_received").increment(n as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_stream.rs b/lading/src/blackhole/unix_stream.rs index a1e90a2b9..2f42027bd 100644 --- a/lading/src/blackhole/unix_stream.rs +++ b/lading/src/blackhole/unix_stream.rs @@ -4,6 +4,7 @@ //! //! `connection_accepted`: Incoming connections received //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! @@ -127,7 +128,9 @@ impl UnixStream { while let Some(msg) = stream.next().await { counter!("message_received", labels).increment(1); if let Ok(msg) = msg { - counter!("bytes_received", labels).increment(msg.len() as u64); + let len = msg.len() as u64; + counter!("bytes_received", labels).increment(len); + counter!("total_bytes_received").increment(len); } } }