diff --git a/bin/stateless-validator/src/app.rs b/bin/stateless-validator/src/app.rs index 6b1a1b63..981ab4fd 100644 --- a/bin/stateless-validator/src/app.rs +++ b/bin/stateless-validator/src/app.rs @@ -19,6 +19,11 @@ use crate::{metrics, validator_db::ValidatorDB, workers}; /// Database filename for the validator. pub const VALIDATOR_DB_FILENAME: &str = "validator.redb"; +/// Default `--tip-buffer`. The validator races the upstream witness generator, so it stays +/// 3 blocks behind by default. Core's `PipelineConfig::tip_buffer` defaults to `0` because +/// the buffer is opt-in per binary; the validator opts in here. +const DEFAULT_TIP_BUFFER: u64 = 3; + /// Loads or creates a ChainSpec from the database or a genesis file. pub fn load_or_create_chain_spec( validator_db: &ValidatorDB, @@ -116,6 +121,13 @@ pub struct CommandLineArgs { #[clap(long, env = "STATELESS_VALIDATOR_ERROR_RESTART_DELAY_MS")] pub error_restart_delay_ms: Option, + /// Safety margin below the remote tip: the fetcher will not spawn fetches for blocks + /// `> chain_latest - tip_buffer`. Gives the upstream witness generator headroom to + /// finish the very block we'd otherwise race it for. `0` disables the buffer. Defaults + /// to `DEFAULT_TIP_BUFFER`. + #[clap(long, env = "STATELESS_VALIDATOR_TIP_BUFFER")] + pub tip_buffer: Option, + /// Initial round-level RPC retry backoff (milliseconds). Applied after every provider in a /// round has failed; doubles each round up to `--rpc-max-backoff-ms`. #[clap(long, env = "STATELESS_VALIDATOR_RPC_INITIAL_BACKOFF_MS")] @@ -260,9 +272,7 @@ pub async fn run() -> Result<()> { override_ms(args.poll_interval_ms, pipeline_config.poll_interval); pipeline_config.error_restart_delay = override_ms(args.error_restart_delay_ms, pipeline_config.error_restart_delay); - // Stay 3 blocks behind the remote tip so the upstream witness generator has headroom - // to finish the block we'd otherwise race it for. - pipeline_config.tip_buffer = 3; + pipeline_config.tip_buffer = args.tip_buffer.unwrap_or(DEFAULT_TIP_BUFFER); let result = workers::run_with_signals( client, diff --git a/bin/stateless-validator/tests/integration.rs b/bin/stateless-validator/tests/integration.rs index 3e105e41..cf665c4e 100644 --- a/bin/stateless-validator/tests/integration.rs +++ b/bin/stateless-validator/tests/integration.rs @@ -31,6 +31,18 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use tracing_subscriber::EnvFilter; +/// Argv prefix for tests that exercise an *optional* flag — both required endpoints are +/// already supplied so the parse only depends on the flag under test. +const BASE_ARGS: &[&str] = &[ + "stateless-validator", + "--data-dir", + "/tmp/x", + "--rpc-endpoint", + "http://rpc", + "--witness-endpoint", + "http://w", +]; + /// Verifies that an endpoint flag accepts repeated flags, CSV values, and env var — /// ensuring container deployments configured purely via env are not silently limited /// to one endpoint (clap's `value_delimiter` applies to env-var values too). @@ -76,77 +88,59 @@ fn rpc_endpoint_accepts_multiple_forms() { ); } -/// Verifies that a concurrency cap flag parses as `Some(n)` via CLI and env var, -/// and omission leaves the value `None` (unbounded). -fn assert_concurrency_flag( +/// Verifies that an optional numeric flag parses as `Some(n)` via CLI and env var, +/// and omission leaves the value `None`. +fn assert_optional_numeric_flag( flag: &str, env: &str, - base: &[&str], - extract: impl Fn(CommandLineArgs) -> Option, -) { + extract: impl Fn(CommandLineArgs) -> Option, +) where + T: std::str::FromStr + std::fmt::Debug + PartialEq, + T::Err: std::fmt::Debug, +{ let guard = stateless_test_utils::env::env_lock(); let parse = |extra: &[&str]| { - extract(CommandLineArgs::try_parse_from(base.iter().chain(extra)).unwrap()) + extract(CommandLineArgs::try_parse_from(BASE_ARGS.iter().chain(extra)).unwrap()) }; assert_eq!(parse(&[]), None); - assert_eq!(parse(&[flag, "7"]), Some(7)); + assert_eq!(parse(&[flag, "7"]), Some("7".parse().unwrap())); let from_env = stateless_test_utils::env::with_env_var(&guard, env, "12", || parse(&[])); - assert_eq!(from_env, Some(12)); + assert_eq!(from_env, Some("12".parse().unwrap())); } #[test] fn data_max_concurrent_requests_flag_and_env() { - assert_concurrency_flag( + assert_optional_numeric_flag::( "--data-max-concurrent-requests", "STATELESS_VALIDATOR_DATA_MAX_CONCURRENT_REQUESTS", - &[ - "stateless-validator", - "--data-dir", - "/tmp/x", - "--rpc-endpoint", - "http://rpc", - "--witness-endpoint", - "http://w", - ], |a| a.data_max_concurrent_requests, ); } #[test] fn witness_max_concurrent_requests_flag_and_env() { - assert_concurrency_flag( + assert_optional_numeric_flag::( "--witness-max-concurrent-requests", "STATELESS_VALIDATOR_WITNESS_MAX_CONCURRENT_REQUESTS", - &[ - "stateless-validator", - "--data-dir", - "/tmp/x", - "--rpc-endpoint", - "http://rpc", - "--witness-endpoint", - "http://w", - ], |a| a.witness_max_concurrent_requests, ); } +#[test] +fn tip_buffer_flag_and_env() { + assert_optional_numeric_flag::("--tip-buffer", "STATELESS_VALIDATOR_TIP_BUFFER", |a| { + a.tip_buffer + }); +} + /// `canonical_chain_max_length` must reject 0 at parse time. A value of 0 would make /// `advance_chain` prune the entire canonical chain on every successful advance, /// rolling the pipeline back to the anchor each round and looping forever. #[test] fn canonical_chain_max_length_rejects_zero() { - let base = [ - "stateless-validator", - "--data-dir", - "/tmp/x", - "--rpc-endpoint", - "http://rpc", - "--witness-endpoint", - "http://w", - ]; - let parse = |extra: &[&str]| CommandLineArgs::try_parse_from(base.iter().chain(extra)); + let parse = |extra: &[&str]| CommandLineArgs::try_parse_from(BASE_ARGS.iter().chain(extra)); assert_eq!(parse(&[]).unwrap().canonical_chain_max_length, None); assert_eq!(