diff --git a/Cargo.lock b/Cargo.lock
index d5813be584a..0b3b74ea02e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4865,6 +4865,7 @@ dependencies = [
"image",
"key-wallet",
"key-wallet-manager",
+ "parking_lot",
"platform-encryption",
"rand 0.8.5",
"serde_json",
diff --git a/packages/rs-platform-wallet/Cargo.toml b/packages/rs-platform-wallet/Cargo.toml
index 05e5eb0547c..01f48f06fc5 100644
--- a/packages/rs-platform-wallet/Cargo.toml
+++ b/packages/rs-platform-wallet/Cargo.toml
@@ -47,6 +47,12 @@ image = { version = "0.25", default-features = false, features = ["png", "jpeg",
# Security
zeroize = "1"
+# Sync primitives used by the optional `lock-stats` feature for the
+# per-tag breakdown map. Plain `parking_lot::Mutex` (sync, fast) is the
+# right shape here — the per-tag map is touched on the lock acquire /
+# release path, never across an `.await`.
+parking_lot = { version = "0.12", optional = true }
+
# Shielded pool (optional, behind `shielded` feature)
grovedb-commitment-tree = { git = "https://github.com/dashpay/grovedb", rev = "8f25b20d04bfc0e8bdfb3870676d647a0d74918b", optional = true }
zip32 = { version = "0.2.0", default-features = false, optional = true }
@@ -65,6 +71,16 @@ default = ["bls", "eddsa"]
bls = ["key-wallet/bls", "key-wallet-manager/bls"]
eddsa = ["key-wallet/eddsa", "key-wallet-manager/eddsa"]
shielded = ["dep:grovedb-commitment-tree", "dep:zip32", "dash-sdk/shielded", "dpp/shielded-client"]
+
+# Off by default. When enabled, the per-wallet `wallet_manager` RwLock is
+# wrapped with an `InstrumentedRwLock` that records acquisition counts,
+# wait time, and hold time per call site (using `read_at("tag")` /
+# `write_at("tag")`). With the feature off the wrapper is a transparent
+# type alias for `tokio::sync::RwLock` and there is zero runtime cost.
+# See `crate::diagnostics::instrumented_lock` for the API and
+# `LockStats` for the snapshot shape.
+lock-stats = ["dep:parking_lot"]
+
# Forward to the upstream `key-wallet` / `key-wallet-manager`
# `keep-finalized-transactions` feature. With it OFF (the default),
# chainlocked transactions are evicted from the in-memory
diff --git a/packages/rs-platform-wallet/src/changeset/core_bridge.rs b/packages/rs-platform-wallet/src/changeset/core_bridge.rs
index b2d9761ac2b..f5c04f35bce 100644
--- a/packages/rs-platform-wallet/src/changeset/core_bridge.rs
+++ b/packages/rs-platform-wallet/src/changeset/core_bridge.rs
@@ -33,12 +33,17 @@ use key_wallet::transaction_checking::TransactionContext;
use key_wallet::Utxo;
use key_wallet_manager::{WalletEvent, WalletId, WalletManager};
use tokio::sync::broadcast::error::RecvError;
-use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::changeset::changeset::{CoreChangeSet, PlatformWalletChangeSet};
use crate::changeset::traits::PlatformWalletPersistence;
+// `InstrumentedRwLockExt` is unused when `lock-stats` is on — the
+// wrapper has the same methods inherent and method resolution prefers
+// inherent over trait. Suppress the warning so the call sites can
+// import the trait unconditionally.
+#[allow(unused_imports)]
+use crate::diagnostics::{InstrumentedRwLock, InstrumentedRwLockExt};
use crate::wallet::platform_wallet::PlatformWalletInfo;
/// Spawn the wallet-event subscriber task.
@@ -54,7 +59,7 @@ use crate::wallet::platform_wallet::PlatformWalletInfo;
/// `Arc
` (not the `Arc`
/// coercion) to actually realize the static-dispatch win.
pub fn spawn_wallet_event_adapter(
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
persister: Arc,
cancel: CancellationToken,
) -> JoinHandle<()>
@@ -63,7 +68,11 @@ where
{
tokio::spawn(async move {
let mut receiver = {
- let guard = wallet_manager.read().await;
+ // Subscribe-time read; happens once at task start. Tagged
+ // so the `lock-stats` feature can confirm the one-shot
+ // nature of this acquisition vs. the per-event probe in
+ // `is_chain_locked`.
+ let guard = wallet_manager.read_at("event_adapter::subscribe").await;
guard.subscribe_events()
};
tracing::debug!("wallet-event adapter task started");
@@ -120,7 +129,7 @@ where
/// Project an upstream [`WalletEvent`] into a [`CoreChangeSet`] suitable
/// for atomic persistence.
async fn build_core_changeset(
- wallet_manager: &Arc>>,
+ wallet_manager: &Arc>>,
event: &WalletEvent,
) -> CoreChangeSet {
match event {
@@ -201,11 +210,18 @@ async fn build_core_changeset(
/// Returns `true` when the wallet's stored record for `txid` is in a
/// chain-locked block. Used to gate IS-lock projection.
async fn is_chain_locked(
- wallet_manager: &Arc>>,
+ wallet_manager: &Arc>>,
wallet_id: &WalletId,
txid: &dashcore::Txid,
) -> bool {
- let guard = wallet_manager.read().await;
+ // Tagged so the `lock-stats` feature can attribute this site's
+ // contribution to wallet-manager contention. The event adapter
+ // touches this lock once per `TransactionInstantLocked` event;
+ // tagging lets a perf audit distinguish the IS-lock finality
+ // probe from generic identity / token / address-sync reads.
+ let guard = wallet_manager
+ .read_at("event_adapter::is_chain_locked")
+ .await;
let Some(info) = guard.get_wallet_info(wallet_id) else {
return false;
};
diff --git a/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs
new file mode 100644
index 00000000000..94414cacf02
--- /dev/null
+++ b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs
@@ -0,0 +1,594 @@
+//! `InstrumentedRwLock` — opt-in instrumented wrapper around
+//! [`tokio::sync::RwLock`].
+//!
+//! # Build modes
+//!
+//! - **`lock-stats` feature OFF (default)** — `InstrumentedRwLock` is
+//! a literal type alias for [`tokio::sync::RwLock`]. There is no
+//! wrapper struct, no extra `Arc`, and no `Drop` glue. The tagged
+//! methods (`read_at` / `write_at` / `try_*_at` / `blocking_*_at`)
+//! plus `raw_arc` are provided by zero-cost extension traits
+//! ([`InstrumentedRwLockExt`] and [`InstrumentedArcExt`]) whose
+//! methods are `#[inline]` and drop the tag at the call site. After
+//! inlining, every call collapses to the equivalent inherent
+//! [`tokio::sync::RwLock`] method.
+//!
+//! - **`lock-stats` feature ON** — `InstrumentedRwLock` is a
+//! wrapper struct holding `Arc>` plus
+//! `Arc<`[`LockStats`]`>`. Each acquisition records the wait time
+//! (until the guard resolves) and the hold time (until the guard
+//! drops). The tagged methods bucket the acquisition under the
+//! given tag; untagged calls bucket into [`UNTAGGED`].
+//!
+//! # Why an inner Arc when the feature is on
+//!
+//! The wrapper has to hand `Arc>` to APIs that
+//! take that concrete type literally (e.g. `dash_spv::DashSpvClient::new`
+//! which takes `Arc>`). With the feature off the wallet-manager
+//! field IS `Arc>`, so [`InstrumentedArcExt::raw_arc`]
+//! reduces to `Arc::clone(self)`. With the feature on the wrapper holds
+//! its tokio lock as `Arc>` internally so the same
+//! `raw_arc()` call extracts the inner Arc. SPV's own acquisitions go
+//! through that inner Arc directly and are NOT seen by the wrapper's
+//! stats — the intentional trade is that platform-side acquisitions
+//! (everything that goes through `wallet_manager.read()` /
+//! `wallet_manager.read_at("…")`) are counted, while upstream's own
+//! `process_block` write isn't. That's the right shape for "what does
+//! platform-wallet contribute to lock pressure?", which is the question
+//! this layer was added to answer.
+//!
+//! # Tagged call sites
+//!
+//! Untagged calls (`lock.read().await`) bucket into [`UNTAGGED`] —
+//! a useful aggregate but not actionable when you're trying to find
+//! the specific code path serializing the lock. Tagging individual
+//! sites (`lock.read_at("event_adapter::is_chain_locked").await`) is
+//! the path to actionable contention numbers. The tag is `&'static str`
+//! so it doesn't allocate on the hot path; with the feature off the
+//! tag is dropped at the `read_at` boundary and the call collapses
+//! into a plain `read().await`.
+//!
+//! # Snapshot shape
+//!
+//! With `lock-stats` enabled, calling [`InstrumentedRwLock::stats`]
+//! hands back the shared `Arc`. From there
+//! [`LockStats::snapshot`] produces a [`Snapshot`] containing the
+//! global counters and the per-tag breakdown — clone the snapshot
+//! wherever you need to print or log it (e.g. an FFI accessor or a
+//! periodic `tracing::info!`).
+
+#![allow(unused_imports)] // some imports are only used under one cfg branch
+
+use std::future::Future;
+use std::sync::Arc;
+
+use tokio::sync::{
+ RwLock as TokioRwLock, RwLockReadGuard as TokioReadGuard, RwLockWriteGuard as TokioWriteGuard,
+ TryLockError,
+};
+
+#[cfg(feature = "lock-stats")]
+mod stats;
+
+#[cfg(feature = "lock-stats")]
+pub use stats::{LockStats, SiteStats, Snapshot};
+
+#[cfg(feature = "lock-stats")]
+use std::time::Instant;
+
+/// The default tag attributed to acquisitions made through the
+/// un-suffixed methods (`read`, `write`, `try_read`, …). Visible in
+/// [`LockStats`] snapshots so it's clear which acquisitions came
+/// from un-tagged sites.
+pub const UNTAGGED: &str = "untagged";
+
+// ---------------------------------------------------------------------------
+// Extension traits
+//
+// Defined unconditionally so call sites import them once and stay agnostic
+// to the feature flag. The impls differ per cfg branch — see below.
+// ---------------------------------------------------------------------------
+
+/// Tagged-acquisition methods on a `RwLock`-shaped lock.
+///
+/// In feature-off mode the impl forwards to the corresponding tokio
+/// inherent method and drops the tag. In feature-on mode the impl
+/// forwards to the wrapper's inherent method, which records the
+/// acquisition under `tag`.
+pub trait InstrumentedRwLockExt {
+ /// Acquire a shared lock, attributing the acquisition to `tag`
+ /// (when `lock-stats` is enabled).
+ fn read_at(&self, tag: &'static str) -> impl Future> + Send;
+ /// Acquire an exclusive lock, attributing the acquisition to `tag`.
+ fn write_at(&self, tag: &'static str) -> impl Future> + Send;
+ /// Try to acquire a shared lock without waiting.
+ fn try_read_at(&self, tag: &'static str) -> Result, TryLockError>;
+ /// Try to acquire an exclusive lock without waiting.
+ fn try_write_at(&self, tag: &'static str) -> Result, TryLockError>;
+ /// Synchronously acquire a shared lock — must NOT be called from a
+ /// tokio runtime thread (will panic).
+ fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T>;
+ /// Synchronously acquire an exclusive lock — must NOT be called
+ /// from a tokio runtime thread.
+ fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T>;
+}
+
+/// `raw_arc()` extension on a shared handle to the lock. Returns the
+/// `Arc>` shape that external APIs take literally
+/// (e.g. `dash_spv::DashSpvClient::new`). With the feature off the
+/// handle IS the tokio Arc, so this is `Arc::clone(self)`. With the
+/// feature on the handle is `Arc` and this extracts the
+/// wrapper's inner Arc.
+pub trait InstrumentedArcExt {
+ /// Cheap clone of the underlying `Arc>`.
+ /// Acquisitions made through the returned Arc bypass the wrapper's
+ /// stats — see the module-level docs for the rationale.
+ fn raw_arc(&self) -> Arc>;
+}
+
+// ---------------------------------------------------------------------------
+// Feature OFF: type aliases — the wrapper IS tokio's RwLock.
+// ---------------------------------------------------------------------------
+
+#[cfg(not(feature = "lock-stats"))]
+mod alias_mode {
+ use super::*;
+
+ /// Type alias for [`tokio::sync::RwLock`] when `lock-stats` is
+ /// off. No wrapper struct, no extra `Arc`, no per-method
+ /// instrumentation cost.
+ pub type InstrumentedRwLock = TokioRwLock;
+
+ /// Type alias for [`tokio::sync::RwLockReadGuard<'a, T>`] when
+ /// `lock-stats` is off.
+ pub type ReadGuard<'a, T> = TokioReadGuard<'a, T>;
+
+ /// Type alias for [`tokio::sync::RwLockWriteGuard<'a, T>`] when
+ /// `lock-stats` is off.
+ pub type WriteGuard<'a, T> = TokioWriteGuard<'a, T>;
+
+ impl InstrumentedRwLockExt for TokioRwLock {
+ #[inline]
+ fn read_at(&self, _tag: &'static str) -> impl Future> + Send {
+ self.read()
+ }
+
+ #[inline]
+ fn write_at(&self, _tag: &'static str) -> impl Future> + Send {
+ self.write()
+ }
+
+ #[inline]
+ fn try_read_at(&self, _tag: &'static str) -> Result, TryLockError> {
+ self.try_read()
+ }
+
+ #[inline]
+ fn try_write_at(&self, _tag: &'static str) -> Result, TryLockError> {
+ self.try_write()
+ }
+
+ #[inline]
+ fn blocking_read_at(&self, _tag: &'static str) -> ReadGuard<'_, T> {
+ self.blocking_read()
+ }
+
+ #[inline]
+ fn blocking_write_at(&self, _tag: &'static str) -> WriteGuard<'_, T> {
+ self.blocking_write()
+ }
+ }
+
+ impl InstrumentedArcExt for Arc> {
+ #[inline]
+ fn raw_arc(&self) -> Arc> {
+ Arc::clone(self)
+ }
+ }
+}
+
+#[cfg(not(feature = "lock-stats"))]
+pub use alias_mode::{InstrumentedRwLock, ReadGuard, WriteGuard};
+
+// ---------------------------------------------------------------------------
+// Feature ON: full wrapper struct with stats.
+// ---------------------------------------------------------------------------
+
+#[cfg(feature = "lock-stats")]
+mod struct_mode {
+ use super::*;
+ use std::ops::{Deref, DerefMut};
+
+ /// Wrapper around [`tokio::sync::RwLock`] that records
+ /// per-call-site acquisition counts plus wait and hold durations.
+ /// See the module-level docs.
+ pub struct InstrumentedRwLock {
+ inner: Arc>,
+ stats: Arc,
+ }
+
+ impl InstrumentedRwLock {
+ /// Construct a new lock holding `value`.
+ pub fn new(value: T) -> Self {
+ Self {
+ inner: Arc::new(TokioRwLock::new(value)),
+ stats: Arc::new(LockStats::new()),
+ }
+ }
+
+ /// Borrow the wrapped tokio lock. Use only for APIs that
+ /// genuinely need a `&TokioRwLock`; prefer the wrapper's
+ /// own methods so acquisitions stay attributed.
+ #[inline]
+ pub fn raw(&self) -> &TokioRwLock {
+ &self.inner
+ }
+
+ /// Cheap clone of the inner `Arc>`. See the
+ /// [`InstrumentedArcExt::raw_arc`] doc for the trade.
+ #[inline]
+ pub fn raw_arc(&self) -> Arc> {
+ Arc::clone(&self.inner)
+ }
+
+ /// Shared handle to the per-lock stats snapshot store.
+ #[inline]
+ pub fn stats(&self) -> Arc {
+ Arc::clone(&self.stats)
+ }
+
+ /// Acquire a shared lock — buckets into [`UNTAGGED`].
+ #[inline]
+ pub async fn read(&self) -> ReadGuard<'_, T> {
+ self.read_at(UNTAGGED).await
+ }
+
+ /// Acquire an exclusive lock — buckets into [`UNTAGGED`].
+ #[inline]
+ pub async fn write(&self) -> WriteGuard<'_, T> {
+ self.write_at(UNTAGGED).await
+ }
+
+ /// Acquire a shared lock with a per-call-site tag.
+ pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> {
+ let wait_start = Instant::now();
+ let inner = self.inner.read().await;
+ let wait_ns = wait_start.elapsed().as_nanos() as u64;
+ self.stats.record_read_acquired(tag, wait_ns);
+ ReadGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ }
+ }
+
+ /// Acquire an exclusive lock with a per-call-site tag.
+ pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> {
+ let wait_start = Instant::now();
+ let inner = self.inner.write().await;
+ let wait_ns = wait_start.elapsed().as_nanos() as u64;
+ self.stats.record_write_acquired(tag, wait_ns);
+ WriteGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ }
+ }
+
+ /// Try to acquire a shared lock without waiting.
+ #[inline]
+ pub fn try_read(&self) -> Result, TryLockError> {
+ self.try_read_at(UNTAGGED)
+ }
+
+ /// Try to acquire an exclusive lock without waiting.
+ #[inline]
+ pub fn try_write(&self) -> Result, TryLockError> {
+ self.try_write_at(UNTAGGED)
+ }
+
+ /// Tagged variant of [`try_read`](Self::try_read).
+ pub fn try_read_at(&self, tag: &'static str) -> Result, TryLockError> {
+ match self.inner.try_read() {
+ Ok(inner) => {
+ self.stats.record_read_acquired(tag, 0);
+ Ok(ReadGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ })
+ }
+ Err(e) => {
+ self.stats.record_read_contended(tag);
+ Err(e)
+ }
+ }
+ }
+
+ /// Tagged variant of [`try_write`](Self::try_write).
+ pub fn try_write_at(&self, tag: &'static str) -> Result, TryLockError> {
+ match self.inner.try_write() {
+ Ok(inner) => {
+ self.stats.record_write_acquired(tag, 0);
+ Ok(WriteGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ })
+ }
+ Err(e) => {
+ self.stats.record_write_contended(tag);
+ Err(e)
+ }
+ }
+ }
+
+ /// Synchronously acquire a shared lock — must NOT be called from
+ /// a tokio runtime thread.
+ #[inline]
+ pub fn blocking_read(&self) -> ReadGuard<'_, T> {
+ self.blocking_read_at(UNTAGGED)
+ }
+
+ /// Synchronously acquire an exclusive lock — must NOT be called
+ /// from a tokio runtime thread.
+ #[inline]
+ pub fn blocking_write(&self) -> WriteGuard<'_, T> {
+ self.blocking_write_at(UNTAGGED)
+ }
+
+ /// Tagged variant of [`blocking_read`](Self::blocking_read).
+ pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> {
+ let wait_start = Instant::now();
+ let inner = self.inner.blocking_read();
+ let wait_ns = wait_start.elapsed().as_nanos() as u64;
+ self.stats.record_read_acquired(tag, wait_ns);
+ ReadGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ }
+ }
+
+ /// Tagged variant of [`blocking_write`](Self::blocking_write).
+ pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> {
+ let wait_start = Instant::now();
+ let inner = self.inner.blocking_write();
+ let wait_ns = wait_start.elapsed().as_nanos() as u64;
+ self.stats.record_write_acquired(tag, wait_ns);
+ WriteGuard {
+ inner,
+ stats: Arc::clone(&self.stats),
+ tag,
+ acquired_at: Instant::now(),
+ }
+ }
+ }
+
+ impl Default for InstrumentedRwLock {
+ fn default() -> Self {
+ Self::new(T::default())
+ }
+ }
+
+ impl std::fmt::Debug for InstrumentedRwLock {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InstrumentedRwLock")
+ .field("inner", &self.inner)
+ .finish()
+ }
+ }
+
+ /// Shared read guard. `Deref`. Records hold time on
+ /// `Drop`.
+ pub struct ReadGuard<'a, T> {
+ inner: TokioReadGuard<'a, T>,
+ stats: Arc,
+ tag: &'static str,
+ acquired_at: Instant,
+ }
+
+ impl Deref for ReadGuard<'_, T> {
+ type Target = T;
+
+ #[inline]
+ fn deref(&self) -> &T {
+ &self.inner
+ }
+ }
+
+ impl Drop for ReadGuard<'_, T> {
+ fn drop(&mut self) {
+ let held_ns = self.acquired_at.elapsed().as_nanos() as u64;
+ self.stats.record_read_released(self.tag, held_ns);
+ }
+ }
+
+ /// Exclusive write guard. `Deref` + `DerefMut`. Records
+ /// hold time on `Drop`.
+ pub struct WriteGuard<'a, T> {
+ inner: TokioWriteGuard<'a, T>,
+ stats: Arc,
+ tag: &'static str,
+ acquired_at: Instant,
+ }
+
+ impl Deref for WriteGuard<'_, T> {
+ type Target = T;
+
+ #[inline]
+ fn deref(&self) -> &T {
+ &self.inner
+ }
+ }
+
+ impl DerefMut for WriteGuard<'_, T> {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+ }
+
+ impl Drop for WriteGuard<'_, T> {
+ fn drop(&mut self) {
+ let held_ns = self.acquired_at.elapsed().as_nanos() as u64;
+ self.stats.record_write_released(self.tag, held_ns);
+ }
+ }
+
+ // Trait impls so feature-agnostic call sites that import the Ext
+ // traits keep working. Method resolution prefers inherent methods
+ // when they exist, so these are mostly redundant in feature-on
+ // mode — they're here so a generic helper bounded by
+ // `InstrumentedRwLockExt` can take the wrapper just as easily as
+ // it can take a raw `TokioRwLock`.
+ impl InstrumentedRwLockExt for InstrumentedRwLock {
+ #[inline]
+ fn read_at(&self, tag: &'static str) -> impl Future> + Send {
+ InstrumentedRwLock::read_at(self, tag)
+ }
+
+ #[inline]
+ fn write_at(&self, tag: &'static str) -> impl Future> + Send {
+ InstrumentedRwLock::write_at(self, tag)
+ }
+
+ #[inline]
+ fn try_read_at(&self, tag: &'static str) -> Result, TryLockError> {
+ InstrumentedRwLock::try_read_at(self, tag)
+ }
+
+ #[inline]
+ fn try_write_at(&self, tag: &'static str) -> Result, TryLockError> {
+ InstrumentedRwLock::try_write_at(self, tag)
+ }
+
+ #[inline]
+ fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> {
+ InstrumentedRwLock::blocking_read_at(self, tag)
+ }
+
+ #[inline]
+ fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> {
+ InstrumentedRwLock::blocking_write_at(self, tag)
+ }
+ }
+
+ impl InstrumentedArcExt for Arc> {
+ #[inline]
+ fn raw_arc(&self) -> Arc> {
+ InstrumentedRwLock::raw_arc(self)
+ }
+ }
+}
+
+#[cfg(feature = "lock-stats")]
+pub use struct_mode::{InstrumentedRwLock, ReadGuard, WriteGuard};
+
+// ---------------------------------------------------------------------------
+// Tests
+// ---------------------------------------------------------------------------
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn read_write_smoke() {
+ let lock = InstrumentedRwLock::new(0u32);
+ {
+ let guard = lock.read().await;
+ assert_eq!(*guard, 0);
+ }
+ {
+ let mut guard = lock.write().await;
+ *guard = 42;
+ }
+ let guard = lock.read().await;
+ assert_eq!(*guard, 42);
+ }
+
+ #[tokio::test]
+ async fn try_read_contended() {
+ let lock = InstrumentedRwLock::new(0u32);
+ let _w = lock.write().await;
+ assert!(lock.try_read().is_err());
+ }
+
+ #[tokio::test]
+ async fn read_at_smoke() {
+ // Tagged calls work in both feature modes — feature-off via the
+ // `InstrumentedRwLockExt` trait, feature-on via inherent methods.
+ let lock = InstrumentedRwLock::new(0u32);
+ let guard = lock.read_at("test::tag").await;
+ assert_eq!(*guard, 0);
+ }
+
+ #[tokio::test]
+ async fn raw_arc_smoke() {
+ // `raw_arc()` works in both modes — feature-off via the
+ // `InstrumentedArcExt` trait on `Arc`, feature-on
+ // via the wrapper's inherent method.
+ let lock = Arc::new(InstrumentedRwLock::new(0u32));
+ let raw: Arc> = lock.raw_arc();
+ let guard = raw.read().await;
+ assert_eq!(*guard, 0);
+ }
+
+ #[cfg(feature = "lock-stats")]
+ #[tokio::test]
+ async fn stats_count_and_attribute_to_tag() {
+ let lock = InstrumentedRwLock::new(0u32);
+
+ // Two reads tagged "ours", one write tagged "theirs".
+ {
+ let _r1 = lock.read_at("ours").await;
+ let _r2 = lock.read_at("ours").await;
+ }
+ {
+ let _w = lock.write_at("theirs").await;
+ }
+
+ let snap = lock.stats().snapshot();
+ let ours = snap.per_tag.get("ours").expect("ours tag present");
+ assert_eq!(ours.read_acquired, 2);
+ assert_eq!(ours.write_acquired, 0);
+ let theirs = snap.per_tag.get("theirs").expect("theirs tag present");
+ assert_eq!(theirs.read_acquired, 0);
+ assert_eq!(theirs.write_acquired, 1);
+ assert_eq!(snap.total.read_acquired, 2);
+ assert_eq!(snap.total.write_acquired, 1);
+ }
+
+ // Untagged calls go to the UNTAGGED bucket so the snapshot still
+ // accounts for them — we don't want acquisitions to vanish.
+ #[cfg(feature = "lock-stats")]
+ #[tokio::test]
+ async fn untagged_calls_go_to_untagged_bucket() {
+ let lock = InstrumentedRwLock::new(0u32);
+ {
+ let _r = lock.read().await;
+ }
+ let snap = lock.stats().snapshot();
+ let untagged = snap.per_tag.get(UNTAGGED).expect("UNTAGGED bucket");
+ assert_eq!(untagged.read_acquired, 1);
+ }
+
+ #[cfg(feature = "lock-stats")]
+ #[tokio::test]
+ async fn try_read_failure_records_contention() {
+ let lock = InstrumentedRwLock::new(0u32);
+ let _w = lock.write_at("holder").await;
+ let r = lock.try_read_at("contender");
+ assert!(r.is_err());
+ let snap = lock.stats().snapshot();
+ let contender = snap.per_tag.get("contender").expect("contender tag");
+ assert_eq!(contender.read_contended, 1);
+ assert_eq!(contender.read_acquired, 0);
+ }
+}
diff --git a/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs
new file mode 100644
index 00000000000..d6f5a8dae17
--- /dev/null
+++ b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs
@@ -0,0 +1,238 @@
+//! Stats counters for [`super::InstrumentedRwLock`]. Only compiled when
+//! the `lock-stats` Cargo feature is enabled.
+//!
+//! # Storage shape
+//!
+//! - **Total counters** — atomic `u64`s, lock-free, bumped from every
+//! acquire / release path. Cheap enough that the bump is in the
+//! noise even under heavy contention.
+//! - **Per-tag breakdown** — `BTreeMap<&'static str, SiteStats>` behind
+//! a `parking_lot::Mutex`. The map is touched only on the lock
+//! acquire / release boundary (never across an `.await`), so a sync
+//! mutex is appropriate. The acquire path holds the mutex just long
+//! enough to look up or insert the entry, then bumps the entry's
+//! atomics outside the lock. New tags are inserted lazily on first
+//! use; existing tags re-use the entry.
+
+use std::collections::BTreeMap;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+
+use parking_lot::Mutex;
+
+/// Aggregate lock-acquisition counters maintained by an
+/// [`InstrumentedRwLock`](super::InstrumentedRwLock).
+///
+/// Use [`LockStats::snapshot`] to clone out a readable [`Snapshot`].
+/// The live [`LockStats`] keeps its counters as atomics so reads via
+/// `snapshot` don't race with writers.
+#[derive(Debug)]
+pub struct LockStats {
+ total: SiteCounters,
+ per_tag: Mutex>>,
+}
+
+impl LockStats {
+ pub(super) fn new() -> Self {
+ Self {
+ total: SiteCounters::new(),
+ per_tag: Mutex::new(BTreeMap::new()),
+ }
+ }
+
+ /// Take a snapshot of the current counters. Cheap enough to call
+ /// from a debug UI on every refresh; a periodic logger could call
+ /// it on a 1-second timer without measurable overhead.
+ pub fn snapshot(&self) -> Snapshot {
+ let per_tag: BTreeMap<&'static str, SiteStats> = self
+ .per_tag
+ .lock()
+ .iter()
+ .map(|(tag, counters)| (*tag, counters.snapshot()))
+ .collect();
+ Snapshot {
+ total: self.total.snapshot(),
+ per_tag,
+ }
+ }
+
+ /// Look up (or insert on first use) the [`SiteCounters`] for a tag.
+ /// Holds the per-tag mutex only for the lookup; the returned `Arc`
+ /// lets the caller bump atomics outside the mutex.
+ fn site(&self, tag: &'static str) -> Arc {
+ let mut guard = self.per_tag.lock();
+ if let Some(existing) = guard.get(tag) {
+ return Arc::clone(existing);
+ }
+ let new = Arc::new(SiteCounters::new());
+ guard.insert(tag, Arc::clone(&new));
+ new
+ }
+
+ pub(super) fn record_read_acquired(&self, tag: &'static str, wait_ns: u64) {
+ self.total.read_acquired.fetch_add(1, Ordering::Relaxed);
+ self.total
+ .read_wait_ns_total
+ .fetch_add(wait_ns, Ordering::Relaxed);
+ let site = self.site(tag);
+ site.read_acquired.fetch_add(1, Ordering::Relaxed);
+ site.read_wait_ns_total
+ .fetch_add(wait_ns, Ordering::Relaxed);
+ }
+
+ pub(super) fn record_read_released(&self, tag: &'static str, held_ns: u64) {
+ self.total
+ .read_hold_ns_total
+ .fetch_add(held_ns, Ordering::Relaxed);
+ let site = self.site(tag);
+ site.read_hold_ns_total
+ .fetch_add(held_ns, Ordering::Relaxed);
+ }
+
+ pub(super) fn record_read_contended(&self, tag: &'static str) {
+ self.total.read_contended.fetch_add(1, Ordering::Relaxed);
+ let site = self.site(tag);
+ site.read_contended.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub(super) fn record_write_acquired(&self, tag: &'static str, wait_ns: u64) {
+ self.total.write_acquired.fetch_add(1, Ordering::Relaxed);
+ self.total
+ .write_wait_ns_total
+ .fetch_add(wait_ns, Ordering::Relaxed);
+ let site = self.site(tag);
+ site.write_acquired.fetch_add(1, Ordering::Relaxed);
+ site.write_wait_ns_total
+ .fetch_add(wait_ns, Ordering::Relaxed);
+ }
+
+ pub(super) fn record_write_released(&self, tag: &'static str, held_ns: u64) {
+ self.total
+ .write_hold_ns_total
+ .fetch_add(held_ns, Ordering::Relaxed);
+ let site = self.site(tag);
+ site.write_hold_ns_total
+ .fetch_add(held_ns, Ordering::Relaxed);
+ }
+
+ pub(super) fn record_write_contended(&self, tag: &'static str) {
+ self.total.write_contended.fetch_add(1, Ordering::Relaxed);
+ let site = self.site(tag);
+ site.write_contended.fetch_add(1, Ordering::Relaxed);
+ }
+}
+
+/// Live atomic counters for a single bucket (the global "total" plus
+/// each per-tag site).
+#[derive(Debug)]
+struct SiteCounters {
+ read_acquired: AtomicU64,
+ write_acquired: AtomicU64,
+ read_contended: AtomicU64,
+ write_contended: AtomicU64,
+ read_wait_ns_total: AtomicU64,
+ write_wait_ns_total: AtomicU64,
+ read_hold_ns_total: AtomicU64,
+ write_hold_ns_total: AtomicU64,
+}
+
+impl SiteCounters {
+ fn new() -> Self {
+ Self {
+ read_acquired: AtomicU64::new(0),
+ write_acquired: AtomicU64::new(0),
+ read_contended: AtomicU64::new(0),
+ write_contended: AtomicU64::new(0),
+ read_wait_ns_total: AtomicU64::new(0),
+ write_wait_ns_total: AtomicU64::new(0),
+ read_hold_ns_total: AtomicU64::new(0),
+ write_hold_ns_total: AtomicU64::new(0),
+ }
+ }
+
+ fn snapshot(&self) -> SiteStats {
+ SiteStats {
+ read_acquired: self.read_acquired.load(Ordering::Relaxed),
+ write_acquired: self.write_acquired.load(Ordering::Relaxed),
+ read_contended: self.read_contended.load(Ordering::Relaxed),
+ write_contended: self.write_contended.load(Ordering::Relaxed),
+ read_wait_ns_total: self.read_wait_ns_total.load(Ordering::Relaxed),
+ write_wait_ns_total: self.write_wait_ns_total.load(Ordering::Relaxed),
+ read_hold_ns_total: self.read_hold_ns_total.load(Ordering::Relaxed),
+ write_hold_ns_total: self.write_hold_ns_total.load(Ordering::Relaxed),
+ }
+ }
+}
+
+/// Plain-old-data snapshot of a single bucket (the global total or a
+/// single tag). All durations are in nanoseconds; cumulative across
+/// every acquisition since the lock was created.
+#[derive(Debug, Clone, Default, PartialEq, Eq)]
+pub struct SiteStats {
+ /// Number of times a read guard was successfully acquired.
+ pub read_acquired: u64,
+ /// Number of times a write guard was successfully acquired.
+ pub write_acquired: u64,
+ /// Number of times a `try_read` returned `Err(TryLockError)`.
+ pub read_contended: u64,
+ /// Number of times a `try_write` returned `Err(TryLockError)`.
+ pub write_contended: u64,
+ /// Cumulative wait time before read acquisitions resolved, in ns.
+ pub read_wait_ns_total: u64,
+ /// Cumulative wait time before write acquisitions resolved, in ns.
+ pub write_wait_ns_total: u64,
+ /// Cumulative time read guards were held before drop, in ns.
+ pub read_hold_ns_total: u64,
+ /// Cumulative time write guards were held before drop, in ns.
+ pub write_hold_ns_total: u64,
+}
+
+impl SiteStats {
+ /// Mean wait time for read acquisitions, in nanoseconds. Returns
+ /// `None` if no read acquisitions have completed.
+ pub fn read_wait_ns_mean(&self) -> Option {
+ if self.read_acquired == 0 {
+ None
+ } else {
+ Some(self.read_wait_ns_total / self.read_acquired)
+ }
+ }
+
+ /// Mean wait time for write acquisitions, in nanoseconds.
+ pub fn write_wait_ns_mean(&self) -> Option {
+ if self.write_acquired == 0 {
+ None
+ } else {
+ Some(self.write_wait_ns_total / self.write_acquired)
+ }
+ }
+
+ /// Mean hold time for read acquisitions, in nanoseconds.
+ pub fn read_hold_ns_mean(&self) -> Option {
+ if self.read_acquired == 0 {
+ None
+ } else {
+ Some(self.read_hold_ns_total / self.read_acquired)
+ }
+ }
+
+ /// Mean hold time for write acquisitions, in nanoseconds.
+ pub fn write_hold_ns_mean(&self) -> Option {
+ if self.write_acquired == 0 {
+ None
+ } else {
+ Some(self.write_hold_ns_total / self.write_acquired)
+ }
+ }
+}
+
+/// Snapshot of a [`LockStats`]: aggregate totals plus the per-tag
+/// breakdown. Cheap to clone; suitable for shipping through a debug
+/// UI or a periodic log line.
+#[derive(Debug, Clone, Default)]
+pub struct Snapshot {
+ /// Aggregate counters across every tag (including `UNTAGGED`).
+ pub total: SiteStats,
+ /// Per-tag breakdown. Tags that have never been used don't appear.
+ pub per_tag: BTreeMap<&'static str, SiteStats>,
+}
diff --git a/packages/rs-platform-wallet/src/diagnostics/mod.rs b/packages/rs-platform-wallet/src/diagnostics/mod.rs
new file mode 100644
index 00000000000..e503937a65f
--- /dev/null
+++ b/packages/rs-platform-wallet/src/diagnostics/mod.rs
@@ -0,0 +1,18 @@
+//! Optional runtime diagnostics for platform-wallet.
+//!
+//! Currently a single submodule:
+//!
+//! - [`instrumented_lock`] — an `InstrumentedRwLock` newtype wrapping
+//! [`tokio::sync::RwLock`] that, when the `lock-stats` Cargo feature
+//! is enabled, records per-call-site acquisition counts plus wait /
+//! hold durations. With the feature off the wrapper compiles down to
+//! the underlying tokio lock (zero added overhead in the hot path).
+
+pub mod instrumented_lock;
+
+pub use instrumented_lock::{
+ InstrumentedArcExt, InstrumentedRwLock, InstrumentedRwLockExt, ReadGuard, WriteGuard,
+};
+
+#[cfg(feature = "lock-stats")]
+pub use instrumented_lock::{LockStats, SiteStats, Snapshot};
diff --git a/packages/rs-platform-wallet/src/lib.rs b/packages/rs-platform-wallet/src/lib.rs
index 98b9a609a43..bdb5e402b08 100644
--- a/packages/rs-platform-wallet/src/lib.rs
+++ b/packages/rs-platform-wallet/src/lib.rs
@@ -15,6 +15,7 @@
pub mod address_paths;
pub mod broadcaster;
pub mod changeset;
+pub mod diagnostics;
pub mod error;
pub mod events;
pub mod manager;
diff --git a/packages/rs-platform-wallet/src/manager/mod.rs b/packages/rs-platform-wallet/src/manager/mod.rs
index ac44658e8f3..9e4f5f9bead 100644
--- a/packages/rs-platform-wallet/src/manager/mod.rs
+++ b/packages/rs-platform-wallet/src/manager/mod.rs
@@ -17,6 +17,7 @@ use tokio_util::sync::CancellationToken;
use key_wallet_manager::WalletManager;
use crate::changeset::{spawn_wallet_event_adapter, PlatformWalletPersistence};
+use crate::diagnostics::InstrumentedRwLock;
use crate::events::{PlatformEventHandler, PlatformEventManager};
use crate::manager::identity_sync::IdentitySyncManager;
use crate::manager::platform_address_sync::PlatformAddressSyncManager;
@@ -34,7 +35,7 @@ use crate::wallet::PlatformWallet;
/// [`PlatformEventHandler`]s by reference (no cloning).
pub struct PlatformWalletManager {
pub(super) sdk: Arc,
- pub(super) wallet_manager: Arc>>,
+ pub(super) wallet_manager: Arc>>,
/// Map of registered wallets. Held in an `Arc` so the
/// `BalanceUpdateHandler` can hold a clone and look up wallets to
/// update their lock-free balance atomics from event-handler
@@ -78,7 +79,7 @@ impl PlatformWalletManager {
persister: Arc
,
app_handler: Arc,
) -> Self {
- let wallet_manager = Arc::new(RwLock::new(WalletManager::new(sdk.network)));
+ let wallet_manager = Arc::new(InstrumentedRwLock::new(WalletManager::new(sdk.network)));
let wallets = Arc::new(RwLock::new(std::collections::BTreeMap::new()));
let lock_notify = Arc::new(Notify::new());
diff --git a/packages/rs-platform-wallet/src/spv/runtime.rs b/packages/rs-platform-wallet/src/spv/runtime.rs
index d0c56e48b7a..06401c82a08 100644
--- a/packages/rs-platform-wallet/src/spv/runtime.rs
+++ b/packages/rs-platform-wallet/src/spv/runtime.rs
@@ -15,6 +15,11 @@ use dash_spv::{ClientConfig, DashSpvClient, EventHandler, Hash};
use key_wallet_manager::WalletManager;
+// `InstrumentedArcExt` is unused when `lock-stats` is on — the wrapper
+// has `raw_arc()` inherent and method resolution prefers it.
+// Suppress the warning so the import line stays uniform across modes.
+#[allow(unused_imports)]
+use crate::diagnostics::{InstrumentedArcExt, InstrumentedRwLock};
use crate::error::PlatformWalletError;
use crate::events::PlatformEventManager;
use crate::wallet::platform_wallet::PlatformWalletInfo;
@@ -28,7 +33,7 @@ type SpvClient =
/// handlers by reference (no cloning).
pub struct SpvRuntime {
event_manager: Arc,
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
client: RwLock>,
/// Cancel token for the `run()` task when it was spawned via
/// [`spawn_in_background`]. [`stop`] fires this token and joins
@@ -39,7 +44,7 @@ pub struct SpvRuntime {
impl SpvRuntime {
/// Create a new SPV runtime.
pub fn new(
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
event_manager: Arc,
) -> Self {
Self {
@@ -73,11 +78,16 @@ impl SpvRuntime {
// platform event manager's own handler list).
let event_handlers: Vec> =
vec![Arc::clone(&self.event_manager) as Arc];
+ // Upstream takes `Arc>` literally; hand
+ // it the inner Arc that lives inside our `InstrumentedRwLock`
+ // wrapper. SPV's own acquisitions go through this Arc directly
+ // and are not seen by the wrapper's stats — see the wrapper's
+ // type-level docs for the rationale.
let spv_client = DashSpvClient::new(
config,
network_manager,
storage_manager,
- Arc::clone(&self.wallet_manager),
+ self.wallet_manager.raw_arc(),
event_handlers,
)
.await
diff --git a/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs b/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs
index 3afc0053612..bcc8f89327b 100644
--- a/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs
+++ b/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs
@@ -6,10 +6,11 @@
use std::sync::Arc;
-use tokio::sync::{Notify, RwLock};
+use tokio::sync::Notify;
use crate::broadcaster::TransactionBroadcaster;
use crate::changeset::changeset::AssetLockChangeSet;
+use crate::diagnostics::InstrumentedRwLock;
use crate::wallet::persister::WalletPersister;
use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId};
@@ -30,7 +31,7 @@ pub(super) const DEFAULT_FEE_PER_KB: u64 = 1000;
pub struct AssetLockManager {
pub(super) sdk: Arc,
/// The shared wallet manager lock for all mutable wallet state.
- pub(super) wallet_manager: Arc>>,
+ pub(super) wallet_manager: Arc>>,
/// Identifies which wallet within the manager this manager operates on.
pub(super) wallet_id: WalletId,
/// Notified on InstantLock / ChainLock events by SpvEventForwarder.
@@ -64,7 +65,7 @@ impl AssetLockManager {
/// Create a new `AssetLockManager`.
pub(crate) fn new(
sdk: Arc,
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
wallet_id: WalletId,
lock_notify: Arc,
broadcaster: Arc,
diff --git a/packages/rs-platform-wallet/src/wallet/core/wallet.rs b/packages/rs-platform-wallet/src/wallet/core/wallet.rs
index 5a29db29002..fe282a69e52 100644
--- a/packages/rs-platform-wallet/src/wallet/core/wallet.rs
+++ b/packages/rs-platform-wallet/src/wallet/core/wallet.rs
@@ -5,11 +5,11 @@ use std::sync::Arc;
use super::balance::WalletBalance;
use dashcore::Address as DashAddress;
-use tokio::sync::RwLock;
use key_wallet_manager::WalletManager;
use crate::broadcaster::TransactionBroadcaster;
+use crate::diagnostics::InstrumentedRwLock;
use crate::error::PlatformWalletError;
use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId};
@@ -24,7 +24,7 @@ use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId};
/// through a `dyn` vtable.
pub struct CoreWallet {
pub(crate) sdk: Arc,
- pub(crate) wallet_manager: Arc>>,
+ pub(crate) wallet_manager: Arc>>,
pub(crate) wallet_id: WalletId,
/// Injected broadcaster — delegates to SPV or DAPI depending on how
/// the wallet was constructed by `PlatformWalletManager`.
@@ -36,7 +36,7 @@ pub struct CoreWallet {
impl CoreWallet {
pub(crate) fn new(
sdk: Arc,
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
wallet_id: WalletId,
broadcaster: Arc,
balance: Arc,
diff --git a/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs b/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs
index 80619ef8663..a872a94f4d4 100644
--- a/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs
+++ b/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs
@@ -33,10 +33,10 @@ use key_wallet::dip9::{
use key_wallet::wallet::Wallet;
use key_wallet::Network;
use key_wallet_manager::WalletManager;
-use tokio::sync::RwLock;
use zeroize::Zeroizing;
use crate::broadcaster::{SpvBroadcaster, TransactionBroadcaster};
+use crate::diagnostics::{InstrumentedRwLock, ReadGuard, WriteGuard};
use crate::error::PlatformWalletError;
use crate::wallet::asset_lock::manager::AssetLockManager;
use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId};
@@ -257,7 +257,7 @@ pub(crate) fn derive_identity_auth_key_hash(
pub struct IdentityWallet {
pub(crate) sdk: Arc,
/// Shared wallet manager holding key material and wallet info.
- pub(crate) wallet_manager: Arc>>,
+ pub(crate) wallet_manager: Arc>>,
/// Identifier for the wallet within the wallet manager.
pub(crate) wallet_id: WalletId,
/// Shared asset lock manager for building, broadcasting, and tracking
@@ -381,9 +381,7 @@ impl IdentityWallet {
/// Access wallet info via `wm.get_wallet_info(&wallet_id)` and key material
/// via `wm.get_wallet(&wallet_id)` on the returned guard. The identity
/// manager is on the wallet info: `info.identity_manager`.
- pub async fn wallet_manager_read(
- &self,
- ) -> tokio::sync::RwLockReadGuard<'_, WalletManager> {
+ pub async fn wallet_manager_read(&self) -> ReadGuard<'_, WalletManager> {
self.wallet_manager.read().await
}
@@ -392,9 +390,7 @@ impl IdentityWallet {
/// Access wallet info via `wm.get_wallet_info_mut(&wallet_id)` on the
/// returned guard. This allows callers to mutate managed identities (e.g.
/// adding or updating identities from an external persistence layer).
- pub async fn wallet_manager_write(
- &self,
- ) -> tokio::sync::RwLockWriteGuard<'_, WalletManager> {
+ pub async fn wallet_manager_write(&self) -> WriteGuard<'_, WalletManager> {
self.wallet_manager.write().await
}
@@ -404,7 +400,7 @@ impl IdentityWallet {
/// Useful for synchronous callers that cannot await.
pub fn try_wallet_manager_write(
&self,
- ) -> Option>> {
+ ) -> Option>> {
self.wallet_manager.try_write().ok()
}
diff --git a/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs b/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs
index d5836be9ff1..0eaab15f9e1 100644
--- a/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs
+++ b/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs
@@ -30,12 +30,12 @@ use key_wallet::PlatformP2PKHAddress;
use async_trait::async_trait;
use key_wallet_manager::WalletManager;
+use crate::diagnostics::InstrumentedRwLock;
use crate::error::PlatformWalletError;
use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId};
use dash_sdk::platform::address_sync::{
AddressFunds, AddressIndex, AddressProvider, AddressSyncResult,
};
-use tokio::sync::RwLock;
/// DIP-17 address coordinates used as both the pending-bimap key and
/// the SDK sync engine's `Tag`. Having the SDK carry these three
@@ -151,7 +151,7 @@ pub(crate) type PerWalletInSyncPlatformAddressState =
/// view of pending addresses spanning all wallets.
pub(crate) struct PlatformPaymentAddressProvider {
/// Shared wallet manager for gap-limit extension.
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
/// Committed per-wallet tracked state — xpub + `addresses` bimap
/// + `found` + `absent` from the last successful sync. `found`
/// here is what [`current_balances`](Self::current_balances)
@@ -185,7 +185,7 @@ impl PlatformPaymentAddressProvider {
/// no key derivation happens here. `wallet_ids` not found in the
/// wallet manager are silently skipped.
pub(crate) async fn from_wallets(
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
wallet_ids: impl IntoIterator- ,
) -> Result
{
let mut per_wallet: BTreeMap = BTreeMap::new();
@@ -268,7 +268,7 @@ impl PlatformPaymentAddressProvider {
/// of sync and the caller needs to reconcile rather than silently
/// continue with stale data.
pub async fn from_persisted(
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
per_wallet: BTreeMap,
sync_height: u64,
sync_timestamp: u64,
diff --git a/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs b/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs
index 0c08fc8a425..4b3cf372902 100644
--- a/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs
+++ b/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs
@@ -6,6 +6,7 @@ use dpp::address_funds::PlatformAddress;
use dpp::fee::Credits;
use tokio::sync::RwLock;
+use crate::diagnostics::InstrumentedRwLock;
use crate::error::PlatformWalletError;
use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId};
use key_wallet_manager::WalletManager;
@@ -19,7 +20,7 @@ use super::provider::PlatformPaymentAddressProvider;
pub struct PlatformAddressWallet {
pub(crate) sdk: Arc,
/// The shared wallet manager lock for all mutable wallet state.
- pub(crate) wallet_manager: Arc>>,
+ pub(crate) wallet_manager: Arc>>,
/// Identifies which wallet within the manager this sub-wallet operates on.
pub(crate) wallet_id: WalletId,
/// Single provider covering every platform payment account on the
@@ -37,7 +38,7 @@ impl PlatformAddressWallet {
/// Call [`initialize`] afterwards to build the unified provider.
pub(crate) fn new(
sdk: Arc,
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
wallet_id: WalletId,
persister: WalletPersister,
) -> Self {
diff --git a/packages/rs-platform-wallet/src/wallet/platform_wallet.rs b/packages/rs-platform-wallet/src/wallet/platform_wallet.rs
index dcd9486798e..4116546cf78 100644
--- a/packages/rs-platform-wallet/src/wallet/platform_wallet.rs
+++ b/packages/rs-platform-wallet/src/wallet/platform_wallet.rs
@@ -8,7 +8,10 @@ use dashcore::OutPoint;
use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet::wallet::Wallet;
use key_wallet_manager::WalletManager;
-use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+
+use crate::diagnostics::{
+ InstrumentedRwLock, ReadGuard as InstrumentedReadGuard, WriteGuard as InstrumentedWriteGuard,
+};
use super::asset_lock::manager::AssetLockManager;
use super::asset_lock::tracked::TrackedAssetLock;
@@ -61,7 +64,7 @@ pub struct PlatformWalletInfo {
pub struct PlatformWallet {
wallet_id: WalletId,
pub(crate) sdk: Arc,
- pub(crate) wallet_manager: Arc>>,
+ pub(crate) wallet_manager: Arc>>,
// Sub-wallets that hold a broadcaster are monomorphized with
// `SpvBroadcaster` — the only production broadcaster in use.
// Swapping this out to another broadcaster is a three-line flip
@@ -133,7 +136,7 @@ impl PlatformWallet {
}
/// Get a reference to the shared wallet manager lock.
- pub fn wallet_manager(&self) -> &Arc>> {
+ pub fn wallet_manager(&self) -> &Arc>> {
&self.wallet_manager
}
@@ -226,7 +229,7 @@ impl PlatformWallet {
pub(crate) fn new(
sdk: Arc,
wallet_id: WalletId,
- wallet_manager: Arc>>,
+ wallet_manager: Arc>>,
balance: Arc,
lock_notify: Arc,
persister: Arc,
@@ -502,7 +505,7 @@ impl std::fmt::Debug for PlatformWallet {
/// Read guard that locks `WalletManager` and derefs to this wallet's
/// `PlatformWalletInfo`. Also provides `.wallet()` for key material access.
pub struct WalletStateReadGuard<'a> {
- guard: RwLockReadGuard<'a, WalletManager>,
+ guard: InstrumentedReadGuard<'a, WalletManager>,
wallet_id: WalletId,
}
@@ -527,7 +530,7 @@ impl Deref for WalletStateReadGuard<'_> {
/// Write guard that locks `WalletManager` and derefs to this wallet's
/// `PlatformWalletInfo` (with `DerefMut`). Also provides `.wallet()`.
pub struct WalletStateWriteGuard<'a> {
- guard: RwLockWriteGuard<'a, WalletManager>,
+ guard: InstrumentedWriteGuard<'a, WalletManager>,
wallet_id: WalletId,
}