diff --git a/CHANGELOG.md b/CHANGELOG.md index 96b5f251..7ceb6ed0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2026-05-19 + +- richat-v10.1.0 + +### Features + +- richat: add subscribe handshake observability ([#207](https://github.com/lamports-dev/richat/pull/207)) + ## 2026-04-30 - richat-v10.0.0 diff --git a/Cargo.lock b/Cargo.lock index d9f25e14..2e9ffb90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3217,7 +3217,7 @@ dependencies = [ [[package]] name = "richat" -version = "10.0.0" +version = "10.1.0" dependencies = [ "affinity-linux", "agave-reserved-account-keys", diff --git a/richat/Cargo.toml b/richat/Cargo.toml index eb0c1c5a..e6929548 100644 --- a/richat/Cargo.toml +++ b/richat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "richat" -version = "10.0.0" +version = "10.1.0" authors = { workspace = true } edition = { workspace = true } description = "Richat App" diff --git a/richat/src/grpc/server.rs b/richat/src/grpc/server.rs index e5c3f38b..fb2e540d 100644 --- a/richat/src/grpc/server.rs +++ b/richat/src/grpc/server.rs @@ -6,7 +6,7 @@ use { metrics::{self, GrpcSubscribeMessage}, version::VERSION, }, - ::metrics::{Gauge, counter, gauge}, + ::metrics::{Gauge, counter, gauge, histogram}, crossbeam_queue::SegQueue, futures::{ future::{FutureExt, TryFutureExt, ready, try_join_all}, @@ -392,6 +392,14 @@ impl GrpcServer { .increment(duration_to_seconds(ts.elapsed())); ticks_without_messages = 0; } + + // Observe the time from filter set -> first data message pushed + // to the client. This is the subscribe handshake latency the + // clients actually perceive. Taken-once: only the first push + // after the filter was applied records the histogram. + if pushed { + state.observe_time_to_first_message(); + } drop(state); if pushed { @@ -430,6 +438,7 @@ impl GrpcServer { ) .increment(1); + let ts_subscribe_entry = Instant::now(); let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed); let client = SubscribeClient::new( id, @@ -443,8 +452,10 @@ impl GrpcServer { let shutdown = self.shutdown.clone(); let ping_interval = self.ping_interval; let client = client.clone(); + let x_subscription_id = Arc::clone(&x_subscription_id); async move { let mut ts_latest = Instant::now(); + let mut slow_filter_warned = false; loop { tokio::select! { () = shutdown.cancelled() => { @@ -457,9 +468,26 @@ impl GrpcServer { if state.finished { break } + let filter_set = state.filter.is_some(); drop(state); let ts = Instant::now(); + + // Warn once when a client has been connected for >3s + // without a filter. Matches observed client timeouts. + if !filter_set && !slow_filter_warned { + let elapsed_ms = ts.duration_since(ts_subscribe_entry).as_millis() as u64; + if elapsed_ms > 3_000 { + slow_filter_warned = true; + warn!( + id, + x_subscription_id = x_subscription_id.as_ref(), + elapsed_ms, + "subscribe: filter not set after 3s" + ); + } + } + if ts.duration_since(ts_latest) > ping_interval { ts_latest = ts; let data = SubscribeClientState::create_ping(); @@ -478,6 +506,8 @@ impl GrpcServer { let client = client.clone(); let messages = self.messages.clone(); async move { + let mut filter_ever_set = false; + let mut handshake_abandoned = false; loop { match stream.message().await { Ok(Some(message)) => { @@ -490,6 +520,7 @@ impl GrpcServer { let (subscribe_from_slot, new_filter) = get_filter(&limits, message); let mut state = client.state_lock(); + let was_unset = state.filter.is_none(); if let Err(error) = new_filter.and_then(|filter| { if filter.contains_blocks() && subscribe_from_slot.is_some() { return Err(Status::invalid_argument( @@ -520,21 +551,48 @@ impl GrpcServer { } } state.filter = Some(filter); + if was_unset { + state.ts_filter_set = Some(Instant::now()); + } Ok::<(), Status>(()) }) { warn!(id, %error, "failed to handle request"); drop(state); client.push_error(error); } else { + // Record filter-parse latency only on the initial + // transition unset -> set. Subsequent filter + // updates (commitment change, etc.) are not + // part of the handshake and would skew the + // histogram. + if was_unset { + filter_ever_set = true; + histogram!( + metrics::GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS, + "x_subscription_id" => Arc::clone(&x_subscription_id) + ) + .record(duration_to_seconds(ts_subscribe_entry.elapsed())); + } + drop(state); info!(id, "set new filter"); continue; } } - Ok(None) => info!(id, "tx stream finished"), + Ok(None) => { + handshake_abandoned = !filter_ever_set; + info!(id, "tx stream finished"); + } Err(error) => warn!(id, %error, "error to receive new filter"), }; break; } + if handshake_abandoned { + counter!( + metrics::GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, + "x_subscription_id" => Arc::clone(&x_subscription_id) + ) + .increment(1); + } info!(id, "drop client tx stream"); } }); @@ -822,6 +880,10 @@ pub struct SubscribeClientState { pub head: IndexLocation, pub filter: Option, metric_cpu_usage: Gauge, + /// Taken once, when the worker pushes the first data message to this + /// client after a filter has been applied. Observes + /// `grpc_subscribe_time_to_first_message_seconds`. + pub ts_filter_set: Option, } impl Drop for SubscribeClientState { @@ -859,6 +921,7 @@ impl SubscribeClientState { head: IndexLocation::Unknown, filter: None, metric_cpu_usage, + ts_filter_set: None, } } @@ -881,6 +944,17 @@ impl SubscribeClientState { fn create_pong(id: i32) -> Vec { Self::serialize_ping_pong(UpdateOneof::Pong(SubscribeUpdatePong { id })) } + + /// Observe the first data message pushed after a filter is applied. + pub fn observe_time_to_first_message(&mut self) { + if let Some(ts_filter_set) = self.ts_filter_set.take() { + histogram!( + metrics::GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, + "x_subscription_id" => Arc::clone(&self.x_subscription_id) + ) + .record(duration_to_seconds(ts_filter_set.elapsed())); + } + } } #[derive(Debug)] diff --git a/richat/src/metrics.rs b/richat/src/metrics.rs index 3f31e24a..322cb342 100644 --- a/richat/src/metrics.rs +++ b/richat/src/metrics.rs @@ -1,6 +1,6 @@ use { crate::version::VERSION as VERSION_INFO, - ::metrics::{counter, describe_counter, describe_gauge}, + ::metrics::{counter, describe_counter, describe_gauge, describe_histogram}, metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle}, richat_filter::filter::FilteredUpdateType, richat_metrics::ConfigMetrics, @@ -57,6 +57,11 @@ pub const GRPC_SUBSCRIBE_MESSAGES_BYTES_TOTAL: &str = "grpc_subscribe_messages_b pub const GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL: &str = "grpc_subscribe_cpu_seconds_total"; // x_subscription_id pub const GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL: &str = "grpc_subscribe_replay_disk_cpu_seconds_total"; // x_subscription_id +pub const GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS: &str = "grpc_subscribe_filter_parse_seconds"; // x_subscription_id +pub const GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS: &str = + "grpc_subscribe_time_to_first_message_seconds"; // x_subscription_id +pub const GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL: &str = + "grpc_subscribe_handshake_abandoned_total"; // x_subscription_id, reason pub const PUBSUB_SLOT: &str = "pubsub_slot"; // commitment pub const PUBSUB_CACHED_SIGNATURES_TOTAL: &str = "pubsub_cached_signatures_total"; pub const PUBSUB_STORED_MESSAGES_COUNT_TOTAL: &str = "pubsub_stored_messages_count_total"; @@ -127,6 +132,9 @@ pub fn setup() -> Result { describe_counter!(GRPC_SUBSCRIBE_MESSAGES_BYTES_TOTAL, "Total size of gRPC messages in subscriptions by type"); describe_gauge!(GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL, "CPU consumption of gRPC filters in subscriptions"); describe_gauge!(GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL, "CPU consumption of gRPC filters in subscriptions on replay from disk"); + describe_histogram!(GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS, "Seconds between subscribe handshake start and the moment the client's SubscribeRequest is parsed into a filter"); + describe_histogram!(GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, "Seconds between filter being applied and the first data message pushed to the client"); + describe_counter!(GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, "Subscribe handshakes where the client stream ended before a filter was ever set"); describe_gauge!(PUBSUB_SLOT, "Latest slot handled in PubSub by commitment"); describe_gauge!(PUBSUB_CACHED_SIGNATURES_TOTAL, "Number of cached signatures"); describe_gauge!(PUBSUB_STORED_MESSAGES_COUNT_TOTAL, "Number of stored filtered messages in cache"); diff --git a/richat/src/storage/mod.rs b/richat/src/storage/mod.rs index 852c44fc..672d039a 100644 --- a/richat/src/storage/mod.rs +++ b/richat/src/storage/mod.rs @@ -170,6 +170,9 @@ impl Storage { locked_state.head = IndexLocation::Storage(current_head); req.state.head = Some(current_head); + if pushed { + locked_state.observe_time_to_first_message(); + } if req.state.read_finished && req.state.messages.is_empty() { if let Some(head) = req.messages.get_head_by_replay_index(current_head + 1) {