Skip to content

Commit 9325f73

Browse files
authored
feat: dial again to disconnected peers (#65)
## Summary - Add automatic redial functionality for disconnected bootnodes - Redials occur every 30 seconds until reconnection succeeds - Handles both clean disconnections and connection errors ## Changes - Introduced `RetryMessage` enum to handle both block fetch retries and peer redials - Store bootnode addresses during initialization for later redial attempts - Schedule redial on `ConnectionClosed` and `OutgoingConnectionError` events - Implement `handle_peer_redial()` to attempt reconnection with retry logic ## Test Plan - ✅ All tests pass (`make test`) - ✅ No clippy warnings (`make lint`) - Manual testing: Run devnet, kill a bootnode, observe redial logs every 30s, restart bootnode to verify reconnection
1 parent c276928 commit 9325f73

2 files changed

Lines changed: 68 additions & 7 deletions

File tree

crates/net/p2p/src/lib.rs

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ pub use metrics::populate_name_registry;
4141
const MAX_FETCH_RETRIES: u32 = 5;
4242
const INITIAL_BACKOFF_MS: u64 = 10;
4343
const BACKOFF_MULTIPLIER: u64 = 4;
44+
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
45+
46+
enum RetryMessage {
47+
BlockFetch(H256),
48+
PeerRedial(PeerId),
49+
}
4450

4551
pub(crate) struct PendingRequest {
4652
pub(crate) attempts: u32,
@@ -122,6 +128,7 @@ pub async fn start_p2p(
122128
})
123129
.build();
124130
let local_peer_id = *swarm.local_peer_id();
131+
let mut bootnode_addrs = HashMap::new();
125132
for bootnode in bootnodes {
126133
let peer_id = PeerId::from_public_key(&bootnode.public_key);
127134
if peer_id == local_peer_id {
@@ -133,6 +140,7 @@ pub async fn start_p2p(
133140
.with(Protocol::QuicV1)
134141
.with_p2p(peer_id)
135142
.expect("failed to add peer ID to multiaddr");
143+
bootnode_addrs.insert(peer_id, addr.clone());
136144
swarm.dial(addr).unwrap();
137145
}
138146
let addr = Multiaddr::empty()
@@ -159,7 +167,7 @@ pub async fn start_p2p(
159167
"/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"
160168
));
161169

162-
info!("P2P node started on {listening_socket}");
170+
info!(socket=%listening_socket, "P2P node started");
163171

164172
let (retry_tx, retry_rx) = mpsc::unbounded_channel();
165173

@@ -173,6 +181,7 @@ pub async fn start_p2p(
173181
connected_peers: HashSet::new(),
174182
pending_requests: HashMap::new(),
175183
request_id_map: HashMap::new(),
184+
bootnode_addrs,
176185
retry_tx,
177186
retry_rx,
178187
};
@@ -197,8 +206,11 @@ pub(crate) struct P2PServer {
197206
pub(crate) connected_peers: HashSet<PeerId>,
198207
pub(crate) pending_requests: HashMap<ethlambda_types::primitives::H256, PendingRequest>,
199208
pub(crate) request_id_map: HashMap<OutboundRequestId, ethlambda_types::primitives::H256>,
200-
retry_tx: mpsc::UnboundedSender<ethlambda_types::primitives::H256>,
201-
retry_rx: mpsc::UnboundedReceiver<ethlambda_types::primitives::H256>,
209+
/// Bootnode addresses for redialing when disconnected
210+
bootnode_addrs: HashMap<PeerId, Multiaddr>,
211+
/// Channel for scheduling retries (block fetches and peer redials)
212+
pub(crate) retry_tx: mpsc::UnboundedSender<RetryMessage>,
213+
retry_rx: mpsc::UnboundedReceiver<RetryMessage>,
202214
}
203215

204216
/// Event loop for the P2P crate.
@@ -220,8 +232,11 @@ async fn event_loop(mut server: P2PServer) {
220232
};
221233
handle_swarm_event(&mut server, event).await;
222234
}
223-
Some(root) = server.retry_rx.recv() => {
224-
handle_retry(&mut server, root).await;
235+
Some(msg) = server.retry_rx.recv() => {
236+
match msg {
237+
RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await,
238+
RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await,
239+
}
225240
}
226241
}
227242
}
@@ -302,13 +317,20 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE
302317
server.connected_peers.remove(&peer_id);
303318
let peer_count = server.connected_peers.len();
304319
metrics::notify_peer_disconnected(&Some(peer_id), direction, reason);
320+
305321
info!(
306322
%peer_id,
307323
%direction,
308324
%reason,
309325
peer_count,
310326
"Peer disconnected"
311327
);
328+
329+
// Schedule redial if this is a bootnode
330+
if server.bootnode_addrs.contains_key(&peer_id) {
331+
schedule_peer_redial(server.retry_tx.clone(), peer_id);
332+
info!(%peer_id, "Scheduled bootnode redial in {}s", PEER_REDIAL_INTERVAL_SECS);
333+
}
312334
} else {
313335
info!(%peer_id, %direction, %reason, "Peer connection closed but other connections remain");
314336
}
@@ -321,6 +343,15 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent<BehaviourE
321343
};
322344
metrics::notify_peer_connected(&peer_id, "outbound", result);
323345
warn!(?peer_id, %error, "Outgoing connection error");
346+
347+
// Schedule redial if this was a bootnode
348+
if let Some(pid) = peer_id
349+
&& server.bootnode_addrs.contains_key(&pid)
350+
&& !server.connected_peers.contains(&pid)
351+
{
352+
schedule_peer_redial(server.retry_tx.clone(), pid);
353+
info!(%pid, "Scheduled bootnode redial after connection error");
354+
}
324355
}
325356
SwarmEvent::IncomingConnectionError { peer_id, error, .. } => {
326357
metrics::notify_peer_connected(&peer_id, "inbound", "error");
@@ -369,6 +400,33 @@ async fn handle_retry(server: &mut P2PServer, root: H256) {
369400
}
370401
}
371402

