Skip to content

Commit 59f0cd4

Browse files
feat: enable prefetch on the new engine
1 parent 819f149 commit 59f0cd4

10 files changed

Lines changed: 216 additions & 61 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/blockchain-tree/src/chain.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use std::{
3030
clone::Clone,
3131
collections::BTreeMap,
3232
ops::{Deref, DerefMut},
33-
sync::Arc,
3433
time::Instant,
3534
};
3635

@@ -231,14 +230,9 @@ impl AppendableChain {
231230

232231
let initial_execution_outcome = ExecutionOutcome::from((state, block.number));
233232

234-
// stop the prefetch task.
235-
if let Some(interrupt_tx) = interrupt_tx {
236-
let _ = interrupt_tx.send(());
237-
}
238-
239233
// check state root if the block extends the canonical chain __and__ if state root
240234
// validation was requested.
241-
if block_validation_kind.is_exhaustive() {
235+
let result = if block_validation_kind.is_exhaustive() {
242236
// calculate and check state root
243237
let start = Instant::now();
244238
let (state_root, trie_updates) = if block_attachment.is_canonical() {
@@ -283,7 +277,14 @@ impl AppendableChain {
283277
Ok((initial_execution_outcome, trie_updates))
284278
} else {
285279
Ok((initial_execution_outcome, None))
286-
}
280+
};
281+
282+
// stop the prefetch task.
283+
if let Some(interrupt_tx) = interrupt_tx {
284+
let _ = interrupt_tx.send(());
285+
};
286+
287+
result
287288
}
288289

289290
/// Validate and execute the given block, and append it to this chain.
@@ -356,18 +357,11 @@ impl AppendableChain {
356357
let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel();
357358

358359
let mut trie_prefetch = TriePrefetch::new();
359-
let consistent_view = if let Ok(view) =
360-
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())
361-
{
362-
view
363-
} else {
364-
tracing::debug!("Failed to create consistent view for trie prefetch");
365-
return (None, None)
366-
};
360+
let provider_factory = externals.provider_factory.clone();
367361

368362
tokio::spawn({
369363
async move {
370-
trie_prefetch.run(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await;
364+
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
371365
}
372366
});
373367

crates/engine/service/src/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ where
8080
invalid_block_hook: Box<dyn InvalidBlockHook>,
8181
sync_metrics_tx: MetricEventsSender,
8282
skip_state_root_validation: bool,
83+
enable_prefetch: bool,
8384
) -> Self {
8485
let engine_kind =
8586
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
@@ -104,6 +105,7 @@ where
104105
invalid_block_hook,
105106
engine_kind,
106107
skip_state_root_validation,
108+
enable_prefetch,
107109
);
108110

109111
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
@@ -217,6 +219,7 @@ mod tests {
217219
Box::new(NoopInvalidBlockHook::default()),
218220
sync_metrics_tx,
219221
false,
222+
false,
220223
);
221224
}
222225
}

crates/engine/tree/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ reth-stages-api.workspace = true
3333
reth-tasks.workspace = true
3434
reth-trie.workspace = true
3535
reth-trie-parallel.workspace = true
36+
reth-trie-prefetch.workspace = true
3637

3738
# alloy
3839
alloy-primitives.workspace = true

crates/engine/tree/src/tree/mod.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use reth_payload_builder::PayloadBuilderHandle;
3333
use reth_payload_primitives::{PayloadAttributes, PayloadBuilder, PayloadBuilderAttributes};
3434
use reth_payload_validator::ExecutionPayloadValidator;
3535
use reth_primitives::{
36-
Block, GotExpected, Header, SealedBlock, SealedBlockWithSenders, SealedHeader,
36+
revm_primitives::EvmState, Block, GotExpected, Header, SealedBlock,
37+
SealedBlockWithSenders, SealedHeader,
3738
};
3839
use reth_provider::{
3940
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
@@ -44,6 +45,7 @@ use reth_revm::database::StateProviderDatabase;
4445
use reth_stages_api::ControlFlow;
4546
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
4647
use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError};
48+
use reth_trie_prefetch::TriePrefetch;
4749
use std::{
4850
cmp::Ordering,
4951
collections::{btree_map, hash_map, BTreeMap, VecDeque},
@@ -505,6 +507,8 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes, Spec> {
505507
engine_kind: EngineApiKind,
506508
/// Flag indicating whether the state root validation should be skipped.
507509
skip_state_root_validation: bool,
510+
/// Flag indicating whether to enable prefetch.
511+
enable_prefetch: bool,
508512
}
509513

510514
impl<P: Debug, E: Debug, T: EngineTypes + Debug, Spec: Debug> std::fmt::Debug
@@ -527,6 +531,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug, Spec: Debug> std::fmt::Debug
527531
.field("metrics", &self.metrics)
528532
.field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
529533
.field("engine_kind", &self.engine_kind)
534+
.field("skip_state_root_validation", &self.skip_state_root_validation)
535+
.field("enable_prefetch", &self.enable_prefetch)
530536
.finish()
531537
}
532538
}
@@ -555,6 +561,7 @@ where
555561
config: TreeConfig,
556562
engine_kind: EngineApiKind,
557563
skip_state_root_validation: bool,
564+
enable_prefetch: bool,
558565
) -> Self {
559566
let (incoming_tx, incoming) = std::sync::mpsc::channel();
560567

@@ -577,6 +584,7 @@ where
577584
invalid_block_hook: Box::new(NoopInvalidBlockHook),
578585
engine_kind,
579586
skip_state_root_validation,
587+
enable_prefetch,
580588
}
581589
}
582590

