Skip to content

Commit 47d014a

Browse files
add missing leaves cache
1 parent 59f0cd4 commit 47d014a

9 files changed

Lines changed: 108 additions & 37 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/blockchain-tree/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ metrics.workspace = true
4848
# misc
4949
aquamarine.workspace = true
5050
linked_hash_set.workspace = true
51+
dashmap = "6.1.0"
5152

5253
[dev-dependencies]
5354
reth-chainspec.workspace = true

crates/blockchain-tree/src/chain.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::externals::TreeExternals;
77
use crate::BundleStateDataRef;
88
use alloy_eips::ForkBlock;
99
use alloy_primitives::{map::HashMap, BlockHash, BlockNumber, B256, U256};
10+
use dashmap::DashMap;
1011
use reth_blockchain_tree_api::{
1112
error::{BlockchainTreeError, InsertBlockErrorKind},
1213
BlockAttachment, BlockValidationKind,
@@ -23,13 +24,17 @@ use reth_provider::{
2324
FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider,
2425
};
2526
use reth_revm::database::StateProviderDatabase;
26-
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
27+
use reth_trie::{
28+
updates::{StorageTrieUpdates, TrieUpdates},
29+
HashedPostState, TrieInput,
30+
};
2731
use reth_trie_parallel::parallel_root::ParallelStateRoot;
2832
use reth_trie_prefetch::TriePrefetch;
2933
use std::{
3034
clone::Clone,
3135
collections::BTreeMap,
3236
ops::{Deref, DerefMut},
37+
sync::Arc,
3338
time::Instant,
3439
};
3540

@@ -213,8 +218,11 @@ impl AppendableChain {
213218

214219
let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);
215220

216-
let (prefetch_tx, interrupt_tx) =
217-
if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) };
221+
let (prefetch_tx, interrupt_tx, missing_leaves_cache) = if enable_prefetch {
222+
Self::setup_prefetch(externals)
223+
} else {
224+
(None, None, Default::default())
225+
};
218226

219227
let db = StateProviderDatabase::new(&provider);
220228
let executor = externals.executor_factory.executor(db, prefetch_tx);
@@ -243,7 +251,7 @@ impl AppendableChain {
243251
consistent_view,
244252
TrieInput::from_state(execution_outcome.hash_state_slow()),
245253
)
246-
.incremental_root_with_updates()
254+
.incremental_root_with_updates_and_cache(missing_leaves_cache)
247255
.map(|(root, updates)| (root, Some(updates)))
248256
.map_err(ProviderError::from)?
249257
} else {
@@ -348,6 +356,7 @@ impl AppendableChain {
348356
) -> (
349357
Option<tokio::sync::mpsc::UnboundedSender<EvmState>>,
350358
Option<tokio::sync::oneshot::Sender<()>>,
359+
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
351360
)
352361
where
353362
N: ProviderNodeTypes,
@@ -358,13 +367,17 @@ impl AppendableChain {
358367

359368
let mut trie_prefetch = TriePrefetch::new();
360369
let provider_factory = externals.provider_factory.clone();
370+
let missing_leaves_cache = Arc::new(DashMap::new());
371+
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);
361372

362373
tokio::spawn({
363374
async move {
364-
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
375+
trie_prefetch
376+
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
377+
.await;
365378
}
366379
});
367380

368-
(Some(prefetch_tx), Some(interrupt_tx))
381+
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
369382
}
370383
}

crates/engine/tree/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ reth-metrics = { workspace = true, features = ["common"] }
5151

5252
# misc
5353
tracing.workspace = true
54+
dashmap = "6.1.0"
5455

5556
# optional deps for test-utils
5657
reth-prune-types = { workspace = true, optional = true }

crates/engine/tree/src/tree/mod.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use alloy_rpc_types_engine::{
1313
CancunPayloadFields, ExecutionPayload, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
1414
PayloadValidationError,
1515
};
16+
use dashmap::DashMap;
1617
use reth_beacon_consensus::{
1718
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
1819
OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN,
@@ -43,7 +44,10 @@ use reth_provider::{
4344
};
4445
use reth_revm::database::StateProviderDatabase;
4546
use reth_stages_api::ControlFlow;
46-
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
47+
use reth_trie::{
48+
updates::{StorageTrieUpdates, TrieUpdates},
49+
HashedPostState, TrieInput,
50+
};
4751
use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError};
4852
use reth_trie_prefetch::TriePrefetch;
4953
use std::{
@@ -2194,11 +2198,11 @@ where
21942198
}
21952199

21962200
trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
2197-
let (prefetch_tx, interrupt_tx) =
2201+
let (prefetch_tx, interrupt_tx, missing_leaves_cache) =
21982202
if self.enable_prefetch && !self.skip_state_root_validation {
21992203
self.setup_prefetch()
22002204
} else {
2201-
(None, None)
2205+
(None, None, Default::default())
22022206
};
22032207

