Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bin/remote-prover/src/server/prover.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use miden_block_prover::LocalBlockProver;
use miden_node_proto::BlockProofRequest;
use miden_node_utils::ErrorReport;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::MIN_PROOF_SECURITY_LEVEL;
use miden_protocol::batch::{ProposedBatch, ProvenBatch};
Expand Down Expand Up @@ -113,7 +114,8 @@ impl ProveRequest for LocalBatchProver {

async fn prove(&self, input: Self::Input) -> Result<Self::Output, tonic::Status> {
let prover = self.clone();
tokio::task::spawn_blocking(move || {

spawn_blocking_in_current_span(move || {
prover
.prove(input)
.map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove batch")))
Expand All @@ -131,7 +133,8 @@ impl ProveRequest for LocalBlockProver {
async fn prove(&self, input: Self::Input) -> Result<Self::Output, tonic::Status> {
let prover = self.clone();
let BlockProofRequest { tx_batches, block_header, block_inputs } = input;
tokio::task::spawn_blocking(move || {

spawn_blocking_in_current_span(move || {
prover
.prove(tx_batches, &block_header, block_inputs)
.map_err(|e| tonic::Status::internal(e.as_report_context("failed to prove block")))
Expand Down
3 changes: 3 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ disallowed-methods = [
{ path = "std::path::Path::read_link", reason = "Use fs_err::path::PathExt methods" },
{ path = "std::path::Path::symlink_metadata", reason = "Use fs_err::path::PathExt methods" },
{ path = "std::path::Path::try_exists", reason = "Use fs_err::path::PathExt methods" },

# Use our own `spawn_blocking` wrapper so that the tracing span is correctly propagated
{ path = "tokio::task::spawn_blocking", replacement = "miden_node_utils::spawn::spawn_blocking_in_current_span" },
]
13 changes: 8 additions & 5 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use futures::never::Never;
use futures::{FutureExt, TryFutureExt};
use miden_node_proto::domain::batch::BatchInputs;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::MIN_PROOF_SECURITY_LEVEL;
use miden_protocol::batch::{BatchId, ProposedBatch, ProvenBatch};
Expand Down Expand Up @@ -249,12 +250,14 @@ impl BatchJob {
.prove(proposed_batch)
.await
.map_err(BuildBatchError::RemoteProverClientError),
BatchProver::Local(prover) => tokio::task::spawn_blocking({
BatchProver::Local(prover) => {
let prover = prover.clone();
move || prover.prove(proposed_batch).map_err(BuildBatchError::ProveBatchError)
})
.await
.map_err(BuildBatchError::JoinError)?,
spawn_blocking_in_current_span(move || {
prover.prove(proposed_batch).map_err(BuildBatchError::ProveBatchError)
})
.await
.map_err(BuildBatchError::JoinError)?
},
}?;

if proven_batch.proof_security_level() < MIN_PROOF_SECURITY_LEVEL {
Expand Down
3 changes: 2 additions & 1 deletion crates/block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use anyhow::Context;
use futures::FutureExt;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::batch::{OrderedBatches, ProvenBatch};
use miden_protocol::block::{BlockInputs, BlockNumber, ProposedBlock, ProvenBlock, SignedBlock};
Expand Down Expand Up @@ -225,7 +226,7 @@ impl BlockBuilder {
proposed_block: ProposedBlock,
) -> Result<(OrderedBatches, SignedBlock), BuildBlockError> {
// Concurrently build the block and validate it via the validator.
let build_result = tokio::task::spawn_blocking({
let build_result = spawn_blocking_in_current_span({
let proposed_block = proposed_block.clone();
move || proposed_block.into_header_and_body()
});
Expand Down
9 changes: 6 additions & 3 deletions crates/block-producer/src/server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use miden_node_proto::generated::block_producer::api_client as block_producer_cl
use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, GenesisState, Store};
use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions};
use miden_node_utils::fee::test_fee_params;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_validator::{Validator, ValidatorSigner};
use miden_protocol::testing::random_secret_key::random_secret_key;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -172,9 +173,11 @@ async fn start_store(
/// Shuts down the store runtime properly to allow the database to flush before the temp directory
/// is deleted.
async fn shutdown_store(store_runtime: runtime::Runtime) {
task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_millis(500)))
.await
.expect("shutdown should complete");
spawn_blocking_in_current_span(move || {
store_runtime.shutdown_timeout(Duration::from_millis(500));
})
.await
.expect("shutdown should complete");
}

/// Sends a status request to the block producer to verify connectivity.
Expand Down
30 changes: 19 additions & 11 deletions crates/ntx-builder/src/actor/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};

use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::Word;
use miden_protocol::account::{
Expand Down Expand Up @@ -202,8 +203,10 @@ impl NtxContext {
// calls to the store) are driven by the existing I/O driver.
let ctx = self.clone();
let handle = tokio::runtime::Handle::current();
let span = tracing::Span::current();

let (executed_tx, failed_notes, scripts_to_cache) =
tokio::task::spawn_blocking(move || {
spawn_blocking_in_current_span(move || {
let data_store = NtxDataStore::new(
account,
chain_tip_header,
Expand All @@ -212,14 +215,17 @@ impl NtxContext {
ctx.script_cache.clone(),
ctx.db.clone(),
);
handle.block_on(async {
let (successful_notes, failed_notes) =
ctx.filter_notes(&data_store, notes).await?;
let executed_tx =
Box::pin(ctx.execute(&data_store, successful_notes)).await?;
let scripts_to_cache = data_store.take_fetched_scripts();
Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache))
})
handle.block_on(
async {
let (successful_notes, failed_notes) =
ctx.filter_notes(&data_store, notes).await?;
let executed_tx =
Box::pin(ctx.execute(&data_store, successful_notes)).await?;
let scripts_to_cache = data_store.take_fetched_scripts();
Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache))
}
.instrument(span),
)
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?;
Expand Down Expand Up @@ -330,12 +336,14 @@ impl NtxContext {
} else {
// ZK proof generation is CPU-intensive; run it on a dedicated blocking thread.
let tx_inputs = tx_inputs.clone();
tokio::task::spawn_blocking(move || {
let span = tracing::Span::current();

spawn_blocking_in_current_span(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime")
.block_on(LocalTransactionProver::default().prove(tx_inputs))
.block_on(LocalTransactionProver::default().prove(tx_inputs).instrument(span))
})
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))
Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use miden_node_utils::limiter::{
QueryParamNoteIdLimit,
QueryParamNullifierLimit,
};
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_protocol::Word;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::account::{
Expand Down Expand Up @@ -568,7 +569,7 @@ async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, So
/// Shuts down the store runtime properly to allow `RocksDB` to flush before the temp directory is
/// deleted.
async fn shutdown_store(store_runtime: Runtime) {
task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_secs(3)))
spawn_blocking_in_current_span(move || store_runtime.shutdown_timeout(Duration::from_secs(3)))
.await
.expect("shutdown should complete");
// Give RocksDB time to release its lock file after the runtime shutdown
Expand Down
11 changes: 6 additions & 5 deletions crates/store/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ use miden_node_proto_build::{
};
use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions};
use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
use miden_node_utils::spawn::spawn_blocking_in_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_node_utils::tracing::grpc::grpc_trace_fn;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::trace::TraceLayer;
use tracing::{Instrument, info, info_span, instrument};
use tracing::{info, info_span, instrument};
use url::Url;

use crate::blocks::BlockStore;
Expand Down Expand Up @@ -208,10 +209,10 @@ impl Store {
loop {
interval.tick().await;
let dir = data_directory.clone();
let span = info_span!(target: COMPONENT, "measure disk space usage");
let result = tokio::task::spawn_blocking(move || measure_disk_usage_bytes(&dir))
.instrument(span.clone())
.await;
let span = info_span!(target: COMPONENT, "measure_disk_space_usage");
let result =
spawn_blocking_in_span(move || measure_disk_usage_bytes(&dir), span.clone())
.await;
match result {
Ok(usage) => {
span.set_attribute("db.sqlite.size", usage.sqlite_db);
Expand Down
1 change: 1 addition & 0 deletions crates/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod limiter;
pub mod logging;
pub mod lru_cache;
pub mod panic;
pub mod spawn;
pub mod tracing;

pub trait ErrorReport: std::error::Error {
Expand Down
21 changes: 21 additions & 0 deletions crates/utils/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use tokio::task::JoinHandle;
use tracing::Span;

/// Spawn a blocking task in the current tracing span.
pub fn spawn_blocking_in_current_span<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
spawn_blocking_in_span(f, Span::current())
}

/// Spawn a blocking task in a span.
pub fn spawn_blocking_in_span<F, R>(f: F, span: Span) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[expect(clippy::disallowed_methods)]
tokio::task::spawn_blocking(move || span.in_scope(f))
}
3 changes: 2 additions & 1 deletion crates/validator/src/signers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod kms;
pub use kms::KmsSigner;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_protocol::block::BlockHeader;
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, SecretKey, Signature};

Expand Down Expand Up @@ -40,7 +41,7 @@ impl ValidatorSigner {
let commitment = header.commitment();
let signature = match self {
Self::Kms(signer) => signer.sign(commitment).await?,
Self::Local(signer) => tokio::task::spawn_blocking({
Self::Local(signer) => spawn_blocking_in_current_span({
let signer = signer.clone();
move || signer.sign(commitment)
})
Expand Down
14 changes: 7 additions & 7 deletions crates/validator/src/tx_validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod data_store;
mod validated_tx;

pub use data_store::TransactionInputsDataStore;
use miden_node_utils::spawn::{spawn_blocking_in_current_span, spawn_blocking_in_span};
use miden_protocol::MIN_PROOF_SECURITY_LEVEL;
use miden_protocol::transaction::{ProvenTransaction, TransactionHeader, TransactionInputs};
use miden_tx::auth::UnreachableAuth;
Expand Down Expand Up @@ -41,11 +42,10 @@ pub async fn validate_transaction(
) -> Result<ValidatedTransaction, TransactionValidationError> {
// Proof verification is CPU-intensive; run it on a dedicated blocking thread.
let proven_tx_clone = proven_tx.clone();
tokio::task::spawn_blocking(move || {
info_span!("verify").in_scope(|| {
TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone)
})
})
spawn_blocking_in_span(
move || TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL).verify(&proven_tx_clone),
info_span!("verify"),
)
.await
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()))?;

Expand All @@ -54,8 +54,8 @@ pub async fn validate_transaction(

// VM execution may not yield; run it on a dedicated blocking thread.
let (account, block_header, _, input_notes, tx_args) = tx_inputs.into_parts();
let execute_span = info_span!("execute");
let executed_tx = tokio::task::spawn_blocking(move || {
let execute_span = info_span!("execute").or_current();
let executed_tx = spawn_blocking_in_current_span(move || {
let executor: TransactionExecutor<'_, '_, _, UnreachableAuth> =
TransactionExecutor::new(&data_store);
tokio::runtime::Builder::new_current_thread()
Expand Down
Loading