diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d2031391..59a710e02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## v0.14.11 (TBD) +- Replaced blocking-in-async operations in the validator, remote prover, and ntx-builder with `spawn_blocking` to avoid starving the Tokio runtime ([#2041](https://github.com/0xMiden/node/pull/2041)). - Implement persistent RocksDB backend for `AccountStateForest`, improving startup time ([#2020](https://github.com/0xMiden/node/pull/2020)). ## v0.14.10 (2026-05-29) diff --git a/bin/remote-prover/src/server/prover.rs b/bin/remote-prover/src/server/prover.rs index 2931cc70f..f8c552138 100644 --- a/bin/remote-prover/src/server/prover.rs +++ b/bin/remote-prover/src/server/prover.rs @@ -112,8 +112,14 @@ impl ProveRequest for LocalBatchProver { type Output = ProvenBatch; async fn prove(&self, input: Self::Input) -> Result { - self.prove(input) - .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch"))) + let prover = self.clone(); + tokio::task::spawn_blocking(move || { + prover + .prove(input) + .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch"))) + }) + .await + .map_err(|e| tonic::Status::internal(e.as_report_context("batch prover task panicked")))? } } @@ -123,8 +129,14 @@ impl ProveRequest for LocalBlockProver { type Output = BlockProof; async fn prove(&self, input: Self::Input) -> Result { + let prover = self.clone(); let BlockProofRequest { tx_batches, block_header, block_inputs } = input; - self.prove(tx_batches, &block_header, block_inputs) - .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block"))) + tokio::task::spawn_blocking(move || { + prover + .prove(tx_batches, &block_header, block_inputs) + .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block"))) + }) + .await + .map_err(|e| tonic::Status::internal(e.as_report_context("block prover task panicked")))? } } diff --git a/crates/ntx-builder/src/actor/execute.rs b/crates/ntx-builder/src/actor/execute.rs index 2888e1b77..f001883dc 100644 --- a/crates/ntx-builder/src/actor/execute.rs +++ b/crates/ntx-builder/src/actor/execute.rs @@ -194,25 +194,35 @@ impl NtxContext { async move { Box::pin(async move { - let data_store = NtxDataStore::new( - account, - chain_tip_header, - chain_mmr, - self.store.clone(), - self.script_cache.clone(), - self.db.clone(), - ); - - // Filter notes. let notes = notes.into_iter().map(Note::from).collect::>(); - let (successful_notes, failed_notes) = - self.filter_notes(&data_store, notes).await?; - // Execute transaction. - let executed_tx = Box::pin(self.execute(&data_store, successful_notes)).await?; - - // Collect scripts fetched from the remote store during execution. - let scripts_to_cache = data_store.take_fetched_scripts(); + // VM execution (note filtering + transaction execution) is CPU-intensive and + // may not yield between await points. Run on a dedicated blocking thread, + // using the parent runtime handle so that async data-store callbacks (gRPC + // calls to the store) are driven by the existing I/O driver. + let ctx = self.clone(); + let handle = tokio::runtime::Handle::current(); + let (executed_tx, failed_notes, scripts_to_cache) = + tokio::task::spawn_blocking(move || { + let data_store = NtxDataStore::new( + account, + chain_tip_header, + chain_mmr, + ctx.store.clone(), + ctx.script_cache.clone(), + ctx.db.clone(), + ); + handle.block_on(async { + let (successful_notes, failed_notes) = + ctx.filter_notes(&data_store, notes).await?; + let executed_tx = + Box::pin(ctx.execute(&data_store, successful_notes)).await?; + let scripts_to_cache = data_store.take_fetched_scripts(); + Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache)) + }) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; // Prove transaction. let tx_inputs: TransactionInputs = executed_tx.into(); @@ -316,13 +326,21 @@ impl NtxContext { #[instrument(target = COMPONENT, name = "ntx.execute_transaction.prove", skip_all, err)] async fn prove(&self, tx_inputs: &TransactionInputs) -> NtxResult { if let Some(remote) = &self.prover { - remote.prove(tx_inputs).await + remote.prove(tx_inputs).await.map_err(NtxError::Proving) } else { - // Only perform tx inputs clone for local proving. + // ZK proof generation is CPU-intensive; run it on a dedicated blocking thread. let tx_inputs = tx_inputs.clone(); - LocalTransactionProver::default().prove(tx_inputs).await + tokio::task::spawn_blocking(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime") + .block_on(LocalTransactionProver::default().prove(tx_inputs)) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())) + .map_err(NtxError::Proving) } - .map_err(NtxError::Proving) } /// Submits the transaction to the block producer. diff --git a/crates/validator/src/signers/mod.rs b/crates/validator/src/signers/mod.rs index 21bbeaa7a..6e6092350 100644 --- a/crates/validator/src/signers/mod.rs +++ b/crates/validator/src/signers/mod.rs @@ -36,7 +36,16 @@ impl ValidatorSigner { Ok(sig) }, Self::Local(signer) => { - let sig = ::sign(signer, header).await?; + let signer = signer.clone(); + let header = header.clone(); + let sig = tokio::task::spawn_blocking(move || { + tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to build tokio runtime") + .block_on(::sign(&signer, &header)) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; Ok(sig) }, } diff --git a/crates/validator/src/tx_validation/mod.rs b/crates/validator/src/tx_validation/mod.rs index f2d1250a2..d33c0934a 100644 --- a/crates/validator/src/tx_validation/mod.rs +++ b/crates/validator/src/tx_validation/mod.rs @@ -39,23 +39,41 @@ pub async fn validate_transaction( proven_tx: ProvenTransaction, tx_inputs: TransactionInputs, ) -> Result { - // First, verify the transaction proof - info_span!("verify").in_scope(|| { - let tx_verifier = TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL); - tx_verifier.verify(&proven_tx) - })?; + // Proof verification is CPU-intensive; run it on a dedicated blocking thread. + let proven_tx_clone = proven_tx.clone(); + tokio::task::spawn_blocking(move || { + info_span!("verify").in_scope(|| { + TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone) + }) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; // Create a DataStore from the transaction inputs. let data_store = TransactionInputsDataStore::new(tx_inputs.clone()); - // Execute the transaction. + // VM execution may not yield; run it on a dedicated blocking thread. let (account, block_header, _, input_notes, tx_args) = tx_inputs.into_parts(); - let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> = - TransactionExecutor::new(&data_store); - let executed_tx = executor - .execute_transaction(account.id(), block_header.block_num(), input_notes, tx_args) - .instrument(info_span!("execute")) - .await?; + let execute_span = info_span!("execute"); + let executed_tx = tokio::task::spawn_blocking(move || { + let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> = + TransactionExecutor::new(&data_store); + tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to build tokio runtime") + .block_on( + executor + .execute_transaction( + account.id(), + block_header.block_num(), + input_notes, + tx_args, + ) + .instrument(execute_span), + ) + }) + .await + .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; // Validate that the executed transaction matches the submitted transaction. let executed_tx_header: TransactionHeader = (&executed_tx).into();