22042208
let executor = self
@@ -2256,7 +2260,7 @@ where
22562260
let persistence_in_progress = self.persistence_state.in_progress();
22572261
if !persistence_in_progress {
22582262
state_root_result = match self
2259-
.compute_state_root_parallel(block.parent_hash, &hashed_state)
2263+
.compute_state_root_parallel(block.parent_hash, &hashed_state, missing_leaves_cache)
22602264
{
22612265
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
22622266
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
@@ -2342,6 +2346,7 @@ where
23422346
&self,
23432347
parent_hash: B256,
23442348
hashed_state: &HashedPostState,
2349+
missing_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
23452350
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
23462351
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
23472352
let mut input = TrieInput::default();
@@ -2364,7 +2369,7 @@ where
23642369
// Extend with block we are validating root for.
23652370
input.append_ref(hashed_state);
23662371

2367-
ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2372+
ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates_and_cache(missing_leaves_cache)
23682373
}
23692374

23702375
/// Handles an error that occurred while inserting a block.
@@ -2608,20 +2613,30 @@ where
26082613
Ok(())
26092614
}
26102615

2611-
fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
2616+
fn setup_prefetch(
2617+
&self,
2618+
) -> (
2619+
Option<UnboundedSender<EvmState>>,
2620+
Option<oneshot::Sender<()>>,
2621+
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
2622+
) {
26122623
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
26132624
let (interrupt_tx, interrupt_rx) = oneshot::channel();
26142625

26152626
let mut trie_prefetch = TriePrefetch::new();
26162627
let provider_factory = self.provider.clone();
2628+
let missing_leaves_cache = Arc::new(DashMap::new());
2629+
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);
26172630

26182631
tokio::spawn({
26192632
async move {
2620-
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
2633+
trie_prefetch
2634+
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
2635+
.await;
26212636
}
26222637
});
26232638

2624-
(Some(prefetch_tx), Some(interrupt_tx))
2639+
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
26252640
}
26262641
}
26272642

crates/trie/parallel/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ tracing.workspace = true
3131
# misc
3232
thiserror.workspace = true
3333
derive_more.workspace = true
34+
dashmap = "6.1.0"
3435
rayon.workspace = true
3536
itertools.workspace = true
3637

crates/trie/parallel/src/parallel_root.rs

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::metrics::ParallelStateRootMetrics;
33
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
44
use alloy_primitives::B256;
55
use alloy_rlp::{BufMut, Encodable};
6+
use dashmap::DashMap;
67
use itertools::Itertools;
78
use reth_execution_errors::StorageRootError;
89
use reth_provider::{
@@ -12,7 +13,7 @@ use reth_trie::{
1213
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
1314
node_iter::{TrieElement, TrieNodeIter},
1415
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
15-
updates::TrieUpdates,
16+
updates::{StorageTrieUpdates, TrieUpdates},
1617
walker::TrieWalker,
1718
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
1819
};
@@ -61,19 +62,28 @@ where
6162
{
6263
/// Calculate incremental state root in parallel.
6364
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {
64-
self.calculate(false).map(|(root, _)| root)
65+
self.calculate(false, None).map(|(root, _)| root)
6566
}
6667

6768
/// Calculate incremental state root with updates in parallel.
6869
pub fn incremental_root_with_updates(
6970
self,
7071
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
71-
self.calculate(true)
72+
self.calculate(true, None)
73+
}
74+
75+
/// Calculate incremental state root with missing leaves cache.
76+
pub fn incremental_root_with_updates_and_cache(
77+
self,
78+
miss_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
79+
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
80+
self.calculate(true, Some(miss_leaves_cache))
7281
}
7382

7483
fn calculate(
7584
self,
7685
retain_updates: bool,
86+
miss_leaves_cache: Option<Arc<DashMap<B256, (B256, StorageTrieUpdates)>>>,
7787
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
7888
let mut tracker = ParallelTrieTracker::default();
7989
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
@@ -171,14 +181,30 @@ where
171181
// be a possibility of re-adding a non-modified leaf to the hash builder.
172182
None => {
173183
tracker.inc_missed_leaves();
174-
StorageRoot::new_hashed(
175-
trie_cursor_factory.clone(),
176-
hashed_cursor_factory.clone(),
177-
hashed_address,
178-
#[cfg(feature = "metrics")]
179-
self.metrics.storage_trie.clone(),
180-
)
181-
.calculate(retain_updates)?
184+
if let Some(cache) = miss_leaves_cache.clone() {
185+
if let Some(value) = cache.get(&hashed_address) {
186+
let (root, updates) = value.value();
187+
(*root, 0usize, updates.clone())
188+
} else {
189+
StorageRoot::new_hashed(
190+
trie_cursor_factory.clone(),
191+
hashed_cursor_factory.clone(),
192+
hashed_address,
193+
#[cfg(feature = "metrics")]
194+
self.metrics.storage_trie.clone(),
195+
)
196+
.calculate(retain_updates)?
197+
}
198+
} else {
199+
StorageRoot::new_hashed(
200+
trie_cursor_factory.clone(),
201+
hashed_cursor_factory.clone(),
202+
hashed_address,
203+
#[cfg(feature = "metrics")]
204+
self.metrics.storage_trie.clone(),
205+
)
206+
.calculate(retain_updates)?
207+
}
182208
}
183209
};
184210

crates/trie/prefetch/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ tracing.workspace = true
3333
thiserror.workspace = true
3434
derive_more.workspace = true
3535
rayon.workspace = true
36+
dashmap = "6.1.0"
3637

3738
# async
3839
tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] }

0 commit comments

Comments
 (0)