Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi

### Breaking

## 2026-05-19

- richat-v10.1.0

### Features

- richat: add subscribe handshake observability ([#207](https://github.com/lamports-dev/richat/pull/207))

## 2026-04-30

- richat-v10.0.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion richat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "richat"
version = "10.0.0"
version = "10.1.0"
authors = { workspace = true }
edition = { workspace = true }
description = "Richat App"
Expand Down
78 changes: 76 additions & 2 deletions richat/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
metrics::{self, GrpcSubscribeMessage},
version::VERSION,
},
::metrics::{Gauge, counter, gauge},
::metrics::{Gauge, counter, gauge, histogram},
crossbeam_queue::SegQueue,
futures::{
future::{FutureExt, TryFutureExt, ready, try_join_all},
Expand Down Expand Up @@ -392,6 +392,14 @@ impl GrpcServer {
.increment(duration_to_seconds(ts.elapsed()));
ticks_without_messages = 0;
}

// Observe the time from filter set -> first data message pushed
// to the client. This is the subscribe handshake latency the
// clients actually perceive. Taken-once: only the first push
// after the filter was applied records the histogram.
if pushed {
state.observe_time_to_first_message();
}
drop(state);

if pushed {
Expand Down Expand Up @@ -430,6 +438,7 @@ impl GrpcServer {
)
.increment(1);

let ts_subscribe_entry = Instant::now();
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);
let client = SubscribeClient::new(
id,
Expand All @@ -443,8 +452,10 @@ impl GrpcServer {
let shutdown = self.shutdown.clone();
let ping_interval = self.ping_interval;
let client = client.clone();
let x_subscription_id = Arc::clone(&x_subscription_id);
async move {
let mut ts_latest = Instant::now();
let mut slow_filter_warned = false;
loop {
tokio::select! {
() = shutdown.cancelled() => {
Expand All @@ -457,9 +468,26 @@ impl GrpcServer {
if state.finished {
break
}
let filter_set = state.filter.is_some();
drop(state);

let ts = Instant::now();

// Warn once when a client has been connected for >3s
// without a filter. Matches observed client timeouts.
if !filter_set && !slow_filter_warned {
let elapsed_ms = ts.duration_since(ts_subscribe_entry).as_millis() as u64;
if elapsed_ms > 3_000 {
slow_filter_warned = true;
warn!(
id,
x_subscription_id = x_subscription_id.as_ref(),
elapsed_ms,
"subscribe: filter not set after 3s"
);
}
}

if ts.duration_since(ts_latest) > ping_interval {
ts_latest = ts;
let data = SubscribeClientState::create_ping();
Expand All @@ -478,6 +506,8 @@ impl GrpcServer {
let client = client.clone();
let messages = self.messages.clone();
async move {
let mut filter_ever_set = false;
let mut handshake_abandoned = false;
loop {
match stream.message().await {
Ok(Some(message)) => {
Expand All @@ -490,6 +520,7 @@ impl GrpcServer {

let (subscribe_from_slot, new_filter) = get_filter(&limits, message);
let mut state = client.state_lock();
let was_unset = state.filter.is_none();
if let Err(error) = new_filter.and_then(|filter| {
if filter.contains_blocks() && subscribe_from_slot.is_some() {
return Err(Status::invalid_argument(
Expand Down Expand Up @@ -520,21 +551,48 @@ impl GrpcServer {
}
}
state.filter = Some(filter);
if was_unset {
state.ts_filter_set = Some(Instant::now());
}
Ok::<(), Status>(())
}) {
warn!(id, %error, "failed to handle request");
drop(state);
client.push_error(error);
} else {
// Record filter-parse latency only on the initial
// transition unset -> set. Subsequent filter
// updates (commitment change, etc.) are not
// part of the handshake and would skew the
// histogram.
if was_unset {
filter_ever_set = true;
histogram!(
metrics::GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS,
"x_subscription_id" => Arc::clone(&x_subscription_id)
)
.record(duration_to_seconds(ts_subscribe_entry.elapsed()));
}
drop(state);
info!(id, "set new filter");
continue;
}
}
Ok(None) => info!(id, "tx stream finished"),
Ok(None) => {
handshake_abandoned = !filter_ever_set;
info!(id, "tx stream finished");
}
Err(error) => warn!(id, %error, "error to receive new filter"),
};
break;
}
if handshake_abandoned {
counter!(
metrics::GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL,
"x_subscription_id" => Arc::clone(&x_subscription_id)
)
.increment(1);
}
info!(id, "drop client tx stream");
}
});
Expand Down Expand Up @@ -822,6 +880,10 @@ pub struct SubscribeClientState {
pub head: IndexLocation,
pub filter: Option<Filter>,
metric_cpu_usage: Gauge,
/// Taken once, when the worker pushes the first data message to this
/// client after a filter has been applied. Observes
/// `grpc_subscribe_time_to_first_message_seconds`.
pub ts_filter_set: Option<Instant>,
}

impl Drop for SubscribeClientState {
Expand Down Expand Up @@ -859,6 +921,7 @@ impl SubscribeClientState {
head: IndexLocation::Unknown,
filter: None,
metric_cpu_usage,
ts_filter_set: None,
}
}

Expand All @@ -881,6 +944,17 @@ impl SubscribeClientState {
fn create_pong(id: i32) -> Vec<u8> {
Self::serialize_ping_pong(UpdateOneof::Pong(SubscribeUpdatePong { id }))
}

/// Observe the first data message pushed after a filter is applied.
pub fn observe_time_to_first_message(&mut self) {
if let Some(ts_filter_set) = self.ts_filter_set.take() {
histogram!(
metrics::GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS,
"x_subscription_id" => Arc::clone(&self.x_subscription_id)
)
.record(duration_to_seconds(ts_filter_set.elapsed()));
}
}
}

#[derive(Debug)]
Expand Down
10 changes: 9 additions & 1 deletion richat/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::version::VERSION as VERSION_INFO,
::metrics::{counter, describe_counter, describe_gauge},
::metrics::{counter, describe_counter, describe_gauge, describe_histogram},
metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle},
richat_filter::filter::FilteredUpdateType,
richat_metrics::ConfigMetrics,
Expand Down Expand Up @@ -57,6 +57,11 @@ pub const GRPC_SUBSCRIBE_MESSAGES_BYTES_TOTAL: &str = "grpc_subscribe_messages_b
pub const GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL: &str = "grpc_subscribe_cpu_seconds_total"; // x_subscription_id
pub const GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL: &str =
"grpc_subscribe_replay_disk_cpu_seconds_total"; // x_subscription_id
pub const GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS: &str = "grpc_subscribe_filter_parse_seconds"; // x_subscription_id
pub const GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS: &str =
"grpc_subscribe_time_to_first_message_seconds"; // x_subscription_id
pub const GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL: &str =
"grpc_subscribe_handshake_abandoned_total"; // x_subscription_id, reason
pub const PUBSUB_SLOT: &str = "pubsub_slot"; // commitment
pub const PUBSUB_CACHED_SIGNATURES_TOTAL: &str = "pubsub_cached_signatures_total";
pub const PUBSUB_STORED_MESSAGES_COUNT_TOTAL: &str = "pubsub_stored_messages_count_total";
Expand Down Expand Up @@ -127,6 +132,9 @@ pub fn setup() -> Result<PrometheusHandle, BuildError> {
describe_counter!(GRPC_SUBSCRIBE_MESSAGES_BYTES_TOTAL, "Total size of gRPC messages in subscriptions by type");
describe_gauge!(GRPC_SUBSCRIBE_CPU_SECONDS_TOTAL, "CPU consumption of gRPC filters in subscriptions");
describe_gauge!(GRPC_SUBSCRIBE_REPLAY_DISK_SECONDS_TOTAL, "CPU consumption of gRPC filters in subscriptions on replay from disk");
describe_histogram!(GRPC_SUBSCRIBE_FILTER_PARSE_SECONDS, "Seconds between subscribe handshake start and the moment the client's SubscribeRequest is parsed into a filter");
describe_histogram!(GRPC_SUBSCRIBE_TIME_TO_FIRST_MESSAGE_SECONDS, "Seconds between filter being applied and the first data message pushed to the client");
describe_counter!(GRPC_SUBSCRIBE_HANDSHAKE_ABANDONED_TOTAL, "Subscribe handshakes where the client stream ended before a filter was ever set");
describe_gauge!(PUBSUB_SLOT, "Latest slot handled in PubSub by commitment");
describe_gauge!(PUBSUB_CACHED_SIGNATURES_TOTAL, "Number of cached signatures");
describe_gauge!(PUBSUB_STORED_MESSAGES_COUNT_TOTAL, "Number of stored filtered messages in cache");
Expand Down
3 changes: 3 additions & 0 deletions richat/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ impl Storage {

locked_state.head = IndexLocation::Storage(current_head);
req.state.head = Some(current_head);
if pushed {
locked_state.observe_time_to_first_message();
}

if req.state.read_finished && req.state.messages.is_empty() {
if let Some(head) = req.messages.get_head_by_replay_index(current_head + 1) {
Expand Down
Loading