Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v0.14.11 (TBD)

- Replaced blocking-in-async operations in the validator, remote prover, and ntx-builder with `spawn_blocking` to avoid starving the Tokio runtime ([#2041](https://github.com/0xMiden/node/pull/2041)).
- Implement persistent RocksDB backend for `AccountStateForest`, improving startup time ([#2020](https://github.com/0xMiden/node/pull/2020)).

## v0.14.10 (2026-05-29)
Expand Down
20 changes: 16 additions & 4 deletions bin/remote-prover/src/server/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,14 @@ impl ProveRequest for LocalBatchProver {
type Output = ProvenBatch;

async fn prove(&self, input: Self::Input) -> Result<Self::Output, tonic::Status> {
self.prove(input)
.map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch")))
let prover = self.clone();
tokio::task::spawn_blocking(move || {
prover
.prove(input)
.map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch")))
})
.await
.map_err(|e| tonic::Status::internal(e.as_report_context("batch prover task panicked")))?
}
}

Expand All @@ -123,8 +129,14 @@ impl ProveRequest for LocalBlockProver {
type Output = BlockProof;

async fn prove(&self, input: Self::Input) -> Result<Self::Output, tonic::Status> {
let prover = self.clone();
let BlockProofRequest { tx_batches, block_header, block_inputs } = input;
self.prove(tx_batches, &block_header, block_inputs)
.map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block")))
tokio::task::spawn_blocking(move || {
prover
.prove(tx_batches, &block_header, block_inputs)
.map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block")))
})
.await
.map_err(|e| tonic::Status::internal(e.as_report_context("block prover task panicked")))?
}
}
60 changes: 39 additions & 21 deletions crates/ntx-builder/src/actor/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,25 +194,35 @@ impl NtxContext {

async move {
Box::pin(async move {
let data_store = NtxDataStore::new(
account,
chain_tip_header,
chain_mmr,
self.store.clone(),
self.script_cache.clone(),
self.db.clone(),
);

// Filter notes.
let notes = notes.into_iter().map(Note::from).collect::<Vec<_>>();
let (successful_notes, failed_notes) =
self.filter_notes(&data_store, notes).await?;

// Execute transaction.
let executed_tx = Box::pin(self.execute(&data_store, successful_notes)).await?;

// Collect scripts fetched from the remote store during execution.
let scripts_to_cache = data_store.take_fetched_scripts();
// VM execution (note filtering + transaction execution) is CPU-intensive and
// may not yield between await points. Run on a dedicated blocking thread,
// using the parent runtime handle so that async data-store callbacks (gRPC
// calls to the store) are driven by the existing I/O driver.
let ctx = self.clone();
let handle = tokio::runtime::Handle::current();
let (executed_tx, failed_notes, scripts_to_cache) =
tokio::task::spawn_blocking(move || {
let data_store = NtxDataStore::new(
account,
chain_tip_header,
chain_mmr,
ctx.store.clone(),
ctx.script_cache.clone(),
ctx.db.clone(),
);
handle.block_on(async {
let (successful_notes, failed_notes) =
ctx.filter_notes(&data_store, notes).await?;
let executed_tx =
Box::pin(ctx.execute(&data_store, successful_notes)).await?;
let scripts_to_cache = data_store.take_fetched_scripts();
Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache))
})
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?;

// Prove transaction.
let tx_inputs: TransactionInputs = executed_tx.into();
Expand Down Expand Up @@ -316,13 +326,21 @@ impl NtxContext {
#[instrument(target = COMPONENT, name = "ntx.execute_transaction.prove", skip_all, err)]
async fn prove(&self, tx_inputs: &TransactionInputs) -> NtxResult<ProvenTransaction> {
if let Some(remote) = &self.prover {
remote.prove(tx_inputs).await
remote.prove(tx_inputs).await.map_err(NtxError::Proving)
} else {
// Only perform tx inputs clone for local proving.
// ZK proof generation is CPU-intensive; run it on a dedicated blocking thread.
let tx_inputs = tx_inputs.clone();
LocalTransactionProver::default().prove(tx_inputs).await
tokio::task::spawn_blocking(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime")
.block_on(LocalTransactionProver::default().prove(tx_inputs))
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))
.map_err(NtxError::Proving)
}
.map_err(NtxError::Proving)
}

/// Submits the transaction to the block producer.
Expand Down
11 changes: 10 additions & 1 deletion crates/validator/src/signers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ impl ValidatorSigner {
Ok(sig)
},
Self::Local(signer) => {
let sig = <SecretKey as BlockSigner>::sign(signer, header).await?;
let signer = signer.clone();
let header = header.clone();
let sig = tokio::task::spawn_blocking(move || {
tokio::runtime::Builder::new_current_thread()
.build()
.expect("failed to build tokio runtime")
.block_on(<SecretKey as BlockSigner>::sign(&signer, &header))
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?;
Ok(sig)
},
}
Expand Down
42 changes: 30 additions & 12 deletions crates/validator/src/tx_validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,41 @@ pub async fn validate_transaction(
proven_tx: ProvenTransaction,
tx_inputs: TransactionInputs,
) -> Result<ValidatedTransaction, TransactionValidationError> {
// First, verify the transaction proof
info_span!("verify").in_scope(|| {
let tx_verifier = TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL);
tx_verifier.verify(&proven_tx)
})?;
// Proof verification is CPU-intensive; run it on a dedicated blocking thread.
let proven_tx_clone = proven_tx.clone();
tokio::task::spawn_blocking(move || {
info_span!("verify").in_scope(|| {
TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone)
})
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?;

// Create a DataStore from the transaction inputs.
let data_store = TransactionInputsDataStore::new(tx_inputs.clone());

// Execute the transaction.
// VM execution may not yield; run it on a dedicated blocking thread.
let (account, block_header, _, input_notes, tx_args) = tx_inputs.into_parts();
let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> =
TransactionExecutor::new(&data_store);
let executed_tx = executor
.execute_transaction(account.id(), block_header.block_num(), input_notes, tx_args)
.instrument(info_span!("execute"))
.await?;
let execute_span = info_span!("execute");
let executed_tx = tokio::task::spawn_blocking(move || {
let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> =
TransactionExecutor::new(&data_store);
tokio::runtime::Builder::new_current_thread()
.build()
.expect("failed to build tokio runtime")
.block_on(
executor
.execute_transaction(
account.id(),
block_header.block_num(),
input_notes,
tx_args,
)
.instrument(execute_span),
)
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?;

// Validate that the executed transaction matches the submitted transaction.
let executed_tx_header: TransactionHeader = (&executed_tx).into();
Expand Down
Loading