403+
async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) {
404+
// Skip if already reconnected
405+
if server.connected_peers.contains(&peer_id) {
406+
trace!(%peer_id, "Bootnode reconnected during redial delay, skipping");
407+
return;
408+
}
409+
410+
if let Some(addr) = server.bootnode_addrs.get(&peer_id) {
411+
info!(%peer_id, "Redialing disconnected bootnode");
412+
// NOTE: this dial does some checks and adds a pending outbound connection attempt.
413+
// It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event.
414+
if let Err(e) = server.swarm.dial(addr.clone()) {
415+
warn!(%peer_id, %e, "Failed to redial bootnode, will retry");
416+
// Schedule another redial attempt
417+
schedule_peer_redial(server.retry_tx.clone(), peer_id);
418+
}
419+
}
420+
}
421+
422+
/// Schedules a peer redial after the configured delay interval.
423+
pub(crate) fn schedule_peer_redial(retry_tx: mpsc::UnboundedSender<RetryMessage>, peer_id: PeerId) {
424+
tokio::spawn(async move {
425+
tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await;
426+
let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id));
427+
});
428+
}
429+
372430
pub struct Bootnode {
373431
ip: IpAddr,
374432
quic_port: u16,

crates/net/p2p/src/req_resp/handlers.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use super::{
1111
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload,
1212
ResponseResult, Status,
1313
};
14-
use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest};
14+
use crate::{
15+
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
16+
RetryMessage,
17+
};
1518

1619
pub async fn handle_req_resp_message(
1720
server: &mut P2PServer,
@@ -226,6 +229,6 @@ async fn handle_fetch_failure(
226229
let retry_tx = server.retry_tx.clone();
227230
tokio::spawn(async move {
228231
tokio::time::sleep(backoff).await;
229-
let _ = retry_tx.send(root);
232+
let _ = retry_tx.send(RetryMessage::BlockFetch(root));
230233
});
231234
}

0 commit comments

Comments
 (0)