@@ -603,6 +611,7 @@ where
603611
invalid_block_hook: Box<dyn InvalidBlockHook>,
604612
kind: EngineApiKind,
605613
skip_state_root_validation: bool,
614+
enable_prefetch: bool,
606615
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
607616
let best_block_number = provider.best_block_number().unwrap_or(0);
608617
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
@@ -634,10 +643,20 @@ where
634643
config,
635644
kind,
636645
skip_state_root_validation,
646+
enable_prefetch,
637647
);
638648
task.set_invalid_block_hook(invalid_block_hook);
639649
let incoming = task.incoming_tx.clone();
640-
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
650+
std::thread::Builder::new()
651+
.name("Tree Task".to_string())
652+
.spawn(move || {
653+
let runtime =
654+
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
655+
runtime.block_on(async {
656+
task.run();
657+
});
658+
})
659+
.unwrap();
641660
(incoming, outgoing)
642661
}
643662

@@ -2175,8 +2194,16 @@ where
21752194
}
21762195

21772196
trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
2178-
let executor =
2179-
self.executor_provider.executor(StateProviderDatabase::new(&state_provider), None);
2197+
let (prefetch_tx, interrupt_tx) =
2198+
if self.enable_prefetch && !self.skip_state_root_validation {
2199+
self.setup_prefetch()
2200+
} else {
2201+
(None, None)
2202+
};
2203+
2204+
let executor = self
2205+
.executor_provider
2206+
.executor(StateProviderDatabase::new(&state_provider), prefetch_tx);
21802207

21812208
let block_number = block.number;
21822209
let block_hash = block.hash();
@@ -2247,6 +2274,11 @@ where
22472274
state_provider.state_root_with_updates(hashed_state.clone())?
22482275
};
22492276

2277+
// stop the prefetch task.
2278+
if let Some(interrupt_tx) = interrupt_tx {
2279+
let _ = interrupt_tx.send(());
2280+
};
2281+
22502282
if state_root != block.state_root {
22512283
// call post-block hook
22522284
self.invalid_block_hook.on_invalid_block(
@@ -2575,6 +2607,22 @@ where
25752607
);
25762608
Ok(())
25772609
}
2610+
2611+
fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
2612+
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
2613+
let (interrupt_tx, interrupt_rx) = oneshot::channel();
2614+
2615+
let mut trie_prefetch = TriePrefetch::new();
2616+
let provider_factory = self.provider.clone();
2617+
2618+
tokio::spawn({
2619+
async move {
2620+
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
2621+
}
2622+
});
2623+
2624+
(Some(prefetch_tx), Some(interrupt_tx))
2625+
}
25782626
}
25792627

25802628
/// This is an error that can come from advancing persistence. Either this can be a
@@ -2728,6 +2776,7 @@ mod tests {
27282776
TreeConfig::default(),
27292777
EngineApiKind::Ethereum,
27302778
false,
2779+
false,
27312780
);
27322781

27332782
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());

crates/node/builder/src/launch/engine.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ where
237237
ctx.invalid_block_hook()?,
238238
ctx.sync_metrics_tx(),
239239
ctx.node_config().skip_state_root_validation,
240+
ctx.node_config().enable_prefetch,
240241
);
241242
eth_service
242243
}
@@ -271,6 +272,7 @@ where
271272
ctx.invalid_block_hook()?,
272273
ctx.sync_metrics_tx(),
273274
ctx.node_config().skip_state_root_validation,
275+
ctx.node_config().enable_prefetch,
274276
);
275277
eth_service
276278
}

crates/trie/parallel/src/parallel_root.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,17 @@ where
148148
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?,
149149
);
150150

151+
let account_tree_start = std::time::Instant::now();
151152
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
152153
let mut account_rlp = Vec::with_capacity(128);
153154
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
154155
match node {
155156
TrieElement::Branch(node) => {
157+
tracker.inc_branch();
156158
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
157159
}
158160
TrieElement::Leaf(hashed_address, account) => {
161+
tracker.inc_leaf();
159162
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
160163
Some(rx) => rx.recv().map_err(|_| {
161164
ParallelStateRootError::StorageRoot(StorageRootError::Database(
@@ -199,15 +202,18 @@ where
199202
prefix_sets.destroyed_accounts,
200203
);
201204

205+
let account_tree_duration = account_tree_start.elapsed();
202206
let stats = tracker.finish();
203207

204208
#[cfg(feature = "metrics")]
205209
self.metrics.record_state_trie(stats);
206210

207-
trace!(
211+
debug!(
208212
target: "trie::parallel_state_root",
209213
%root,
210214
duration = ?stats.duration(),
215+
account_tree_duration = ?account_tree_duration,
216+
storage_trees_duration = ?(stats.duration() - account_tree_duration),
211217
branches_added = stats.branches_added(),
212218
leaves_added = stats.leaves_added(),
213219
missed_leaves = stats.missed_leaves(),

crates/trie/prefetch/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
)]
88
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
99

10+
pub use prefetch::TriePrefetch;
1011
pub use reth_trie_parallel::StorageRootTargets;
1112

13+
/// Trie prefetch stats.
14+
pub mod stats;
15+
1216
/// Implementation of trie prefetch.
13-
mod prefetch;
14-
pub use prefetch::TriePrefetch;
17+
pub mod prefetch;

0 commit comments

Comments
 (0)