diff --git a/metrics/src/server.rs b/metrics/src/server.rs index cd6e75e9..2742b5c3 100644 --- a/metrics/src/server.rs +++ b/metrics/src/server.rs @@ -20,6 +20,7 @@ pub async fn spawn_server( gather_metrics: impl Fn() -> Vec + Clone + Send + 'static, is_health_check: impl Fn() -> bool + Clone + Send + 'static, is_ready_check: impl Fn() -> bool + Clone + Send + 'static, + get_subscribers: impl Fn() -> usize + Clone + Send + 'static, shutdown: impl Future + Send + 'static, ) -> std::io::Result>> { let listener = TcpListener::bind(endpoint).await?; @@ -46,6 +47,7 @@ pub async fn spawn_server( let gather_metrics = gather_metrics.clone(); let is_health_check = is_health_check.clone(); let is_ready_check = is_ready_check.clone(); + let get_subscribers = get_subscribers.clone(); tokio::spawn(async move { if let Err(error) = ServerBuilder::new(TokioExecutor::new()) .serve_connection( @@ -54,6 +56,7 @@ pub async fn spawn_server( let gather_metrics = gather_metrics.clone(); let is_health_check = is_health_check.clone(); let is_ready_check = is_ready_check.clone(); + let get_subscribers = get_subscribers.clone(); async move { let (status, bytes) = match req.uri().path() { "/health" => { @@ -77,6 +80,10 @@ pub async fn spawn_server( ) } } + "/subscribers" => ( + StatusCode::OK, + Bytes::from(get_subscribers().to_string()), + ), _ => (StatusCode::NOT_FOUND, Bytes::new()), }; diff --git a/plugin-agave/src/metrics.rs b/plugin-agave/src/metrics.rs index 2e9e1e06..fcd1cbf8 100644 --- a/plugin-agave/src/metrics.rs +++ b/plugin-agave/src/metrics.rs @@ -62,6 +62,7 @@ pub async fn spawn_server( move || handle.render().into_bytes(), // metrics || true, // health || true, // ready + || 0, // subscribers (not applicable to plugin) shutdown, ) .await diff --git a/richat/src/grpc/server.rs b/richat/src/grpc/server.rs index e5c3f38b..86287008 100644 --- a/richat/src/grpc/server.rs +++ b/richat/src/grpc/server.rs @@ -833,6 +833,7 @@ impl Drop for SubscribeClientState { ); gauge!(metrics::GRPC_SUBSCRIBE_TOTAL, "x_subscription_id" => Arc::clone(&self.x_subscription_id)) .decrement(1); + metrics::GRPC_SUBSCRIBE_COUNT.fetch_sub(1, Ordering::Relaxed); } } @@ -845,6 +846,7 @@ impl SubscribeClientState { ); gauge!(metrics::GRPC_SUBSCRIBE_TOTAL, "x_subscription_id" => Arc::clone(&x_subscription_id)) .increment(1); + metrics::GRPC_SUBSCRIBE_COUNT.fetch_add(1, Ordering::Relaxed); let metric_cpu_usage = gauge!( metrics::GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL, diff --git a/richat/src/metrics.rs b/richat/src/metrics.rs index 3f31e24a..4a38cd46 100644 --- a/richat/src/metrics.rs +++ b/richat/src/metrics.rs @@ -10,7 +10,7 @@ use { future::Future, sync::{ Arc, - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, }, }, tokio::{ @@ -20,6 +20,12 @@ use { tracing::error, }; +/// Active gRPC subscribers. Incremented in `SubscribeClientState::new()`, +/// decremented in its `Drop`. Read by the `/subscribers` HTTP endpoint so +/// an external load-balancer weighting loop can size origin weights by +/// current load. +pub static GRPC_SUBSCRIBE_COUNT: AtomicUsize = AtomicUsize::new(0); + pub const BLOCK_MESSAGE_FAILED: &str = "block_message_failed"; // reason pub const CHANNEL_EVENTS_RECEIVED: &str = "channel_events_received"; // source, type pub const CHANNEL_SLOT: &str = "channel_slot"; // commitment @@ -159,6 +165,7 @@ pub async fn spawn_server( move || handle.render().into_bytes(), // metrics || true, // health move || is_ready.load(Ordering::Relaxed), // ready + || GRPC_SUBSCRIBE_COUNT.load(Ordering::Relaxed), // subscribers shutdown, ) .await