diff --git a/.claude/settings.json b/.claude/settings.json index 061f7e338..6ca8f017f 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -23,7 +23,7 @@ "Bash(gh repo view:*)", "WebFetch(domain:docs.sentry.io)", "WebFetch(domain:develop.sentry.dev)", - "Bash(grep:*)", + "Bash(grep:*)" ], "deny": [] } diff --git a/CHANGELOG.md b/CHANGELOG.md index f5abb9850..09aad8cde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Added a `Envelope::into_items` method, which returns an iterator over owned [`EnvelopeItem`s](https://docs.rs/sentry/0.46.2/sentry/protocol/enum.EnvelopeItem.html) in the [`Envelope`](https://docs.rs/sentry/0.46.2/sentry/struct.Envelope.html) ([#983](https://github.com/getsentry/sentry-rust/pull/983)). - Expose transport utilities ([#949](https://github.com/getsentry/sentry-rust/pull/949)) +- Add support for [trace metrics](https://develop.sentry.dev/sdk/telemetry/metrics/) ([#997](https://github.com/getsentry/sentry-rust/pull/997)) ## 0.46.2 diff --git a/sentry-core/Cargo.toml b/sentry-core/Cargo.toml index 237a2b72c..ef8c03556 100644 --- a/sentry-core/Cargo.toml +++ b/sentry-core/Cargo.toml @@ -25,6 +25,7 @@ client = ["rand"] test = ["client", "release-health"] release-health = [] logs = [] +metrics = [] [dependencies] log = { version = "0.4.8", optional = true, features = ["std"] } diff --git a/sentry-core/src/batcher.rs b/sentry-core/src/batcher.rs new file mode 100644 index 000000000..189df59a1 --- /dev/null +++ b/sentry-core/src/batcher.rs @@ -0,0 +1,316 @@ +//! Generic batching for Sentry envelope items (logs, metrics, etc.). + +use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +use crate::client::TransportArc; +use crate::protocol::EnvelopeItem; +use crate::Envelope; + +// Flush when there's 100 items in the buffer +const MAX_ITEMS: usize = 100; +// Or when 5 seconds have passed from the last flush +const FLUSH_INTERVAL: Duration = Duration::from_secs(5); + +/// Accumulates items in a queue and submits them through the transport when one of the flushing +/// conditions is met: either the queue reaches [`MAX_ITEMS`] or [`FLUSH_INTERVAL`] has elapsed. +pub(crate) struct Batcher +where + EnvelopeItem: From>, + T: Send + 'static, +{ + transport: TransportArc, + queue: Arc>>, + shutdown: Arc<(Mutex, Condvar)>, + worker: Option>, + name: &'static str, +} + +impl Batcher +where + EnvelopeItem: From>, + T: Send + 'static, +{ + /// Creates a new Batcher that will submit envelopes to the given `transport`. + /// + /// `name` is used for the background thread name and debug logging. + /// `into_envelope_item` converts a batch of items into an [`EnvelopeItem`]. + pub(crate) fn new(transport: TransportArc, name: &'static str) -> Self { + let queue: Arc>> = Arc::new(Mutex::new(Vec::new())); + #[allow(clippy::mutex_atomic)] + let shutdown = Arc::new((Mutex::new(false), Condvar::new())); + + let worker_transport = transport.clone(); + let worker_queue = queue.clone(); + let worker_shutdown = shutdown.clone(); + let worker = std::thread::Builder::new() + .name(format!("sentry-{name}-batcher")) + .spawn(move || { + let (lock, cvar) = worker_shutdown.as_ref(); + let mut shutdown = lock.lock().unwrap(); + // check this immediately, in case the main thread is already shutting down + if *shutdown { + return; + } + let mut last_flush = Instant::now(); + loop { + let timeout = FLUSH_INTERVAL + .checked_sub(last_flush.elapsed()) + .unwrap_or_else(|| Duration::from_secs(0)); + shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0; + if *shutdown { + return; + } + if last_flush.elapsed() >= FLUSH_INTERVAL { + Self::flush_queue_internal( + worker_queue.lock().unwrap(), + &worker_transport, + name, + ); + last_flush = Instant::now(); + } + } + }) + .unwrap(); + + Self { + transport, + queue, + shutdown, + worker: Some(worker), + name, + } + } + + /// Enqueues an item for delayed sending. + /// + /// This will automatically flush the queue if it reaches [`MAX_ITEMS`]. + pub(crate) fn enqueue(&self, item: T) { + let mut queue = self.queue.lock().unwrap(); + queue.push(item); + if queue.len() >= MAX_ITEMS { + Self::flush_queue_internal(queue, &self.transport, self.name); + } + } + + /// Flushes the queue to the transport. + pub(crate) fn flush(&self) { + let queue = self.queue.lock().unwrap(); + Self::flush_queue_internal(queue, &self.transport, self.name); + } + + /// Flushes the queue to the transport. + /// + /// This is a static method as it will be called from both the background + /// thread and the main thread on drop. + fn flush_queue_internal( + mut queue_lock: MutexGuard>, + transport: &TransportArc, + name: &str, + ) { + let items = std::mem::take(&mut *queue_lock); + drop(queue_lock); + + if items.is_empty() { + return; + } + + sentry_debug!("[Batcher({name})] Flushing {} items", items.len()); + + if let Some(ref transport) = *transport.read().unwrap() { + let mut envelope = Envelope::new(); + envelope.add_item(items); + transport.send_envelope(envelope); + } + } +} + +impl Drop for Batcher +where + EnvelopeItem: From>, + T: Send + 'static, +{ + fn drop(&mut self) { + let (lock, cvar) = self.shutdown.as_ref(); + *lock.lock().unwrap() = true; + cvar.notify_one(); + + if let Some(worker) = self.worker.take() { + worker.join().ok(); + } + Self::flush_queue_internal(self.queue.lock().unwrap(), &self.transport, self.name); + } +} + +#[cfg(all(test, feature = "test"))] +mod tests { + use crate::test; + + // ---- Log batching tests ---- + + #[cfg(feature = "logs")] + mod log_tests { + use super::*; + use crate::logger_info; + + #[test] + fn test_logs_batching() { + let envelopes = test::with_captured_envelopes_options( + || { + for i in 0..150 { + logger_info!("test log {}", i); + } + }, + crate::ClientOptions { + enable_logs: true, + ..Default::default() + }, + ); + + assert_eq!(2, envelopes.len()); + + let mut total_logs = 0; + for envelope in &envelopes { + for item in envelope.items() { + if let crate::protocol::EnvelopeItem::ItemContainer( + crate::protocol::ItemContainer::Logs(logs), + ) = item + { + total_logs += logs.len(); + } + } + } + + assert_eq!(150, total_logs); + } + + #[test] + fn test_logs_batcher_flush() { + let envelopes = test::with_captured_envelopes_options( + || { + for i in 0..12 { + logger_info!("test log {}", i); + } + }, + crate::ClientOptions { + enable_logs: true, + ..Default::default() + }, + ); + + assert_eq!(1, envelopes.len()); + + for envelope in &envelopes { + for item in envelope.items() { + if let crate::protocol::EnvelopeItem::ItemContainer( + crate::protocol::ItemContainer::Logs(logs), + ) = item + { + assert_eq!(12, logs.len()); + break; + } + } + } + } + } + + // ---- Metric batching tests ---- + + #[cfg(feature = "metrics")] + mod metric_tests { + use super::*; + use sentry_types::protocol::v7::{TraceId, TraceMetric, TraceMetricType}; + use std::time::SystemTime; + + fn test_metric(name: &str) -> TraceMetric { + TraceMetric { + r#type: TraceMetricType::Counter, + name: name.to_owned(), + value: 1.0, + timestamp: SystemTime::now(), + trace_id: TraceId::default(), + span_id: None, + unit: None, + attributes: Default::default(), + } + } + + #[test] + fn test_metrics_batching() { + let envelopes = test::with_captured_envelopes_options( + || { + for i in 0..150 { + crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}"))); + } + }, + crate::ClientOptions { + enable_metrics: true, + ..Default::default() + }, + ); + + assert_eq!(2, envelopes.len()); + + let mut total_metrics = 0; + for envelope in &envelopes { + for item in envelope.items() { + if let crate::protocol::EnvelopeItem::ItemContainer( + crate::protocol::ItemContainer::TraceMetrics(metrics), + ) = item + { + total_metrics += metrics.len(); + } + } + } + + assert_eq!(150, total_metrics); + } + + #[test] + fn test_metrics_disabled_explicitly() { + let envelopes = test::with_captured_envelopes_options( + || { + for i in 0..10 { + crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}"))); + } + }, + crate::ClientOptions { + enable_metrics: false, + ..Default::default() + }, + ); + + assert_eq!(0, envelopes.len()); + } + + #[test] + fn test_metrics_batcher_flush() { + let envelopes = test::with_captured_envelopes_options( + || { + for i in 0..12 { + crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}"))); + } + }, + crate::ClientOptions { + enable_metrics: true, + ..Default::default() + }, + ); + + assert_eq!(1, envelopes.len()); + + for envelope in &envelopes { + for item in envelope.items() { + if let crate::protocol::EnvelopeItem::ItemContainer( + crate::protocol::ItemContainer::TraceMetrics(metrics), + ) = item + { + assert_eq!(12, metrics.len()); + break; + } + } + } + } + } +} diff --git a/sentry-core/src/client.rs b/sentry-core/src/client.rs index a4b72d080..154a0c286 100644 --- a/sentry-core/src/client.rs +++ b/sentry-core/src/client.rs @@ -1,6 +1,6 @@ use std::any::TypeId; use std::borrow::Cow; -#[cfg(feature = "logs")] +#[cfg(any(feature = "logs", feature = "metrics"))] use std::collections::BTreeMap; use std::fmt; use std::panic::RefUnwindSafe; @@ -12,9 +12,9 @@ use crate::protocol::SessionUpdate; use rand::random; use sentry_types::random_uuid; +#[cfg(any(feature = "logs", feature = "metrics"))] +use crate::batcher::Batcher; use crate::constants::SDK_INFO; -#[cfg(feature = "logs")] -use crate::logs::LogsBatcher; use crate::protocol::{ClientSdkInfo, Event}; #[cfg(feature = "release-health")] use crate::session::SessionFlusher; @@ -24,6 +24,10 @@ use crate::SessionMode; use crate::{ClientOptions, Envelope, Hub, Integration, Scope, Transport}; #[cfg(feature = "logs")] use sentry_types::protocol::v7::Context; +#[cfg(all(feature = "metrics", not(feature = "logs")))] +use sentry_types::protocol::v7::LogAttribute; +#[cfg(feature = "metrics")] +use sentry_types::protocol::v7::TraceMetric; #[cfg(feature = "logs")] use sentry_types::protocol::v7::{Log, LogAttribute}; @@ -58,9 +62,13 @@ pub struct Client { #[cfg(feature = "release-health")] session_flusher: RwLock>, #[cfg(feature = "logs")] - logs_batcher: RwLock>, + logs_batcher: RwLock>>, #[cfg(feature = "logs")] default_log_attributes: Option>, + #[cfg(feature = "metrics")] + metrics_batcher: RwLock>>, + #[cfg(feature = "metrics")] + default_metric_attributes: Option>, integrations: Vec<(TypeId, Arc)>, pub(crate) sdk_info: ClientSdkInfo, } @@ -86,7 +94,14 @@ impl Clone for Client { #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if self.options.enable_logs { - Some(LogsBatcher::new(transport.clone())) + Some(Batcher::new(transport.clone(), "logs")) + } else { + None + }); + + #[cfg(feature = "metrics")] + let metrics_batcher = RwLock::new(if self.options.enable_metrics { + Some(Batcher::new(transport.clone(), "metrics")) } else { None }); @@ -100,6 +115,10 @@ impl Clone for Client { logs_batcher, #[cfg(feature = "logs")] default_log_attributes: self.default_log_attributes.clone(), + #[cfg(feature = "metrics")] + metrics_batcher, + #[cfg(feature = "metrics")] + default_metric_attributes: self.default_metric_attributes.clone(), integrations: self.integrations.clone(), sdk_info: self.sdk_info.clone(), } @@ -171,7 +190,14 @@ impl Client { #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if options.enable_logs { - Some(LogsBatcher::new(transport.clone())) + Some(Batcher::new(transport.clone(), "logs")) + } else { + None + }); + + #[cfg(feature = "metrics")] + let metrics_batcher = RwLock::new(if options.enable_metrics { + Some(Batcher::new(transport.clone(), "metrics")) } else { None }); @@ -186,6 +212,10 @@ impl Client { logs_batcher, #[cfg(feature = "logs")] default_log_attributes: None, + #[cfg(feature = "metrics")] + metrics_batcher, + #[cfg(feature = "metrics")] + default_metric_attributes: None, integrations, sdk_info, }; @@ -193,6 +223,9 @@ impl Client { #[cfg(feature = "logs")] client.cache_default_log_attributes(); + #[cfg(feature = "metrics")] + client.cache_default_metric_attributes(); + client } @@ -247,6 +280,35 @@ impl Client { self.default_log_attributes = Some(attributes); } + #[cfg(feature = "metrics")] + fn cache_default_metric_attributes(&mut self) { + let mut attributes = BTreeMap::new(); + + if let Some(environment) = self.options.environment.as_ref() { + attributes.insert("sentry.environment".to_owned(), environment.clone().into()); + } + + if let Some(release) = self.options.release.as_ref() { + attributes.insert("sentry.release".to_owned(), release.clone().into()); + } + + attributes.insert( + "sentry.sdk.name".to_owned(), + self.sdk_info.name.to_owned().into(), + ); + + attributes.insert( + "sentry.sdk.version".to_owned(), + self.sdk_info.version.to_owned().into(), + ); + + if let Some(server) = &self.options.server_name { + attributes.insert("server.address".to_owned(), server.clone().into()); + } + + self.default_metric_attributes = Some(attributes); + } + pub(crate) fn get_integration(&self) -> Option<&I> where I: Integration, @@ -420,6 +482,10 @@ impl Client { if let Some(ref batcher) = *self.logs_batcher.read().unwrap() { batcher.flush(); } + #[cfg(feature = "metrics")] + if let Some(ref batcher) = *self.metrics_batcher.read().unwrap() { + batcher.flush(); + } if let Some(ref transport) = *self.transport.read().unwrap() { transport.flush(timeout.unwrap_or(self.options.shutdown_timeout)) } else { @@ -439,6 +505,8 @@ impl Client { drop(self.session_flusher.write().unwrap().take()); #[cfg(feature = "logs")] drop(self.logs_batcher.write().unwrap().take()); + #[cfg(feature = "metrics")] + drop(self.metrics_batcher.write().unwrap().take()); let transport_opt = self.transport.write().unwrap().take(); if let Some(transport) = transport_opt { sentry_debug!("client close; request transport to shut down"); @@ -493,6 +561,41 @@ impl Client { Some(log) } + + /// Captures a trace metric and sends it to Sentry. + #[cfg(feature = "metrics")] + pub fn capture_metric(&self, metric: TraceMetric, scope: &Scope) { + if !self.options.enable_metrics { + return; + } + if let Some(metric) = self.prepare_metric(metric, scope) { + if let Some(ref batcher) = *self.metrics_batcher.read().unwrap() { + batcher.enqueue(metric); + } + } + } + + /// Prepares a metric to be sent, setting the `trace_id` and other default attributes, and + /// processing it through `before_send_metric`. + #[cfg(feature = "metrics")] + fn prepare_metric(&self, mut metric: TraceMetric, scope: &Scope) -> Option { + scope.apply_to_metric(&mut metric, self.options.send_default_pii); + + if let Some(default_attributes) = self.default_metric_attributes.as_ref() { + for (key, val) in default_attributes.iter() { + metric + .attributes + .entry(key.to_owned()) + .or_insert(val.clone()); + } + } + + if let Some(ref func) = self.options.before_send_metric { + metric = func(metric)?; + } + + Some(metric) + } } // Make this unwind safe. It's not out of the box because of the diff --git a/sentry-core/src/clientoptions.rs b/sentry-core/src/clientoptions.rs index 02d5d5a98..c81176c31 100644 --- a/sentry-core/src/clientoptions.rs +++ b/sentry-core/src/clientoptions.rs @@ -7,6 +7,8 @@ use crate::constants::USER_AGENT; use crate::performance::TracesSampler; #[cfg(feature = "logs")] use crate::protocol::Log; +#[cfg(feature = "metrics")] +use crate::protocol::TraceMetric; use crate::protocol::{Breadcrumb, Event}; use crate::types::Dsn; use crate::{Integration, IntoDsn, TransportFactory}; @@ -172,6 +174,12 @@ pub struct ClientOptions { /// Determines whether captured structured logs should be sent to Sentry (defaults to false). #[cfg(feature = "logs")] pub enable_logs: bool, + /// Determines whether captured trace metrics should be sent to Sentry (defaults to true). + #[cfg(feature = "metrics")] + pub enable_metrics: bool, + /// Callback that is executed for each TraceMetric being added. + #[cfg(feature = "metrics")] + pub before_send_metric: Option>, // Other options not documented in Unified API /// Disable SSL verification. /// @@ -232,6 +240,12 @@ impl fmt::Debug for ClientOptions { struct BeforeSendLog; self.before_send_log.as_ref().map(|_| BeforeSendLog) }; + #[cfg(feature = "metrics")] + let before_send_metric = { + #[derive(Debug)] + struct BeforeSendMetric; + self.before_send_metric.as_ref().map(|_| BeforeSendMetric) + }; #[derive(Debug)] struct TransportFactory; @@ -278,6 +292,11 @@ impl fmt::Debug for ClientOptions { .field("enable_logs", &self.enable_logs) .field("before_send_log", &before_send_log); + #[cfg(feature = "metrics")] + debug_struct + .field("enable_metrics", &self.enable_metrics) + .field("before_send_metric", &before_send_metric); + debug_struct.field("user_agent", &self.user_agent).finish() } } @@ -317,6 +336,10 @@ impl Default for ClientOptions { enable_logs: true, #[cfg(feature = "logs")] before_send_log: None, + #[cfg(feature = "metrics")] + enable_metrics: true, + #[cfg(feature = "metrics")] + before_send_metric: None, } } } diff --git a/sentry-core/src/hub.rs b/sentry-core/src/hub.rs index ebd70b0ca..bec92db7e 100644 --- a/sentry-core/src/hub.rs +++ b/sentry-core/src/hub.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock}; -use crate::protocol::{Event, Level, Log, LogAttribute, LogLevel, Map, SessionStatus}; +use crate::protocol::{Event, Level, Log, LogAttribute, LogLevel, Map, SessionStatus, TraceMetric}; use crate::types::Uuid; use crate::{Integration, IntoBreadcrumbs, Scope, ScopeGuard}; @@ -255,4 +255,14 @@ impl Hub { client.capture_log(log, &top.scope); }} } + + /// Captures a trace metric. + #[cfg(feature = "metrics")] + pub fn capture_metric(&self, metric: TraceMetric) { + with_client_impl! {{ + let top = self.inner.with(|stack| stack.top().clone()); + let Some(ref client) = top.client else { return }; + client.capture_metric(metric, &top.scope); + }} + } } diff --git a/sentry-core/src/lib.rs b/sentry-core/src/lib.rs index a39e63ca5..4300f75ff 100644 --- a/sentry-core/src/lib.rs +++ b/sentry-core/src/lib.rs @@ -134,14 +134,16 @@ pub use crate::scope::{Scope, ScopeGuard}; pub use crate::transport::{Transport, TransportFactory}; #[cfg(feature = "logs")] mod logger; // structured logging macros exported with `#[macro_export]` +#[cfg(feature = "metrics")] +mod metrics; // trace metrics macros exported with `#[macro_export]` // client feature +#[cfg(all(feature = "client", any(feature = "logs", feature = "metrics")))] +mod batcher; #[cfg(feature = "client")] mod client; #[cfg(feature = "client")] mod hub_impl; -#[cfg(all(feature = "client", feature = "logs"))] -mod logs; #[cfg(feature = "client")] mod session; diff --git a/sentry-core/src/logs.rs b/sentry-core/src/logs.rs deleted file mode 100644 index 9be3ee335..000000000 --- a/sentry-core/src/logs.rs +++ /dev/null @@ -1,197 +0,0 @@ -//! Batching for Sentry [structured logs](https://docs.sentry.io/product/explore/logs/). - -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; -use std::thread::JoinHandle; -use std::time::{Duration, Instant}; - -use crate::client::TransportArc; -use crate::protocol::EnvelopeItem; -use crate::Envelope; -use sentry_types::protocol::v7::Log; - -// Flush when there's 100 logs in the buffer -const MAX_LOG_ITEMS: usize = 100; -// Or when 5 seconds have passed from the last flush -const FLUSH_INTERVAL: Duration = Duration::from_secs(5); - -#[derive(Debug, Default)] -struct LogQueue { - logs: Vec, -} - -/// Accumulates logs in the queue and submits them through the transport when one of the flushing -/// conditions is met. -pub(crate) struct LogsBatcher { - transport: TransportArc, - queue: Arc>, - shutdown: Arc<(Mutex, Condvar)>, - worker: Option>, -} - -impl LogsBatcher { - /// Creates a new LogsBatcher that will submit envelopes to the given `transport`. - pub(crate) fn new(transport: TransportArc) -> Self { - let queue = Arc::new(Mutex::new(Default::default())); - #[allow(clippy::mutex_atomic)] - let shutdown = Arc::new((Mutex::new(false), Condvar::new())); - - let worker_transport = transport.clone(); - let worker_queue = queue.clone(); - let worker_shutdown = shutdown.clone(); - let worker = std::thread::Builder::new() - .name("sentry-logs-batcher".into()) - .spawn(move || { - let (lock, cvar) = worker_shutdown.as_ref(); - let mut shutdown = lock.lock().unwrap(); - // check this immediately, in case the main thread is already shutting down - if *shutdown { - return; - } - let mut last_flush = Instant::now(); - loop { - let timeout = FLUSH_INTERVAL - .checked_sub(last_flush.elapsed()) - .unwrap_or_else(|| Duration::from_secs(0)); - shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0; - if *shutdown { - return; - } - if last_flush.elapsed() >= FLUSH_INTERVAL { - LogsBatcher::flush_queue_internal( - worker_queue.lock().unwrap(), - &worker_transport, - ); - last_flush = Instant::now(); - } - } - }) - .unwrap(); - - Self { - transport, - queue, - shutdown, - worker: Some(worker), - } - } - - /// Enqueues a log for delayed sending. - /// - /// This will automatically flush the queue if it reaches a size of `BATCH_SIZE`. - pub(crate) fn enqueue(&self, log: Log) { - let mut queue = self.queue.lock().unwrap(); - queue.logs.push(log); - if queue.logs.len() >= MAX_LOG_ITEMS { - LogsBatcher::flush_queue_internal(queue, &self.transport); - } - } - - /// Flushes the queue to the transport. - pub(crate) fn flush(&self) { - let queue = self.queue.lock().unwrap(); - LogsBatcher::flush_queue_internal(queue, &self.transport); - } - - /// Flushes the queue to the transport. - /// - /// This is a static method as it will be called from both the background - /// thread and the main thread on drop. - fn flush_queue_internal(mut queue_lock: MutexGuard, transport: &TransportArc) { - let logs = std::mem::take(&mut queue_lock.logs); - drop(queue_lock); - - if logs.is_empty() { - return; - } - - sentry_debug!("[LogsBatcher] Flushing {} logs", logs.len()); - - if let Some(ref transport) = *transport.read().unwrap() { - let mut envelope = Envelope::new(); - let logs_item: EnvelopeItem = logs.into(); - envelope.add_item(logs_item); - transport.send_envelope(envelope); - } - } -} - -impl Drop for LogsBatcher { - fn drop(&mut self) { - let (lock, cvar) = self.shutdown.as_ref(); - *lock.lock().unwrap() = true; - cvar.notify_one(); - - if let Some(worker) = self.worker.take() { - worker.join().ok(); - } - LogsBatcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); - } -} - -#[cfg(all(test, feature = "test"))] -mod tests { - use crate::logger_info; - use crate::test; - - // Test that logs are sent in batches - #[test] - fn test_logs_batching() { - let envelopes = test::with_captured_envelopes_options( - || { - for i in 0..150 { - logger_info!("test log {}", i); - } - }, - crate::ClientOptions { - enable_logs: true, - ..Default::default() - }, - ); - - assert_eq!(2, envelopes.len()); - - let mut total_logs = 0; - for envelope in &envelopes { - for item in envelope.items() { - if let crate::protocol::EnvelopeItem::ItemContainer( - crate::protocol::ItemContainer::Logs(logs), - ) = item - { - total_logs += logs.len(); - } - } - } - - assert_eq!(150, total_logs); - } - - // Test that the batcher is flushed on client close - #[test] - fn test_logs_batcher_flush() { - let envelopes = test::with_captured_envelopes_options( - || { - for i in 0..12 { - logger_info!("test log {}", i); - } - }, - crate::ClientOptions { - enable_logs: true, - ..Default::default() - }, - ); - - assert_eq!(1, envelopes.len()); - - for envelope in &envelopes { - for item in envelope.items() { - if let crate::protocol::EnvelopeItem::ItemContainer( - crate::protocol::ItemContainer::Logs(logs), - ) = item - { - assert_eq!(12, logs.len()); - break; - } - } - } - } -} diff --git a/sentry-core/src/metrics.rs b/sentry-core/src/metrics.rs new file mode 100644 index 000000000..194e00470 --- /dev/null +++ b/sentry-core/src/metrics.rs @@ -0,0 +1,182 @@ +//! Macros for Sentry [trace metrics](https://develop.sentry.dev/sdk/telemetry/metrics/). + +// Helper macro to emit a metric at the given type. Should not be used directly. +#[doc(hidden)] +#[macro_export] +macro_rules! metric_emit { + // Name and value only + ($type:expr, $name:expr, $value:expr) => {{ + let metric = $crate::protocol::TraceMetric { + r#type: $type, + name: $name.to_owned(), + value: $value as f64, + timestamp: ::std::time::SystemTime::now(), + trace_id: $crate::protocol::TraceId::default(), + span_id: None, + unit: None, + attributes: $crate::protocol::Map::new(), + }; + $crate::Hub::current().capture_metric(metric) + }}; + + // Attributes entrypoint + ($type:expr, $name:expr, $value:expr, $($rest:tt)+) => {{ + let mut attributes = $crate::protocol::Map::new(); + let mut unit: Option = None; + $crate::metric_emit!(@internal attributes, unit, $type, $name, $value, $($rest)+) + }}; + + // Recursive case: unit = value, followed by more + (@internal $attrs:ident, $unit:ident, $type:expr, $name:expr, $value:expr, unit = $uval:expr, $($rest:tt)+) => {{ + $unit = Some($uval.to_owned()); + $crate::metric_emit!(@internal $attrs, $unit, $type, $name, $value, $($rest)+) + }}; + + // Base case: unit = value (last pair) + (@internal $attrs:ident, $unit:ident, $type:expr, $name:expr, $value:expr, unit = $uval:expr) => {{ + $unit = Some($uval.to_owned()); + let metric = $crate::protocol::TraceMetric { + r#type: $type, + name: $name.to_owned(), + value: $value as f64, + timestamp: ::std::time::SystemTime::now(), + trace_id: $crate::protocol::TraceId::default(), + span_id: None, + #[allow(clippy::redundant_field_names)] + unit: $unit, + #[allow(clippy::redundant_field_names)] + attributes: $attrs, + }; + $crate::Hub::current().capture_metric(metric) + }}; + + // Recursive case: quoted key = value, followed by more + (@internal $attrs:ident, $unit:ident, $type:expr, $name:expr, $value:expr, $key:literal = $aval:expr, $($rest:tt)+) => {{ + $attrs.insert( + $key.to_owned(), + $crate::protocol::LogAttribute($crate::protocol::Value::from($aval)) + ); + $crate::metric_emit!(@internal $attrs, $unit, $type, $name, $value, $($rest)+) + }}; + + // Base case: quoted key = value (last pair) + (@internal $attrs:ident, $unit:ident, $type:expr, $name:expr, $value:expr, $key:literal = $aval:expr) => {{ + $attrs.insert( + $key.to_owned(), + $crate::protocol::LogAttribute($crate::protocol::Value::from($aval)) + ); + let metric = $crate::protocol::TraceMetric { + r#type: $type, + name: $name.to_owned(), + value: $value as f64, + timestamp: ::std::time::SystemTime::now(), + trace_id: $crate::protocol::TraceId::default(), + span_id: None, + #[allow(clippy::redundant_field_names)] + unit: $unit, + #[allow(clippy::redundant_field_names)] + attributes: $attrs, + }; + $crate::Hub::current().capture_metric(metric) + }}; + + // Recursive case: ident key = value, followed by more + (@internal $attrs:ident, $unit:ident, $type:expr, $name:expr, $value:expr, $($key:ident).+ = $aval:expr, $($rest:tt)+) => {{ + $attrs.insert( + stringify!($($key).+).to_owned(), + $crate::protocol::LogAttribute($crate::protocol::Value::from($aval)) + ); + $crate::metric_emit!(@internal $attrs, $unit, $type, $name, $value, $($rest)+) + }}; + + // Base case: ident key = value (last pair) + (@internal $attrs:ident, $unit:ident, $type:expr, $name:expr, $value:expr, $($key:ident).+ = $aval:expr) => {{ + $attrs.insert( + stringify!($($key).+).to_owned(), + $crate::protocol::LogAttribute($crate::protocol::Value::from($aval)) + ); + let metric = $crate::protocol::TraceMetric { + r#type: $type, + name: $name.to_owned(), + value: $value as f64, + timestamp: ::std::time::SystemTime::now(), + trace_id: $crate::protocol::TraceId::default(), + span_id: None, + #[allow(clippy::redundant_field_names)] + unit: $unit, + #[allow(clippy::redundant_field_names)] + attributes: $attrs, + }; + $crate::Hub::current().capture_metric(metric) + }}; +} + +/// Emits a counter metric. Counters track event frequency (e.g., requests, errors). +/// +/// Attributes can be passed with `key = value` or `"key" = value` syntax. +/// A measurement unit can be set with `unit = "..."`. To set an attribute +/// named "unit", quote the key: `"unit" = "..."`. +/// +/// # Examples +/// +/// ``` +/// use sentry::metric_count; +/// +/// // Simple counter +/// metric_count!("api.requests", 1); +/// +/// // With attributes +/// metric_count!("api.requests", 1, route = "/users", method = "GET"); +/// +/// // With unit +/// metric_count!("api.requests", 1, unit = "request"); +/// +/// // Quoted key to set an attribute named "unit" +/// metric_count!("api.requests", 1, "unit" = "request"); +/// ``` +#[macro_export] +macro_rules! metric_count { + ($name:expr, $value:expr $(, $($rest:tt)+)?) => { + $crate::metric_emit!($crate::protocol::TraceMetricType::Counter, $name, $value $(, $($rest)+)?) + }; +} + +/// Emits a gauge metric. Gauges represent current state (e.g., memory usage, pool size). +/// +/// Attributes can be passed with `key = value` or `"key" = value` syntax. +/// A measurement unit can be set with `unit = "..."`. To set an attribute +/// named "unit", quote the key: `"unit" = "..."`. +/// +/// # Examples +/// +/// ``` +/// use sentry::metric_gauge; +/// +/// metric_gauge!("memory.usage", 1024.0, unit = "byte"); +/// ``` +#[macro_export] +macro_rules! metric_gauge { + ($name:expr, $value:expr $(, $($rest:tt)+)?) => { + $crate::metric_emit!($crate::protocol::TraceMetricType::Gauge, $name, $value $(, $($rest)+)?) + }; +} + +/// Emits a distribution metric. Distributions measure statistical spread (e.g., response times). +/// +/// Attributes can be passed with `key = value` or `"key" = value` syntax. +/// A measurement unit can be set with `unit = "..."`. To set an attribute +/// named "unit", quote the key: `"unit" = "..."`. +/// +/// # Examples +/// +/// ``` +/// use sentry::metric_distribution; +/// +/// metric_distribution!("response.time", 150.0, unit = "millisecond", route = "/users"); +/// ``` +#[macro_export] +macro_rules! metric_distribution { + ($name:expr, $value:expr $(, $($rest:tt)+)?) => { + $crate::metric_emit!($crate::protocol::TraceMetricType::Distribution, $name, $value $(, $($rest)+)?) + }; +} diff --git a/sentry-core/src/scope/noop.rs b/sentry-core/src/scope/noop.rs index fc62120b9..af3df0b50 100644 --- a/sentry-core/src/scope/noop.rs +++ b/sentry-core/src/scope/noop.rs @@ -2,6 +2,8 @@ use std::fmt; #[cfg(feature = "logs")] use crate::protocol::Log; +#[cfg(feature = "metrics")] +use crate::protocol::TraceMetric; use crate::protocol::{Context, Event, Level, User, Value}; use crate::TransactionOrSpan; @@ -119,6 +121,14 @@ impl Scope { minimal_unreachable!(); } + /// Applies the contained scoped data to fill a trace metric. + #[cfg(feature = "metrics")] + pub fn apply_to_metric(&self, metric: &mut TraceMetric, send_default_pii: bool) { + let _metric = metric; + let _send_default_pii = send_default_pii; + minimal_unreachable!(); + } + /// Set the given [`TransactionOrSpan`] as the active span for this scope. pub fn set_span(&mut self, span: Option) { let _ = span; diff --git a/sentry-core/src/scope/real.rs b/sentry-core/src/scope/real.rs index 590f39214..a1ccf4b54 100644 --- a/sentry-core/src/scope/real.rs +++ b/sentry-core/src/scope/real.rs @@ -6,11 +6,16 @@ use std::sync::Mutex; use std::sync::{Arc, PoisonError, RwLock}; use crate::performance::TransactionOrSpan; +#[cfg(feature = "logs")] +use crate::protocol::Log; +#[cfg(any(feature = "logs", feature = "metrics"))] +use crate::protocol::LogAttribute; +#[cfg(feature = "metrics")] +use crate::protocol::TraceMetric; use crate::protocol::{ Attachment, Breadcrumb, Context, Event, Level, TraceContext, Transaction, User, Value, }; -#[cfg(feature = "logs")] -use crate::protocol::{Log, LogAttribute}; + #[cfg(feature = "release-health")] use crate::session::Session; use crate::{Client, SentryTrace, TraceHeader, TraceHeadersIter}; @@ -399,6 +404,59 @@ impl Scope { } } + /// Applies the contained scoped data to a trace metric, setting the `trace_id`, `span_id`, + /// and certain default attributes. User PII attributes are only attached when + /// `send_default_pii` is `true`. + #[cfg(feature = "metrics")] + pub fn apply_to_metric(&self, metric: &mut TraceMetric, send_default_pii: bool) { + if let Some(span) = self.span.as_ref() { + metric.trace_id = span.get_trace_context().trace_id; + } else { + metric.trace_id = self.propagation_context.trace_id; + } + + if metric.span_id.is_none() { + if let Some(span) = self.get_span() { + let span_id = match span { + crate::TransactionOrSpan::Transaction(transaction) => { + transaction.get_trace_context().span_id + } + crate::TransactionOrSpan::Span(span) => span.get_span_id(), + }; + metric.span_id = Some(span_id); + } + } + + if send_default_pii { + if let Some(user) = self.user.as_ref() { + if !metric.attributes.contains_key("user.id") { + if let Some(id) = user.id.as_ref() { + metric + .attributes + .insert("user.id".to_owned(), LogAttribute(id.to_owned().into())); + } + } + + if !metric.attributes.contains_key("user.name") { + if let Some(name) = user.username.as_ref() { + metric + .attributes + .insert("user.name".to_owned(), LogAttribute(name.to_owned().into())); + } + } + + if !metric.attributes.contains_key("user.email") { + if let Some(email) = user.email.as_ref() { + metric.attributes.insert( + "user.email".to_owned(), + LogAttribute(email.to_owned().into()), + ); + } + } + } + } + } + /// Set the given [`TransactionOrSpan`] as the active span for this scope. pub fn set_span(&mut self, span: Option) { self.span = Arc::new(span); diff --git a/sentry-types/src/protocol/envelope.rs b/sentry-types/src/protocol/envelope.rs index 040ca91ba..a3bdcca8a 100644 --- a/sentry-types/src/protocol/envelope.rs +++ b/sentry-types/src/protocol/envelope.rs @@ -11,7 +11,7 @@ use super::v7 as protocol; use protocol::{ Attachment, AttachmentType, ClientSdkInfo, DynamicSamplingContext, Event, Log, MonitorCheckIn, - SessionAggregates, SessionUpdate, Transaction, + SessionAggregates, SessionUpdate, TraceMetric, Transaction, }; /// Raised if a envelope cannot be parsed from a given input. @@ -127,6 +127,9 @@ enum EnvelopeItemType { /// A container of Log items. #[serde(rename = "log")] LogsContainer, + /// A container of TraceMetric items. + #[serde(rename = "trace_metric")] + TraceMetricsContainer, } /// An Envelope Item Header. @@ -192,6 +195,8 @@ pub enum EnvelopeItem { pub enum ItemContainer { /// A list of logs. Logs(Vec), + /// A list of trace metrics. + TraceMetrics(Vec), } #[allow(clippy::len_without_is_empty, reason = "is_empty is not needed")] @@ -200,6 +205,7 @@ impl ItemContainer { pub fn len(&self) -> usize { match self { Self::Logs(logs) => logs.len(), + Self::TraceMetrics(metrics) => metrics.len(), } } @@ -207,6 +213,7 @@ impl ItemContainer { pub fn ty(&self) -> &'static str { match self { Self::Logs(_) => "log", + Self::TraceMetrics(_) => "trace_metric", } } @@ -214,6 +221,7 @@ impl ItemContainer { pub fn content_type(&self) -> &'static str { match self { Self::Logs(_) => "application/vnd.sentry.items.log+json", + Self::TraceMetrics(_) => "application/vnd.sentry.items.trace-metric+json", } } } @@ -224,6 +232,12 @@ impl From> for ItemContainer { } } +impl From> for ItemContainer { + fn from(metrics: Vec) -> Self { + Self::TraceMetrics(metrics) + } +} + #[derive(Serialize)] struct LogsSerializationWrapper<'a> { items: &'a [Log], @@ -234,6 +248,16 @@ struct LogsDeserializationWrapper { items: Vec, } +#[derive(Serialize)] +struct TraceMetricsSerializationWrapper<'a> { + items: &'a [TraceMetric], +} + +#[derive(Deserialize)] +struct TraceMetricsDeserializationWrapper { + items: Vec, +} + impl From> for EnvelopeItem { fn from(event: Event<'static>) -> Self { EnvelopeItem::Event(event) @@ -282,6 +306,12 @@ impl From> for EnvelopeItem { } } +impl From> for EnvelopeItem { + fn from(metrics: Vec) -> Self { + EnvelopeItem::ItemContainer(metrics.into()) + } +} + /// An Iterator over the items of an Envelope. #[derive(Clone)] pub struct EnvelopeItemIter<'s> { @@ -505,6 +535,10 @@ impl Envelope { let wrapper = LogsSerializationWrapper { items: logs }; serde_json::to_writer(&mut item_buf, &wrapper)? } + ItemContainer::TraceMetrics(metrics) => { + let wrapper = TraceMetricsSerializationWrapper { items: metrics }; + serde_json::to_writer(&mut item_buf, &wrapper)? + } }, EnvelopeItem::Raw => { continue; @@ -676,6 +710,10 @@ impl Envelope { serde_json::from_slice::(payload) .map(|x| EnvelopeItem::ItemContainer(ItemContainer::Logs(x.items))) } + EnvelopeItemType::TraceMetricsContainer => { + serde_json::from_slice::(payload) + .map(|x| EnvelopeItem::ItemContainer(ItemContainer::TraceMetrics(x.items))) + } } .map_err(EnvelopeError::InvalidItemPayload)?; diff --git a/sentry-types/src/protocol/v7.rs b/sentry-types/src/protocol/v7.rs index b2f8ee99a..fcf52ddfa 100644 --- a/sentry-types/src/protocol/v7.rs +++ b/sentry-types/src/protocol/v7.rs @@ -2368,6 +2368,43 @@ impl<'de> Deserialize<'de> for LogAttribute { } } +/// The type of a [trace metric](https://develop.sentry.dev/sdk/telemetry/metrics/). +#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum TraceMetricType { + /// A counter metric that only increments. + Counter, + /// A gauge metric that can go up and down. + Gauge, + /// A distribution metric for statistical spread measurements. + Distribution, +} + +/// A single [trace metric](https://develop.sentry.dev/sdk/telemetry/metrics/). +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct TraceMetric { + /// The metric type (required). + pub r#type: TraceMetricType, + /// The metric name (required). Uses dot separators for hierarchy. + pub name: String, + /// The numeric value (required). + pub value: f64, + /// The timestamp when recorded (required). + #[serde(with = "ts_seconds_float")] + pub timestamp: SystemTime, + /// The trace ID this metric is associated with (required). + pub trace_id: TraceId, + /// The span ID of the active span, if any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub span_id: Option, + /// The measurement unit. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub unit: Option, + /// Additional key-value attributes. + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub attributes: Map, +} + /// An ID that identifies an organization in the Sentry backend. #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] pub struct OrganizationId(u64); diff --git a/sentry/Cargo.toml b/sentry/Cargo.toml index 19076bdb2..b31c2a0cb 100644 --- a/sentry/Cargo.toml +++ b/sentry/Cargo.toml @@ -49,6 +49,7 @@ opentelemetry = ["sentry-opentelemetry"] test = ["sentry-core/test"] release-health = ["sentry-core/release-health", "sentry-actix?/release-health"] logs = ["sentry-core/logs", "sentry-tracing?/logs", "sentry-log?/logs"] +metrics = ["sentry-core/metrics"] # transports transport = ["reqwest", "native-tls"] reqwest = ["dep:reqwest", "httpdate", "tokio"] diff --git a/sentry/src/transports/ratelimit.rs b/sentry/src/transports/ratelimit.rs index fbe053590..5a4a47586 100644 --- a/sentry/src/transports/ratelimit.rs +++ b/sentry/src/transports/ratelimit.rs @@ -14,6 +14,7 @@ pub struct RateLimiter { transaction: Option, attachment: Option, log_item: Option, + trace_metric: Option, } impl RateLimiter { @@ -59,6 +60,7 @@ impl RateLimiter { "transaction" => self.transaction = new_time, "attachment" => self.attachment = new_time, "log_item" => self.log_item = new_time, + "trace_metric" => self.trace_metric = new_time, _ => {} } } @@ -93,6 +95,7 @@ impl RateLimiter { RateLimitingCategory::Transaction => self.transaction, RateLimitingCategory::Attachment => self.attachment, RateLimitingCategory::LogItem => self.log_item, + RateLimitingCategory::TraceMetric => self.trace_metric, }?; time_left.duration_since(SystemTime::now()).ok() } @@ -119,6 +122,9 @@ impl RateLimiter { EnvelopeItem::ItemContainer(ItemContainer::Logs(_)) => { RateLimitingCategory::LogItem } + EnvelopeItem::ItemContainer(ItemContainer::TraceMetrics(_)) => { + RateLimitingCategory::TraceMetric + } _ => RateLimitingCategory::Any, }) }) @@ -140,6 +146,8 @@ pub enum RateLimitingCategory { Attachment, /// Rate Limit pertaining to Log Items. LogItem, + /// Rate Limit pertaining to Trace Metrics. + TraceMetric, } #[cfg(test)] @@ -155,6 +163,7 @@ mod tests { assert!(rl.is_disabled(RateLimitingCategory::Session).unwrap() <= Duration::from_secs(60)); assert!(rl.is_disabled(RateLimitingCategory::Transaction).is_none()); assert!(rl.is_disabled(RateLimitingCategory::LogItem).is_none()); + assert!(rl.is_disabled(RateLimitingCategory::TraceMetric).is_none()); assert!(rl.is_disabled(RateLimitingCategory::Any).is_none()); rl.update_from_sentry_header( @@ -185,6 +194,9 @@ mod tests { assert!( rl.is_disabled(RateLimitingCategory::Attachment).unwrap() <= Duration::from_secs(120) ); + assert!( + rl.is_disabled(RateLimitingCategory::TraceMetric).unwrap() <= Duration::from_secs(120) + ); assert!(rl.is_disabled(RateLimitingCategory::Any).unwrap() <= Duration::from_secs(120)); }