diff --git a/CHANGELOG.md b/CHANGELOG.md index bd1fdb784c..4a86053596 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ - Added `StoreReplica` gRPC service with endpoints for streaming blocks and proofs ([#1987](https://github.com/0xMiden/node/pull/1987)). - Replaced the network monitor's JavaScript dashboard with a server-rendered Maud + HTMX frontend ([#2024](https://github.com/0xMiden/node/pull/2024)). - [BREAKING] Removed `CheckNullifiers` endpoint ([#2049](https://github.com/0xMiden/node/pull/2049)). +- Replaced blocking-in-async operations in the validator, remote prover, and ntx-builder with `spawn_blocking` to avoid starving the Tokio runtime ([#2041](https://github.com/0xMiden/node/pull/2041)). +- Implemented persistent RocksDB backend for `AccountStateForest`, improving startup time ([#2020](https://github.com/0xMiden/node/pull/2020)). - [BREAKING] `BlockRange.block_to` is now required for all RPC endpoints ([#2056](https://github.com/0xMiden/node/pull/2056)). ## v0.14.10 (2026-05-29) diff --git a/Cargo.lock b/Cargo.lock index eb5f37ee88..af13b1eaa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3051,6 +3051,7 @@ dependencies = [ "rand_core 0.9.5", "rand_hc", "rayon", + "rocksdb", "serde", "sha2", "sha3", diff --git a/bin/genesis/src/main.rs b/bin/genesis/src/main.rs index 0b66588273..4244ee6b13 100644 --- a/bin/genesis/src/main.rs +++ b/bin/genesis/src/main.rs @@ -205,17 +205,17 @@ mod tests { /// Parses the generated genesis.toml, builds a genesis block, and asserts the bridge account /// is included with nonce=1. - async fn assert_valid_genesis_block(dir: &Path) { + fn assert_valid_genesis_block(dir: &Path) { let bridge_id = AccountFile::read(dir.join("bridge.mac")).unwrap().account.id(); let config = GenesisConfig::read_toml_file(&dir.join("genesis.toml")).unwrap(); let signer = SecretKey::read_from_bytes(&[0x01; 32]).unwrap(); - let (state, _) = config.into_state(signer).unwrap(); + let (state, _) = config.into_state(signer.public_key()).unwrap(); let bridge = state.accounts.iter().find(|a| a.id() == bridge_id).unwrap(); assert_eq!(bridge.nonce(), ONE); - state.into_block().await.expect("genesis block should build"); + state.into_block(&signer).expect("genesis block should build"); } #[tokio::test] @@ -229,7 +229,7 @@ mod tests { let ger = AccountFile::read(dir.path().join("ger_manager.mac")).unwrap(); assert_eq!(ger.auth_secret_keys.len(), 1); - assert_valid_genesis_block(dir.path()).await; + assert_valid_genesis_block(dir.path()); } #[tokio::test] @@ -249,6 +249,6 @@ mod tests { let ger = AccountFile::read(dir.path().join("ger_manager.mac")).unwrap(); assert!(ger.auth_secret_keys.is_empty()); - assert_valid_genesis_block(dir.path()).await; + assert_valid_genesis_block(dir.path()); } } diff --git a/bin/node/src/commands/validator.rs b/bin/node/src/commands/validator.rs index 807f5fa129..4658224a23 100644 --- a/bin/node/src/commands/validator.rs +++ b/bin/node/src/commands/validator.rs @@ -6,7 +6,6 @@ use miden_node_store::genesis::config::{AccountFileWithName, GenesisConfig}; use miden_node_utils::clap::GrpcOptionsInternal; use miden_node_utils::fs::ensure_empty_directory; use miden_node_utils::grpc::UrlExt; -use miden_node_utils::signer::BlockSigner; use miden_node_validator::{Validator, ValidatorSigner}; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey; use miden_protocol::utils::serde::{Deserializable, Serializable}; @@ -195,28 +194,14 @@ impl ValidatorCommand { // Bootstrap with KMS key or local key. let signer = validator_key.into_signer().await?; - match signer { - ValidatorSigner::Kms(signer) => { - build_and_write_genesis( - config, - signer, - accounts_directory, - genesis_block_directory, - data_directory, - ) - .await - }, - ValidatorSigner::Local(signer) => { - build_and_write_genesis( - config, - signer, - accounts_directory, - genesis_block_directory, - data_directory, - ) - .await - }, - } + build_and_write_genesis( + config, + signer, + accounts_directory, + genesis_block_directory, + data_directory, + ) + .await } } @@ -224,13 +209,13 @@ impl ValidatorCommand { /// to disk, and initializes the validator's database with the genesis block as the chain tip. async fn build_and_write_genesis( config: GenesisConfig, - signer: impl BlockSigner, + signer: ValidatorSigner, accounts_directory: &Path, genesis_block_directory: &Path, data_directory: &Path, ) -> anyhow::Result<()> { // Build genesis state with the provided signer. - let (genesis_state, secrets) = config.into_state(signer)?; + let (genesis_state, secrets) = config.into_state(signer.public_key())?; // Write account secret files. for item in secrets.as_account_files(&genesis_state) { @@ -246,8 +231,16 @@ async fn build_and_write_genesis( } // Build the signed genesis block. - let genesis_block = - genesis_state.into_block().await.context("failed to build the genesis block")?; + let unsigned_genesis_block = genesis_state + .into_unsigned_block() + .context("failed to build the unsigned genesis block")?; + let signature = signer + .sign(unsigned_genesis_block.header()) + .await + .context("failed to sign the genesis block")?; + let genesis_block = unsigned_genesis_block + .into_block(signature) + .context("failed to build the genesis block")?; // Serialize and write the genesis block to disk. let block_bytes = genesis_block.inner().to_bytes(); diff --git a/bin/remote-prover/src/server/prover.rs b/bin/remote-prover/src/server/prover.rs index 2931cc70fe..06aa761e50 100644 --- a/bin/remote-prover/src/server/prover.rs +++ b/bin/remote-prover/src/server/prover.rs @@ -1,6 +1,7 @@ use miden_block_prover::LocalBlockProver; use miden_node_proto::BlockProofRequest; use miden_node_utils::ErrorReport; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::batch::{ProposedBatch, ProvenBatch}; @@ -112,8 +113,15 @@ impl ProveRequest for LocalBatchProver { type Output = ProvenBatch; async fn prove(&self, input: Self::Input) -> Result { - self.prove(input) - .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch"))) + let prover = self.clone(); + + spawn_blocking_in_current_span(move || { + prover + .prove(input) + .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch"))) + }) + .await + .map_err(|e| tonic::Status::internal(e.as_report_context("batch prover task panicked")))? } } @@ -123,8 +131,15 @@ impl ProveRequest for LocalBlockProver { type Output = BlockProof; async fn prove(&self, input: Self::Input) -> Result { + let prover = self.clone(); let BlockProofRequest { tx_batches, block_header, block_inputs } = input; - self.prove(tx_batches, &block_header, block_inputs) - .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block"))) + + spawn_blocking_in_current_span(move || { + prover + .prove(tx_batches, &block_header, block_inputs) + .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block"))) + }) + .await + .map_err(|e| tonic::Status::internal(e.as_report_context("block prover task panicked")))? } } diff --git a/bin/stress-test/README.md b/bin/stress-test/README.md index d60a611907..557a3a96b7 100644 --- a/bin/stress-test/README.md +++ b/bin/stress-test/README.md @@ -14,20 +14,42 @@ After building the binary, you can run the following command to generate one mil The store file will then be located at `./data/miden-store.sqlite3`. +The seed data can be tuned for account-detail benchmarks: + +- `--public-accounts-percentage` controls how many generated accounts are public. The default is `0`. +- `--storage-map-entries` adds a deterministic storage map with the given number of entries to every public account. The default is `0`. +- `--vault-entries` adds the given number of distinct fungible assets to every public account's vault. The default is `1`, and the value must fit within the protocol note asset limit. +- `--account-update-blocks` appends the given number of blocks after account initialization. These blocks randomly update existing accounts and rotate updates through the seeded storage-map entries. The default is `0`. + +For example, this creates public accounts with storage maps, multiple vault assets, and additional account-update history: + +```bash +miden-node-stress-test seed-store \ + --data-directory ./data \ + --num-accounts 100000 \ + --public-accounts-percentage 50 \ + --storage-map-entries 128 \ + --vault-entries 5 \ + --account-update-blocks 100 +``` + ## Benchmark Store This command allows to run stress tests against the Store component. These tests use the dump file with accounts ids created when seeding the store, so be sure to run the `seed-store` command beforehand. The endpoints that you can test are: -- `load_state` -- `sync_notes` -- `sync_nullifiers` -- `sync_transactions` +- `load-state` +- `get-account` +- `sync-notes` +- `sync-nullifiers` +- `sync-transactions` - `sync-chain-mmr` -Most benchmarks accept options to control the number of iterations and concurrency level. The `load_state` endpoint is different - it simply measures the one-time startup cost of loading the state from disk. +Most benchmarks accept options to control the number of iterations and concurrency level. The `load-state` endpoint is different - it simply measures the one-time startup cost of loading the state from disk. + +The `get-account` benchmark uses the account id dump created by `seed-store`, selects public accounts, and requests account details from the store. Each request asks for vault details and all entries from a storage map slot. By default, it uses the slot created by `--storage-map-entries`: `miden::mock::stress_test::map`. You can request a different slot with `--storage-map-slot`. -**Note on Concurrency**: For the endpoints that support it (`sync_notes`, `sync_nullifiers`), the concurrency parameter controls how many requests are sent in parallel to the store. Since these benchmarks run against a local store (no network overhead), higher concurrency values can help identify bottlenecks in the store's internal processing. The latency measurements exclude network time and represent pure store processing time. +**Note on Concurrency**: For request benchmarks, the concurrency parameter controls how many requests are sent in parallel to the store. Since these benchmarks run against a local store (no network overhead), higher concurrency values can help identify bottlenecks in the store's internal processing. The latency measurements exclude network time and represent pure store processing time. Example usage: @@ -39,6 +61,16 @@ miden-node-stress-test benchmark-store \ sync-notes ``` +To benchmark public account detail loading, seed public accounts first and then run: + +```bash +miden-node-stress-test benchmark-store \ + --data-directory ./data \ + --iterations 10000 \ + --concurrency 16 \ + get-account +``` + ### Results The following results were obtained using a store with 100k accounts, half of which are public. @@ -58,49 +90,49 @@ Average DB growth rate: 325.3 KB per block > Note: Each block contains 256 transactions (16 batches * 16 transactions). -| Block | Insert Time (ms) | Get Block Inputs Time (ms) | Get Batch Inputs Time (ms) | Block Size (KB) | DB Size (MB) | -| ------ | ------------------ | ----------------------------- | ------------------------------ | ------------------ | ------------- | -| 0 | 22 | 1 | 0 | 375.6 | 0.3 | -| 50 | 186 | 9 | 1 | 473.6 | 22.2 | -| 100 | 199 | 10 | 1 | 473.6 | 40.7 | -| 150 | 219 | 10 | 1 | 473.6 | 58.1 | -| 200 | 218 | 11 | 1 | 473.6 | 74.8 | -| 250 | 222 | 11 | 1 | 473.6 | 91.6 | -| 300 | 228 | 12 | 1 | 473.6 | 108.1 | -| 350 | 232 | 13 | 1 | 473.6 | 124.4 | +| Block | Insert Time (ms) | Get Block Inputs Time (ms) | Get Batch Inputs Time (ms) | Block Size (KB) | DB Size (MB) | +| ----- | ---------------- | -------------------------- | -------------------------- | --------------- | ------------ | +| 0 | 22 | 1 | 0 | 375.6 | 0.3 | +| 50 | 186 | 9 | 1 | 473.6 | 22.2 | +| 100 | 199 | 10 | 1 | 473.6 | 40.7 | +| 150 | 219 | 10 | 1 | 473.6 | 58.1 | +| 200 | 218 | 11 | 1 | 473.6 | 74.8 | +| 250 | 222 | 11 | 1 | 473.6 | 91.6 | +| 300 | 228 | 12 | 1 | 473.6 | 108.1 | +| 350 | 232 | 13 | 1 | 473.6 | 124.4 | #### Database stats > Note: Database contains 100215 accounts and 100215 notes across all blocks. -| Table | Size (MB) | KB/Entry | -| ---------------------------------- | --------------- | ---------- | -| accounts | 26.1 | 0.3 | -| account_deltas | 1.2 | 0.0 | -| account_fungible_asset_deltas | 2.2 | 0.0 | -| account_non_fungible_asset_updates | 0.0 | - | -| account_storage_map_updates | 0.0 | - | -| account_storage_slot_updates | 3.1 | 0.1 | -| block_headers | 0.1 | 0.3 | -| notes | 49.1 | 0.5 | -| note_scripts | 0.0 | 8.0 | -| nullifiers | 4.6 | 0.0 | -| transactions | 6.0 | 0.1 | +| Table | Size (MB) | KB/Entry | +| ---------------------------------- | --------- | -------- | +| accounts | 26.1 | 0.3 | +| account_deltas | 1.2 | 0.0 | +| account_fungible_asset_deltas | 2.2 | 0.0 | +| account_non_fungible_asset_updates | 0.0 | - | +| account_storage_map_updates | 0.0 | - | +| account_storage_slot_updates | 3.1 | 0.1 | +| block_headers | 0.1 | 0.3 | +| notes | 49.1 | 0.5 | +| note_scripts | 0.0 | 8.0 | +| nullifiers | 4.6 | 0.0 | +| transactions | 6.0 | 0.1 | #### Index stats -| Index | Size (MB) | -| ---------------------------------- | --------------- | -| idx_accounts_network_prefix | 0.0 | -| idx_notes_note_id | 4.4 | -| idx_notes_sender | 2.9 | -| idx_notes_tag | 1.6 | -| idx_notes_nullifier | 4.4 | -| idx_unconsumed_network_notes | 1.1 | -| idx_nullifiers_prefix | 4.3 | -| idx_nullifiers_block_num | 4.2 | -| idx_transactions_account_id | 5.6 | -| idx_transactions_block_num | 4.2 | +| Index | Size (MB) | +| ---------------------------- | --------- | +| idx_accounts_network_prefix | 0.0 | +| idx_notes_note_id | 4.4 | +| idx_notes_sender | 2.9 | +| idx_notes_tag | 1.6 | +| idx_notes_nullifier | 4.4 | +| idx_unconsumed_network_notes | 1.1 | +| idx_nullifiers_prefix | 4.3 | +| idx_nullifiers_block_num | 4.2 | +| idx_transactions_account_id | 5.6 | +| idx_transactions_block_num | 4.2 | Current results of the store stress-tests: @@ -175,5 +207,22 @@ Pagination statistics: Average pages per run: 1.00 ``` +- get-account +``` bash +$ miden-node-stress-test benchmark-store --data-directory ./data --iterations 10000 --concurrency 16 get-account + +Average request latency: 937.969µs +P50 request latency: 688.332µs +P95 request latency: 932.549µs +P99 request latency: 1.119977ms +P99.9 request latency: 42.992839ms +GetAccount statistics: + Total runs: 10000 + Storage map limit exceeded responses: 0 + Average returned storage map entries: 64.00 + Vault limit exceeded responses: 0 + Average returned vault assets: 2.00 +``` + ## License This project is [MIT licensed](../../LICENSE). diff --git a/bin/stress-test/src/main.rs b/bin/stress-test/src/main.rs index a5cc82f9f4..9e02f4dd55 100644 --- a/bin/stress-test/src/main.rs +++ b/bin/stress-test/src/main.rs @@ -4,6 +4,7 @@ use clap::{Parser, Subcommand}; use miden_node_utils::logging::OpenTelemetry; use seeding::seed_store; use store::{ + bench_get_account, bench_sync_chain_mmr, bench_sync_notes, bench_sync_nullifiers, @@ -39,6 +40,18 @@ pub enum Command { /// private accounts. #[arg(short, long, value_name = "PUBLIC_ACCOUNTS_PERCENTAGE", default_value = "0")] public_accounts_percentage: u8, + + /// Number of entries to add to a deterministic storage map on every public account. + #[arg(long, value_name = "STORAGE_MAP_ENTRIES", default_value = "0")] + storage_map_entries: usize, + + /// Number of distinct vault assets to add to every public account. + #[arg(long, value_name = "VAULT_ENTRIES", default_value = "1")] + vault_entries: usize, + + /// Number of post-initialization blocks to generate with random account updates. + #[arg(long, value_name = "ACCOUNT_UPDATE_BLOCKS", default_value = "0")] + account_update_blocks: usize, }, /// Benchmark the performance of the store endpoints. @@ -62,7 +75,7 @@ pub enum Command { }, } -#[derive(Subcommand, Clone, Copy)] +#[derive(Subcommand, Clone)] pub enum Endpoint { #[command(name = "sync-nullifiers")] SyncNullifiers { @@ -89,6 +102,12 @@ pub enum Endpoint { }, #[command(name = "load-state")] LoadState, + #[command(name = "get-account")] + GetAccount { + /// Storage slot name to request with all entries. + #[arg(long, value_name = "SLOT_NAME", default_value = seeding::BENCHMARK_STORAGE_MAP_SLOT_NAME)] + storage_map_slot: String, + }, } #[tokio::main] @@ -103,8 +122,19 @@ async fn main() { data_directory, num_accounts, public_accounts_percentage, + storage_map_entries, + vault_entries, + account_update_blocks, } => { - seed_store(data_directory, num_accounts, public_accounts_percentage).await; + seed_store( + data_directory, + num_accounts, + public_accounts_percentage, + storage_map_entries, + vault_entries, + account_update_blocks, + ) + .await; }, Command::BenchmarkStore { endpoint, @@ -134,6 +164,9 @@ async fn main() { Endpoint::LoadState => { load_state(&data_directory).await; }, + Endpoint::GetAccount { storage_map_slot } => { + bench_get_account(data_directory, iterations, concurrency, storage_map_slot).await; + }, }, } } diff --git a/bin/stress-test/src/seeding/mod.rs b/bin/stress-test/src/seeding/mod.rs index 5d487e0b8f..76015acf38 100644 --- a/bin/stress-test/src/seeding/mod.rs +++ b/bin/stress-test/src/seeding/mod.rs @@ -7,7 +7,7 @@ use metrics::SeedingMetrics; use miden_node_block_producer::store::StoreClient; use miden_node_proto::domain::batch::BatchInputs; use miden_node_proto::generated::store::rpc_client::RpcClient; -use miden_node_store::{DataDirectory, GenesisState, Store}; +use miden_node_store::{DataDirectory, GenesisState, Store, StoreMode}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::tracing::grpc::OtelInterceptor; use miden_protocol::account::auth::AuthScheme; @@ -15,10 +15,18 @@ use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::account::{ Account, AccountBuilder, + AccountComponent, + AccountComponentMetadata, AccountDelta, AccountId, + AccountStorageDelta, AccountStorageMode, AccountType, + AccountVaultDelta, + StorageMap, + StorageMapKey, + StorageSlot, + StorageSlotName, }; use miden_protocol::asset::{Asset, FungibleAsset, TokenSymbol}; use miden_protocol::batch::{BatchAccountUpdate, BatchId, ProvenBatch}; @@ -34,7 +42,7 @@ use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey as EcdsaSecretKey; use miden_protocol::crypto::dsa::falcon512_poseidon2::{PublicKey, SecretKey}; use miden_protocol::crypto::rand::RandomCoin; use miden_protocol::errors::AssetError; -use miden_protocol::note::{Note, NoteHeader, NoteId, NoteInclusionProof}; +use miden_protocol::note::{Note, NoteAssets, NoteHeader, NoteId, NoteInclusionProof}; use miden_protocol::transaction::{ InputNote, InputNoteCommitment, @@ -59,8 +67,10 @@ use miden_standards::account::policies::{ TokenPolicyManager, }; use miden_standards::account::wallets::BasicWallet; +use miden_standards::code_builder::CodeBuilder; use miden_standards::note::P2idNote; use rand::Rng; +use rand::seq::SliceRandom; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::ParallelSlice; use tokio::io::AsyncWriteExt; @@ -71,6 +81,8 @@ use tonic::transport::Channel; use url::Url; mod metrics; +#[cfg(test)] +mod tests; // CONSTANTS // ================================================================================================ @@ -80,6 +92,8 @@ const TRANSACTIONS_PER_BATCH: usize = 16; pub const ACCOUNTS_FILENAME: &str = "accounts.txt"; +pub const BENCHMARK_STORAGE_MAP_SLOT_NAME: &str = "miden::mock::stress_test::map"; + // SEED STORE // ================================================================================================ @@ -88,8 +102,16 @@ pub async fn seed_store( data_directory: PathBuf, num_accounts: usize, public_accounts_percentage: u8, + storage_map_entries: usize, + vault_entries: usize, + account_update_blocks: usize, ) { let start = Instant::now(); + assert!( + vault_entries <= NoteAssets::MAX_NUM_ASSETS, + "--vault-entries must be at most {}", + NoteAssets::MAX_NUM_ASSETS + ); // Recreate the data directory (it should be empty for store bootstrapping). // @@ -98,14 +120,15 @@ pub async fn seed_store( fs_err::create_dir_all(&data_directory).expect("created data directory"); // generate the faucet account and the genesis state - let faucet = create_faucet(); + let benchmark_faucets = create_benchmark_faucets(vault_entries); + let faucet = benchmark_faucets[0].clone(); + let asset_faucet_ids = benchmark_faucets.iter().map(Account::id).collect::>(); let fee_params = FeeParameters::new(faucet.id(), 0).unwrap(); let signer = EcdsaSecretKey::new(); - let genesis_state = GenesisState::new(vec![faucet.clone()], fee_params, 1, 1, signer.clone()); + let genesis_state = GenesisState::new(benchmark_faucets, fee_params, 1, 1, signer.public_key()); let genesis_block = genesis_state .clone() - .into_block() - .await + .into_block(&signer) .expect("genesis block should be created"); Store::bootstrap(genesis_block, &data_directory).expect("store should bootstrap"); @@ -117,16 +140,20 @@ pub async fn seed_store( let accounts_filepath = data_directory.join(ACCOUNTS_FILENAME); let data_directory = miden_node_store::DataDirectory::load(data_directory).expect("data directory should exist"); - let genesis_block = genesis_state.into_block().await.unwrap().into_inner(); + let genesis_header = genesis_state.into_block(&signer).unwrap().into_inner(); let metrics = generate_blocks( num_accounts, public_accounts_percentage, faucet, - genesis_block, + genesis_header, &store_client, data_directory, accounts_filepath, &signer, + storage_map_entries, + vault_entries, + account_update_blocks, + asset_faucet_ids, ) .await; @@ -139,6 +166,7 @@ pub async fn seed_store( /// The first transaction in each batch sends assets from the faucet to 255 accounts. /// The rest of the transactions consume the notes created by the faucet in the previous block. #[expect(clippy::too_many_arguments)] +#[expect(clippy::too_many_lines)] async fn generate_blocks( num_accounts: usize, public_accounts_percentage: u8, @@ -148,6 +176,10 @@ async fn generate_blocks( data_directory: DataDirectory, accounts_filepath: PathBuf, signer: &EcdsaSecretKey, + storage_map_entries: usize, + vault_entries: usize, + account_update_blocks: usize, + asset_faucet_ids: Vec, ) -> SeedingMetrics { // Each block is composed of [`BATCHES_PER_BLOCK`] batches, and each batch is composed of // [`TRANSACTIONS_PER_BATCH`] txs. The first note of the block is always a send assets tx @@ -158,8 +190,10 @@ async fn generate_blocks( let mut account_ids = vec![]; let mut note_nullifiers = vec![]; + let mut account_states: BTreeMap = BTreeMap::new(); - let mut consume_notes_txs = vec![]; + let mut consume_notes_txs: Vec = vec![]; + let mut pending_consumed_accounts: Vec = vec![]; let consumes_per_block = TRANSACTIONS_PER_BATCH * BATCHES_PER_BLOCK - 1; #[expect(clippy::cast_sign_loss, clippy::cast_precision_loss)] @@ -190,8 +224,10 @@ async fn generate_blocks( AccountStorageMode::Public, &key_pair, &rng, - faucet.id(), + &asset_faucet_ids, i, + storage_map_entries, + vault_entries, ); // create private accounts and notes that mint assets for these accounts @@ -200,8 +236,10 @@ async fn generate_blocks( AccountStorageMode::Private, &key_pair, &rng, - faucet.id(), + &asset_faucet_ids, i, + storage_map_entries, + vault_entries, ); let notes = [pub_notes, priv_notes].concat(); @@ -228,6 +266,8 @@ async fn generate_blocks( // update blocks prev_block_header = apply_block(batches, block_inputs, store_client, &mut metrics, signer).await; + account_states + .extend(pending_consumed_accounts.into_iter().map(|account| (account.id(), account))); if current_anchor_header.block_epoch() != prev_block_header.block_epoch() { current_anchor_header = prev_block_header.clone(); } @@ -235,8 +275,13 @@ async fn generate_blocks( // create the consume notes txs to be used in the next block let batch_inputs = get_batch_inputs(store_client, &prev_block_header, ¬es, &mut metrics).await; - consume_notes_txs = - create_consume_note_txs(&prev_block_header, accounts, notes, &batch_inputs.note_proofs); + (pending_consumed_accounts, consume_notes_txs) = create_consume_note_txs( + &prev_block_header, + accounts, + notes, + &batch_inputs.note_proofs, + None, + ); // track store size every 50 blocks if i % 50 == 0 { @@ -244,6 +289,67 @@ async fn generate_blocks( } } + let update_note_faucet_ids = + asset_faucet_ids.iter().take(vault_entries).copied().collect::>(); + let mut random = rand::rng(); + for update_block_index in 0..account_update_blocks { + let mut block_txs = Vec::with_capacity(BATCHES_PER_BLOCK * TRANSACTIONS_PER_BATCH); + + let selected_account_ids = select_random_account_ids_for_update_notes( + &account_states, + &pending_consumed_accounts, + consumes_per_block, + &mut random, + ); + let notes = { + let mut note_rng = rng.lock().unwrap(); + selected_account_ids + .iter() + .map(|account_id| create_note(&update_note_faucet_ids, *account_id, &mut note_rng)) + .collect::>() + }; + + let emit_note_tx = create_emit_note_tx(&prev_block_header, &mut faucet, notes.clone()); + block_txs.push(emit_note_tx); + block_txs.extend(consume_notes_txs); + + let batches: Vec = block_txs + .par_chunks(TRANSACTIONS_PER_BATCH) + .map(|txs| create_batch(txs, &prev_block_header)) + .collect(); + + let block_inputs = get_block_inputs(store_client, &batches, &mut metrics).await; + + prev_block_header = + apply_block(batches, block_inputs, store_client, &mut metrics, signer).await; + account_states + .extend(pending_consumed_accounts.into_iter().map(|account| (account.id(), account))); + if current_anchor_header.block_epoch() != prev_block_header.block_epoch() { + current_anchor_header = prev_block_header.clone(); + } + + let batch_inputs = + get_batch_inputs(store_client, &prev_block_header, ¬es, &mut metrics).await; + let accounts = selected_account_ids + .iter() + .filter_map(|account_id| account_states.get(account_id).cloned()) + .collect::>(); + (pending_consumed_accounts, consume_notes_txs) = create_consume_note_txs( + &prev_block_header, + accounts, + notes, + &batch_inputs.note_proofs, + Some(BenchmarkStorageUpdate { + block_index: update_block_index, + storage_map_entries, + }), + ); + + if update_block_index % 50 == 0 { + metrics.record_store_size(); + } + } + // dump account ids to a file let mut file = fs::File::create(accounts_filepath).await.unwrap(); for id in account_ids { @@ -296,14 +402,28 @@ fn fee_from_block(block_ref: &BlockHeader) -> Result /// Returns a tuple with: /// - The list of new accounts /// - The list of new notes +#[expect(clippy::too_many_arguments)] fn create_accounts_and_notes( num_accounts: usize, storage_mode: AccountStorageMode, key_pair: &SecretKey, rng: &Arc>, - faucet_id: AccountId, + asset_faucet_ids: &[AccountId], block_num: usize, + storage_map_entries: usize, + vault_entries: usize, ) -> (Vec, Vec) { + assert!( + !asset_faucet_ids.is_empty(), + "at least one faucet id is required to create benchmark notes" + ); + let note_faucet_ids = match storage_mode { + AccountStorageMode::Public => { + asset_faucet_ids.iter().take(vault_entries).copied().collect() + }, + AccountStorageMode::Private | AccountStorageMode::Network => vec![asset_faucet_ids[0]], + }; + (0..num_accounts) .into_par_iter() .map(|account_index| { @@ -311,24 +431,29 @@ fn create_accounts_and_notes( key_pair.public_key(), ((block_num * num_accounts) + account_index) as u64, storage_mode, + storage_map_entries, ); let note = { let mut rng = rng.lock().unwrap(); - create_note(faucet_id, account.id(), &mut rng) + create_note(¬e_faucet_ids, account.id(), &mut rng) }; (account, note) }) .collect() } -/// Creates a public P2ID note containing 10 tokens of the fungible asset associated with the -/// specified `faucet_id` and sent to the specified target account. -fn create_note(faucet_id: AccountId, target_id: AccountId, rng: &mut RandomCoin) -> Note { - let asset = Asset::Fungible(FungibleAsset::new(faucet_id, 10).unwrap()); +/// Creates a public P2ID note containing 10 tokens for each requested fungible asset and sends it +/// to the specified target account. +fn create_note(faucet_ids: &[AccountId], target_id: AccountId, rng: &mut RandomCoin) -> Note { + let assets = faucet_ids + .iter() + .map(|faucet_id| Asset::Fungible(FungibleAsset::new(*faucet_id, 10).unwrap())) + .collect(); + let sender = faucet_ids.first().copied().unwrap_or(target_id); P2idNote::create( - faucet_id, + sender, target_id, - vec![asset], + assets, miden_protocol::note::NoteType::Public, miden_protocol::note::NoteAttachment::default(), rng, @@ -336,25 +461,122 @@ fn create_note(faucet_id: AccountId, target_id: AccountId, rng: &mut RandomCoin) .expect("note creation failed") } -/// Creates a new private account with a given public key and anchor block. Generates the seed from -/// the given index. -fn create_account(public_key: PublicKey, index: u64, storage_mode: AccountStorageMode) -> Account { +fn select_random_account_ids_for_update_notes( + account_states: &BTreeMap, + pending_accounts: &[Account], + max_accounts: usize, + rng: &mut R, +) -> Vec { + let mut account_ids = account_states.keys().copied().collect::>(); + for account in pending_accounts { + let account_id = account.id(); + if !account_states.contains_key(&account_id) { + account_ids.push(account_id); + } + } + + account_ids.shuffle(rng); + account_ids.truncate(max_accounts); + account_ids +} + +#[derive(Clone, Copy)] +struct BenchmarkStorageUpdate { + block_index: usize, + storage_map_entries: usize, +} + +fn benchmark_storage_map_update_value(block_index: usize, tx_index: usize, key_index: u32) -> Word { + Word::from([ + Felt::ZERO, + Felt::from(u32::try_from(block_index).expect("update block index fits into u32")), + Felt::from(u32::try_from(tx_index).expect("transaction index fits into u32")), + Felt::from(key_index), + ]) +} + +fn update_benchmark_storage_map_entry( + account: &mut Account, + block_index: usize, + tx_index: usize, + storage_map_entries: usize, +) -> bool { + if !account.is_public() || storage_map_entries == 0 { + return false; + } + + let key_index = + u32::try_from((tx_index % storage_map_entries) + 1).expect("storage map key fits into u32"); + let key = StorageMapKey::from_index(key_index); + let value = benchmark_storage_map_update_value(block_index, tx_index, key_index); + + account + .storage_mut() + .set_map_item(&benchmark_storage_map_slot(), key, value) + .is_ok() +} + +/// Creates a new account with a given public key and storage mode. Generates the seed from the +/// given index. +pub fn benchmark_storage_map_slot() -> StorageSlotName { + StorageSlotName::new(BENCHMARK_STORAGE_MAP_SLOT_NAME).unwrap() +} + +fn create_account( + public_key: PublicKey, + index: u64, + storage_mode: AccountStorageMode, + storage_map_entries: usize, +) -> Account { let init_seed: Vec<_> = index.to_be_bytes().into_iter().chain([0u8; 24]).collect(); - AccountBuilder::new(init_seed.try_into().unwrap()) + let mut builder = AccountBuilder::new(init_seed.try_into().unwrap()) .account_type(AccountType::RegularAccountImmutableCode) .storage_mode(storage_mode) .with_auth_component(AuthSingleSig::new(public_key.into(), AuthScheme::Falcon512Poseidon2)) - .with_component(BasicWallet) - .build() - .unwrap() + .with_component(BasicWallet); + + if storage_mode == AccountStorageMode::Public && storage_map_entries > 0 { + let entries = (1..=storage_map_entries) + .map(|i| { + let i = u32::try_from(i).expect("storage map entry index fits into u32"); + ( + StorageMapKey::from_index(i), + Word::from([Felt::ZERO, Felt::ZERO, Felt::ZERO, Felt::from(i)]), + ) + }) + .collect::>(); + let storage_map = StorageMap::with_entries(entries).unwrap(); + let component_storage = + vec![StorageSlot::with_map(benchmark_storage_map_slot(), storage_map)]; + let component_code = CodeBuilder::default() + .compile_component_code("benchmark::storage_map", "pub proc noop push.0 drop end") + .unwrap(); + let component = AccountComponent::new( + component_code, + component_storage, + AccountComponentMetadata::new( + "benchmark_storage_map", + [AccountType::RegularAccountImmutableCode], + ), + ) + .unwrap(); + builder = builder.with_component(component); + } + + builder.build().unwrap() +} + +fn create_benchmark_faucets(vault_entries: usize) -> Vec { + (0..vault_entries.max(1)) + .map(|index| create_faucet_with_seed(index as u64)) + .collect() } -/// Creates a new faucet account. -fn create_faucet() -> Account { +fn create_faucet_with_seed(index: u64) -> Account { let coin_seed: [u64; 4] = rand::rng().random(); let mut rng = RandomCoin::new(coin_seed.map(Felt::new).into()); let key_pair = SecretKey::with_rng(&mut rng); - let init_seed = [0_u8; 32]; + let init_seed: Vec<_> = index.to_be_bytes().into_iter().chain([0u8; 24]).collect(); let token_symbol = TokenSymbol::new("TEST").unwrap(); let token_metadata = FungibleTokenMetadata::builder( @@ -365,7 +587,8 @@ fn create_faucet() -> Account { ) .build() .unwrap(); - AccountBuilder::new(init_seed) + + AccountBuilder::new(init_seed.try_into().unwrap()) .account_type(AccountType::FungibleFaucet) .storage_mode(AccountStorageMode::Private) .with_component(token_metadata) @@ -410,19 +633,22 @@ fn create_consume_note_txs( accounts: Vec, notes: Vec, note_proofs: &BTreeMap, -) -> Vec { + storage_update: Option, +) -> (Vec, Vec) { accounts .into_iter() .zip(notes) - .map(|(account, note)| { + .enumerate() + .map(|(tx_index, (account, note))| { let inclusion_proof = note_proofs.get(¬e.id()).unwrap(); create_consume_note_tx( block_ref, account, InputNote::authenticated(note, inclusion_proof.clone()), + storage_update.map(|update| (update, tx_index)), ) }) - .collect() + .unzip() } /// Creates a transaction that creates an account and consumes the given input note. @@ -432,17 +658,32 @@ fn create_consume_note_tx( block_ref: &BlockHeader, mut account: Account, input_note: InputNote, -) -> ProvenTransaction { + storage_update: Option<(BenchmarkStorageUpdate, usize)>, +) -> (Account, ProvenTransaction) { let init_hash = account.initial_commitment(); + let is_new_account = account.is_new(); input_note.note().assets().iter().for_each(|asset| { account.vault_mut().add_asset(*asset).unwrap(); }); + if let Some((storage_update, tx_index)) = storage_update { + update_benchmark_storage_map_entry( + &mut account, + storage_update.block_index, + tx_index, + storage_update.storage_map_entries, + ); + } + account.increment_nonce(ONE).unwrap(); let (details, account_delta_commitment) = if account.is_public() { - let account_delta = AccountDelta::try_from(account.clone()).unwrap(); + let account_delta = if is_new_account { + AccountDelta::try_from(account.clone()).unwrap() + } else { + create_existing_account_delta(&account, input_note.note().assets(), storage_update) + }; let commitment = account_delta.clone().to_commitment(); (AccountUpdateDetails::Delta(account_delta), commitment) } else { @@ -457,7 +698,7 @@ fn create_consume_note_tx( details, ) .unwrap(); - ProvenTransaction::new( + let transaction = ProvenTransaction::new( account_update, vec![InputNoteCommitment::from(input_note)], Vec::::new(), @@ -467,7 +708,43 @@ fn create_consume_note_tx( u32::MAX.into(), ExecutionProof::new_dummy(), ) - .unwrap() + .unwrap(); + + (account, transaction) +} + +fn create_existing_account_delta( + account: &Account, + note_assets: &NoteAssets, + storage_update: Option<(BenchmarkStorageUpdate, usize)>, +) -> AccountDelta { + let mut vault_delta = AccountVaultDelta::default(); + for asset in note_assets.iter() { + vault_delta.add_asset(*asset).unwrap(); + } + + let mut storage_delta = AccountStorageDelta::new(); + if let Some((storage_update, tx_index)) = storage_update { + if storage_update.storage_map_entries > 0 + && account.storage().get(&benchmark_storage_map_slot()).is_some() + { + let key_index = u32::try_from((tx_index % storage_update.storage_map_entries) + 1) + .expect("storage map key fits into u32"); + storage_delta + .set_map_item( + benchmark_storage_map_slot(), + StorageMapKey::from_index(key_index), + benchmark_storage_map_update_value( + storage_update.block_index, + tx_index, + key_index, + ), + ) + .unwrap(); + } + } + + AccountDelta::new(account.id(), storage_delta, vault_delta, ONE).unwrap() } /// Creates a transaction from the faucet that creates the given output notes. @@ -589,7 +866,7 @@ pub async fn start_store( task::spawn(async move { Store { rpc_listener, - mode: miden_node_store::StoreMode::BlockProducer { + mode: StoreMode::BlockProducer { block_producer_listener, ntx_builder_listener, block_prover_url: None, diff --git a/bin/stress-test/src/seeding/tests.rs b/bin/stress-test/src/seeding/tests.rs new file mode 100644 index 0000000000..3084d2a8be --- /dev/null +++ b/bin/stress-test/src/seeding/tests.rs @@ -0,0 +1,136 @@ +use miden_protocol::account::StorageSlotContent; + +use super::*; + +fn benchmark_fungible_faucet_ids(vault_entries: usize) -> Vec { + create_benchmark_faucets(vault_entries) + .into_iter() + .map(|account| account.id()) + .collect() +} + +#[test] +fn public_account_can_be_created_with_large_storage_map() { + let coin_seed = [1, 2, 3, 4].map(Felt::new); + let mut rng = RandomCoin::new(coin_seed.into()); + let key_pair = SecretKey::with_rng(&mut rng); + + let account = create_account(key_pair.public_key(), 42, AccountStorageMode::Public, 128); + + let map_slot = account + .storage() + .slots() + .iter() + .find(|slot| slot.name() == &benchmark_storage_map_slot()) + .expect("benchmark storage map slot should exist"); + + let StorageSlotContent::Map(storage_map) = map_slot.content() else { + panic!("benchmark slot should be a storage map"); + }; + + assert_eq!(storage_map.num_entries(), 128); +} + +#[test] +fn private_account_ignores_large_storage_map_entries() { + let coin_seed = [1, 2, 3, 4].map(Felt::new); + let mut rng = RandomCoin::new(coin_seed.into()); + let key_pair = SecretKey::with_rng(&mut rng); + + let account = create_account(key_pair.public_key(), 42, AccountStorageMode::Private, 128); + + assert!( + account + .storage() + .slots() + .iter() + .all(|slot| slot.name() != &benchmark_storage_map_slot()) + ); +} + +#[test] +fn public_account_note_contains_requested_distinct_vault_assets() { + let coin_seed = [1, 2, 3, 4].map(Felt::new); + let rng = Arc::new(Mutex::new(RandomCoin::new(coin_seed.into()))); + let mut key_rng = rng.lock().unwrap(); + let key_pair = SecretKey::with_rng(&mut *key_rng); + drop(key_rng); + + let faucet_ids = benchmark_fungible_faucet_ids(5); + let (_, notes) = create_accounts_and_notes( + 1, + AccountStorageMode::Public, + &key_pair, + &rng, + &faucet_ids, + 0, + 0, + 5, + ); + + let assets = notes[0].assets(); + assert_eq!(assets.num_assets(), 5); + + let distinct_vault_keys = + assets.iter().map(Asset::vault_key).collect::>(); + assert_eq!(distinct_vault_keys.len(), 5); +} + +#[test] +fn private_account_note_keeps_single_vault_asset() { + let coin_seed = [1, 2, 3, 4].map(Felt::new); + let rng = Arc::new(Mutex::new(RandomCoin::new(coin_seed.into()))); + let mut key_rng = rng.lock().unwrap(); + let key_pair = SecretKey::with_rng(&mut *key_rng); + drop(key_rng); + + let faucet_ids = benchmark_fungible_faucet_ids(5); + let (_, notes) = create_accounts_and_notes( + 1, + AccountStorageMode::Private, + &key_pair, + &rng, + &faucet_ids, + 0, + 0, + 5, + ); + + assert_eq!(notes[0].assets().num_assets(), 1); +} + +#[test] +fn public_account_storage_map_entry_can_be_updated_for_benchmark_blocks() { + let coin_seed = [1, 2, 3, 4].map(Felt::new); + let mut rng = RandomCoin::new(coin_seed.into()); + let key_pair = SecretKey::with_rng(&mut rng); + let mut account = create_account(key_pair.public_key(), 42, AccountStorageMode::Public, 4); + + let key = StorageMapKey::from_index(2); + let old_value = account + .storage() + .get_map_item(&benchmark_storage_map_slot(), key.into()) + .unwrap(); + + let updated = update_benchmark_storage_map_entry(&mut account, 3, 9, 4); + + let new_value = account + .storage() + .get_map_item(&benchmark_storage_map_slot(), key.into()) + .unwrap(); + assert!(updated); + assert_ne!(new_value, old_value); + assert_eq!(new_value, benchmark_storage_map_update_value(3, 9, 2)); +} + +#[test] +fn private_account_storage_map_update_is_skipped() { + let coin_seed = [1, 2, 3, 4].map(Felt::new); + let mut rng = RandomCoin::new(coin_seed.into()); + let key_pair = SecretKey::with_rng(&mut rng); + let mut account = create_account(key_pair.public_key(), 42, AccountStorageMode::Private, 4); + + let updated = update_benchmark_storage_map_entry(&mut account, 3, 9, 4); + + assert!(!updated); +} diff --git a/bin/stress-test/src/store/mod.rs b/bin/stress-test/src/store/mod.rs index 8cbda83cf6..e631bf06e9 100644 --- a/bin/stress-test/src/store/mod.rs +++ b/bin/stress-test/src/store/mod.rs @@ -7,6 +7,7 @@ use miden_node_proto::generated::{self as proto}; use miden_node_store::state::State; use miden_node_utils::clap::StorageOptions; use miden_node_utils::tracing::grpc::OtelInterceptor; +use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::note::{NoteDetails, NoteTag}; use miden_protocol::utils::serde::{Deserializable, Serializable}; @@ -34,6 +35,182 @@ const NOTE_IDS_PER_NULLIFIERS_CHECK: usize = 20; /// Number of attempts the benchmark will make to reach the store before proceeding. const STORE_STATUS_RETRIES: usize = 10; +// GET ACCOUNT +// ================================================================================================ + +/// Sends multiple `get_account` requests to the store and prints the performance. +/// +/// Each request asks for all entries in `storage_map_slot`, which is intended to exercise the +/// storage-map reconstruction path for public accounts seeded by this stress-test tool. +pub async fn bench_get_account( + data_directory: PathBuf, + iterations: usize, + concurrency: usize, + storage_map_slot: String, +) { + let accounts_file = data_directory.join(ACCOUNTS_FILENAME); + let accounts = fs::read_to_string(&accounts_file) + .await + .unwrap_or_else(|e| panic!("missing file {}: {e:?}", accounts_file.display())); + let mut account_ids: Vec = accounts + .lines() + .map(|a| AccountId::from_hex(a).expect("invalid account id")) + .filter(AccountId::has_public_state) + .collect(); + + assert!( + !account_ids.is_empty(), + "no public accounts found in {}; seed with --public-accounts-percentage > 0", + accounts_file.display() + ); + + let mut rng = rand::rng(); + account_ids.shuffle(&mut rng); + let mut account_ids = account_ids.into_iter().cycle(); + + let (store_client, _) = start_store(data_directory).await; + + wait_for_store(&store_client).await.unwrap(); + + let request = |_| { + let mut client = store_client.clone(); + let account_id = account_ids.next().expect("cycled public account ids never end"); + let storage_map_slot = storage_map_slot.clone(); + tokio::spawn(async move { get_account(&mut client, account_id, storage_map_slot).await }) + }; + + let results = stream::iter(0..iterations) + .map(request) + .buffer_unordered(concurrency) + .map(|res| res.unwrap()) + .collect::>() + .await; + + let timers_accumulator: Vec = results.iter().map(|r| r.duration).collect(); + print_summary(&timers_accumulator); + + let total_runs = results.len(); + let storage_map_limit_exceeded = + results.iter().filter(|r| r.storage_map_limit_exceeded).count(); + let vault_limit_exceeded = results.iter().filter(|r| r.vault_limit_exceeded).count(); + #[expect(clippy::cast_precision_loss)] + let average_storage_map_entries = if total_runs > 0 { + results.iter().map(|r| r.storage_map_entries as f64).sum::() / total_runs as f64 + } else { + 0.0 + }; + #[expect(clippy::cast_precision_loss)] + let average_vault_assets = if total_runs > 0 { + results.iter().map(|r| r.vault_assets as f64).sum::() / total_runs as f64 + } else { + 0.0 + }; + + println!("GetAccount statistics:"); + println!(" Total runs: {total_runs}"); + println!(" Storage map limit exceeded responses: {storage_map_limit_exceeded}"); + println!(" Average returned storage map entries: {average_storage_map_entries:.2}"); + println!(" Vault limit exceeded responses: {vault_limit_exceeded}"); + println!(" Average returned vault assets: {average_vault_assets:.2}"); +} + +#[derive(Clone)] +struct GetAccountRun { + duration: Duration, + storage_map_entries: usize, + storage_map_limit_exceeded: bool, + vault_assets: usize, + vault_limit_exceeded: bool, +} + +async fn get_account( + api_client: &mut RpcClient>, + account_id: AccountId, + storage_map_slot: String, +) -> GetAccountRun { + use proto::rpc::account_storage_details::account_storage_map_details::Entries; + + let request = get_account_request(account_id, storage_map_slot); + + let start = Instant::now(); + let response = api_client.get_account(request).await.unwrap().into_inner(); + let duration = start.elapsed(); + + let details = response.details; + let map_details = details + .as_ref() + .and_then(|details| details.storage_details.as_ref()) + .and_then(|storage_details| storage_details.map_details.first()); + let (storage_map_entries, storage_map_limit_exceeded) = match map_details { + Some(details) if details.too_many_entries => (0, true), + Some(details) => match &details.entries { + Some(Entries::AllEntries(entries)) => (entries.entries.len(), false), + _ => (0, false), + }, + None => (0, false), + }; + + let vault_details = details.and_then(|details| details.vault_details); + let (vault_assets, vault_limit_exceeded) = match vault_details { + Some(details) if details.too_many_assets => (0, true), + Some(details) => (details.assets.len(), false), + None => (0, false), + }; + + GetAccountRun { + duration, + storage_map_entries, + storage_map_limit_exceeded, + vault_assets, + vault_limit_exceeded, + } +} + +fn get_account_request( + account_id: AccountId, + storage_map_slot: String, +) -> proto::rpc::AccountRequest { + use proto::rpc::account_request::AccountDetailRequest; + use proto::rpc::account_request::account_detail_request::StorageMapDetailRequest; + use proto::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData; + + proto::rpc::AccountRequest { + account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }), + block_num: None, + details: Some(AccountDetailRequest { + code_commitment: None, + asset_vault_commitment: Some(proto::primitives::Digest::from(Word::empty())), + storage_maps: vec![StorageMapDetailRequest { + slot_name: storage_map_slot, + slot_data: Some(SlotData::AllEntries(true)), + }], + }), + } +} + +#[cfg(test)] +mod tests { + use miden_protocol::testing::account_id::ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE; + + use super::*; + + #[test] + fn get_account_request_includes_vault_details() { + let account_id = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE) + .expect("test account id should be valid"); + let request = get_account_request( + account_id, + crate::seeding::BENCHMARK_STORAGE_MAP_SLOT_NAME.to_string(), + ); + + let details = request.details.expect("details should be requested"); + assert!( + details.asset_vault_commitment.is_some(), + "benchmark get-account should request vault asset details" + ); + } +} + // SYNC NOTES // ================================================================================================ diff --git a/clippy.toml b/clippy.toml index 2a5815cec4..9ee9a79b1b 100644 --- a/clippy.toml +++ b/clippy.toml @@ -29,4 +29,7 @@ disallowed-methods = [ { path = "std::path::Path::read_link", reason = "Use fs_err::path::PathExt methods" }, { path = "std::path::Path::symlink_metadata", reason = "Use fs_err::path::PathExt methods" }, { path = "std::path::Path::try_exists", reason = "Use fs_err::path::PathExt methods" }, + + # Use our own `spawn_blocking` wrapper so that the tracing span is correctly propagated + { path = "tokio::task::spawn_blocking", replacement = "miden_node_utils::spawn::spawn_blocking_in_current_span" }, ] diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index 549d76261f..86f7e49d5e 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -6,6 +6,7 @@ use std::time::Duration; use futures::never::Never; use futures::{FutureExt, TryFutureExt}; use miden_node_proto::domain::batch::BatchInputs; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::batch::{BatchId, ProposedBatch, ProvenBatch}; @@ -249,12 +250,14 @@ impl BatchJob { .prove(proposed_batch) .await .map_err(BuildBatchError::RemoteProverClientError), - BatchProver::Local(prover) => tokio::task::spawn_blocking({ + BatchProver::Local(prover) => { let prover = prover.clone(); - move || prover.prove(proposed_batch).map_err(BuildBatchError::ProveBatchError) - }) - .await - .map_err(BuildBatchError::JoinError)?, + spawn_blocking_in_current_span(move || { + prover.prove(proposed_batch).map_err(BuildBatchError::ProveBatchError) + }) + .await + .map_err(BuildBatchError::JoinError)? + }, }?; if proven_batch.proof_security_level() < MIN_PROOF_SECURITY_LEVEL { diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 85acb62bff..3d1613c6cd 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::Context; use futures::FutureExt; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::batch::{OrderedBatches, ProvenBatch}; use miden_protocol::block::{BlockInputs, BlockNumber, ProposedBlock, ProvenBlock, SignedBlock}; @@ -225,7 +226,7 @@ impl BlockBuilder { proposed_block: ProposedBlock, ) -> Result<(OrderedBatches, SignedBlock), BuildBlockError> { // Concurrently build the block and validate it via the validator. - let build_result = tokio::task::spawn_blocking({ + let build_result = spawn_blocking_in_current_span({ let proposed_block = proposed_block.clone(); move || proposed_block.into_header_and_body() }); diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index c3b1d40039..963151cb2b 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -5,6 +5,7 @@ use miden_node_proto::generated::block_producer::api_client as block_producer_cl use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, GenesisState, Store, StoreMode}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fee::test_fee_params; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_validator::{Validator, ValidatorSigner}; use miden_protocol::testing::random_secret_key::random_secret_key; use tokio::net::TcpListener; @@ -130,11 +131,11 @@ async fn start_store( store_addr: std::net::SocketAddr, data_directory: &std::path::Path, ) -> runtime::Runtime { - let genesis_state = GenesisState::new(vec![], test_fee_params(), 1, 1, random_secret_key()); + let signer = random_secret_key(); + let genesis_state = GenesisState::new(vec![], test_fee_params(), 1, 1, signer.public_key()); let genesis_block = genesis_state .clone() - .into_block() - .await + .into_block(&signer) .expect("genesis block should be created"); Store::bootstrap(genesis_block, data_directory).expect("store should bootstrap"); @@ -174,9 +175,11 @@ async fn start_store( /// Shuts down the store runtime properly to allow the database to flush before the temp directory /// is deleted. async fn shutdown_store(store_runtime: runtime::Runtime) { - task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_millis(500))) - .await - .expect("shutdown should complete"); + spawn_blocking_in_current_span(move || { + store_runtime.shutdown_timeout(Duration::from_millis(500)); + }) + .await + .expect("shutdown should complete"); } /// Sends a status request to the block producer to verify connectivity. diff --git a/crates/ntx-builder/src/actor/execute.rs b/crates/ntx-builder/src/actor/execute.rs index fca948e103..6648cacd75 100644 --- a/crates/ntx-builder/src/actor/execute.rs +++ b/crates/ntx-builder/src/actor/execute.rs @@ -1,8 +1,9 @@ use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; use miden_protocol::account::{ @@ -48,7 +49,6 @@ use miden_tx::{ TransactionMastStore, TransactionProverError, }; -use tokio::sync::Mutex; use tracing::{Instrument, instrument}; use crate::COMPONENT; @@ -195,26 +195,40 @@ impl NtxContext { async move { Box::pin(async move { - let data_store = NtxDataStore::new( - account, - chain_tip_header, - chain_mmr, - self.store.clone(), - self.script_cache.clone(), - self.db.clone(), - ); - - // Filter notes. let notes = notes.into_iter().map(AccountTargetNetworkNote::into_note).collect::>(); - let (successful_notes, failed_notes) = - self.filter_notes(&data_store, notes).await?; - // Execute transaction. - let executed_tx = Box::pin(self.execute(&data_store, successful_notes)).await?; - - // Collect scripts fetched from the remote store during execution. - let scripts_to_cache = data_store.take_fetched_scripts().await; + // VM execution (note filtering + transaction execution) is CPU-intensive and may + // not yield between await points. Run it on a dedicated blocking thread while using + // the parent runtime handle to drive async store callbacks. + let ctx = self.clone(); + let handle = tokio::runtime::Handle::current(); + let span = tracing::Span::current(); + + let (executed_tx, failed_notes, scripts_to_cache) = + spawn_blocking_in_current_span(move || { + let data_store = NtxDataStore::new( + account, + chain_tip_header, + chain_mmr, + ctx.store.clone(), + ctx.script_cache.clone(), + ctx.db.clone(), + ); + handle.block_on( + async { + let (successful_notes, failed_notes) = + ctx.filter_notes(&data_store, notes).await?; + let executed_tx = + Box::pin(ctx.execute(&data_store, successful_notes)).await?; + let scripts_to_cache = data_store.take_fetched_scripts(); + Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache)) + } + .instrument(span), + ) + }) + .await + .unwrap_or_else(|err| std::panic::resume_unwind(err.into_panic()))?; // Prove transaction. let tx_inputs: TransactionInputs = executed_tx.into(); @@ -325,7 +339,17 @@ impl NtxContext { } else { // Only perform tx inputs clone for local proving. let tx_inputs = tx_inputs.clone(); - LocalTransactionProver::default().prove(tx_inputs).await + let span = tracing::Span::current(); + + spawn_blocking_in_current_span(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime") + .block_on(LocalTransactionProver::default().prove(tx_inputs).instrument(span)) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())) } .map_err(NtxError::Proving) } @@ -430,19 +454,23 @@ impl NtxDataStore { } /// Returns the list of note scripts fetched from the remote store during execution. - async fn take_fetched_scripts(&self) -> Vec<(Word, NoteScript)> { - self.fetched_scripts.lock().await.drain(..).collect() + fn take_fetched_scripts(&self) -> Vec<(Word, NoteScript)> { + self.fetched_scripts + .lock() + .expect("fetched scripts lock poisoned") + .drain(..) + .collect() } /// Registers storage map slot names for the given account ID and storage header. /// /// These slot names are subsequently used to query for storage map witnesses against the store. - async fn register_storage_map_slots( + fn register_storage_map_slots( &self, account_id: AccountId, storage_header: &AccountStorageHeader, ) { - let mut storage_slots = self.storage_slots.lock().await; + let mut storage_slots = self.storage_slots.lock().expect("storage slots lock poisoned"); for slot_header in storage_header.slots() { if let StorageSlotType::Map = slot_header.slot_type() { storage_slots.insert((account_id, slot_header.value()), slot_header.name().clone()); @@ -471,8 +499,7 @@ impl DataStore for NtxDataStore { } // Register slot names from the native account for later use. - self.register_storage_map_slots(account_id, &self.account.storage().to_header()) - .await; + self.register_storage_map_slots(account_id, &self.account.storage().to_header()); let partial_account = PartialAccount::from(&self.account); Ok((partial_account, self.reference_block.clone(), (*self.chain_mmr).clone())) @@ -498,8 +525,7 @@ impl DataStore for NtxDataStore { self.mast_store.load_account_code(account_inputs.code()); // Register slot names from the foreign account for later use. - self.register_storage_map_slots(foreign_account_id, account_inputs.storage().header()) - .await; + self.register_storage_map_slots(foreign_account_id, account_inputs.storage().header()); Ok(account_inputs) } @@ -536,11 +562,14 @@ impl DataStore for NtxDataStore { async move { // The slot name that corresponds to the given account ID and map root must have been // registered during previous calls of this data store. - let storage_slots = self.storage_slots.lock().await; - let Some(slot_name) = storage_slots.get(&(account_id, map_root)) else { - return Err(DataStoreError::other( - "requested storage slot has not been registered", - )); + let slot_name = { + let storage_slots = self.storage_slots.lock().expect("storage slots lock poisoned"); + let Some(slot_name) = storage_slots.get(&(account_id, map_root)) else { + return Err(DataStoreError::other( + "requested storage slot has not been registered", + )); + }; + slot_name.clone() }; let ref_block = self.reference_block.block_num(); @@ -548,7 +577,7 @@ impl DataStore for NtxDataStore { // Get storage map witness from the store. let witness = self .store - .get_storage_map_witness(account_id, slot_name.clone(), map_key, Some(ref_block)) + .get_storage_map_witness(account_id, slot_name, map_key, Some(ref_block)) .await .map_err(|err| { DataStoreError::other_with_source("failed to get storage map witness", err) @@ -594,7 +623,10 @@ impl DataStore for NtxDataStore { if let Some(script) = maybe_script { // Collect for later persistence by the coordinator. - self.fetched_scripts.lock().await.push((script_root, script.clone())); + self.fetched_scripts + .lock() + .expect("fetched scripts lock poisoned") + .push((script_root, script.clone())); self.script_cache.put(script_root, script.clone()); Ok(Some(script)) } else { diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index 1b67f353b5..4609bebecd 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -17,10 +17,10 @@ use miden_protocol::note::{NoteScript, Nullifier}; use miden_protocol::transaction::TransactionId; use miden_remote_prover_client::RemoteTransactionProver; use miden_tx::FailedNote; -use tokio::sync::{Notify, RwLock, Semaphore, mpsc}; +use tokio::sync::{Notify, Semaphore, mpsc}; use crate::NoteError; -use crate::chain_state::ChainState; +use crate::chain_state::{ChainState, SharedChainState}; use crate::clients::{BlockProducerClient, StoreClient, ValidatorClient}; use crate::db::Db; @@ -65,7 +65,7 @@ pub struct State { /// Local database for account state, notes, and transaction tracking. pub db: Db, /// The latest chain state. A single chain state is shared among all actors. - pub chain: Arc>, + pub chain: Arc, /// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls. pub script_cache: LruCache, } @@ -104,17 +104,16 @@ impl AccountActorContext { /// but this is sufficient for testing coordinator logic (registry, deactivation, etc.). pub fn test(db: &crate::db::Db) -> Self { use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr}; - use tokio::sync::RwLock; use url::Url; - use crate::chain_state::ChainState; + use crate::chain_state::SharedChainState; use crate::clients::StoreClient; use crate::test_utils::mock_block_header; let url = Url::parse("http://127.0.0.1:1").unwrap(); let block_header = mock_block_header(0_u32.into()); let chain_mmr = PartialMmr::from_peaks(MmrPeaks::new(Forest::new(0), vec![]).unwrap()); - let chain_state = Arc::new(RwLock::new(ChainState::new(block_header, chain_mmr))); + let chain_state = Arc::new(SharedChainState::new(block_header, chain_mmr)); let (request_tx, _request_rx) = mpsc::channel(1); Self { @@ -234,7 +233,7 @@ impl AccountActor { } // Determine initial mode by checking DB for available notes. - let block_num = self.state.chain.read().await.chain_tip_header.block_num(); + let block_num = self.state.chain.chain_tip_block_number(); let has_notes = self .state .db @@ -292,7 +291,7 @@ impl AccountActor { let _permit = permit.context("semaphore closed")?; // Read the chain state. - let chain_state = self.state.chain.read().await.clone(); + let chain_state = self.state.chain.get_cloned(); // Query DB for latest account and available notes. let tx_candidate = self.select_candidate_from_db( diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index 12bc442a8b..5acad269f6 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -8,14 +8,14 @@ use miden_node_proto::domain::mempool::MempoolEvent; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::block::BlockHeader; use tokio::net::TcpListener; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; use tonic::Status; use crate::NtxBuilderConfig; use crate::actor::{AccountActorContext, ActorRequest}; -use crate::chain_state::ChainState; +use crate::chain_state::SharedChainState; use crate::clients::StoreClient; use crate::coordinator::Coordinator; use crate::db::Db; @@ -51,7 +51,7 @@ pub struct NetworkTransactionBuilder { /// Database for persistent state. db: Db, /// Shared chain state updated by the event loop and read by actors. - chain_state: Arc>, + chain_state: Arc, /// Context shared with all account actors. actor_context: AccountActorContext, /// Stream of mempool events from the block producer. @@ -70,7 +70,7 @@ impl NetworkTransactionBuilder { coordinator: Coordinator, store: StoreClient, db: Db, - chain_state: Arc>, + chain_state: Arc, actor_context: AccountActorContext, mempool_events: MempoolEventStream, actor_request_rx: mpsc::Receiver, @@ -197,7 +197,7 @@ impl NetworkTransactionBuilder { .context("failed to load account from store")? .context("account should exist in store")?; - let block_num = self.chain_state.read().await.chain_tip_header.block_num(); + let block_num = self.chain_state.chain_tip_block_number(); let notes = self .store .get_unconsumed_network_notes(account_id, block_num.as_u32()) @@ -248,7 +248,7 @@ impl NetworkTransactionBuilder { .await .context("failed to write BlockCommitted to DB")?; - self.update_chain_tip(header.as_ref().clone()).await; + self.update_chain_tip(header.as_ref().clone()); self.coordinator.notify_accounts(&result.accounts_to_notify); Ok(()) }, @@ -289,35 +289,7 @@ impl NetworkTransactionBuilder { } /// Updates the chain tip and prunes old blocks from the MMR. - async fn update_chain_tip(&mut self, tip: BlockHeader) { - let mut chain_state = self.chain_state.write().await; - - // Skip blocks already reflected in the chain state. A `BlockCommitted` event may arrive - // for a block whose state was already loaded from the store during startup: the mempool - // subscription is established first and then the chain tip is fetched, so any block - // committed in that window produces an event for state we have already ingested. - if tip.block_num() <= chain_state.chain_tip_header.block_num() { - tracing::debug!( - event_block = %tip.block_num(), - current_tip = %chain_state.chain_tip_header.block_num(), - "skipping BlockCommitted event for block already in chain state", - ); - return; - } - - // Update MMR which lags by one block. - let mmr_tip = chain_state.chain_tip_header.clone(); - Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true); - - // Set the new tip. - chain_state.chain_tip_header = tip; - - // Keep MMR pruned. - let pruned_block_height = (chain_state - .chain_mmr - .chain_length() - .as_usize() - .saturating_sub(self.config.max_block_count)) as u32; - Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into()); + fn update_chain_tip(&mut self, tip: BlockHeader) { + self.chain_state.update_chain_tip(tip, self.config.max_block_count); } } diff --git a/crates/ntx-builder/src/chain_state.rs b/crates/ntx-builder/src/chain_state.rs index 287c0ba291..12d5b79c57 100644 --- a/crates/ntx-builder/src/chain_state.rs +++ b/crates/ntx-builder/src/chain_state.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use std::sync::{Arc, RwLock}; -use miden_protocol::block::BlockHeader; +use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::crypto::merkle::mmr::PartialMmr; use miden_protocol::transaction::PartialBlockchain; @@ -46,4 +46,58 @@ impl ChainState { pub fn into_parts(self) -> (BlockHeader, Arc) { (self.chain_tip_header, self.chain_mmr) } + + /// Updates the chain tip and prunes old blocks from the MMR. + fn update_chain_tip(&mut self, tip: BlockHeader, max_block_count: usize) { + // Skip blocks already reflected in the chain state. A `BlockCommitted` event may arrive + // for a block whose state was already loaded from the store during startup: the mempool + // subscription is established first and then the chain tip is fetched, so any block + // committed in that window produces an event for state we have already ingested. + if tip.block_num() <= self.chain_tip_header.block_num() { + tracing::debug!( + event_block = %tip.block_num(), + current_tip = %self.chain_tip_header.block_num(), + "skipping BlockCommitted event for block already in chain state", + ); + return; + } + + // Update MMR which lags by one block. + let mmr_tip = self.chain_tip_header.clone(); + Arc::make_mut(&mut self.chain_mmr).add_block(&mmr_tip, true); + + // Set the new tip. + self.chain_tip_header = tip; + + // Keep MMR pruned. + let pruned_block_height = + (self.chain_mmr.chain_length().as_usize().saturating_sub(max_block_count)) as u32; + Arc::make_mut(&mut self.chain_mmr).prune_to(..pruned_block_height.into()); + } +} + +/// A thread-safe wrapper around [`ChainState`] that can be shared across multiple actors. +/// +/// The API guarantees that the lock cannot be held across await points. +pub struct SharedChainState(RwLock); + +impl SharedChainState { + pub fn new(chain_tip_header: BlockHeader, chain_mmr: PartialMmr) -> Self { + Self(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))) + } + + pub(crate) fn chain_tip_block_number(&self) -> BlockNumber { + self.0.read().expect("chain state lock poisoned").chain_tip_header.block_num() + } + + pub(crate) fn update_chain_tip(&self, tip: BlockHeader, max_block_count: usize) { + self.0 + .write() + .expect("chain state lock poisoned") + .update_chain_tip(tip, max_block_count); + } + + pub(crate) fn get_cloned(&self) -> ChainState { + self.0.read().expect("chain state lock poisoned").clone() + } } diff --git a/crates/ntx-builder/src/lib.rs b/crates/ntx-builder/src/lib.rs index 4b70137380..4331428415 100644 --- a/crates/ntx-builder/src/lib.rs +++ b/crates/ntx-builder/src/lib.rs @@ -6,7 +6,7 @@ use std::time::Duration; use actor::{AccountActorContext, ActorConfig, GrpcClients, State}; use anyhow::Context; use builder::MempoolEventStream; -use chain_state::ChainState; +use chain_state::SharedChainState; use clients::{BlockProducerClient, StoreClient, ValidatorClient}; use coordinator::Coordinator; use db::Db; @@ -14,7 +14,7 @@ use futures::TryStreamExt; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; use miden_remote_prover_client::RemoteTransactionProver; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::mpsc; use url::Url; pub(crate) type NoteError = Arc; @@ -289,7 +289,7 @@ impl NtxBuilderConfig { .await .context("failed to upsert chain state")?; - let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))); + let chain_state = Arc::new(SharedChainState::new(chain_tip_header, chain_mmr)); let (request_tx, actor_request_rx) = mpsc::channel(1); diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 8f5547dd41..303a3868d9 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -294,7 +294,25 @@ impl api_server::Api for RpcService { &self, request: Request, ) -> Result, Status> { - debug!(target: COMPONENT, request = ?request.get_ref()); + let request_ref = request.get_ref(); + + let span = Span::current(); + span.set_attribute("block_range.from", request_ref.block_from); + match request_ref.upper_bound { + Some(proto::rpc::sync_chain_mmr_request::UpperBound::BlockNum(block_num)) => { + span.set_attribute("block_range.to", block_num); + }, + Some(proto::rpc::sync_chain_mmr_request::UpperBound::ChainTip(chain_tip)) => { + let chain_tip = proto::rpc::ChainTip::try_from(chain_tip) + .unwrap_or(proto::rpc::ChainTip::Unspecified); + span.set_attribute("sync.target", chain_tip.as_str_name()); + }, + None => { + span.set_attribute("sync.target", "CHAIN_TIP_COMMITTED"); + }, + } + + debug!(target: COMPONENT, request = ?request_ref); self.store.clone().sync_chain_mmr(request).await } diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index 058495b53a..ba7e40814e 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -18,6 +18,7 @@ use miden_node_utils::limiter::{ QueryParamNoteTagLimit, QueryParamNullifierPrefixLimit, }; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_protocol::Word; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::account::{ @@ -527,12 +528,12 @@ async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, So let config = GenesisConfig::default(); let signer = SecretKey::new(); - let (genesis_state, _) = config.into_state(signer).unwrap(); + let (genesis_state, _) = config.into_state(signer.public_key()).unwrap(); let genesis_block = genesis_state .clone() - .into_block() - .await + .into_block(&signer) .expect("genesis block should be created"); + let genesis_commitment = genesis_block.inner().header().commitment(); Store::bootstrap(genesis_block, data_directory.path()).expect("store should bootstrap"); let dir = data_directory.path().to_path_buf(); let store_addr = @@ -565,18 +566,13 @@ async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, So .await .expect("store should start serving"); }); - ( - store_runtime, - data_directory, - genesis_state.into_block().await.unwrap().inner().header().commitment(), - store_addr, - ) + (store_runtime, data_directory, genesis_commitment, store_addr) } /// Shuts down the store runtime properly to allow `RocksDB` to flush before the temp directory is /// deleted. async fn shutdown_store(store_runtime: Runtime) { - task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_secs(3))) + spawn_blocking_in_current_span(move || store_runtime.shutdown_timeout(Duration::from_secs(3))) .await .expect("shutdown should complete"); // Give RocksDB time to release its lock file after the runtime shutdown diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index fc90d7e1c7..2c72ffc7ff 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -75,7 +75,7 @@ tempfile = { workspace = true } [features] default = ["rocksdb"] -rocksdb = ["dep:miden-large-smt-backend-rocksdb", "miden-node-utils/rocksdb"] +rocksdb = ["dep:miden-large-smt-backend-rocksdb", "miden-crypto/persistent-forest", "miden-node-utils/rocksdb"] [[bench]] harness = false diff --git a/crates/store/src/account_state_forest/mod.rs b/crates/store/src/account_state_forest/mod.rs index 0c9adceb3a..26513dc9cd 100644 --- a/crates/store/src/account_state_forest/mod.rs +++ b/crates/store/src/account_state_forest/mod.rs @@ -1,9 +1,13 @@ use std::collections::BTreeSet; +use std::num::NonZeroUsize; use miden_crypto::hash::rpo::Rpo256; -use miden_crypto::merkle::smt::ForestInMemoryBackend; +#[cfg(feature = "rocksdb")] +use miden_crypto::merkle::smt::ForestPersistentBackend; +use miden_crypto::merkle::smt::{Backend, ForestInMemoryBackend}; use miden_node_proto::domain::account::{AccountStorageMapDetails, AccountVaultDetails}; use miden_node_utils::ErrorReport; +use miden_node_utils::lru_cache::LruCache; use miden_protocol::account::delta::{AccountDelta, AccountStorageDelta, AccountVaultDelta}; use miden_protocol::account::{ AccountId, @@ -37,6 +41,8 @@ pub use crate::db::models::queries::HISTORICAL_BLOCK_RETENTION; #[cfg(test)] mod tests; +const HASHED_STORAGE_MAP_KEY_CACHE_CAPACITY: usize = 65_536; + // ERRORS // ================================================================================================ @@ -60,34 +66,81 @@ pub enum WitnessError { AssetError(#[from] AssetError), } +#[cfg(feature = "rocksdb")] +pub(crate) type AccountStateForestBackend = ForestPersistentBackend; +#[cfg(not(feature = "rocksdb"))] +pub(crate) type AccountStateForestBackend = ForestInMemoryBackend; + +const fn empty_smt_root() -> Word { + *EmptySubtreeRoots::entry(SMT_DEPTH, 0) +} + // ACCOUNT STATE FOREST // ================================================================================================ +/// Result of retrieving storage map details for all entries in a storage map. +#[derive(Debug, PartialEq)] +pub enum AccountStorageMapResult { + NotFound, + CannotReconstructKeysFromCache, + Details(AccountStorageMapDetails), +} + /// Container for forest-related state that needs to be updated atomically. -pub(crate) struct AccountStateForest { +pub(crate) struct AccountStateForest { /// `LargeSmtForest` for efficient account storage reconstruction. /// Populated during block import with storage and vault SMTs. - forest: LargeSmtForest, + forest: LargeSmtForest, + + /// Reverse lookup from hashed SMT storage keys to raw storage map keys. + /// + /// Ideally this would be a mapping from `StorageMapKeyHash` to `StorageMapKey` but + /// unfortunately `StorageMapKeyHash` does not implement `Hash`. + storage_map_key_cache: LruCache, } -impl AccountStateForest { +#[cfg(test)] +impl AccountStateForest { pub(crate) fn new() -> Self { - Self { forest: Self::create_forest() } + Self { + forest: Self::create_forest(), + storage_map_key_cache: LruCache::new( + NonZeroUsize::new(HASHED_STORAGE_MAP_KEY_CACHE_CAPACITY) + .expect("storage map key cache capacity must be non-zero"), + ), + } + } + + /// Returns the root of an empty SMT. + pub(crate) const fn empty_smt_root() -> Word { + empty_smt_root() } fn create_forest() -> LargeSmtForest { let backend = ForestInMemoryBackend::new(); LargeSmtForest::new(backend).expect("in-memory backend should initialize") } +} - // HELPERS - // -------------------------------------------------------------------------------------------- +impl AccountStateForest { + pub(crate) fn from_backend(backend: B) -> Result { + Ok(Self { + forest: LargeSmtForest::new(backend)?, + storage_map_key_cache: LruCache::new( + NonZeroUsize::new(HASHED_STORAGE_MAP_KEY_CACHE_CAPACITY) + .expect("storage map key cache capacity must be non-zero"), + ), + }) + } - /// Returns the root of an empty SMT. - const fn empty_smt_root() -> Word { - *EmptySubtreeRoots::entry(SMT_DEPTH, 0) + #[cfg(feature = "rocksdb")] + pub(crate) fn lineage_count(&self) -> usize { + self.forest.lineage_count() } + // HELPERS + // -------------------------------------------------------------------------------------------- + #[cfg(test)] fn tree_id_for_root( &self, @@ -136,6 +189,24 @@ impl AccountStateForest { .collect() } + fn cache_storage_map_keys_from_delta(&mut self, delta: &AccountDelta) { + let raw_keys = delta + .storage() + .maps() + .flat_map(|(_slot_name, map_delta)| map_delta.entries().keys().copied()); + self.cache_storage_map_keys(raw_keys); + } + + pub(crate) fn cache_storage_map_keys(&self, raw_keys: impl IntoIterator) { + self.storage_map_key_cache + .put_many(raw_keys.into_iter().map(|raw_key| (raw_key.hash().into(), raw_key))); + } + + #[cfg(test)] + fn clear_storage_map_key_cache(&self) { + self.storage_map_key_cache.clear(); + } + fn apply_forest_updates( &mut self, lineage: LineageId, @@ -314,6 +385,84 @@ impl AccountStateForest { Some(proofs.map(|proofs| AccountStorageMapDetails::from_proofs(slot_name, proofs))) } + /// Enumerates a storage map as it is stored in the SMT. + /// + /// Storage map keys are hashed before insertion, so returned keys are hashed SMT keys rather + /// than the raw [`StorageMapKey`] values supplied by users. + /// + /// Returns `None` when no storage root is tracked for this account/slot/block combination. + /// Returns at most `limit` entries. + fn get_storage_map_entries( + &self, + account_id: AccountId, + slot_name: &StorageSlotName, + block_num: BlockNumber, + limit: usize, + ) -> Option, MerkleError>> { + let lineage = Self::storage_lineage_id(account_id, slot_name); + let tree = self.get_tree_id(lineage, block_num)?; + + Some( + self.forest + .entries(tree) + .map_err(Self::map_forest_error) + .map(|entries| entries.take(limit).map(|entry| (entry.key, entry.value)).collect()), + ) + } + + /// Returns all storage map entries when the forest and reverse-key cache contain enough data. + /// + /// Returns `AccountStorageMapResult::NotFound` when no storage root is tracked for this + /// account/slot/block combination. + /// Returns `AccountStorageMapResult::CannotReconstructKeysFromCache` when the forest has hashed + /// entries but at least one raw key is missing from the reverse-key cache, so the caller + /// should fall back to database reconstruction. + #[instrument(target = COMPONENT, skip_all)] + pub(crate) fn get_storage_map_details_for_all_entries( + &self, + account_id: AccountId, + slot_name: StorageSlotName, + block_num: BlockNumber, + ) -> Result { + let Some(hashed_entries) = self + .get_storage_map_entries( + account_id, + &slot_name, + block_num, + AccountStorageMapDetails::MAX_RETURN_ENTRIES + 1, + ) + .transpose()? + else { + return Ok(AccountStorageMapResult::NotFound); + }; + + if hashed_entries.len() > AccountStorageMapDetails::MAX_RETURN_ENTRIES { + return Ok(AccountStorageMapResult::Details(AccountStorageMapDetails { + slot_name, + entries: miden_node_proto::domain::account::StorageMapEntries::LimitExceeded, + })); + } + + let raw_keys = self + .storage_map_key_cache + .get_many(hashed_entries.iter().map(|(hashed_key, _)| hashed_key)); + if raw_keys.iter().any(Option::is_none) { + return Ok(AccountStorageMapResult::CannotReconstructKeysFromCache); + } + + let mut entries = raw_keys + .into_iter() + .flatten() + .zip(hashed_entries) + .map(|(raw_key, (_hashed_key, value))| (raw_key, value)) + .collect::>(); + entries.sort_by(|(key_a, _), (key_b, _)| key_a.cmp(key_b)); + + Ok(AccountStorageMapResult::Details(AccountStorageMapDetails::from_forest_entries( + slot_name, entries, + ))) + } + // PUBLIC INTERFACE // -------------------------------------------------------------------------------------------- @@ -387,6 +536,8 @@ impl AccountStateForest { self.update_account_storage(block_num, account_id, delta.storage()); } + self.cache_storage_map_keys_from_delta(delta); + Ok(()) } @@ -395,9 +546,9 @@ impl AccountStateForest { /// Retrieves the most recent vault SMT root for an account. If no vault root is found for the /// account, returns an empty SMT root. - fn get_latest_vault_root(&self, account_id: AccountId) -> Word { + pub(crate) fn get_latest_vault_root(&self, account_id: AccountId) -> Word { let lineage = Self::vault_lineage_id(account_id); - self.forest.latest_root(lineage).unwrap_or_else(Self::empty_smt_root) + self.forest.latest_root(lineage).unwrap_or_else(empty_smt_root) } /// Inserts asset vault data into the forest for the specified account. Assumes that asset @@ -410,7 +561,7 @@ impl AccountStateForest { ) -> Result<(), AccountStateForestError> { let prev_root = self.get_latest_vault_root(account_id); let lineage = Self::vault_lineage_id(account_id); - assert_eq!(prev_root, Self::empty_smt_root(), "account should not be in the forest"); + assert_eq!(prev_root, empty_smt_root(), "account should not be in the forest"); assert!( self.forest.latest_version(lineage).is_none(), "account should not be in the forest" @@ -480,7 +631,7 @@ impl AccountStateForest { for (slot_name, map_delta) in storage_delta.maps() { // get the latest root for this map, and make sure the root is for an empty tree let prev_root = self.get_latest_storage_map_root(account_id, slot_name); - assert_eq!(prev_root, Self::empty_smt_root(), "account should not be in the forest"); + assert_eq!(prev_root, empty_smt_root(), "account should not be in the forest"); let raw_map_entries: Vec<(StorageMapKey, Word)> = Vec::from_iter(map_delta.entries().iter().filter_map(|(&key, &value)| { @@ -612,13 +763,13 @@ impl AccountStateForest { // -------------------------------------------------------------------------------------------- /// Retrieves the most recent storage map SMT root for an account slot. - fn get_latest_storage_map_root( + pub(crate) fn get_latest_storage_map_root( &self, account_id: AccountId, slot_name: &StorageSlotName, ) -> Word { let lineage = Self::storage_lineage_id(account_id, slot_name); - self.forest.latest_root(lineage).unwrap_or_else(Self::empty_smt_root) + self.forest.latest_root(lineage).unwrap_or_else(empty_smt_root) } /// Updates the forest with storage map changes from a delta. diff --git a/crates/store/src/account_state_forest/tests.rs b/crates/store/src/account_state_forest/tests.rs index 6d5dd7011c..fb252f09d6 100644 --- a/crates/store/src/account_state_forest/tests.rs +++ b/crates/store/src/account_state_forest/tests.rs @@ -661,6 +661,82 @@ fn storage_map_open_returns_proofs() { }); } +#[test] +fn storage_map_all_entries_returns_raw_keys_after_update() { + use std::collections::BTreeMap; + + use miden_protocol::account::delta::{StorageMapDelta, StorageSlotDelta}; + + let mut forest = AccountStateForest::new(); + let account_id = dummy_account(); + let slot_name = StorageSlotName::mock(6); + let block_num = BlockNumber::GENESIS.child(); + let raw_key = StorageMapKey::from_index(42); + let value = Word::from([42u32, 0, 0, 0]); + + let mut map_delta = StorageMapDelta::default(); + map_delta.insert(raw_key, value); + let raw = BTreeMap::from_iter([(slot_name.clone(), StorageSlotDelta::Map(map_delta))]); + let storage_delta = AccountStorageDelta::from_raw(raw); + let delta = dummy_partial_delta(account_id, AccountVaultDelta::default(), storage_delta); + forest.update_account(block_num, &delta).unwrap(); + + let result = forest + .get_storage_map_details_for_all_entries(account_id, slot_name.clone(), block_num) + .expect("forest lookup should not fail"); + + assert_eq!( + result, + AccountStorageMapResult::Details(AccountStorageMapDetails::from_forest_entries( + slot_name, + vec![(raw_key, value)] + )) + ); +} + +#[test] +fn storage_map_all_entries_returns_cache_miss_when_raw_key_is_not_cached() { + use std::collections::BTreeMap; + + use miden_protocol::account::delta::{StorageMapDelta, StorageSlotDelta}; + + let mut forest = AccountStateForest::new(); + let account_id = dummy_account(); + let slot_name = StorageSlotName::mock(7); + let block_num = BlockNumber::GENESIS.child(); + let raw_key = StorageMapKey::from_index(43); + let value = Word::from([43u32, 0, 0, 0]); + + let mut map_delta = StorageMapDelta::default(); + map_delta.insert(raw_key, value); + let raw = BTreeMap::from_iter([(slot_name.clone(), StorageSlotDelta::Map(map_delta))]); + let storage_delta = AccountStorageDelta::from_raw(raw); + let delta = dummy_partial_delta(account_id, AccountVaultDelta::default(), storage_delta); + forest.update_account(block_num, &delta).unwrap(); + + forest.clear_storage_map_key_cache(); + + let result = forest + .get_storage_map_details_for_all_entries(account_id, slot_name.clone(), block_num) + .expect("forest lookup should not fail"); + + assert_eq!(result, AccountStorageMapResult::CannotReconstructKeysFromCache); + + forest.cache_storage_map_keys([raw_key]); + + let result = forest + .get_storage_map_details_for_all_entries(account_id, slot_name.clone(), block_num) + .expect("forest lookup should not fail"); + + assert_eq!( + result, + AccountStorageMapResult::Details(AccountStorageMapDetails::from_forest_entries( + slot_name, + vec![(raw_key, value)] + )) + ); +} + #[test] fn storage_map_key_hashing_and_raw_entries_are_consistent() { use std::collections::BTreeMap; diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 89b36a7fae..7653e2537c 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -5,11 +5,10 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::Context; -use diesel::{Connection, QueryableByName, RunQueryDsl, SqliteConnection}; +use diesel::{Connection, SqliteConnection}; use miden_node_proto::domain::account::AccountInfo; use miden_node_proto::{BlockProofRequest, generated as proto}; use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES; -use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; use miden_protocol::account::{AccountHeader, AccountId, AccountStorageHeader, StorageMapKey}; use miden_protocol::asset::{Asset, AssetVaultKey}; @@ -35,6 +34,7 @@ pub use crate::db::models::queries::{ AccountCommitmentsPage, NullifiersPage, PublicAccountIdsPage, + PublicAccountStateRootsPage, }; use crate::db::models::queries::{BlockHeaderCommitment, StorageMapValuesPage}; use crate::db::models::{Page, queries}; @@ -409,6 +409,19 @@ impl Db { .await } + /// Returns a page of public account state roots for forest consistency verification. + #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] + pub async fn select_public_account_state_roots_paged( + &self, + page_size: std::num::NonZeroUsize, + after_account_id: Option, + ) -> Result { + self.transact("read public account state roots paged", move |conn| { + queries::select_public_account_state_roots_paged(conn, page_size, after_account_id) + }) + .await + } + /// Loads public account details from the DB. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn select_account(&self, id: AccountId) -> Result { @@ -735,47 +748,6 @@ impl Db { }) } - /// Emits size metrics for each table in the database, and the entire database. - #[instrument(target = COMPONENT, skip_all, err)] - pub async fn analyze_table_sizes(&self) -> Result<(), DatabaseError> { - self.transact("db analysis", |conn| { - #[derive(QueryableByName)] - struct TotalSize { - #[diesel(sql_type = diesel::sql_types::BigInt)] - size: i64, - } - - #[derive(QueryableByName)] - struct Table { - #[diesel(sql_type = diesel::sql_types::Text)] - name: String, - #[diesel(sql_type = diesel::sql_types::BigInt)] - size: i64, - } - - let tables = - diesel::sql_query("SELECT name, sum(payload) AS size FROM dbstat GROUP BY name") - .load::(conn)?; - - let span = tracing::Span::current(); - for Table { name, size } in tables { - span.set_attribute(format!("database.table.{name}.size"), size); - } - - let total = diesel::sql_query( - "SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()", - ) - .get_result::(conn)?; - span.set_attribute("database.total.size", total.size); - - Result::<_, DatabaseError>::Ok(()) - }) - .await - .inspect_err(|err| tracing::Span::current().set_error(err))?; - - Ok(()) - } - /// Loads the network notes for an account that are unconsumed by a specified block number. /// Pagination is used to limit the number of notes returned. pub(crate) async fn select_unconsumed_network_notes( diff --git a/crates/store/src/db/models/queries/accounts.rs b/crates/store/src/db/models/queries/accounts.rs index 2ac0bd847b..f69846a58a 100644 --- a/crates/store/src/db/models/queries/accounts.rs +++ b/crates/store/src/db/models/queries/accounts.rs @@ -340,6 +340,24 @@ pub struct PublicAccountIdsPage { pub next_cursor: Option, } +/// Latest account state forest roots for a public account. +#[derive(Debug)] +pub struct PublicAccountStateRoots { + pub account_id: AccountId, + pub vault_root: Word, + pub storage_header: AccountStorageHeader, +} + +/// Page of public account state roots returned by +/// [`select_public_account_state_roots_paged`]. +#[derive(Debug)] +pub struct PublicAccountStateRootsPage { + /// The public account state roots in this page. + pub accounts: Vec, + /// If `Some`, there are more results. Use this as the `after_account_id` for the next page. + pub next_cursor: Option, +} + /// Selects public account IDs with pagination. /// /// Returns up to `page_size` public account IDs, starting after `after_account_id` if provided. @@ -400,6 +418,94 @@ pub(crate) fn select_public_account_ids_paged( Ok(PublicAccountIdsPage { account_ids, next_cursor }) } +/// Selects public account vault roots and storage headers with pagination. +/// +/// Returns up to `page_size` public account states, starting after `after_account_id` if provided. +/// Results are ordered by `account_id` for stable pagination. +/// +/// Public accounts are those with `AccountStorageMode::Public` or `AccountStorageMode::Network`. +/// We identify them by checking `code_commitment IS NOT NULL` - public accounts store their full +/// state (including `code_commitment`), while private accounts only store the `account_commitment`. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT +/// account_id, +/// vault_root, +/// storage_header +/// FROM +/// accounts +/// WHERE +/// is_latest = 1 +/// AND code_commitment IS NOT NULL +/// AND (account_id > :after_account_id OR :after_account_id IS NULL) +/// ORDER BY +/// account_id ASC +/// LIMIT :page_size + 1 +/// ``` +pub(crate) fn select_public_account_state_roots_paged( + conn: &mut SqliteConnection, + page_size: NonZeroUsize, + after_account_id: Option, +) -> Result { + #[expect(clippy::cast_possible_wrap)] + let limit = (page_size.get() + 1) as i64; + + let mut query = SelectDsl::select( + schema::accounts::table, + ( + schema::accounts::account_id, + schema::accounts::vault_root, + schema::accounts::storage_header, + ), + ) + .filter(schema::accounts::is_latest.eq(true)) + .filter(schema::accounts::code_commitment.is_not_null()) + .order_by(schema::accounts::account_id.asc()) + .limit(limit) + .into_boxed(); + + if let Some(cursor) = after_account_id { + query = query.filter(schema::accounts::account_id.gt(cursor.to_bytes())); + } + + let raw = query.load::<(Vec, Option>, Option>)>(conn)?; + + let mut accounts: Vec = Result::from_iter(raw.into_iter().map( + |(account_id_bytes, vault_root_bytes, storage_header_bytes)| { + let account_id = AccountId::read_from_bytes(&account_id_bytes) + .map_err(DatabaseError::DeserializationError)?; + let vault_root_bytes = vault_root_bytes.ok_or_else(|| { + DatabaseError::DataCorrupted(format!( + "public account {account_id} is missing a vault root" + )) + })?; + let storage_header_bytes = storage_header_bytes.ok_or_else(|| { + DatabaseError::DataCorrupted(format!( + "public account {account_id} is missing a storage header" + )) + })?; + + Ok::<_, DatabaseError>(PublicAccountStateRoots { + account_id, + vault_root: Word::read_from_bytes(&vault_root_bytes)?, + storage_header: AccountStorageHeader::read_from_bytes(&storage_header_bytes)?, + }) + }, + ))?; + + // If we got more than page_size, there are more results. + let next_cursor = if accounts.len() > page_size.get() { + accounts.pop(); + accounts.last().map(|account| account.account_id) + } else { + None + }; + + Ok(PublicAccountStateRootsPage { accounts, next_cursor }) +} + /// Select account vault assets within a block range (inclusive). /// /// # Parameters diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index 1c582066ae..7cf6720182 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -2008,9 +2008,10 @@ async fn genesis_with_account_assets() { .build_existing() .unwrap(); + let signer = random_secret_key(); let genesis_state = - GenesisState::new(vec![account], test_fee_params(), 1, 0, random_secret_key()); - let genesis_block = genesis_state.into_block().await.unwrap(); + GenesisState::new(vec![account], test_fee_params(), 1, 0, signer.public_key()); + let genesis_block = genesis_state.into_block(&signer).unwrap(); crate::db::Db::bootstrap(":memory:".into(), genesis_block).unwrap(); } @@ -2063,9 +2064,10 @@ async fn genesis_with_account_storage_map() { .build_existing() .unwrap(); + let signer = random_secret_key(); let genesis_state = - GenesisState::new(vec![account], test_fee_params(), 1, 0, random_secret_key()); - let genesis_block = genesis_state.into_block().await.unwrap(); + GenesisState::new(vec![account], test_fee_params(), 1, 0, signer.public_key()); + let genesis_block = genesis_state.into_block(&signer).unwrap(); crate::db::Db::bootstrap(":memory:".into(), genesis_block).unwrap(); } @@ -2116,9 +2118,10 @@ async fn genesis_with_account_assets_and_storage() { .build_existing() .unwrap(); + let signer = random_secret_key(); let genesis_state = - GenesisState::new(vec![account], test_fee_params(), 1, 0, random_secret_key()); - let genesis_block = genesis_state.into_block().await.unwrap(); + GenesisState::new(vec![account], test_fee_params(), 1, 0, signer.public_key()); + let genesis_block = genesis_state.into_block(&signer).unwrap(); crate::db::Db::bootstrap(":memory:".into(), genesis_block).unwrap(); } @@ -2207,14 +2210,15 @@ async fn genesis_with_multiple_accounts() { .build_existing() .unwrap(); + let signer = random_secret_key(); let genesis_state = GenesisState::new( vec![account1, account2, account3], test_fee_params(), 1, 0, - random_secret_key(), + signer.public_key(), ); - let genesis_block = genesis_state.into_block().await.unwrap(); + let genesis_block = genesis_state.into_block(&signer).unwrap(); crate::db::Db::bootstrap(":memory:".into(), genesis_block).unwrap(); } diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index 12df2d5bee..30dc17176f 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -120,6 +120,8 @@ pub enum StateInitializationError { AccountTreeIoError(String), #[error("nullifier tree IO error: {0}")] NullifierTreeIoError(String), + #[error("account state forest IO error: {0}")] + AccountStateForestIoError(String), #[error("database error")] DatabaseError(#[from] DatabaseError), #[error("failed to create nullifier tree")] @@ -145,6 +147,17 @@ pub enum StateInitializationError { tree_root: Word, block_root: Word, }, + #[error( + "account state forest root ({forest_root}) does not match SQLite root \ + ({database_root}) for account {account_id}, slot {slot_name:?}. Delete the account \ + state forest storage directory and restart the node to rebuild from the database." + )] + AccountStateForestStorageDiverged { + account_id: AccountId, + slot_name: Option, + forest_root: Word, + database_root: Word, + }, #[error("public account {0} is missing details in database")] PublicAccountMissingDetails(AccountId), #[error("failed to convert account to delta: {0}")] diff --git a/crates/store/src/genesis/config/mod.rs b/crates/store/src/genesis/config/mod.rs index 66c53307d9..5b14c651d5 100644 --- a/crates/store/src/genesis/config/mod.rs +++ b/crates/store/src/genesis/config/mod.rs @@ -6,7 +6,6 @@ use std::str::FromStr; use indexmap::IndexMap; use miden_node_utils::crypto::get_rpo_random_coin; -use miden_node_utils::signer::BlockSigner; use miden_protocol::account::auth::{AuthScheme, AuthSecretKey}; use miden_protocol::account::{ Account, @@ -23,6 +22,7 @@ use miden_protocol::account::{ }; use miden_protocol::asset::{FungibleAsset, TokenSymbol}; use miden_protocol::block::FeeParameters; +use miden_protocol::crypto::dsa::ecdsa_k256_keccak::PublicKey; use miden_protocol::crypto::dsa::falcon512_poseidon2::SecretKey as RpoSecretKey; use miden_protocol::errors::TokenSymbolError; use miden_protocol::{Felt, ONE}; @@ -147,10 +147,10 @@ impl GenesisConfig { /// /// Also returns the set of secrets for the generated accounts. #[expect(clippy::too_many_lines)] - pub fn into_state( + pub fn into_state( self, - signer: S, - ) -> Result<(GenesisState, AccountSecrets), GenesisConfigError> { + validator_key: PublicKey, + ) -> Result<(GenesisState, AccountSecrets), GenesisConfigError> { let GenesisConfig { version, timestamp, @@ -341,7 +341,7 @@ impl GenesisConfig { accounts: all_accounts, version, timestamp, - block_signer: signer, + validator_key, }, AccountSecrets { secrets }, )) @@ -544,7 +544,7 @@ impl AccountSecrets { /// and the index in pub fn as_account_files( &self, - genesis_state: &GenesisState, + genesis_state: &GenesisState, ) -> impl Iterator> + '_ { let account_lut = IndexMap::::from_iter( genesis_state.accounts.iter().map(|account| (account.id(), account.clone())), diff --git a/crates/store/src/genesis/config/tests.rs b/crates/store/src/genesis/config/tests.rs index 6641bf4fe4..3ad58f0032 100644 --- a/crates/store/src/genesis/config/tests.rs +++ b/crates/store/src/genesis/config/tests.rs @@ -27,7 +27,8 @@ fn parsing_yields_expected_default_values() -> TestResult { let config_path = write_toml_file(temp_dir.path(), sample_content); let gcfg = GenesisConfig::read_toml_file(&config_path)?; - let (state, _secrets) = gcfg.into_state(SecretKey::new())?; + let signer = SecretKey::new(); + let (state, _secrets) = gcfg.into_state(signer.public_key())?; let _ = state; // faucets always precede wallet accounts let native_faucet = state.accounts[0].clone(); @@ -70,14 +71,15 @@ fn parsing_yields_expected_default_values() -> TestResult { #[miden_node_test_macro::enable_logging] async fn genesis_accounts_have_nonce_one() -> TestResult { let gcfg = GenesisConfig::default(); - let (state, secrets) = gcfg.into_state(SecretKey::new()).unwrap(); + let signer = SecretKey::new(); + let (state, secrets) = gcfg.into_state(signer.public_key()).unwrap(); let mut iter = secrets.as_account_files(&state); let AccountFileWithName { account_file: status_quo, .. } = iter.next().unwrap().unwrap(); assert!(iter.next().is_none()); assert_eq!(status_quo.account.nonce(), ONE); - let _block = state.into_block().await?; + let _block = state.into_block(&signer)?; Ok(()) } @@ -134,7 +136,8 @@ path = "test_account.mac" let gcfg = GenesisConfig::read_toml_file(&config_path)?; // Convert to state and verify the account is included - let (state, _secrets) = gcfg.into_state(SecretKey::new())?; + let signer = SecretKey::new(); + let (state, _secrets) = gcfg.into_state(signer.public_key())?; assert!(state.accounts.iter().any(|a| a.id() == account_id)); Ok(()) @@ -210,7 +213,8 @@ verification_base_fee = 0 let gcfg = GenesisConfig::read_toml_file(&config_path)?; // Convert to state and verify the native faucet is included - let (state, secrets) = gcfg.into_state(SecretKey::new())?; + let signer = SecretKey::new(); + let (state, secrets) = gcfg.into_state(signer.public_key())?; assert!(state.accounts.iter().any(|a| a.id() == faucet_id)); // No secrets should be generated for file-loaded native faucet @@ -269,7 +273,7 @@ verification_base_fee = 0 let gcfg = GenesisConfig::read_toml_file(&config_path)?; // into_state should fail with NativeFaucetNotFungible error when loading the file - let result = gcfg.into_state(SecretKey::new()); + let result = gcfg.into_state(SecretKey::new().public_key()); assert!(result.is_err()); let err = result.unwrap_err(); assert!( @@ -302,7 +306,7 @@ path = "does_not_exist.mac" let gcfg = GenesisConfig::read_toml_file(&config_path).unwrap(); // into_state should fail with AccountFileRead error when loading the file - let result = gcfg.into_state(SecretKey::new()); + let result = gcfg.into_state(SecretKey::new().public_key()); assert!(result.is_err()); let err = result.unwrap_err(); assert!( @@ -321,7 +325,8 @@ async fn parsing_agglayer_sample_with_account_files() -> TestResult { .join("src/genesis/config/samples/02-with-account-files.toml"); let gcfg = GenesisConfig::read_toml_file(&sample_path)?; - let (state, secrets) = gcfg.into_state(SecretKey::new())?; + let signer = SecretKey::new(); + let (state, secrets) = gcfg.into_state(signer.public_key())?; // Should have 4 accounts: // 1. Native faucet (MIDEN) - built from parameters @@ -373,7 +378,7 @@ async fn parsing_agglayer_sample_with_account_files() -> TestResult { assert_eq!(secrets.secrets.len(), 1, "Only native faucet should generate a secret"); // Verify the genesis state can be converted to a block - let block = state.into_block().await?; + let block = state.into_block(&signer)?; // Verify that non-private accounts (Public and Network) get full Delta details. for update in block.inner().body().updated_accounts() { diff --git a/crates/store/src/genesis/mod.rs b/crates/store/src/genesis/mod.rs index b875aa5f31..33759ebd27 100644 --- a/crates/store/src/genesis/mod.rs +++ b/crates/store/src/genesis/mod.rs @@ -1,4 +1,3 @@ -use miden_node_utils::signer::BlockSigner; use miden_protocol::Word; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::account::{Account, AccountDelta}; @@ -12,6 +11,7 @@ use miden_protocol::block::{ FeeParameters, SignedBlock, }; +use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, SecretKey, Signature}; use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks}; use miden_protocol::crypto::merkle::smt::{LargeSmt, MemoryStorage, Smt}; use miden_protocol::errors::AccountError; @@ -25,18 +25,39 @@ pub mod config; /// Represents the state at genesis, which will be used to derive the genesis block. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct GenesisState { +pub struct GenesisState { pub accounts: Vec, pub fee_parameters: FeeParameters, pub version: u32, pub timestamp: u32, - pub block_signer: S, + pub validator_key: PublicKey, } /// A type-safety wrapper ensuring that genesis block data can only be created from /// [`GenesisState`] or validated from a [`SignedBlock`] via [`GenesisBlock::try_from`]. pub struct GenesisBlock(SignedBlock); +/// A genesis block with all data except the validator signature. +pub struct UnsignedGenesisBlock { + header: BlockHeader, + body: BlockBody, +} + +impl UnsignedGenesisBlock { + pub fn header(&self) -> &BlockHeader { + &self.header + } + + pub fn into_block(self, signature: Signature) -> anyhow::Result { + anyhow::ensure!( + signature.verify(self.header.commitment(), self.header.validator_key()), + "genesis block signature verification failed", + ); + + Ok(GenesisBlock(SignedBlock::new(self.header, self.body, signature)?)) + } +} + impl GenesisBlock { pub fn inner(&self) -> &SignedBlock { &self.0 @@ -68,27 +89,25 @@ impl TryFrom for GenesisBlock { } } -impl GenesisState { +impl GenesisState { pub fn new( accounts: Vec, fee_parameters: FeeParameters, version: u32, timestamp: u32, - signer: S, + validator_key: PublicKey, ) -> Self { Self { accounts, fee_parameters, version, timestamp, - block_signer: signer, + validator_key, } } -} -impl GenesisState { - /// Returns the block header and the account SMT. - pub async fn into_block(self) -> anyhow::Result { + /// Returns the unsigned genesis block. + pub fn into_unsigned_block(self) -> anyhow::Result { let accounts: Vec = self .accounts .iter() @@ -139,7 +158,7 @@ impl GenesisState { empty_block_note_tree.root(), Word::empty(), TransactionKernel.to_commitment(), - self.block_signer.public_key(), + self.validator_key, self.fee_parameters, self.timestamp, ); @@ -151,11 +170,13 @@ impl GenesisState { empty_transactions, ); - // Sign and assert verification for sanity (no mismatch between frontend and backend signing - // impls). - let signature = self.block_signer.sign(&header).await?; - assert!(signature.verify(header.commitment(), &self.block_signer.public_key())); - let signed_block = SignedBlock::new(header, body, signature)?; - Ok(GenesisBlock(signed_block)) + Ok(UnsignedGenesisBlock { header, body }) + } + + /// Builds and signs the genesis block with a local secret key. + pub fn into_block(self, signer: &SecretKey) -> anyhow::Result { + let unsigned_block = self.into_unsigned_block()?; + let signature = signer.sign(unsigned_block.header().commitment()); + unsigned_block.into_block(signature) } } diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 135e6fd34a..4d9283c445 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -9,13 +9,15 @@ use miden_node_proto::generated::store; use miden_node_proto_build::store_api_descriptor; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; +use miden_node_utils::spawn::spawn_blocking_in_span; +use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_node_utils::tracing::grpc::grpc_trace_fn; use tokio::net::TcpListener; use tokio::sync::watch; use tokio::task::JoinSet; use tokio_stream::wrappers::TcpListenerStream; use tower_http::trace::TraceLayer; -use tracing::{info, instrument}; +use tracing::{info, info_span, instrument}; use url::Url; use crate::blocks::BlockStore; @@ -128,6 +130,7 @@ impl Store { State::load(&self.data_directory, self.storage_options, termination_ask) .await .context("failed to load state")?; + let _disk_monitor_task = Self::spawn_disk_monitor(self.data_directory.clone()); let ModeSetup { mut grpc_servers, mode_task } = match self.mode { StoreMode::BlockProducer { @@ -284,7 +287,7 @@ impl Store { (handle, chain_tip_tx) } - /// Spawns the gRPC servers for block-producer mode and the DB maintenance task. + /// Spawns the gRPC servers for block-producer mode. /// /// Starts three listeners: Rpc+StoreReplica (shared), `NtxBuilder`, and `BlockProducer`. fn spawn_block_producer_grpc_servers( @@ -296,7 +299,6 @@ impl Store { block_producer_listener: TcpListener, ) -> anyhow::Result>> { let mut join_set = JoinSet::new(); - Self::spawn_db_maintenance(&mut join_set, &store_api.state); let rpc_service = store::rpc_server::RpcServer::new(store_api.clone()); let replica_service = @@ -343,7 +345,7 @@ impl Store { Ok(join_set) } - /// Spawns the gRPC servers for replica mode and the DB maintenance task. + /// Spawns the gRPC servers for replica mode. /// /// Only the Rpc and `StoreReplica` services are exposed — no `BlockProducer`, `NtxBuilder`, or /// proof scheduler. @@ -353,7 +355,6 @@ impl Store { rpc_listener: TcpListener, ) -> anyhow::Result>> { let mut join_set = JoinSet::new(); - Self::spawn_db_maintenance(&mut join_set, &store_api.state); let rpc_service = store::rpc_server::RpcServer::new(store_api.clone()); let replica_service = store::store_replica_server::StoreReplicaServer::new(store_api); @@ -376,24 +377,95 @@ impl Store { Ok(join_set) } - - fn spawn_db_maintenance( - join_set: &mut JoinSet>, - state: &Arc, - ) { - let state = Arc::clone(state); - join_set.spawn(async move { - // Manual tests on testnet indicate each iteration takes ~2s once things are OS cached. - // - // 5 minutes seems like a reasonable interval, where this should have minimal database - // IO impact while providing a decent view into table growth over time. + /// Spawns a background task that periodically records the on-disk size of every store data + /// path as `OTel` span attributes. + fn spawn_disk_monitor(data_directory: PathBuf) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_mins(5)); loop { interval.tick().await; - let _ = state.analyze_table_sizes().await; + let dir = data_directory.clone(); + let span = info_span!(target: COMPONENT, "measure_disk_space_usage"); + let result = + spawn_blocking_in_span(move || measure_disk_usage_bytes(&dir), span.clone()) + .await; + match result { + Ok(usage) => { + span.set_attribute("db.sqlite.size", usage.sqlite_db); + span.set_attribute("db.sqlite.wal.size", usage.sqlite_wal); + span.set_attribute("db.block_store.size", usage.block_store); + #[cfg(feature = "rocksdb")] + { + span.set_attribute("db.account_tree.size", usage.account_tree); + span.set_attribute("db.nullifier_tree.size", usage.nullifier_tree); + span.set_attribute( + "db.account_state_forest.size", + usage.account_state_forest, + ); + } + }, + Err(err) => span.set_error(&err), + } } - }); + }) + } +} + +// DISK USAGE HELPERS +// ================================================================================================ + +/// Byte counts for each on-disk storage component. +struct DiskUsage { + sqlite_db: u64, + sqlite_wal: u64, + block_store: u64, + #[cfg(feature = "rocksdb")] + account_tree: u64, + #[cfg(feature = "rocksdb")] + nullifier_tree: u64, + #[cfg(feature = "rocksdb")] + account_state_forest: u64, +} + +/// Collects on-disk byte sizes for every store data path under `data_dir`. +fn measure_disk_usage_bytes(data_dir: &Path) -> DiskUsage { + DiskUsage { + sqlite_db: path_size_bytes(&data_dir.join("miden-store.sqlite3")), + sqlite_wal: path_size_bytes(&data_dir.join("miden-store.sqlite3-wal")), + block_store: dir_size_bytes(&data_dir.join("blocks")), + #[cfg(feature = "rocksdb")] + account_tree: dir_size_bytes(&data_dir.join("accounttree")), + #[cfg(feature = "rocksdb")] + nullifier_tree: dir_size_bytes(&data_dir.join("nullifiertree")), + #[cfg(feature = "rocksdb")] + account_state_forest: dir_size_bytes(&data_dir.join("accountstateforest")), + } +} + +/// Returns the byte length of the file at `path`, or `0` if it does not exist. +fn path_size_bytes(path: &Path) -> u64 { + fs_err::metadata(path).map(|m| m.len()).unwrap_or(0) +} + +/// Returns the total byte length of all files in `path` iteratively, or `0` on any error. +fn dir_size_bytes(path: &Path) -> u64 { + let mut to_process = vec![path.to_path_buf()]; + let mut total = 0u64; + while let Some(dir) = to_process.pop() { + let Ok(entries) = fs_err::read_dir(&dir) else { + continue; + }; + for entry in entries.flatten() { + if let Ok(meta) = entry.metadata() { + if meta.is_dir() { + to_process.push(entry.path()); + } else { + total += meta.len(); + } + } + } } + total } /// Represents the store's data-directory and its content paths. diff --git a/crates/store/src/state/loader.rs b/crates/store/src/state/loader.rs index 181578d6db..4400121f38 100644 --- a/crates/store/src/state/loader.rs +++ b/crates/store/src/state/loader.rs @@ -13,10 +13,14 @@ use std::num::NonZeroUsize; use std::path::Path; use miden_crypto::merkle::mmr::Mmr; +use miden_crypto::merkle::smt::{Backend, ForestInMemoryBackend}; +#[cfg(feature = "rocksdb")] +use miden_crypto::merkle::smt::{ForestPersistentBackend, PersistentBackendConfig}; #[cfg(feature = "rocksdb")] use miden_large_smt_backend_rocksdb::RocksDbStorage; #[cfg(feature = "rocksdb")] use miden_node_utils::clap::RocksDbOptions; +use miden_protocol::account::{AccountId, AccountStorageHeader, StorageSlotType}; use miden_protocol::block::account_tree::{AccountIdKey, AccountTree}; use miden_protocol::block::nullifier_tree::NullifierTree; use miden_protocol::block::{BlockNumber, Blockchain}; @@ -43,6 +47,9 @@ pub const ACCOUNT_TREE_STORAGE_DIR: &str = "accounttree"; /// Directory name for the nullifier tree storage within the data directory. pub const NULLIFIER_TREE_STORAGE_DIR: &str = "nullifiertree"; +/// Directory name for the account state forest storage within the data directory. +pub const ACCOUNT_STATE_FOREST_STORAGE_DIR: &str = "accountstateforest"; + /// Page size for loading account commitments from the database during tree rebuilding. /// This limits memory usage when rebuilding trees with millions of accounts. const ACCOUNT_COMMITMENTS_PAGE_SIZE: NonZeroUsize = NonZeroUsize::new(10_000).unwrap(); @@ -91,7 +98,7 @@ fn block_num_to_nullifier_leaf(block_num: BlockNumber) -> Word { Word::from([Felt::from(block_num), Felt::ZERO, Felt::ZERO, Felt::ZERO]) } -// STORAGE LOADER TRAIT +// TREE STORAGE LOADER TRAIT // ================================================================================================ /// Trait for loading trees from storage. @@ -102,7 +109,7 @@ fn block_num_to_nullifier_leaf(block_num: BlockNumber) -> Word { /// Missing or corrupted storage is handled by the `verify_tree_consistency` check after loading, /// which detects divergence between persistent storage and the database. If divergence is detected, /// the user should manually delete the tree storage directories and restart the node. -pub trait StorageLoader: SmtStorage + Sized { +pub trait TreeStorageLoader: SmtStorage + Sized { /// A configuration type for the implementation. type Config: std::fmt::Debug + std::default::Default; /// Creates a storage backend for the given domain. @@ -125,11 +132,38 @@ pub trait StorageLoader: SmtStorage + Sized { ) -> impl Future>, StateInitializationError>> + Send; } +// ACCOUNT FOREST LOADER TRAIT +// ================================================================================================ + +/// Trait for loading account state forests from storage. +/// +/// For `ForestInMemoryBackend`, the forest is rebuilt from database entries on each startup. For +/// `ForestPersistentBackend`, the forest is loaded directly from disk if data exists, otherwise it +/// is rebuilt from the database and persisted. +pub trait AccountForestLoader: Backend + Sized { + /// A configuration type for the implementation. + type Config: std::fmt::Debug + std::default::Default; + + /// Creates a forest backend for the given domain. + fn create( + data_dir: &Path, + storage_options: &Self::Config, + domain: &'static str, + ) -> Result; + + /// Loads the account state forest, either from persistent storage or by rebuilding from DB. + fn load_account_state_forest( + self, + db: &mut Db, + block_num: BlockNumber, + ) -> impl Future, StateInitializationError>> + Send; +} + // MEMORY STORAGE IMPLEMENTATION // ================================================================================================ #[cfg(not(feature = "rocksdb"))] -impl StorageLoader for MemoryStorage { +impl TreeStorageLoader for MemoryStorage { type Config = (); fn create( _data_dir: &Path, @@ -222,7 +256,7 @@ impl StorageLoader for MemoryStorage { // ================================================================================================ #[cfg(feature = "rocksdb")] -impl StorageLoader for RocksDbStorage { +impl TreeStorageLoader for RocksDbStorage { type Config = RocksDbOptions; fn create( data_dir: &Path, @@ -336,6 +370,83 @@ impl StorageLoader for RocksDbStorage { } } +// ACCOUNT FOREST BACKEND IMPLEMENTATIONS +// ================================================================================================ + +impl AccountForestLoader for ForestInMemoryBackend { + type Config = (); + + fn create( + _data_dir: &Path, + _storage_options: &Self::Config, + _domain: &'static str, + ) -> Result { + Ok(ForestInMemoryBackend::new()) + } + + #[instrument(target = COMPONENT, skip_all, fields(block.number = %block_num))] + async fn load_account_state_forest( + self, + db: &mut Db, + block_num: BlockNumber, + ) -> Result, StateInitializationError> { + let mut forest = AccountStateForest::from_backend(self) + .map_err(|e| StateInitializationError::AccountStateForestIoError(e.to_string()))?; + rebuild_account_state_forest(&mut forest, db, block_num).await?; + Ok(forest) + } +} + +#[cfg(feature = "rocksdb")] +impl AccountForestLoader for ForestPersistentBackend { + type Config = RocksDbOptions; + + fn create( + data_dir: &Path, + storage_options: &Self::Config, + domain: &'static str, + ) -> Result { + let storage_path = data_dir.join(domain); + fs_err::create_dir_all(&storage_path) + .map_err(|e| StateInitializationError::AccountStateForestIoError(e.to_string()))?; + + let max_open_files = usize::try_from(storage_options.max_open_fds).map_err(|_| { + StateInitializationError::AccountStateForestIoError(format!( + "invalid account state forest RocksDB max_open_fds: {}", + storage_options.max_open_fds + )) + })?; + let config = PersistentBackendConfig::new(&storage_path) + .map_err(|e| StateInitializationError::AccountStateForestIoError(e.to_string()))? + .with_cache_size_bytes(storage_options.cache_size_in_bytes) + .with_max_open_files(max_open_files); + + ForestPersistentBackend::load(config) + .map_err(|e| StateInitializationError::AccountStateForestIoError(e.to_string())) + } + + #[instrument(target = COMPONENT, skip_all, fields(block.number = %block_num))] + async fn load_account_state_forest( + self, + db: &mut Db, + block_num: BlockNumber, + ) -> Result, StateInitializationError> { + let mut forest = AccountStateForest::from_backend(self) + .map_err(|e| StateInitializationError::AccountStateForestIoError(e.to_string()))?; + + if forest.lineage_count() != 0 { + return Ok(forest); + } + + info!( + target: COMPONENT, + "RocksDB account state forest storage is empty, populating from SQLite" + ); + rebuild_account_state_forest(&mut forest, db, block_num).await?; + Ok(forest) + } +} + // HELPER FUNCTIONS // ================================================================================================ @@ -362,15 +473,15 @@ pub async fn load_mmr(db: &mut Db) -> Result, db: &mut Db, block_num: BlockNumber, -) -> Result { +) -> Result<(), StateInitializationError> { use miden_protocol::account::delta::AccountDelta; - let mut forest = AccountStateForest::new(); let mut cursor = None; loop { @@ -403,7 +514,7 @@ pub async fn load_smt_forest( } } - Ok(forest) + Ok(()) } // CONSISTENCY VERIFICATION @@ -457,3 +568,159 @@ pub async fn verify_tree_consistency( Ok(()) } + +/// Verifies that the account state forest matches latest public account roots from SQLite. +/// +/// This check ensures persisted account state forest has not diverged from the latest +/// account states in SQLite. When the forest is rebuilt from SQLite, it will naturally +/// match; when loaded from `RocksDB`, this catches corruption or incomplete shutdown. +#[instrument(target = COMPONENT, skip_all)] +pub async fn verify_account_state_forest_consistency( + forest: &AccountStateForest, + db: &mut Db, +) -> Result<(), StateInitializationError> { + let mut cursor = None; + + loop { + let page = db + .select_public_account_state_roots_paged(PUBLIC_ACCOUNT_IDS_PAGE_SIZE, cursor) + .await?; + + if page.accounts.is_empty() { + break; + } + + for account in page.accounts { + verify_account_state_forest_record( + forest, + account.account_id, + account.vault_root, + &account.storage_header, + )?; + } + + cursor = page.next_cursor; + if cursor.is_none() { + break; + } + } + + Ok(()) +} + +fn verify_account_state_forest_record( + forest: &AccountStateForest, + account_id: AccountId, + vault_root: Word, + storage_header: &AccountStorageHeader, +) -> Result<(), StateInitializationError> { + let forest_vault_root = forest.get_latest_vault_root(account_id); + if forest_vault_root != vault_root { + return Err(StateInitializationError::AccountStateForestStorageDiverged { + account_id, + slot_name: None, + forest_root: forest_vault_root, + database_root: vault_root, + }); + } + + for slot in storage_header.slots() { + if slot.slot_type() != StorageSlotType::Map { + continue; + } + + let forest_root = forest.get_latest_storage_map_root(account_id, slot.name()); + let database_root = slot.value(); + if forest_root != database_root { + return Err(StateInitializationError::AccountStateForestStorageDiverged { + account_id, + slot_name: Some(slot.name().to_string()), + forest_root, + database_root, + }); + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use miden_protocol::account::{ + AccountId, + AccountStorageHeader, + StorageSlotHeader, + StorageSlotName, + StorageSlotType, + }; + use miden_protocol::testing::account_id::ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE; + + use super::*; + + #[test] + fn account_state_forest_consistency_detects_storage_map_root_mismatch() { + let account_id = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE) + .expect("test account ID should be valid"); + let slot_name = + StorageSlotName::new("account::balances").expect("slot name should be valid"); + let expected_storage_root = Word::from([1, 0, 0, 0u32]); + let storage_header = AccountStorageHeader::new(vec![StorageSlotHeader::new( + slot_name.clone(), + StorageSlotType::Map, + expected_storage_root, + )]) + .expect("storage header should be valid"); + let forest = AccountStateForest::new(); + + let error = verify_account_state_forest_record( + &forest, + account_id, + AccountStateForest::empty_smt_root(), + &storage_header, + ) + .expect_err("storage map root mismatch should be detected"); + + assert_matches::assert_matches!( + error, + StateInitializationError::AccountStateForestStorageDiverged { + account_id: actual_account_id, + slot_name: Some(actual_slot_name), + forest_root, + database_root, + } if actual_account_id == account_id + && actual_slot_name == slot_name.to_string() + && forest_root == AccountStateForest::empty_smt_root() + && database_root == expected_storage_root + ); + } + + #[test] + fn account_state_forest_consistency_detects_vault_root_mismatch() { + let account_id = AccountId::try_from(ACCOUNT_ID_REGULAR_PUBLIC_ACCOUNT_IMMUTABLE_CODE) + .expect("test account ID should be valid"); + let expected_vault_root = Word::from([2, 0, 0, 0u32]); + let storage_header = + AccountStorageHeader::new(Vec::new()).expect("storage header should be valid"); + let forest = AccountStateForest::new(); + + let error = verify_account_state_forest_record( + &forest, + account_id, + expected_vault_root, + &storage_header, + ) + .expect_err("vault root mismatch should be detected"); + + assert_matches::assert_matches!( + error, + StateInitializationError::AccountStateForestStorageDiverged { + account_id: actual_account_id, + slot_name: None, + forest_root, + database_root, + } if actual_account_id == account_id + && forest_root == AccountStateForest::empty_smt_root() + && database_root == expected_vault_root + ); + } +} diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index c52cccfe25..ef73851cd1 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -19,12 +19,12 @@ use miden_node_proto::domain::account::{ AccountStorageMapDetails, AccountVaultDetails, SlotData, + StorageMapEntries, StorageMapRequest, }; use miden_node_proto::domain::batch::BatchInputs; use miden_node_utils::clap::StorageOptions; use miden_node_utils::formatting::format_array; -use miden_node_utils::limiter::{QueryParamLimiter, QueryParamStorageMapKeyTotalLimit}; use miden_protocol::Word; use miden_protocol::account::{AccountId, StorageMapKey, StorageMapWitness, StorageSlotName}; use miden_protocol::asset::{AssetVaultKey, AssetWitness}; @@ -38,7 +38,12 @@ use miden_protocol::transaction::PartialBlockchain; use tokio::sync::{Mutex, RwLock, watch}; use tracing::{Instrument, info, instrument}; -use crate::account_state_forest::{AccountStateForest, WitnessError}; +use crate::account_state_forest::{ + AccountStateForest, + AccountStateForestBackend, + AccountStorageMapResult, + WitnessError, +}; use crate::accounts::AccountTreeWithHistory; use crate::blocks::BlockStore; use crate::db::models::Page; @@ -64,12 +69,14 @@ const PROOF_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(512).unwrap(); mod loader; use loader::{ + ACCOUNT_STATE_FOREST_STORAGE_DIR, ACCOUNT_TREE_STORAGE_DIR, + AccountForestLoader, NULLIFIER_TREE_STORAGE_DIR, - StorageLoader, TreeStorage, + TreeStorageLoader, load_mmr, - load_smt_forest, + verify_account_state_forest_consistency, verify_tree_consistency, }; @@ -140,7 +147,7 @@ pub struct State { inner: RwLock>, /// Forest-related state `(SmtForest, storage_map_roots, vault_roots)` with its own lock. - forest: RwLock, + forest: RwLock>, /// To allow readers to access the tree data while an update in being performed, and prevent /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers. @@ -197,12 +204,15 @@ impl State { let latest_block_num = blockchain.chain_tip().unwrap_or(BlockNumber::GENESIS); #[cfg(feature = "rocksdb")] - let (account_storage_config, nullifier_storage_config) = - (storage_options.account_tree.into(), storage_options.nullifier_tree.into()); + let (account_storage_config, nullifier_storage_config, forest_storage_config) = ( + storage_options.account_tree.into(), + storage_options.nullifier_tree.into(), + storage_options.account_state_forest.into(), + ); #[cfg(not(feature = "rocksdb"))] - let (account_storage_config, nullifier_storage_config) = { + let (account_storage_config, nullifier_storage_config, forest_storage_config) = { let _ = &storage_options; - ((), ()) + ((), (), ()) }; let account_storage = TreeStorage::create(data_path, &account_storage_config, ACCOUNT_TREE_STORAGE_DIR)?; @@ -219,7 +229,13 @@ impl State { let account_tree = AccountTreeWithHistory::new(account_tree, latest_block_num); - let forest = load_smt_forest(&mut db, latest_block_num).await?; + let forest_backend = AccountStateForestBackend::create( + data_path, + &forest_storage_config, + ACCOUNT_STATE_FOREST_STORAGE_DIR, + )?; + let forest = forest_backend.load_account_state_forest(&mut db, latest_block_num).await?; + verify_account_state_forest_consistency(&forest, &mut db).await?; let inner = RwLock::new(InnerState { nullifier_tree, blockchain, account_tree }); @@ -741,6 +757,65 @@ impl State { Ok((block_num, witness)) } + /// Returns storage map details from the forest for a specific account and storage slot. + /// + /// The forest can only be used if all hashed keys in the storage map are known in the + /// reverse-key LRU cache. If any hashed key is unknown, the method returns `Ok(None)` to signal + /// that the caller should fall back to reconstructing the storage map details from the + /// database. + #[instrument(target = COMPONENT, skip_all)] + async fn get_storage_map_details_from_forest( + &self, + account_id: AccountId, + slot_name: StorageSlotName, + block_num: BlockNumber, + ) -> Result, DatabaseError> { + let forest_guard = self + .forest + .read() + .instrument(tracing::info_span!("acquire_forest_for_storage_map_entries")) + .await; + match forest_guard + .get_storage_map_details_for_all_entries(account_id, slot_name.clone(), block_num) + .map_err(DatabaseError::MerkleError)? + { + AccountStorageMapResult::NotFound => Err(DatabaseError::StorageRootNotFound { + account_id, + slot_name: slot_name.to_string(), + block_num, + }), + AccountStorageMapResult::Details(details) => Ok(Some(details)), + AccountStorageMapResult::CannotReconstructKeysFromCache => Ok(None), + } + } + + /// Returns storage map details by reconstructing the storage map from the database. + async fn reconstruct_storage_map_details_from_db( + &self, + account_id: AccountId, + slot_name: StorageSlotName, + block_num: BlockNumber, + ) -> Result { + let details = self + .db + .reconstruct_storage_map_from_db( + account_id, + slot_name, + block_num, + Some(AccountStorageMapDetails::MAX_RETURN_ENTRIES), + ) + .await?; + + if let StorageMapEntries::AllEntries(entries) = &details.entries { + self.forest + .write() + .await + .cache_storage_map_keys(entries.iter().map(|(raw_key, _)| *raw_key)); + } + + Ok(details) + } + /// Fetches the account details (code, vault, storage) for a public account at the specified /// block. /// @@ -749,7 +824,8 @@ impl State { /// /// For specific key queries (`SlotData::MapKeys`), the forest is used to provide SMT proofs. /// Returns an error if the forest doesn't have data for the requested slot. - /// All-entries queries (`SlotData::All`) use the forest to request all entries database. + /// All-entries queries (`SlotData::All`) use the forest when all hashed keys are known in the + /// reverse-key LRU cache, otherwise they fall back to database reconstruction. #[expect(clippy::too_many_lines)] #[instrument(target = COMPONENT, skip_all)] async fn fetch_public_account_details( @@ -859,22 +935,17 @@ impl State { } } - // TODO parallelize the read requests for (index, slot_name) in all_entries_requests { - let details = self - .db - .reconstruct_storage_map_from_db( - account_id, - slot_name.clone(), - block_num, - Some( - // TODO unify this with - // `AccountStorageMapDetails::MAX_RETURN_ENTRIES` - // and accumulated the limits - ::LIMIT, - ), - ) - .await?; + let details = match self + .get_storage_map_details_from_forest(account_id, slot_name.clone(), block_num) + .await? + { + Some(details) => details, + None => { + self.reconstruct_storage_map_details_from_db(account_id, slot_name, block_num) + .await? + }, + }; storage_map_details_by_index[index] = Some(details); } @@ -939,11 +1010,6 @@ impl State { self.block_store.load_proof(block_num).await.map_err(Into::into) } - /// Emits metrics for each database table's size. - pub async fn analyze_table_sizes(&self) -> Result<(), DatabaseError> { - self.db.analyze_table_sizes().await - } - /// Returns the network notes for an account that are unconsumed by a specified block number, /// along with the next pagination token. pub async fn get_unconsumed_network_notes_for_account( diff --git a/crates/utils/src/clap.rs b/crates/utils/src/clap.rs index b5a93dbf29..ffd0a96be0 100644 --- a/crates/utils/src/clap.rs +++ b/crates/utils/src/clap.rs @@ -149,6 +149,9 @@ pub struct StorageOptions { #[cfg(feature = "rocksdb")] #[clap(flatten)] pub nullifier_tree: NullifierTreeRocksDbOptions, + #[cfg(feature = "rocksdb")] + #[clap(flatten)] + pub account_state_forest: AccountStateForestRocksDbOptions, } impl StorageOptions { @@ -168,7 +171,16 @@ impl StorageOptions { cache_size_in_bytes: DEFAULT_ROCKSDB_CACHE_SIZE, durability_mode: None, }; - Self { account_tree, nullifier_tree } + let account_state_forest = AccountStateForestRocksDbOptions { + max_open_fds: BENCH_ROCKSDB_MAX_OPEN_FDS, + cache_size_in_bytes: DEFAULT_ROCKSDB_CACHE_SIZE, + durability_mode: None, + }; + Self { + account_tree, + nullifier_tree, + account_state_forest, + } } #[cfg(not(feature = "rocksdb"))] Self::default() diff --git a/crates/utils/src/clap/rocksdb.rs b/crates/utils/src/clap/rocksdb.rs index d2af9c83bd..0cdca501d0 100644 --- a/crates/utils/src/clap/rocksdb.rs +++ b/crates/utils/src/clap/rocksdb.rs @@ -87,6 +87,38 @@ impl Default for AccountTreeRocksDbOptions { } } +/// Per usage options for rocksdb configuration +#[derive(clap::Args, Clone, Debug, PartialEq, Eq)] +pub struct AccountStateForestRocksDbOptions { + #[arg( + id = "account_state_forest_rocksdb_max_open_fds", + long = "account_state_forest.rocksdb.max_open_fds", + default_value_t = DEFAULT_ROCKSDB_MAX_OPEN_FDS, + value_name = "ACCOUNT_STATE_FOREST__ROCKSDB__MAX_OPEN_FDS" + )] + pub max_open_fds: i32, + #[arg( + id = "account_state_forest_rocksdb_max_cache_size", + long = "account_state_forest.rocksdb.max_cache_size", + default_value_t = DEFAULT_ROCKSDB_CACHE_SIZE, + value_name = "ACCOUNT_STATE_FOREST__ROCKSDB__CACHE_SIZE" + )] + pub cache_size_in_bytes: usize, + #[arg( + id = "account_state_forest_rocksdb_durability_mode", + long = "account_state_forest.rocksdb.durability_mode", + value_enum, + value_name = "ACCOUNT_STATE_FOREST__ROCKSDB__DURABILITY_MODE" + )] + pub durability_mode: Option, +} + +impl Default for AccountStateForestRocksDbOptions { + fn default() -> Self { + RocksDbOptions::default().into() + } +} + /// General confiration options for rocksdb. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct RocksDbOptions { @@ -135,6 +167,21 @@ impl From for RocksDbOptions { } } +impl From for RocksDbOptions { + fn from(value: AccountStateForestRocksDbOptions) -> Self { + let AccountStateForestRocksDbOptions { + max_open_fds, + cache_size_in_bytes, + durability_mode, + } = value; + Self { + max_open_fds, + cache_size_in_bytes, + durability_mode, + } + } +} + impl From for AccountTreeRocksDbOptions { fn from(value: RocksDbOptions) -> Self { let RocksDbOptions { @@ -165,6 +212,21 @@ impl From for NullifierTreeRocksDbOptions { } } +impl From for AccountStateForestRocksDbOptions { + fn from(value: RocksDbOptions) -> Self { + let RocksDbOptions { + max_open_fds, + cache_size_in_bytes, + durability_mode, + } = value; + Self { + max_open_fds, + cache_size_in_bytes, + durability_mode, + } + } +} + impl RocksDbOptions { pub fn with_path(self, path: &Path) -> RocksDbConfig { let mut config = RocksDbConfig::new(path) @@ -178,3 +240,25 @@ impl RocksDbOptions { config } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn account_state_forest_options_roundtrip_general_rocksdb_options() { + let options = AccountStateForestRocksDbOptions { + max_open_fds: 123, + cache_size_in_bytes: 456, + durability_mode: Some(CliRocksDbDurabilityMode::Sync), + }; + + let general = RocksDbOptions::from(options.clone()); + assert_eq!(general.max_open_fds, options.max_open_fds); + assert_eq!(general.cache_size_in_bytes, options.cache_size_in_bytes); + assert_eq!(general.durability_mode, options.durability_mode); + + let roundtrip = AccountStateForestRocksDbOptions::from(general); + assert_eq!(roundtrip, options); + } +} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 85fe37b242..8732c72bf6 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -11,7 +11,7 @@ pub mod limiter; pub mod logging; pub mod lru_cache; pub mod panic; -pub mod signer; +pub mod spawn; pub mod tracing; pub trait ErrorReport: std::error::Error { diff --git a/crates/utils/src/lru_cache.rs b/crates/utils/src/lru_cache.rs index 2f25a77845..f0e73f829b 100644 --- a/crates/utils/src/lru_cache.rs +++ b/crates/utils/src/lru_cache.rs @@ -30,6 +30,28 @@ where self.lock().put(key, value); } + /// Retrieves multiple values from the cache while holding the cache lock once. + pub fn get_many<'a>(&self, keys: impl IntoIterator) -> Vec> + where + K: 'a, + { + let mut cache = self.lock(); + keys.into_iter().map(|key| cache.get(key).cloned()).collect() + } + + /// Puts multiple values into the cache while holding the cache lock once. + pub fn put_many(&self, entries: impl IntoIterator) { + let mut cache = self.lock(); + for (key, value) in entries { + cache.put(key, value); + } + } + + /// Clears all entries from the cache. + pub fn clear(&self) { + self.lock().clear(); + } + #[instrument(name = "lru.lock", skip_all)] fn lock(&self) -> MutexGuard<'_, InnerCache> { // SAFETY: The mutex is only held for the duration of the get/put operation diff --git a/crates/utils/src/signer.rs b/crates/utils/src/signer.rs deleted file mode 100644 index 00dbe3ebc3..0000000000 --- a/crates/utils/src/signer.rs +++ /dev/null @@ -1,36 +0,0 @@ -use core::convert::Infallible; -use core::error; - -use miden_protocol::block::BlockHeader; -use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, SecretKey, Signature}; - -// BLOCK SIGNER -// ================================================================================================ - -/// Trait which abstracts the signing of block headers with ECDSA signatures. -/// -/// Production-level implementations will involve some sort of secure remote backend. The trait also -/// allows for testing with local and ephemeral signers. -pub trait BlockSigner { - type Error: error::Error + Send + Sync + 'static; - fn sign( - &self, - header: &BlockHeader, - ) -> impl Future> + Send; - fn public_key(&self) -> PublicKey; -} - -// SECRET KEY BLOCK SIGNER -// ================================================================================================ - -impl BlockSigner for SecretKey { - type Error = Infallible; - - async fn sign(&self, header: &BlockHeader) -> Result { - Ok(self.sign(header.commitment())) - } - - fn public_key(&self) -> PublicKey { - self.public_key() - } -} diff --git a/crates/utils/src/spawn.rs b/crates/utils/src/spawn.rs new file mode 100644 index 0000000000..b0dc5438f5 --- /dev/null +++ b/crates/utils/src/spawn.rs @@ -0,0 +1,21 @@ +use tokio::task::JoinHandle; +use tracing::Span; + +/// Spawn a blocking task in the current tracing span. +pub fn spawn_blocking_in_current_span(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + spawn_blocking_in_span(f, Span::current()) +} + +/// Spawn a blocking task in a span. +pub fn spawn_blocking_in_span(f: F, span: Span) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + #[expect(clippy::disallowed_methods)] + tokio::task::spawn_blocking(move || span.in_scope(f)) +} diff --git a/crates/utils/src/tracing/span_ext.rs b/crates/utils/src/tracing/span_ext.rs index 07ac008fe3..7f50c82ad8 100644 --- a/crates/utils/src/tracing/span_ext.rs +++ b/crates/utils/src/tracing/span_ext.rs @@ -65,6 +65,12 @@ impl ToValue for usize { } } +impl ToValue for u64 { + fn to_value(&self) -> Value { + i64::try_from(*self).unwrap_or(i64::MAX).into() + } +} + /// Generates `impl ToValue` blocks for types that are `ToString`. macro_rules! impl_to_string_to_value { ($($t:ty),*) => { diff --git a/crates/validator/src/server/tests.rs b/crates/validator/src/server/tests.rs index 8d55f6a7ea..3aed9df2ef 100644 --- a/crates/validator/src/server/tests.rs +++ b/crates/validator/src/server/tests.rs @@ -29,8 +29,10 @@ impl TestValidator { async fn new() -> Self { let signer = ValidatorSigner::new_local(random_secret_key()); - let genesis_state = GenesisState::new(vec![], test_fee_params(), 1, 0, random_secret_key()); - let genesis_block = genesis_state.into_block().await.unwrap(); + let genesis_signer = random_secret_key(); + let genesis_state = + GenesisState::new(vec![], test_fee_params(), 1, 0, genesis_signer.public_key()); + let genesis_block = genesis_state.into_block(&genesis_signer).unwrap(); let genesis_header = genesis_block.inner().header().clone(); let dir = tempfile::tempdir().unwrap(); @@ -234,9 +236,10 @@ async fn commitment_mismatch_rejected() { // Build a valid ProposedBlock on a *different* genesis so its prev_block_commitment // won't match the validator's actual chain tip. + let other_genesis_signer = random_secret_key(); let other_genesis_state = - GenesisState::new(vec![], test_fee_params(), 1, 1, random_secret_key()); - let other_genesis_block = other_genesis_state.into_block().await.unwrap(); + GenesisState::new(vec![], test_fee_params(), 1, 1, other_genesis_signer.public_key()); + let other_genesis_block = other_genesis_state.into_block(&other_genesis_signer).unwrap(); let other_genesis_header = other_genesis_block.inner().header().clone(); let mismatched_block = empty_block(&other_genesis_header, &PartialBlockchain::default()); @@ -261,9 +264,10 @@ async fn replacement_commitment_mismatch_rejected() { // Build a replacement block at the same height but using a *different* genesis so its // prev_block_commitment won't match the validator's actual parent of the chain tip. + let other_genesis_signer = random_secret_key(); let other_genesis_state = - GenesisState::new(vec![], test_fee_params(), 1, 1, random_secret_key()); - let other_genesis_block = other_genesis_state.into_block().await.unwrap(); + GenesisState::new(vec![], test_fee_params(), 1, 1, other_genesis_signer.public_key()); + let other_genesis_block = other_genesis_state.into_block(&other_genesis_signer).unwrap(); let other_genesis_header = other_genesis_block.inner().header().clone(); let mismatched_replacement = empty_block(&other_genesis_header, &PartialBlockchain::default()); diff --git a/crates/validator/src/signers/kms.rs b/crates/validator/src/signers/kms.rs index e84576ac1e..f9a5f47b2d 100644 --- a/crates/validator/src/signers/kms.rs +++ b/crates/validator/src/signers/kms.rs @@ -1,8 +1,7 @@ use aws_sdk_kms::error::SdkError; use aws_sdk_kms::operation::sign::SignError; use aws_sdk_kms::types::SigningAlgorithmSpec; -use miden_node_utils::signer::BlockSigner; -use miden_protocol::block::BlockHeader; +use miden_protocol::Word; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, Signature}; use miden_protocol::crypto::hash::keccak::Keccak256; use miden_protocol::utils::serde::{DeserializationError, Serializable}; @@ -74,18 +73,14 @@ impl KmsSigner { let pub_key = PublicKey::from_der(spki_der)?; Ok(Self { key_id, pub_key, client }) } -} - -impl BlockSigner for KmsSigner { - type Error = KmsSignerError; - async fn sign(&self, header: &BlockHeader) -> Result { + pub async fn sign(&self, commitment: Word) -> Result { // The Validator produces Ethereum-style ECDSA (secp256k1) signatures over Keccak-256 // digests. AWS KMS does not support SHA-3 hashing for ECDSA keys // (ECC_SECG_P256K1 being the corresponding AWS key-spec), so we pre-hash the // message and pass MessageType::Digest. KMS signs the provided 32-byte digest // verbatim. - let msg = header.commitment().to_bytes(); + let msg = commitment.to_bytes(); let digest = Keccak256::hash(&msg); // Request signature from KMS backend. @@ -109,14 +104,14 @@ impl BlockSigner for KmsSigner { .map_err(KmsSignerError::SignatureFormatError)?; // Check the returned signature. - if sig.verify(header.commitment(), &self.pub_key) { + if sig.verify(commitment, &self.pub_key) { Ok(sig) } else { Err(KmsSignerError::InvalidSignature) } } - fn public_key(&self) -> PublicKey { + pub fn public_key(&self) -> PublicKey { self.pub_key.clone() } } diff --git a/crates/validator/src/signers/mod.rs b/crates/validator/src/signers/mod.rs index 21bbeaa7ae..2c50b2dbb2 100644 --- a/crates/validator/src/signers/mod.rs +++ b/crates/validator/src/signers/mod.rs @@ -1,8 +1,8 @@ mod kms; pub use kms::KmsSigner; -use miden_node_utils::signer::BlockSigner; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_protocol::block::BlockHeader; -use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{SecretKey, Signature}; +use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, SecretKey, Signature}; // VALIDATOR SIGNER // ================================================================================================= @@ -28,17 +28,27 @@ impl ValidatorSigner { Self::Local(secret_key) } - /// Signs a block header using the configured signer. - pub async fn sign(&self, header: &BlockHeader) -> anyhow::Result { + /// Returns the public key corresponding to the configured signer. + pub fn public_key(&self) -> PublicKey { match self { - Self::Kms(signer) => { - let sig = signer.sign(header).await?; - Ok(sig) - }, - Self::Local(signer) => { - let sig = ::sign(signer, header).await?; - Ok(sig) - }, + Self::Kms(signer) => signer.public_key(), + Self::Local(signer) => signer.public_key(), } } + + /// Signs a block header using the configured signer. + pub async fn sign(&self, header: &BlockHeader) -> anyhow::Result { + let commitment = header.commitment(); + let signature = match self { + Self::Kms(signer) => signer.sign(commitment).await?, + Self::Local(signer) => spawn_blocking_in_current_span({ + let signer = signer.clone(); + move || signer.sign(commitment) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())), + }; + + Ok(signature) + } } diff --git a/crates/validator/src/tx_validation/mod.rs b/crates/validator/src/tx_validation/mod.rs index f2d1250a20..9921532964 100644 --- a/crates/validator/src/tx_validation/mod.rs +++ b/crates/validator/src/tx_validation/mod.rs @@ -2,6 +2,7 @@ mod data_store; mod validated_tx; pub use data_store::TransactionInputsDataStore; +use miden_node_utils::spawn::{spawn_blocking_in_current_span, spawn_blocking_in_span}; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::transaction::{ProvenTransaction, TransactionHeader, TransactionInputs}; use miden_tx::auth::UnreachableAuth; @@ -39,23 +40,40 @@ pub async fn validate_transaction( proven_tx: ProvenTransaction, tx_inputs: TransactionInputs, ) -> Result { - // First, verify the transaction proof - info_span!("verify").in_scope(|| { - let tx_verifier = TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL); - tx_verifier.verify(&proven_tx) - })?; + // Proof verification is CPU-intensive; run it on a dedicated blocking thread. + let proven_tx_clone = proven_tx.clone(); + spawn_blocking_in_span( + move || TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone), + info_span!("verify"), + ) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; // Create a DataStore from the transaction inputs. let data_store = TransactionInputsDataStore::new(tx_inputs.clone()); - // Execute the transaction. + // VM execution may not yield; run it on a dedicated blocking thread. let (account, block_header, _, input_notes, tx_args) = tx_inputs.into_parts(); - let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> = - TransactionExecutor::new(&data_store); - let executed_tx = executor - .execute_transaction(account.id(), block_header.block_num(), input_notes, tx_args) - .instrument(info_span!("execute")) - .await?; + let execute_span = info_span!("execute").or_current(); + let executed_tx = spawn_blocking_in_current_span(move || { + let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> = + TransactionExecutor::new(&data_store); + tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to build tokio runtime") + .block_on( + executor + .execute_transaction( + account.id(), + block_header.block_num(), + input_notes, + tx_args, + ) + .instrument(execute_span), + ) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; // Validate that the executed transaction matches the submitted transaction. let executed_tx_header: TransactionHeader = (&executed_tx).into();