@@ -17,7 +17,7 @@ use reth_trie_parallel::{parallel_root::ParallelStateRootError, StorageRootTarge
1717use std:: { collections:: HashMap , sync:: Arc } ;
1818use thiserror:: Error ;
1919use tokio:: {
20- sync:: { mpsc:: UnboundedReceiver , oneshot:: Receiver } ,
20+ sync:: { mpsc:: UnboundedReceiver , oneshot:: Receiver , Mutex } ,
2121 task:: JoinSet ,
2222} ;
2323use tracing:: { debug, trace} ;
@@ -29,11 +29,19 @@ pub struct TriePrefetch {
2929 cached_accounts : HashMap < B256 , bool > ,
3030 /// Cached storages.
3131 cached_storages : HashMap < B256 , HashMap < B256 , bool > > ,
32+ global_stats : Arc < Mutex < GlobalStats > > ,
3233 /// State trie metrics.
3334 #[ cfg( feature = "metrics" ) ]
3435 metrics : TrieRootMetrics ,
3536}
3637
38+ #[ derive( Default , Debug ) ]
39+ pub struct GlobalStats {
40+ pub branch_count : usize ,
41+ pub leaf_count : usize ,
42+ pub missing_count : usize ,
43+ }
44+
3745impl Default for TriePrefetch {
3846 fn default ( ) -> Self {
3947 Self :: new ( )
@@ -48,6 +56,7 @@ impl TriePrefetch {
4856 cached_storages : HashMap :: new ( ) ,
4957 #[ cfg( feature = "metrics" ) ]
5058 metrics : TrieRootMetrics :: default ( ) ,
59+ global_stats : Arc :: new ( Mutex :: new ( GlobalStats :: default ( ) ) ) ,
5160 }
5261 }
5362
@@ -70,8 +79,9 @@ impl TriePrefetch {
7079 let hashed_state = self . deduplicate_and_update_cached( state) ;
7180
7281 let self_clone = Arc :: new( self . clone( ) ) ;
82+ let global_stats = Arc :: clone( & self . global_stats) ;
7383 join_set. spawn( async move {
74- if let Err ( e) = self_clone. prefetch_once:: <DB >( consistent_view, hashed_state) . await {
84+ if let Err ( e) = self_clone. prefetch_once:: <DB >( consistent_view, hashed_state, global_stats ) . await {
7585 debug!( target: "trie::trie_prefetch" , ?e, "Error while prefetching trie storage" ) ;
7686 } ;
7787 } ) ;
@@ -80,6 +90,7 @@ impl TriePrefetch {
8090 _ = & mut interrupt_rx => {
8191 debug!( target: "trie::trie_prefetch" , "Interrupted trie prefetch task. Unprocessed tx {:?}" , prefetch_rx. len( ) ) ;
8292 join_set. abort_all( ) ;
93+ debug!( target: "trie::trie_prefetch" , "test info: prefetch trie node count: {:?}" , self . global_stats. lock( ) . await ) ;
8394 return
8495 }
8596 }
@@ -141,6 +152,7 @@ impl TriePrefetch {
141152 self : Arc < Self > ,
142153 consistent_view : Arc < ConsistentDbView < DB , ProviderFactory < DB > > > ,
143154 hashed_state : HashedPostState ,
155+ global_stats : Arc < Mutex < GlobalStats > > ,
144156 ) -> Result < ( ) , TriePrefetchError >
145157 where
146158 DB : Database ,
@@ -201,22 +213,33 @@ impl TriePrefetch {
201213 match node {
202214 TrieElement :: Branch ( _) => {
203215 tracker. inc_branch ( ) ;
216+ let mut stats = global_stats. lock ( ) . await ;
217+ stats. branch_count += 1 ;
204218 }
205219 TrieElement :: Leaf ( hashed_address, _) => {
206220 match storage_roots. remove ( & hashed_address) {
207- Some ( result) => result,
221+ Some ( result) => {
222+ let mut stats = global_stats. lock ( ) . await ;
223+ stats. leaf_count += 1 ;
224+ result
225+ }
208226 // Since we do not store all intermediate nodes in the database, there might
209227 // be a possibility of re-adding a non-modified leaf to the hash builder.
210- None => StorageRoot :: new_hashed (
211- trie_cursor_factory. clone ( ) ,
212- hashed_cursor_factory. clone ( ) ,
213- hashed_address,
214- #[ cfg( feature = "metrics" ) ]
215- self . metrics . clone ( ) ,
216- )
217- . prefetch ( )
218- . ok ( )
219- . unwrap_or_default ( ) ,
228+ None => {
229+ let mut stats = global_stats. lock ( ) . await ;
230+ stats. missing_count += 1 ;
231+
232+ StorageRoot :: new_hashed (
233+ trie_cursor_factory. clone ( ) ,
234+ hashed_cursor_factory. clone ( ) ,
235+ hashed_address,
236+ #[ cfg( feature = "metrics" ) ]
237+ self . metrics . clone ( ) ,
238+ )
239+ . prefetch ( )
240+ . ok ( )
241+ . unwrap_or_default ( )
242+ }
220243 } ;
221244 tracker. inc_leaf ( ) ;
222245 }
0 commit comments