diff --git a/Cargo.lock b/Cargo.lock index 19502fb..4eeed97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2002,6 +2002,7 @@ dependencies = [ "hex", "libssz", "libssz-types", + "rayon", "serde", "serde_json", "spawned-concurrency 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 95259f9..2bd8a84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ libssz-types = "0.2" # Build-time version info vergen-git2 = { version = "9", features = ["rustc"] } +rayon = "1.11" rand = "0.9" rocksdb = "0.24" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index d565123..d3370e4 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -23,6 +23,7 @@ spawned-concurrency.workspace = true tokio.workspace = true +rayon.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index c1a0535..eb8c4fe 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1164,40 +1164,58 @@ fn verify_signatures( let validators = &state.validators; let num_validators = validators.len() as u64; - // Verify each attestation's signature proof + // Verify each attestation's signature proof in parallel let aggregated_start = std::time::Instant::now(); - for (attestation, aggregated_proof) in attestations.iter().zip(attestation_signatures) { - if attestation.aggregation_bits != aggregated_proof.participants { - return Err(StoreError::ParticipantsMismatch); - } - let slot: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); - let message = attestation.data.hash_tree_root(); + // Prepare verification inputs sequentially (cheap: bit checks + pubkey lookups) + let verification_inputs: Vec<_> = attestations + .iter() + .zip(attestation_signatures) + .map(|(attestation, aggregated_proof)| { + if attestation.aggregation_bits != aggregated_proof.participants { + return Err(StoreError::ParticipantsMismatch); + } - // Collect public keys with bounds check in a single pass - let public_keys: Vec<_> = validator_indices(&attestation.aggregation_bits) - .map(|vid| { - if vid >= num_validators { - return Err(StoreError::InvalidValidatorIndex); + let slot: u32 = attestation.data.slot.try_into().expect("slot exceeds u32"); + let message = attestation.data.hash_tree_root(); + + let public_keys: Vec<_> = validator_indices(&attestation.aggregation_bits) + .map(|vid| { + if vid >= num_validators { + return Err(StoreError::InvalidValidatorIndex); + } + validators[vid as usize] + .get_pubkey() + .map_err(|_| StoreError::PubkeyDecodingFailed(vid)) + }) + .collect::>()?; + + Ok((&aggregated_proof.proof_data, public_keys, message, slot)) + }) + .collect::>()?; + + // Run expensive signature verification in parallel. + // into_par_iter() moves each tuple, avoiding a clone of public_keys. + use rayon::prelude::*; + verification_inputs.into_par_iter().try_for_each( + |(proof_data, public_keys, message, slot)| { + let result = { + let _timing = metrics::time_pq_sig_aggregated_signatures_verification(); + verify_aggregated_signature(proof_data, public_keys, &message, slot) + }; + match result { + Ok(()) => { + metrics::inc_pq_sig_aggregated_signatures_valid(); + Ok(()) + } + Err(e) => { + metrics::inc_pq_sig_aggregated_signatures_invalid(); + Err(StoreError::AggregateVerificationFailed(e)) } - validators[vid as usize] - .get_pubkey() - .map_err(|_| StoreError::PubkeyDecodingFailed(vid)) - }) - .collect::>()?; - - let verification_result = { - let _timing = metrics::time_pq_sig_aggregated_signatures_verification(); - verify_aggregated_signature(&aggregated_proof.proof_data, public_keys, &message, slot) - }; - match verification_result { - Ok(()) => metrics::inc_pq_sig_aggregated_signatures_valid(), - Err(e) => { - metrics::inc_pq_sig_aggregated_signatures_invalid(); - return Err(StoreError::AggregateVerificationFailed(e)); } - } - } + }, + )?; + let aggregated_elapsed = aggregated_start.elapsed(); let proposer_start = std::time::Instant::now();