From 47015fb1fad19f167af99d7e60c8476e24c4e0e7 Mon Sep 17 00:00:00 2001 From: lukas Date: Tue, 21 Apr 2026 18:15:01 +0200 Subject: [PATCH] richat: fall back to live tip when source cannot replay and another can When richat resumes after a restart, every source is asked to stream starting from the last finalized slot persisted in storage. If one source has a shorter backlog than that slot, `subscribe()` returns `ReplayFromSlotNotAvailable` and the existing code simply slept and retried the same unreachable slot. As long as any other source could replay and fill the gap, the short-window source stayed parked forever and never delivered live messages, even though no data was actually missing from the channel. Behavior change. When a replay error is reported and `report_replay_failed(name)` returns `false` (meaning at least one other source is still expected to replay), flip a per-source flag and reconnect with `replay_from_slot = None`, i.e. at live tip. The channel still has no gap because another source is covering the history, and this source resumes delivering future messages. The flag is cleared after every successful subscribe, so a later disconnect will retry the normal replay path first. By that point `global_replay_from_slot` is typically close to live and the source's backlog will cover it, keeping reconnects gap free when possible. Observed on richat-ewr-frontend-5, where QuickNode's Yellowstone-grpc backlog does not reach back to the last finalized slot after a richat restart. Before this change the QN source logged "failed to replay, waiting for other sources" once per second indefinitely. After this change it reconnects at live tip and participates in message races normally. No config surface change. --- richat/src/source.rs | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/richat/src/source.rs b/richat/src/source.rs index 8604fffe..5e46a457 100644 --- a/richat/src/source.rs +++ b/richat/src/source.rs @@ -195,6 +195,7 @@ impl Subscription { config, global_replay_from_slot, None, + false, ), move |mut state: ( Backoff, @@ -202,6 +203,11 @@ impl Subscription { ConfigChannelSourceGeneral, GlobalReplayFromSlot, Option>, + bool, // fallback_to_live: this source could not replay from + // the requested slot, but another source is covering + // the gap. Subsequent subscribe attempts skip replay + // and connect at live tip so we still get future + // messages from this source. )| async move { loop { if let Some(stream) = state.4.as_mut() { @@ -213,7 +219,12 @@ impl Subscription { if state.3.report_replay_failed(name) { return Err(ReceiveError::ReplayFailed); } - error!(name, "failed to replay, waiting for other sources"); + state.5 = true; + error!( + name, + "failed to replay, another source covers the gap, \ + reconnecting at live tip" + ); } Ok(Err(error)) => { error!(name, ?error, "failed to receive") @@ -225,26 +236,45 @@ impl Subscription { state.4 = None; state.0.sleep().await; } else { + let replay_from_slot = if state.5 { + None + } else { + state.3.load() + }; match Subscription::subscribe( name, state.1.clone(), state.2.disable_accounts, state.2.parser, state.2.channel_size, - state.3.load(), + replay_from_slot, ) .await { Ok(stream) => { state.4 = Some(stream); state.0.reset(); + // Reset the fallback flag on every + // successful subscribe. A brief + // disconnect later will normally find + // `global_replay_from_slot` close to + // live, so the replay path has a fair + // chance to succeed and keep the source + // gap free. If replay fails again we + // simply fall back once more. + state.5 = false; } Err(error) => { if error.is_replay_slot_not_available() { if state.3.report_replay_failed(name) { return Err(ReceiveError::ReplayFailed); } - error!(name, "failed to replay at subscribe time, waiting for other sources"); + state.5 = true; + error!( + name, + "failed to replay at subscribe time, another \ + source covers the gap, reconnecting at live tip" + ); } else { error!(name, ?error, "failed to connect"); }