From d8b44814676a62142282baad5c19d84ed2c53981 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Wed, 3 Dec 2025 01:46:11 +0000 Subject: [PATCH 1/2] Increase reconstruct channel size (#62) add --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- proxy/src/forwarder.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7faed71a..e4d8dfe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2746,7 +2746,7 @@ dependencies = [ [[package]] name = "jito-protos" -version = "0.2.12" +version = "0.2.13" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", @@ -2757,7 +2757,7 @@ dependencies = [ [[package]] name = "jito-shredstream-proxy" -version = "0.2.12" +version = "0.2.13" dependencies = [ "ahash 0.8.11", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index 12dc7faf..cfc4a9ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["examples", "jito_protos", "proxy"] resolver = "2" [workspace.package] -version = "0.2.12" +version = "0.2.13" description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://docs.jito.wtf/lowlatencytxnfeed/ for details." authors = ["Jito Team "] homepage = "https://jito.wtf/" diff --git a/proxy/src/forwarder.rs b/proxy/src/forwarder.rs index 2c436618..0fdb5f16 100644 --- a/proxy/src/forwarder.rs +++ b/proxy/src/forwarder.rs @@ -682,7 +682,7 @@ mod tests { thread::spawn(move || listen_and_collect(socket, to_receive)); }); - let (reconstruct_tx, _reconstruct_rx) = crossbeam_channel::bounded(1_024); + let (reconstruct_tx, _reconstruct_rx) = crossbeam_channel::bounded(10_240); // send packets recv_from_channel_and_send_multiple_dest( packet_receiver.recv(), From c571e087978a40ffeac420dd32e55c32f6ca832c Mon Sep 17 00:00:00 2001 From: leafaar Date: Fri, 23 Jan 2026 10:57:53 -0300 Subject: [PATCH 2/2] feat(grpc): add subscribe-only mode without forwarding --- proxy/src/heartbeat.rs | 113 +++++++++++++++++++++++++++++++++++++++++ proxy/src/main.rs | 103 +++++++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+) diff --git a/proxy/src/heartbeat.rs b/proxy/src/heartbeat.rs index 0f6da5c8..b7ea5680 100644 --- a/proxy/src/heartbeat.rs +++ b/proxy/src/heartbeat.rs @@ -196,6 +196,119 @@ pub fn heartbeat_loop_thread( }).unwrap() } +/// Simplified heartbeat loop that only performs authentication and sends heartbeats. +/// Does not check for received shreds or track metrics. Used for subscribe-only mode. +#[allow(clippy::too_many_arguments)] +pub fn heartbeat_loop_subscribe_only( + block_engine_url: String, + auth_url: String, + auth_keypair: Arc, + desired_regions: Vec, + recv_socket: SocketAddr, + runtime: Runtime, + service_name: String, + shutdown_receiver: Receiver<()>, + exit: Arc, +) -> JoinHandle<()> { + Builder::new() + .name("ssPxyHbeatSub".to_string()) + .spawn(move || { + let heartbeat_socket = jito_protos::shared::Socket { + ip: recv_socket.ip().to_string(), + port: recv_socket.port() as i64, + }; + let mut heartbeat_interval = Duration::from_secs(1); + let mut heartbeat_tick = crossbeam_channel::tick(heartbeat_interval); + let mut client_restart_count_cumulative = 0u64; + let mut successful_heartbeat_count_cumulative = 0u64; + let mut failed_heartbeat_count_cumulative = 0u64; + + while !exit.load(Ordering::Relaxed) { + let per_con_exit = ScopedAtomicBool::default(); + info!("Starting heartbeat client (subscribe-only mode)"); + let shredstream_client_res = runtime.block_on(get_grpc_client( + block_engine_url.clone(), + auth_url.clone(), + auth_keypair.clone(), + service_name.clone(), + per_con_exit.get_inner_clone(), + )); + + let (mut shredstream_client, _refresh_thread_hdl) = match shredstream_client_res { + Ok(c) => c, + Err(e) => { + warn!("Failed to connect to block engine, retrying. Error: {e}"); + client_restart_count_cumulative += 1; + datapoint_warn!( + "shredstream_proxy-heartbeat_client_error", + "block_engine_url" => block_engine_url, + ("errors", 1, i64), + ("error_str", e.to_string(), String), + ); + sleep(Duration::from_secs(5)); + continue; + } + }; + + let mut successful_heartbeat_count = 0u64; + let mut failed_heartbeat_count = 0u64; + + while !exit.load(Ordering::Relaxed) { + crossbeam_channel::select! { + recv(heartbeat_tick) -> _ => { + let heartbeat_result = runtime.block_on( + shredstream_client.send_heartbeat(Heartbeat { + socket: Some(heartbeat_socket.clone()), + regions: desired_regions.clone(), + }) + ); + + match heartbeat_result { + Ok(hb) => { + let new_interval = + Duration::from_millis((hb.get_ref().ttl_ms / 3) as u64); + if heartbeat_interval != new_interval { + info!("Sending heartbeat every {new_interval:?}."); + heartbeat_interval = new_interval; + heartbeat_tick = crossbeam_channel::tick(new_interval); + } + successful_heartbeat_count += 1; + } + Err(err) => { + if err.code() == Code::InvalidArgument { + panic!("Invalid arguments: {err}."); + }; + warn!("Error sending heartbeat: {err}"); + failed_heartbeat_count += 1; + // Restart client on repeated failures + if failed_heartbeat_count > 5 { + warn!("Too many heartbeat failures, restarting client."); + break; + } + } + } + } + + recv(shutdown_receiver) -> _ => { + break; + } + } + } + + successful_heartbeat_count_cumulative += successful_heartbeat_count; + failed_heartbeat_count_cumulative += failed_heartbeat_count; + } + + info!( + "Exiting heartbeat thread (subscribe-only), sent {} successful, {} failed heartbeats. Client restarted {} times.", + successful_heartbeat_count_cumulative, + failed_heartbeat_count_cumulative, + client_restart_count_cumulative + ); + }) + .unwrap() +} + pub async fn get_grpc_client( block_engine_url: String, auth_url: String, diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 8bcae1c7..769c13f0 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -56,6 +56,10 @@ enum ProxySubcommands { /// Does not request shreds from Jito. Sends anything received on `src-bind-addr`:`src-bind-port` to all destinations. ForwardOnly(CommonArgs), + + /// Requests shreds from Jito to be sent to a specific port without binding a socket or forwarding. + /// Only performs authentication and heartbeat. Useful when another process is listening on the destination port. + SubscribeOnly(SubscribeOnlyArgs), } #[derive(clap::Args, Clone, Debug)] @@ -82,6 +86,36 @@ struct ShredstreamArgs { common_args: CommonArgs, } +#[derive(clap::Args, Clone, Debug)] +struct SubscribeOnlyArgs { + /// Address for Jito Block Engine. + /// See https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details + #[arg(long, env)] + block_engine_url: String, + + /// Manual override for auth service address. For internal use. + #[arg(long, env)] + auth_url: Option, + + /// Path to keypair file used to authenticate with the backend. + #[arg(long, env)] + auth_keypair: PathBuf, + + /// Desired regions to receive heartbeats from. + /// Receives `n` different streams. Requires at least 1 region, comma separated. + #[arg(long, env, value_delimiter = ',', required(true))] + desired_regions: Vec, + + /// Port to tell Jito to send shreds to. This process will NOT listen on this port. + #[arg(long, env)] + dest_port: u16, + + /// Public IP address to use. + /// Overrides value fetched from `ifconfig.me`. + #[arg(long, env)] + public_ip: Option, +} + #[derive(clap::Args, Clone, Debug)] struct CommonArgs { /// Address where Shredstream proxy listens. @@ -221,10 +255,17 @@ fn main() -> Result<(), ShredstreamProxyError> { let all_args: Args = Args::parse(); let shredstream_args = all_args.shredstream_args.clone(); + + // Handle SubscribeOnly mode separately - it only needs heartbeat + if let ProxySubcommands::SubscribeOnly(ref sub_args) = shredstream_args { + return run_subscribe_only_mode(sub_args.clone()); + } + // common args let args = match all_args.shredstream_args { ProxySubcommands::Shredstream(x) => x.common_args, ProxySubcommands::ForwardOnly(x) => x, + ProxySubcommands::SubscribeOnly(_) => unreachable!(), }; set_host_id(hostname::get()?.into_string().unwrap()); if (args.endpoint_discovery_url.is_none() && args.discovered_endpoints_port.is_some()) @@ -376,6 +417,68 @@ fn main() -> Result<(), ShredstreamProxyError> { Ok(()) } +fn run_subscribe_only_mode(args: SubscribeOnlyArgs) -> Result<(), ShredstreamProxyError> { + set_host_id(hostname::get()?.into_string().unwrap()); + + let exit = Arc::new(AtomicBool::new(false)); + let (shutdown_sender, shutdown_receiver) = + shutdown_notifier(exit.clone()).expect("Failed to set up signal handler"); + let panic_hook = panic::take_hook(); + { + let exit = exit.clone(); + panic::set_hook(Box::new(move |panic_info| { + exit.store(true, Ordering::SeqCst); + let _ = shutdown_sender.send(()); + error!("exiting process"); + sleep(Duration::from_secs(1)); + panic_hook(panic_info); + })); + } + + let runtime = Runtime::new()?; + + if args.desired_regions.len() > 2 { + warn!( + "Too many regions requested, only regions: {:?} will be used", + &args.desired_regions[..2] + ); + } + + let auth_keypair = Arc::new( + read_keypair_file(Path::new(&args.auth_keypair)).unwrap_or_else(|e| { + panic!( + "Unable to parse keypair file. Ensure that file {:?} is readable. Error: {e}", + args.auth_keypair + ) + }), + ); + + let public_ip = args.public_ip.unwrap_or_else(|| get_public_ip().unwrap()); + let recv_socket = SocketAddr::new(public_ip, args.dest_port); + + info!( + "Starting subscribe-only mode. Requesting shreds to be sent to {}", + recv_socket + ); + + let heartbeat_hdl = heartbeat::heartbeat_loop_subscribe_only( + args.block_engine_url.clone(), + args.auth_url.unwrap_or(args.block_engine_url), + auth_keypair, + args.desired_regions, + recv_socket, + runtime, + "shredstream_proxy_subscribe".to_string(), + shutdown_receiver, + exit, + ); + + heartbeat_hdl.join().expect("heartbeat thread panicked"); + + info!("Exiting subscribe-only mode."); + Ok(()) +} + fn start_heartbeat( args: ShredstreamArgs, exit: &Arc,