From 2e41da107011d9070d266005186be0c64aace7a0 Mon Sep 17 00:00:00 2001 From: lukas Date: Wed, 15 Apr 2026 11:09:07 +0200 Subject: [PATCH 1/5] richat: add subscribe handshake observability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three metrics to diagnose 3-5s zero-byte stalls observed on new gRPC subscribers in EWR/LAX that cause clients to send RST_STREAM before any data reaches them. No behavioral fix yet — we first want one deploy's worth of data to tell us which latency actually dominates. Metrics added: - grpc_subscribe_filter_parse_seconds (histogram, labeled by x_subscription_id): seconds from subscribe2() entry until the SubscribeRequest is read off the wire and parsed into a Filter. - grpc_subscribe_time_to_first_message_seconds (histogram, labeled by x_subscription_id): seconds from the filter being applied until the worker loop pushes the first data message to the client. This is the latency the client actually perceives as "time to first byte of data". - grpc_subscribe_handshake_abandoned_total (counter, labeled by x_subscription_id): incremented when the client's request stream ends before a filter is ever set — i.e. client disconnected mid- handshake. Tells us how many stalls abandon pre-filter vs post-filter. Plus a one-shot warn! log inside the ping task when a client has been connected for >3s without a filter set. Threshold matches the observed client timeout window. Only the initial unset -> set transition is recorded for both histograms; subsequent filter updates (commitment change, etc.) are not part of the subscribe handshake and would skew the tails. --- richat/src/grpc/server.rs | 74 +++++++++++++++++++++++++++++++++++++-- richat/src/metrics.rs | 10 +++++- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/richat/src/grpc/server.rs b/richat/src/grpc/server.rs index e5c3f38b..aa44e1e7 100644 --- a/richat/src/grpc/server.rs +++ b/richat/src/grpc/server.rs @@ -6,7 +6,7 @@ use { metrics::{self, GrpcSubscribeMessage}, version::VERSION, }, - ::metrics::{Gauge, counter, gauge}, + ::metrics::{Gauge, counter, gauge, histogram}, crossbeam_queue::SegQueue, futures::{ future::{FutureExt, TryFutureExt, ready, try_join_all}, @@ -392,6 +392,20 @@ impl GrpcServer { .increment(duration_to_seconds(ts.elapsed())); ticks_without_messages = 0; } + + // Observe the time from filter set -> first data message pushed + // to the client. This is the subscribe handshake latency the + // clients actually perceive. Taken-once: only the first push + // after the filter was applied records the histogram. + if pushed { + if let Some(ts_filter_set) = state.ts_filter_set.take() { + histogram!( + metrics::GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, + "x_subscription_id" => Arc::clone(&state.x_subscription_id) + ) + .record(duration_to_seconds(ts_filter_set.elapsed())); + } + } drop(state); if pushed { @@ -430,6 +444,7 @@ impl GrpcServer { ) .increment(1); + let ts_subscribe_entry = Instant::now(); let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed); let client = SubscribeClient::new( id, @@ -443,8 +458,10 @@ impl GrpcServer { let shutdown = self.shutdown.clone(); let ping_interval = self.ping_interval; let client = client.clone(); + let x_subscription_id = Arc::clone(&x_subscription_id); async move { let mut ts_latest = Instant::now(); + let mut slow_filter_warned = false; loop { tokio::select! { () = shutdown.cancelled() => { @@ -457,9 +474,28 @@ impl GrpcServer { if state.finished { break } + let filter_set = state.filter.is_some(); drop(state); let ts = Instant::now(); + + // Warn once when a client has been connected for >3s + // without a filter. Matches observed client timeouts. + if !filter_set + && !slow_filter_warned + && ts.duration_since(ts_subscribe_entry) > Duration::from_secs(3) + { + slow_filter_warned = true; + warn!( + id, + x_subscription_id = x_subscription_id.as_ref(), + elapsed_ms = ts + .duration_since(ts_subscribe_entry) + .as_millis() as u64, + "subscribe: filter not set after 3s" + ); + } + if ts.duration_since(ts_latest) > ping_interval { ts_latest = ts; let data = SubscribeClientState::create_ping(); @@ -477,7 +513,9 @@ impl GrpcServer { let limits = Arc::clone(&self.filter_limits); let client = client.clone(); let messages = self.messages.clone(); + let x_subscription_id_for_task = Arc::clone(&x_subscription_id); async move { + let mut filter_ever_set = false; loop { match stream.message().await { Ok(Some(message)) => { @@ -490,6 +528,7 @@ impl GrpcServer { let (subscribe_from_slot, new_filter) = get_filter(&limits, message); let mut state = client.state_lock(); + let was_unset = state.filter.is_none(); if let Err(error) = new_filter.and_then(|filter| { if filter.contains_blocks() && subscribe_from_slot.is_some() { return Err(Status::invalid_argument( @@ -512,7 +551,7 @@ impl GrpcServer { { let metric_cpu_usage = gauge!( metrics::GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL, - "x_subscription_id" => Arc::clone(&x_subscription_id) + "x_subscription_id" => Arc::clone(&x_subscription_id_for_task) ); messages .replay_from_storage(client.clone(), metric_cpu_usage) @@ -520,12 +559,31 @@ impl GrpcServer { } } state.filter = Some(filter); + if was_unset { + state.ts_filter_set = Some(Instant::now()); + } Ok::<(), Status>(()) }) { warn!(id, %error, "failed to handle request"); drop(state); client.push_error(error); } else { + // Record filter-parse latency only on the initial + // transition unset -> set. Subsequent filter + // updates (commitment change, etc.) are not + // part of the handshake and would skew the + // histogram. + if was_unset { + filter_ever_set = true; + histogram!( + metrics::GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS, + "x_subscription_id" => Arc::clone(&x_subscription_id_for_task) + ) + .record(duration_to_seconds( + ts_subscribe_entry.elapsed(), + )); + } + drop(state); info!(id, "set new filter"); continue; } @@ -535,6 +593,13 @@ impl GrpcServer { }; break; } + if !filter_ever_set { + counter!( + metrics::GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, + "x_subscription_id" => Arc::clone(&x_subscription_id_for_task) + ) + .increment(1); + } info!(id, "drop client tx stream"); } }); @@ -822,6 +887,10 @@ pub struct SubscribeClientState { pub head: IndexLocation, pub filter: Option, metric_cpu_usage: Gauge, + /// Taken once, when the worker pushes the first data message to this + /// client after a filter has been applied. Observes + /// `grpc_subscribe_time_to_first_message_seconds`. + pub ts_filter_set: Option, } impl Drop for SubscribeClientState { @@ -859,6 +928,7 @@ impl SubscribeClientState { head: IndexLocation::Unknown, filter: None, metric_cpu_usage, + ts_filter_set: None, } } diff --git a/richat/src/metrics.rs b/richat/src/metrics.rs index 3f31e24a..322cb342 100644 --- a/richat/src/metrics.rs +++ b/richat/src/metrics.rs @@ -1,6 +1,6 @@ use { crate::version::VERSION as VERSION_INFO, - ::metrics::{counter, describe_counter, describe_gauge}, + ::metrics::{counter, describe_counter, describe_gauge, describe_histogram}, metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle}, richat_filter::filter::FilteredUpdateType, richat_metrics::ConfigMetrics, @@ -57,6 +57,11 @@ pub const GRPC_SUBSCRIBE_MESSAGES_BYTES_TOTAL: &str = "grpc_subscribe_messages_b pub const GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL: &str = "grpc_subscribe_cpu_seconds_total"; // x_subscription_id pub const GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL: &str = "grpc_subscribe_replay_disk_cpu_seconds_total"; // x_subscription_id +pub const GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS: &str = "grpc_subscribe_filter_parse_seconds"; // x_subscription_id +pub const GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS: &str = + "grpc_subscribe_time_to_first_message_seconds"; // x_subscription_id +pub const GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL: &str = + "grpc_subscribe_handshake_abandoned_total"; // x_subscription_id, reason pub const PUBSUB_SLOT: &str = "pubsub_slot"; // commitment pub const PUBSUB_CACHED_SIGNATURES_TOTAL: &str = "pubsub_cached_signatures_total"; pub const PUBSUB_STORED_MESSAGES_COUNT_TOTAL: &str = "pubsub_stored_messages_count_total"; @@ -127,6 +132,9 @@ pub fn setup() -> Result { describe_counter!(GRPC_SUBSCRIBE_MESSAGES_BYTES_TOTAL, "Total size of gRPC messages in subscriptions by type"); describe_gauge!(GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL, "CPU consumption of gRPC filters in subscriptions"); describe_gauge!(GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL, "CPU consumption of gRPC filters in subscriptions on replay from disk"); + describe_histogram!(GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS, "Seconds between subscribe handshake start and the moment the client's SubscribeRequest is parsed into a filter"); + describe_histogram!(GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, "Seconds between filter being applied and the first data message pushed to the client"); + describe_counter!(GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, "Subscribe handshakes where the client stream ended before a filter was ever set"); describe_gauge!(PUBSUB_SLOT, "Latest slot handled in PubSub by commitment"); describe_gauge!(PUBSUB_CACHED_SIGNATURES_TOTAL, "Number of cached signatures"); describe_gauge!(PUBSUB_STORED_MESSAGES_COUNT_TOTAL, "Number of stored filtered messages in cache"); From addf2aa549f17016f52637d68a1b85665092f8be Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Tue, 19 May 2026 09:47:12 -0500 Subject: [PATCH 2/5] rm extra Arc::clone --- richat/src/grpc/server.rs | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/richat/src/grpc/server.rs b/richat/src/grpc/server.rs index aa44e1e7..2d83a8ec 100644 --- a/richat/src/grpc/server.rs +++ b/richat/src/grpc/server.rs @@ -481,19 +481,17 @@ impl GrpcServer { // Warn once when a client has been connected for >3s // without a filter. Matches observed client timeouts. - if !filter_set - && !slow_filter_warned - && ts.duration_since(ts_subscribe_entry) > Duration::from_secs(3) - { - slow_filter_warned = true; - warn!( - id, - x_subscription_id = x_subscription_id.as_ref(), - elapsed_ms = ts - .duration_since(ts_subscribe_entry) - .as_millis() as u64, - "subscribe: filter not set after 3s" - ); + if !filter_set && !slow_filter_warned { + let elapsed_ms = ts.duration_since(ts_subscribe_entry).as_millis() as u64; + if elapsed_ms > 3_000 { + slow_filter_warned = true; + warn!( + id, + x_subscription_id = x_subscription_id.as_ref(), + elapsed_ms, + "subscribe: filter not set after 3s" + ); + } } if ts.duration_since(ts_latest) > ping_interval { @@ -513,7 +511,6 @@ impl GrpcServer { let limits = Arc::clone(&self.filter_limits); let client = client.clone(); let messages = self.messages.clone(); - let x_subscription_id_for_task = Arc::clone(&x_subscription_id); async move { let mut filter_ever_set = false; loop { @@ -551,7 +548,7 @@ impl GrpcServer { { let metric_cpu_usage = gauge!( metrics::GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL, - "x_subscription_id" => Arc::clone(&x_subscription_id_for_task) + "x_subscription_id" => Arc::clone(&x_subscription_id) ); messages .replay_from_storage(client.clone(), metric_cpu_usage) @@ -577,11 +574,9 @@ impl GrpcServer { filter_ever_set = true; histogram!( metrics::GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS, - "x_subscription_id" => Arc::clone(&x_subscription_id_for_task) + "x_subscription_id" => Arc::clone(&x_subscription_id) ) - .record(duration_to_seconds( - ts_subscribe_entry.elapsed(), - )); + .record(duration_to_seconds(ts_subscribe_entry.elapsed())); } drop(state); info!(id, "set new filter"); @@ -596,7 +591,7 @@ impl GrpcServer { if !filter_ever_set { counter!( metrics::GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, - "x_subscription_id" => Arc::clone(&x_subscription_id_for_task) + "x_subscription_id" => Arc::clone(&x_subscription_id) ) .increment(1); } From d53f93f7e1d2fcdebb74faed2a5aa096877b0a46 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Tue, 19 May 2026 09:50:31 -0500 Subject: [PATCH 3/5] bump version --- CHANGELOG.md | 8 ++++++++ Cargo.lock | 2 +- richat/Cargo.toml | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96b5f251..7ceb6ed0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2026-05-19 + +- richat-v10.1.0 + +### Features + +- richat: add subscribe handshake observability ([#207](https://github.com/lamports-dev/richat/pull/207)) + ## 2026-04-30 - richat-v10.0.0 diff --git a/Cargo.lock b/Cargo.lock index d9f25e14..2e9ffb90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3217,7 +3217,7 @@ dependencies = [ [[package]] name = "richat" -version = "10.0.0" +version = "10.1.0" dependencies = [ "affinity-linux", "agave-reserved-account-keys", diff --git a/richat/Cargo.toml b/richat/Cargo.toml index eb0c1c5a..e6929548 100644 --- a/richat/Cargo.toml +++ b/richat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "richat" -version = "10.0.0" +version = "10.1.0" authors = { workspace = true } edition = { workspace = true } description = "Richat App" From e565d0d28f32ffdf9e7891085e8152dce305d41f Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Tue, 19 May 2026 09:56:32 -0500 Subject: [PATCH 4/5] write metric only for abandoned handshake --- richat/src/grpc/server.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/richat/src/grpc/server.rs b/richat/src/grpc/server.rs index 2d83a8ec..4b66bec0 100644 --- a/richat/src/grpc/server.rs +++ b/richat/src/grpc/server.rs @@ -513,6 +513,7 @@ impl GrpcServer { let messages = self.messages.clone(); async move { let mut filter_ever_set = false; + let mut handshake_abandoned = false; loop { match stream.message().await { Ok(Some(message)) => { @@ -583,12 +584,15 @@ impl GrpcServer { continue; } } - Ok(None) => info!(id, "tx stream finished"), + Ok(None) => { + handshake_abandoned = !filter_ever_set; + info!(id, "tx stream finished"); + } Err(error) => warn!(id, %error, "error to receive new filter"), }; break; } - if !filter_ever_set { + if handshake_abandoned { counter!( metrics::GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, "x_subscription_id" => Arc::clone(&x_subscription_id) From a74ab3932c586a3d433730dfd62b51320461f5f9 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Tue, 19 May 2026 09:59:11 -0500 Subject: [PATCH 5/5] observe from storage replay too --- richat/src/grpc/server.rs | 19 ++++++++++++------- richat/src/storage/mod.rs | 3 +++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/richat/src/grpc/server.rs b/richat/src/grpc/server.rs index 4b66bec0..fb2e540d 100644 --- a/richat/src/grpc/server.rs +++ b/richat/src/grpc/server.rs @@ -398,13 +398,7 @@ impl GrpcServer { // clients actually perceive. Taken-once: only the first push // after the filter was applied records the histogram. if pushed { - if let Some(ts_filter_set) = state.ts_filter_set.take() { - histogram!( - metrics::GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, - "x_subscription_id" => Arc::clone(&state.x_subscription_id) - ) - .record(duration_to_seconds(ts_filter_set.elapsed())); - } + state.observe_time_to_first_message(); } drop(state); @@ -950,6 +944,17 @@ impl SubscribeClientState { fn create_pong(id: i32) -> Vec { Self::serialize_ping_pong(UpdateOneof::Pong(SubscribeUpdatePong { id })) } + + /// Observe the first data message pushed after a filter is applied. + pub fn observe_time_to_first_message(&mut self) { + if let Some(ts_filter_set) = self.ts_filter_set.take() { + histogram!( + metrics::GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, + "x_subscription_id" => Arc::clone(&self.x_subscription_id) + ) + .record(duration_to_seconds(ts_filter_set.elapsed())); + } + } } #[derive(Debug)] diff --git a/richat/src/storage/mod.rs b/richat/src/storage/mod.rs index 852c44fc..672d039a 100644 --- a/richat/src/storage/mod.rs +++ b/richat/src/storage/mod.rs @@ -170,6 +170,9 @@ impl Storage { locked_state.head = IndexLocation::Storage(current_head); req.state.head = Some(current_head); + if pushed { + locked_state.observe_time_to_first_message(); + } if req.state.read_finished && req.state.messages.is_empty() { if let Some(head) = req.messages.get_head_by_replay_index(current_head + 1) {