From e9cc5ca25e73279bb3b7d9de9fb9031d89690a79 Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Thu, 7 May 2026 12:30:00 +0200 Subject: [PATCH 1/2] fix: correctly propagate the current span to blocking tasks --- bin/remote-prover/src/server/prover.rs | 7 +++-- .../block-producer/src/batch_builder/mod.rs | 13 ++++---- .../block-producer/src/block_builder/mod.rs | 3 +- crates/ntx-builder/src/actor/execute.rs | 30 ++++++++++++------- crates/store/src/server/mod.rs | 11 +++---- crates/utils/src/lib.rs | 1 + crates/utils/src/spawn.rs | 20 +++++++++++++ crates/validator/src/signers/mod.rs | 3 +- crates/validator/src/tx_validation/mod.rs | 14 ++++----- 9 files changed, 70 insertions(+), 32 deletions(-) create mode 100644 crates/utils/src/spawn.rs diff --git a/bin/remote-prover/src/server/prover.rs b/bin/remote-prover/src/server/prover.rs index f8c552138..06aa761e5 100644 --- a/bin/remote-prover/src/server/prover.rs +++ b/bin/remote-prover/src/server/prover.rs @@ -1,6 +1,7 @@ use miden_block_prover::LocalBlockProver; use miden_node_proto::BlockProofRequest; use miden_node_utils::ErrorReport; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::batch::{ProposedBatch, ProvenBatch}; @@ -113,7 +114,8 @@ impl ProveRequest for LocalBatchProver { async fn prove(&self, input: Self::Input) -> Result { let prover = self.clone(); - tokio::task::spawn_blocking(move || { + + spawn_blocking_in_current_span(move || { prover .prove(input) .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch"))) @@ -131,7 +133,8 @@ impl ProveRequest for LocalBlockProver { async fn prove(&self, input: Self::Input) -> Result { let prover = self.clone(); let BlockProofRequest { tx_batches, block_header, block_inputs } = input; - tokio::task::spawn_blocking(move || { + + spawn_blocking_in_current_span(move || { prover .prove(tx_batches, &block_header, block_inputs) .map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block"))) diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index 549d76261..86f7e49d5 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -6,6 +6,7 @@ use std::time::Duration; use futures::never::Never; use futures::{FutureExt, TryFutureExt}; use miden_node_proto::domain::batch::BatchInputs; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::batch::{BatchId, ProposedBatch, ProvenBatch}; @@ -249,12 +250,14 @@ impl BatchJob { .prove(proposed_batch) .await .map_err(BuildBatchError::RemoteProverClientError), - BatchProver::Local(prover) => tokio::task::spawn_blocking({ + BatchProver::Local(prover) => { let prover = prover.clone(); - move || prover.prove(proposed_batch).map_err(BuildBatchError::ProveBatchError) - }) - .await - .map_err(BuildBatchError::JoinError)?, + spawn_blocking_in_current_span(move || { + prover.prove(proposed_batch).map_err(BuildBatchError::ProveBatchError) + }) + .await + .map_err(BuildBatchError::JoinError)? + }, }?; if proven_batch.proof_security_level() < MIN_PROOF_SECURITY_LEVEL { diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 85acb62bf..3d1613c6c 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::Context; use futures::FutureExt; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::batch::{OrderedBatches, ProvenBatch}; use miden_protocol::block::{BlockInputs, BlockNumber, ProposedBlock, ProvenBlock, SignedBlock}; @@ -225,7 +226,7 @@ impl BlockBuilder { proposed_block: ProposedBlock, ) -> Result<(OrderedBatches, SignedBlock), BuildBlockError> { // Concurrently build the block and validate it via the validator. - let build_result = tokio::task::spawn_blocking({ + let build_result = spawn_blocking_in_current_span({ let proposed_block = proposed_block.clone(); move || proposed_block.into_header_and_body() }); diff --git a/crates/ntx-builder/src/actor/execute.rs b/crates/ntx-builder/src/actor/execute.rs index f001883dc..2b465adbd 100644 --- a/crates/ntx-builder/src/actor/execute.rs +++ b/crates/ntx-builder/src/actor/execute.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex}; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; use miden_protocol::account::{ @@ -202,8 +203,10 @@ impl NtxContext { // calls to the store) are driven by the existing I/O driver. let ctx = self.clone(); let handle = tokio::runtime::Handle::current(); + let span = tracing::Span::current(); + let (executed_tx, failed_notes, scripts_to_cache) = - tokio::task::spawn_blocking(move || { + spawn_blocking_in_current_span(move || { let data_store = NtxDataStore::new( account, chain_tip_header, @@ -212,14 +215,17 @@ impl NtxContext { ctx.script_cache.clone(), ctx.db.clone(), ); - handle.block_on(async { - let (successful_notes, failed_notes) = - ctx.filter_notes(&data_store, notes).await?; - let executed_tx = - Box::pin(ctx.execute(&data_store, successful_notes)).await?; - let scripts_to_cache = data_store.take_fetched_scripts(); - Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache)) - }) + handle.block_on( + async { + let (successful_notes, failed_notes) = + ctx.filter_notes(&data_store, notes).await?; + let executed_tx = + Box::pin(ctx.execute(&data_store, successful_notes)).await?; + let scripts_to_cache = data_store.take_fetched_scripts(); + Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache)) + } + .instrument(span), + ) }) .await .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; @@ -330,12 +336,14 @@ impl NtxContext { } else { // ZK proof generation is CPU-intensive; run it on a dedicated blocking thread. let tx_inputs = tx_inputs.clone(); - tokio::task::spawn_blocking(move || { + let span = tracing::Span::current(); + + spawn_blocking_in_current_span(move || { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("failed to build tokio runtime") - .block_on(LocalTransactionProver::default().prove(tx_inputs)) + .block_on(LocalTransactionProver::default().prove(tx_inputs).instrument(span)) }) .await .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic())) diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index e3afd9738..4e8f59914 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -13,13 +13,14 @@ use miden_node_proto_build::{ }; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; +use miden_node_utils::spawn::spawn_blocking_in_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_node_utils::tracing::grpc::grpc_trace_fn; use tokio::net::TcpListener; use tokio::task::JoinSet; use tokio_stream::wrappers::TcpListenerStream; use tower_http::trace::TraceLayer; -use tracing::{Instrument, info, info_span, instrument}; +use tracing::{info, info_span, instrument}; use url::Url; use crate::blocks::BlockStore; @@ -208,10 +209,10 @@ impl Store { loop { interval.tick().await; let dir = data_directory.clone(); - let span = info_span!(target: COMPONENT, "measure disk space usage"); - let result = tokio::task::spawn_blocking(move || measure_disk_usage_bytes(&dir)) - .instrument(span.clone()) - .await; + let span = info_span!(target: COMPONENT, "measure_disk_space_usage"); + let result = + spawn_blocking_in_span(move || measure_disk_usage_bytes(&dir), span.clone()) + .await; match result { Ok(usage) => { span.set_attribute("db.sqlite.size", usage.sqlite_db); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index ea9e60c5a..ca9977ffb 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -10,6 +10,7 @@ pub mod limiter; pub mod logging; pub mod lru_cache; pub mod panic; +pub mod spawn; pub mod tracing; pub trait ErrorReport: std::error::Error { diff --git a/crates/utils/src/spawn.rs b/crates/utils/src/spawn.rs new file mode 100644 index 000000000..0b870ad57 --- /dev/null +++ b/crates/utils/src/spawn.rs @@ -0,0 +1,20 @@ +use tokio::task::JoinHandle; +use tracing::Span; + +/// Spawn a blocking task in the current tracing span. +pub fn spawn_blocking_in_current_span(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + spawn_blocking_in_span(f, Span::current()) +} + +/// Spawn a blocking task in a span. +pub fn spawn_blocking_in_span(f: F, span: Span) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + tokio::task::spawn_blocking(move || span.in_scope(f)) +} diff --git a/crates/validator/src/signers/mod.rs b/crates/validator/src/signers/mod.rs index e4c2192b3..2c50b2dbb 100644 --- a/crates/validator/src/signers/mod.rs +++ b/crates/validator/src/signers/mod.rs @@ -1,5 +1,6 @@ mod kms; pub use kms::KmsSigner; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_protocol::block::BlockHeader; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, SecretKey, Signature}; @@ -40,7 +41,7 @@ impl ValidatorSigner { let commitment = header.commitment(); let signature = match self { Self::Kms(signer) => signer.sign(commitment).await?, - Self::Local(signer) => tokio::task::spawn_blocking({ + Self::Local(signer) => spawn_blocking_in_current_span({ let signer = signer.clone(); move || signer.sign(commitment) }) diff --git a/crates/validator/src/tx_validation/mod.rs b/crates/validator/src/tx_validation/mod.rs index d33c0934a..992153296 100644 --- a/crates/validator/src/tx_validation/mod.rs +++ b/crates/validator/src/tx_validation/mod.rs @@ -2,6 +2,7 @@ mod data_store; mod validated_tx; pub use data_store::TransactionInputsDataStore; +use miden_node_utils::spawn::{spawn_blocking_in_current_span, spawn_blocking_in_span}; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::transaction::{ProvenTransaction, TransactionHeader, TransactionInputs}; use miden_tx::auth::UnreachableAuth; @@ -41,11 +42,10 @@ pub async fn validate_transaction( ) -> Result { // Proof verification is CPU-intensive; run it on a dedicated blocking thread. let proven_tx_clone = proven_tx.clone(); - tokio::task::spawn_blocking(move || { - info_span!("verify").in_scope(|| { - TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone) - }) - }) + spawn_blocking_in_span( + move || TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone), + info_span!("verify"), + ) .await .unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?; @@ -54,8 +54,8 @@ pub async fn validate_transaction( // VM execution may not yield; run it on a dedicated blocking thread. let (account, block_header, _, input_notes, tx_args) = tx_inputs.into_parts(); - let execute_span = info_span!("execute"); - let executed_tx = tokio::task::spawn_blocking(move || { + let execute_span = info_span!("execute").or_current(); + let executed_tx = spawn_blocking_in_current_span(move || { let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> = TransactionExecutor::new(&data_store); tokio::runtime::Builder::new_current_thread() From ef43f307651fa441ec2102237a9fe97efa9efb5e Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Thu, 7 May 2026 13:56:27 +0200 Subject: [PATCH 2/2] chore(clippy): add `tokio::task::spawn_blocking` to disallowed_methods --- clippy.toml | 3 +++ crates/block-producer/src/server/tests.rs | 9 ++++++--- crates/rpc/src/tests.rs | 3 ++- crates/utils/src/spawn.rs | 1 + 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/clippy.toml b/clippy.toml index 2a5815cec..9ee9a79b1 100644 --- a/clippy.toml +++ b/clippy.toml @@ -29,4 +29,7 @@ disallowed-methods = [ { path = "std::path::Path::read_link", reason = "Use fs_err::path::PathExt methods" }, { path = "std::path::Path::symlink_metadata", reason = "Use fs_err::path::PathExt methods" }, { path = "std::path::Path::try_exists", reason = "Use fs_err::path::PathExt methods" }, + + # Use our own `spawn_blocking` wrapper so that the tracing span is correctly propagated + { path = "tokio::task::spawn_blocking", replacement = "miden_node_utils::spawn::spawn_blocking_in_current_span" }, ] diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index 73055deff..a5a2e3412 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -5,6 +5,7 @@ use miden_node_proto::generated::block_producer::api_client as block_producer_cl use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, GenesisState, Store}; use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions}; use miden_node_utils::fee::test_fee_params; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_validator::{Validator, ValidatorSigner}; use miden_protocol::testing::random_secret_key::random_secret_key; use tokio::net::TcpListener; @@ -172,9 +173,11 @@ async fn start_store( /// Shuts down the store runtime properly to allow the database to flush before the temp directory /// is deleted. async fn shutdown_store(store_runtime: runtime::Runtime) { - task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_millis(500))) - .await - .expect("shutdown should complete"); + spawn_blocking_in_current_span(move || { + store_runtime.shutdown_timeout(Duration::from_millis(500)); + }) + .await + .expect("shutdown should complete"); } /// Sends a status request to the block producer to verify connectivity. diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index f892316d8..290c4afa9 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -17,6 +17,7 @@ use miden_node_utils::limiter::{ QueryParamNoteIdLimit, QueryParamNullifierLimit, }; +use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_protocol::Word; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::account::{ @@ -568,7 +569,7 @@ async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, So /// Shuts down the store runtime properly to allow `RocksDB` to flush before the temp directory is /// deleted. async fn shutdown_store(store_runtime: Runtime) { - task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_secs(3))) + spawn_blocking_in_current_span(move || store_runtime.shutdown_timeout(Duration::from_secs(3))) .await .expect("shutdown should complete"); // Give RocksDB time to release its lock file after the runtime shutdown diff --git a/crates/utils/src/spawn.rs b/crates/utils/src/spawn.rs index 0b870ad57..b0dc5438f 100644 --- a/crates/utils/src/spawn.rs +++ b/crates/utils/src/spawn.rs @@ -16,5 +16,6 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { + #[expect(clippy::disallowed_methods)] tokio::task::spawn_blocking(move || span.in_scope(f)) }