Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions metrics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub async fn spawn_server(
gather_metrics: impl Fn() -> Vec<u8> + Clone + Send + 'static,
is_health_check: impl Fn() -> bool + Clone + Send + 'static,
is_ready_check: impl Fn() -> bool + Clone + Send + 'static,
get_subscribers: impl Fn() -> usize + Clone + Send + 'static,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> std::io::Result<impl Future<Output = Result<(), JoinError>>> {
let listener = TcpListener::bind(endpoint).await?;
Expand All @@ -46,6 +47,7 @@ pub async fn spawn_server(
let gather_metrics = gather_metrics.clone();
let is_health_check = is_health_check.clone();
let is_ready_check = is_ready_check.clone();
let get_subscribers = get_subscribers.clone();
tokio::spawn(async move {
if let Err(error) = ServerBuilder::new(TokioExecutor::new())
.serve_connection(
Expand All @@ -54,6 +56,7 @@ pub async fn spawn_server(
let gather_metrics = gather_metrics.clone();
let is_health_check = is_health_check.clone();
let is_ready_check = is_ready_check.clone();
let get_subscribers = get_subscribers.clone();
async move {
let (status, bytes) = match req.uri().path() {
"/health" => {
Expand All @@ -77,6 +80,10 @@ pub async fn spawn_server(
)
}
}
"/subscribers" => (
StatusCode::OK,
Bytes::from(get_subscribers().to_string()),
),
_ => (StatusCode::NOT_FOUND, Bytes::new()),
};

Expand Down
1 change: 1 addition & 0 deletions plugin-agave/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub async fn spawn_server(
move || handle.render().into_bytes(), // metrics
|| true, // health
|| true, // ready
|| 0, // subscribers (not applicable to plugin)
shutdown,
)
.await
Expand Down
2 changes: 2 additions & 0 deletions richat/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ impl Drop for SubscribeClientState {
);
gauge!(metrics::GRPC_SUBSCRIBE_TOTAL, "x_subscription_id" => Arc::clone(&self.x_subscription_id))
.decrement(1);
metrics::GRPC_SUBSCRIBE_COUNT.fetch_sub(1, Ordering::Relaxed);
}
}

Expand All @@ -845,6 +846,7 @@ impl SubscribeClientState {
);
gauge!(metrics::GRPC_SUBSCRIBE_TOTAL, "x_subscription_id" => Arc::clone(&x_subscription_id))
.increment(1);
metrics::GRPC_SUBSCRIBE_COUNT.fetch_add(1, Ordering::Relaxed);

let metric_cpu_usage = gauge!(
metrics::GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL,
Expand Down
9 changes: 8 additions & 1 deletion richat/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
future::Future,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
},
},
tokio::{
Expand All @@ -20,6 +20,12 @@ use {
tracing::error,
};

/// Active gRPC subscribers. Incremented in `SubscribeClientState::new()`,
/// decremented in its `Drop`. Read by the `/subscribers` HTTP endpoint so
/// an external load-balancer weighting loop can size origin weights by
/// current load.
pub static GRPC_SUBSCRIBE_COUNT: AtomicUsize = AtomicUsize::new(0);

pub const BLOCK_MESSAGE_FAILED: &str = "block_message_failed"; // reason
pub const CHANNEL_EVENTS_RECEIVED: &str = "channel_events_received"; // source, type
pub const CHANNEL_SLOT: &str = "channel_slot"; // commitment
Expand Down Expand Up @@ -159,6 +165,7 @@ pub async fn spawn_server(
move || handle.render().into_bytes(), // metrics
|| true, // health
move || is_ready.load(Ordering::Relaxed), // ready
|| GRPC_SUBSCRIBE_COUNT.load(Ordering::Relaxed), // subscribers
shutdown,
)
.await
Expand Down
Loading