Skip to content

Commit 562471f

Browse files
authored
feat: fetch unknown blocks from peers (#61)
* feat: add BlocksByRootCodec * refactor: unify both Codecs * refactor: restructure code in gossipsub and req_resp modules * refactor: remove the messages module * refactor: handle response result explicitly * refactor: make Response fields public * refactor: move req-resp handlers to new module * fix: make response include only a single block * refactor: move handle_outgoing_gossip to gossipsub module * refactor: add new FetchBlock P2PMessage * refactor: rename to handle_p2p_message * feat: request missing blocks * refactor: move block fetching helper to req_resp::handlers * refactor: store event loop state in struct * feat: log req-resp failures * fix: treat encoded-length as uncompressed size * fix: split req-resp behaviour libp2p::request_response expects protocols to be equivalent between them, and negotiates with the peer which one to use. Since we specified the status first in the list of protocols, it seems to default to that one when we send a request. * refactor: use imports * Revert "fix: split req-resp behaviour" This reverts commit b4ffabd. * fix: implement protocol selection in libp2p fork * feat: add block fetch request retrying * refactor: move bookkeeping to fetch_block_from_peer * refactor: remove BlockchainServer.in_flight_requests * chore: tweak retries, initial backoff, and multiplier * fix: avoid calling store::on_block if parent is missing
1 parent 0950a33 commit 562471f

18 files changed

Lines changed: 1036 additions & 624 deletions

File tree

Cargo.lock

Lines changed: 62 additions & 115 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/blockchain/src/lib.rs

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::{Duration, SystemTime};
33

44
use ethlambda_state_transition::is_proposer;
55
use ethlambda_storage::Store;
6+
use ethlambda_types::primitives::H256;
67
use ethlambda_types::{
78
attestation::{Attestation, AttestationData, SignedAttestation},
89
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
@@ -14,21 +15,23 @@ use spawned_concurrency::tasks::{
1415
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
1516
};
1617
use tokio::sync::mpsc;
17-
use tracing::{error, info, warn};
18+
use tracing::{error, info, trace, warn};
1819

1920
use crate::store::StoreError;
2021

2122
pub mod key_manager;
2223
pub mod metrics;
2324
pub mod store;
2425

25-
/// Messages sent from the blockchain to the P2P layer for publishing.
26+
/// Messages sent from the blockchain to the P2P layer.
2627
#[derive(Clone, Debug)]
27-
pub enum OutboundGossip {
28+
pub enum P2PMessage {
2829
/// Publish an attestation to the gossip network.
2930
PublishAttestation(SignedAttestation),
3031
/// Publish a block to the gossip network.
3132
PublishBlock(SignedBlockWithAttestation),
33+
/// Fetch a block by its root hash.
34+
FetchBlock(H256),
3235
}
3336

3437
pub struct BlockChain {
@@ -41,7 +44,7 @@ pub const SECONDS_PER_SLOT: u64 = 4;
4144
impl BlockChain {
4245
pub fn spawn(
4346
store: Store,
44-
p2p_tx: mpsc::UnboundedSender<OutboundGossip>,
47+
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
4548
validator_keys: HashMap<u64, ValidatorSecretKey>,
4649
) -> BlockChain {
4750
let genesis_time = store.config().genesis_time;
@@ -50,6 +53,7 @@ impl BlockChain {
5053
store,
5154
p2p_tx,
5255
key_manager,
56+
pending_blocks: HashMap::new(),
5357
}
5458
.start();
5559
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
@@ -84,8 +88,11 @@ impl BlockChain {
8488

8589
struct BlockChainServer {
8690
store: Store,
87-
p2p_tx: mpsc::UnboundedSender<OutboundGossip>,
91+
p2p_tx: mpsc::UnboundedSender<P2PMessage>,
8892
key_manager: key_manager::KeyManager,
93+
94+
// Pending blocks waiting for their parent
95+
pending_blocks: HashMap<H256, Vec<SignedBlockWithAttestation>>,
8996
}
9097

9198
impl BlockChainServer {
@@ -173,7 +180,7 @@ impl BlockChainServer {
173180
// Publish to gossip network
174181
let Ok(_) = self
175182
.p2p_tx
176-
.send(OutboundGossip::PublishAttestation(signed_attestation))
183+
.send(P2PMessage::PublishAttestation(signed_attestation))
177184
.inspect_err(
178185
|err| error!(%slot, %validator_id, %err, "Failed to publish attestation"),
179186
)
@@ -244,7 +251,7 @@ impl BlockChainServer {
244251
// Publish to gossip network
245252
let Ok(()) = self
246253
.p2p_tx
247-
.send(OutboundGossip::PublishBlock(signed_block))
254+
.send(P2PMessage::PublishBlock(signed_block))
248255
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block"))
249256
else {
250257
return;
@@ -268,8 +275,60 @@ impl BlockChainServer {
268275

269276
fn on_block(&mut self, signed_block: SignedBlockWithAttestation) {
270277
let slot = signed_block.message.block.slot;
271-
if let Err(err) = self.process_block(signed_block) {
272-
warn!(%slot, %err, "Failed to process block");
278+
let block_root = signed_block.message.block.tree_hash_root();
279+
let parent_root = signed_block.message.block.parent_root;
280+
281+
// Check if parent block exists before attempting to process
282+
if !self.store.contains_block(&parent_root) {
283+
info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending");
284+
285+
// Store block for later processing
286+
self.pending_blocks
287+
.entry(parent_root)
288+
.or_default()
289+
.push(signed_block);
290+
291+
// Request missing parent from network
292+
self.request_missing_block(parent_root);
293+
return;
294+
}
295+
296+
// Parent exists, proceed with processing
297+
match self.process_block(signed_block) {
298+
Ok(_) => {
299+
info!(%slot, "Block processed successfully");
300+
301+
// Check if any pending blocks can now be processed
302+
self.process_pending_children(block_root);
303+
}
304+
Err(err) => {
305+
warn!(%slot, %err, "Failed to process block");
306+
}
307+
}
308+
}
309+
310+
fn request_missing_block(&mut self, block_root: H256) {
311+
// Send request to P2P layer (deduplication handled by P2P module)
312+
if let Err(err) = self.p2p_tx.send(P2PMessage::FetchBlock(block_root)) {
313+
error!(%block_root, %err, "Failed to send FetchBlock message to P2P");
314+
} else {
315+
info!(%block_root, "Requested missing block from network");
316+
}
317+
}
318+
319+
fn process_pending_children(&mut self, parent_root: H256) {
320+
// Remove and process all blocks that were waiting for this parent
321+
if let Some(children) = self.pending_blocks.remove(&parent_root) {
322+
info!(%parent_root, num_children=%children.len(),
323+
"Processing pending blocks after parent arrival");
324+
325+
for child_block in children {
326+
let slot = child_block.message.block.slot;
327+
trace!(%parent_root, %slot, "Processing pending child block");
328+
329+
// Process recursively - might unblock more descendants
330+
self.on_block(child_block);
331+
}
273332
}
274333
}
275334

crates/blockchain/src/store.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,9 @@ pub fn on_block(
299299
return Ok(());
300300
}
301301

302-
// Verify parent chain is available
303-
// TODO: sync parent chain if parent is missing
302+
// Verify parent state is available
303+
// Note: Parent block existence is checked by the caller before calling this function.
304+
// This check ensures the state has been computed for the parent block.
304305
let parent_state =
305306
store
306307
.get_state(&block.parent_root)

crates/net/p2p/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@ ethlambda-types.workspace = true
1717

1818
async-trait = "0.1"
1919

20-
libp2p = { version = "0.56", features = ["full"] }
20+
# Fork with request-response feature for outbound protocol selection
21+
libp2p = { git = "https://github.com/lambdaclass/rust-libp2p.git", rev = "cd6cc3b1e5db2c5e23e133c2201c23b063fc4895", features = [
22+
"full",
23+
] }
2124

2225
snap = "1.1"
2326

2427
tokio.workspace = true
2528
tracing.workspace = true
2629

30+
rand = "0.8"
31+
2732
# Required for NodeEnr parsing
2833
ethrex-p2p = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" }
2934
ethrex-rlp = { git = "https://github.com/lambdaclass/ethrex", rev = "1af63a4de7c93eb7413b9b003df1be82e1484c69" }

crates/net/p2p/src/gossipsub.rs

Lines changed: 0 additions & 91 deletions
This file was deleted.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/// Decompress data using raw snappy format (for gossipsub messages).
2+
pub fn decompress_message(data: &[u8]) -> snap::Result<Vec<u8>> {
3+
let uncompressed_size = snap::raw::decompress_len(data)?;
4+
let mut uncompressed_data = vec![0u8; uncompressed_size];
5+
snap::raw::Decoder::new().decompress(data, &mut uncompressed_data)?;
6+
Ok(uncompressed_data)
7+
}
8+
9+
/// Compress data using raw snappy format (for gossipsub messages).
10+
pub fn compress_message(data: &[u8]) -> Vec<u8> {
11+
let max_compressed_len = snap::raw::max_compress_len(data.len());
12+
let mut compressed = vec![0u8; max_compressed_len];
13+
let compressed_len = snap::raw::Encoder::new()
14+
.compress(data, &mut compressed)
15+
.expect("snappy compression should not fail");
16+
compressed.truncate(compressed_len);
17+
compressed
18+
}
19+
20+
#[cfg(test)]
21+
mod tests {
22+
use ethlambda_types::block::SignedBlockWithAttestation;
23+
use ssz::Decode;
24+
25+
#[test]
26+
#[ignore = "Test data uses old BlockSignatures field order (proposer_signature, attestation_signatures). Needs regeneration with correct order (attestation_signatures, proposer_signature)."]
27+
fn test_decode_block() {
28+
// Sample uncompressed block sent by Zeam (commit b153373806aa49f65aadc47c41b68ead4fab7d6e)
29+
let block_bytes = include_bytes!("../../test_data/signed_block_with_attestation.ssz");
30+
let _block = SignedBlockWithAttestation::from_ssz_bytes(block_bytes).unwrap();
31+
}
32+
}

0 commit comments

Comments
 (0)