From cd78b17e6228dbff2038dfb4fde7b0ebbf695470 Mon Sep 17 00:00:00 2001 From: Quantum Explorer Date: Thu, 23 Apr 2026 23:38:32 +0800 Subject: [PATCH 1/3] refactor(wallet-events): atomic events carrying records + balance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace `TransactionReceived` / `TransactionStatusChanged` / `BalanceUpdated` with three self-contained variants — `MempoolTransactionReceived`, `TransactionInstantSendLocked`, `BlockProcessChange` — each carrying the transaction record(s) it pertains to plus the wallet's post-event balance. Consumers can now persist transactions and balance atomically off a single event. - `BlockProcessChange` bundles all records updated by a block into a single per-wallet event; an empty `transactions_updated` signals a balance-only drift (e.g. coinbase maturing as synced height advanced). - `TransactionCheckResult` now exposes `affected_records` (populated for both new records and state-modifying confirmations / IS-locks). - `check_transaction_in_all_wallets` routes events by context; a silent variant lets `process_block` batch per-wallet records into one event. - FFI surface updated (three new callbacks on `FFIWalletEventCallbacks` carrying `FFIBalance*` and record arrays); `ffi_cli` and dashd_sync tests migrated. Co-Authored-By: Claude Opus 4.7 (1M context) --- dash-spv-ffi/src/bin/ffi_cli.rs | 69 ++++-- dash-spv-ffi/src/callbacks.rs | 123 +++++----- dash-spv-ffi/tests/dashd_sync/callbacks.rs | 79 ++++--- dash-spv/tests/dashd_sync/helpers.rs | 2 +- key-wallet-manager/src/accessors.rs | 19 -- key-wallet-manager/src/event_tests.rs | 213 ++++++++++-------- key-wallet-manager/src/events.rs | 135 +++++++---- key-wallet-manager/src/lib.rs | 161 +++++++++++-- key-wallet-manager/src/process_block.rs | 129 ++++++++--- key-wallet-manager/src/test_helpers.rs | 60 +++-- .../transaction_checking/account_checker.rs | 11 +- .../transaction_checking/wallet_checker.rs | 8 +- 12 files changed, 675 insertions(+), 334 deletions(-) diff --git a/dash-spv-ffi/src/bin/ffi_cli.rs b/dash-spv-ffi/src/bin/ffi_cli.rs index 348452d0b..4b602551b 100644 --- a/dash-spv-ffi/src/bin/ffi_cli.rs +++ b/dash-spv-ffi/src/bin/ffi_cli.rs @@ -7,7 +7,7 @@ use clap::{Arg, ArgAction, Command}; use dash_spv_ffi::*; use dashcore::ffi::FFINetwork; use key_wallet_ffi::managed_account::FFITransactionRecord; -use key_wallet_ffi::types::FFITransactionContext; +use key_wallet_ffi::types::FFIBalance; use key_wallet_ffi::wallet_manager::wallet_manager_add_wallet_from_mnemonic; use key_wallet_ffi::FFIError; @@ -157,10 +157,10 @@ extern "C" fn on_peers_updated(connected_count: u32, best_height: u32, _user_dat // Wallet Event Callbacks // ============================================================================ -extern "C" fn on_transaction_received( +extern "C" fn on_mempool_transaction_received( wallet_id: *const c_char, - account_index: u32, record: *const FFITransactionRecord, + balance: *const FFIBalance, _user_data: *mut c_void, ) { let wallet_str = ffi_string_to_rust(wallet_id); @@ -170,36 +170,54 @@ extern "C" fn on_transaction_received( &wallet_str }; if record.is_null() { - println!( - "[Wallet] TX received: wallet={}..., account={}, record=null", - wallet_short, account_index - ); + println!("[Wallet] Mempool TX received: wallet={}..., record=null", wallet_short); return; } let r = unsafe { &*record }; + let b = if balance.is_null() { + FFIBalance::default() + } else { + unsafe { *balance } + }; let txid_hex = hex::encode(r.txid); println!( - "[Wallet] TX received: wallet={}..., txid={}, account={}, amount={} duffs, tx_size={}", - wallet_short, txid_hex, account_index, r.net_amount, r.tx_len + "[Wallet] Mempool TX received: wallet={}..., txid={}, amount={} duffs, balance[confirmed={}, unconfirmed={}]", + wallet_short, txid_hex, r.net_amount, b.confirmed, b.unconfirmed ); } -extern "C" fn on_transaction_status_changed( - _wallet_id: *const c_char, +extern "C" fn on_transaction_instant_send_locked( + wallet_id: *const c_char, txid: *const [u8; 32], - status: FFITransactionContext, + _islock_data: *const u8, + _islock_len: usize, + balance: *const FFIBalance, _user_data: *mut c_void, ) { + let wallet_str = ffi_string_to_rust(wallet_id); + let wallet_short = if wallet_str.len() > 8 { + &wallet_str[..8] + } else { + &wallet_str + }; let txid_hex = unsafe { hex::encode(*txid) }; - println!("[Wallet] TX status changed: txid={}, status={:?}", txid_hex, status); + let b = if balance.is_null() { + FFIBalance::default() + } else { + unsafe { *balance } + }; + println!( + "[Wallet] IS lock: wallet={}..., txid={}, balance[confirmed={}]", + wallet_short, txid_hex, b.confirmed + ); } -extern "C" fn on_balance_updated( +extern "C" fn on_block_process_change( wallet_id: *const c_char, - spendable: u64, - unconfirmed: u64, - immature: u64, - locked: u64, + height: u32, + _transactions_updated: *const FFITransactionRecord, + record_count: u32, + balance: *const FFIBalance, _user_data: *mut c_void, ) { let wallet_str = ffi_string_to_rust(wallet_id); @@ -208,9 +226,14 @@ extern "C" fn on_balance_updated( } else { &wallet_str }; + let b = if balance.is_null() { + FFIBalance::default() + } else { + unsafe { *balance } + }; println!( - "[Wallet] Balance updated: wallet={}..., spendable={}, unconfirmed={}, immature={}, locked={}", - wallet_short, spendable, unconfirmed, immature, locked + "[Wallet] Block processed: wallet={}..., height={}, tx_count={}, balance[confirmed={}, unconfirmed={}, immature={}, locked={}]", + wallet_short, height, record_count, b.confirmed, b.unconfirmed, b.immature, b.locked ); } @@ -435,9 +458,9 @@ fn main() { user_data: ptr::null_mut(), }, wallet: FFIWalletEventCallbacks { - on_transaction_received: Some(on_transaction_received), - on_transaction_status_changed: Some(on_transaction_status_changed), - on_balance_updated: Some(on_balance_updated), + on_mempool_transaction_received: Some(on_mempool_transaction_received), + on_transaction_instant_send_locked: Some(on_transaction_instant_send_locked), + on_block_process_change: Some(on_block_process_change), user_data: ptr::null_mut(), }, error: FFIClientErrorCallback { diff --git a/dash-spv-ffi/src/callbacks.rs b/dash-spv-ffi/src/callbacks.rs index 5ef9103c1..ee2f6393d 100644 --- a/dash-spv-ffi/src/callbacks.rs +++ b/dash-spv-ffi/src/callbacks.rs @@ -12,10 +12,9 @@ use dash_spv::sync::{SyncEvent, SyncProgress}; use dash_spv::EventHandler; use dashcore::hashes::Hash; use key_wallet_ffi::managed_account::FFITransactionRecord; -use key_wallet_ffi::types::FFITransactionContext; +use key_wallet_ffi::types::FFIBalance; use key_wallet_manager::WalletEvent; use std::ffi::CString; -use std::ops::Deref; use std::os::raw::{c_char, c_void}; // ============================================================================ @@ -530,46 +529,57 @@ impl FFINetworkEventCallbacks { // FFIWalletEventCallbacks - One callback per WalletEvent variant // ============================================================================ -/// Callback for WalletEvent::TransactionReceived +/// Callback for `WalletEvent::MempoolTransactionReceived`. /// -/// The `record` pointer is borrowed and only valid for the duration of the -/// callback. Callers must copy any data they need to retain after the callback -/// returns. The record contains all transaction details including serialized -/// transaction bytes, input/output details, and classification metadata. -pub type OnTransactionReceivedCallback = Option< +/// Fires when a wallet-relevant transaction is first seen in the mempool +/// (optionally with an InstantSend lock — in that case the record's context +/// is `InstantSend(..)`). +/// +/// All pointer parameters are borrowed and only valid for the duration of the +/// callback. Callers must copy any data they need to retain. `balance` is the +/// wallet's balance *after* the transaction was recorded. +pub type OnMempoolTransactionReceivedCallback = Option< extern "C" fn( wallet_id: *const c_char, - account_index: u32, record: *const FFITransactionRecord, + balance: *const FFIBalance, user_data: *mut c_void, ), >; -/// Callback for WalletEvent::TransactionStatusChanged +/// Callback for `WalletEvent::TransactionInstantSendLocked`. /// -/// The `wallet_id` string pointer and `txid` hash pointer are borrowed and only -/// valid for the duration of the callback. -pub type OnTransactionStatusChangedCallback = Option< +/// Fires when a previously-seen wallet-relevant transaction is +/// InstantSend-locked. `islock_data` points to consensus-serialized +/// `InstantLock` bytes valid only for the duration of the callback. +pub type OnTransactionInstantSendLockedCallback = Option< extern "C" fn( wallet_id: *const c_char, txid: *const [u8; 32], - status: FFITransactionContext, + islock_data: *const u8, + islock_len: usize, + balance: *const FFIBalance, user_data: *mut c_void, ), >; -/// Callback for WalletEvent::BalanceUpdated +/// Callback for `WalletEvent::BlockProcessChange`. /// -/// The `wallet_id` string pointer is borrowed and only valid for the duration -/// of the callback. Callers must copy the string if they need to retain it -/// after the callback returns. -pub type OnBalanceUpdatedCallback = Option< +/// Fires once per wallet affected by a processed block. `transactions_updated` +/// points to an array of `record_count` records that were newly recorded or +/// had their state updated by this block. The array may be empty when only +/// the wallet's balance shifted (e.g. a coinbase maturing). `balance` is the +/// wallet's balance *after* the block was processed. +/// +/// The `transactions_updated` array and all its contents are borrowed and +/// only valid for the duration of the callback. +pub type OnBlockProcessChangeCallback = Option< extern "C" fn( wallet_id: *const c_char, - confirmed: u64, - unconfirmed: u64, - immature: u64, - locked: u64, + height: u32, + transactions_updated: *const FFITransactionRecord, + record_count: u32, + balance: *const FFIBalance, user_data: *mut c_void, ), >; @@ -578,15 +588,15 @@ pub type OnBalanceUpdatedCallback = Option< /// /// Set only the callbacks you're interested in; unset callbacks will be ignored. /// -/// All pointer parameters passed to callbacks (wallet IDs, txids, addresses) -/// are borrowed and only valid for the duration of the callback invocation. -/// Callers must copy any data they need to retain. +/// All pointer parameters passed to callbacks (wallet IDs, txids, records, +/// balances) are borrowed and only valid for the duration of the callback +/// invocation. Callers must copy any data they need to retain. #[repr(C)] #[derive(Clone)] pub struct FFIWalletEventCallbacks { - pub on_transaction_received: OnTransactionReceivedCallback, - pub on_transaction_status_changed: OnTransactionStatusChangedCallback, - pub on_balance_updated: OnBalanceUpdatedCallback, + pub on_mempool_transaction_received: OnMempoolTransactionReceivedCallback, + pub on_transaction_instant_send_locked: OnTransactionInstantSendLockedCallback, + pub on_block_process_change: OnBlockProcessChangeCallback, pub user_data: *mut c_void, } @@ -597,9 +607,9 @@ unsafe impl Sync for FFIWalletEventCallbacks {} impl Default for FFIWalletEventCallbacks { fn default() -> Self { Self { - on_transaction_received: None, - on_transaction_status_changed: None, - on_balance_updated: None, + on_mempool_transaction_received: None, + on_transaction_instant_send_locked: None, + on_block_process_change: None, user_data: std::ptr::null_mut(), } } @@ -694,60 +704,67 @@ impl FFIWalletEventCallbacks { /// Dispatch a WalletEvent to the appropriate callback. pub fn dispatch(&self, event: &WalletEvent) { match event { - WalletEvent::TransactionReceived { + WalletEvent::MempoolTransactionReceived { wallet_id, - account_index, record, + balance, } => { - if let Some(cb) = self.on_transaction_received { + if let Some(cb) = self.on_mempool_transaction_received { let wallet_id_hex = hex::encode(wallet_id); let c_wallet_id = CString::new(wallet_id_hex).unwrap_or_default(); - - let ffi_record = FFITransactionRecord::from(record.deref()); + let ffi_record = FFITransactionRecord::from(record.as_ref()); + let ffi_balance = FFIBalance::from(*balance); cb( c_wallet_id.as_ptr(), - *account_index, &ffi_record as *const FFITransactionRecord, + &ffi_balance as *const FFIBalance, self.user_data, ); } } - WalletEvent::TransactionStatusChanged { + WalletEvent::TransactionInstantSendLocked { wallet_id, txid, - status, + instant_send_lock, + balance, } => { - if let Some(cb) = self.on_transaction_status_changed { + if let Some(cb) = self.on_transaction_instant_send_locked { let wallet_id_hex = hex::encode(wallet_id); let c_wallet_id = CString::new(wallet_id_hex).unwrap_or_default(); let txid_bytes = txid.as_byte_array(); - let ffi_ctx = FFITransactionContext::from(status.clone()); + let islock_bytes = dashcore::consensus::serialize(instant_send_lock); + let ffi_balance = FFIBalance::from(*balance); cb( c_wallet_id.as_ptr(), txid_bytes as *const [u8; 32], - ffi_ctx, + islock_bytes.as_ptr(), + islock_bytes.len(), + &ffi_balance as *const FFIBalance, self.user_data, ); } } - WalletEvent::BalanceUpdated { + WalletEvent::BlockProcessChange { wallet_id, - confirmed, - unconfirmed, - immature, - locked, + height, + transactions_updated, + balance, } => { - if let Some(cb) = self.on_balance_updated { + if let Some(cb) = self.on_block_process_change { let wallet_id_hex = hex::encode(wallet_id); let c_wallet_id = CString::new(wallet_id_hex).unwrap_or_default(); + let ffi_records: Vec = + transactions_updated.iter().map(FFITransactionRecord::from).collect(); + let ffi_balance = FFIBalance::from(*balance); + cb( c_wallet_id.as_ptr(), - *confirmed, - *unconfirmed, - *immature, - *locked, + *height, + ffi_records.as_ptr(), + ffi_records.len() as u32, + &ffi_balance as *const FFIBalance, self.user_data, ); } diff --git a/dash-spv-ffi/tests/dashd_sync/callbacks.rs b/dash-spv-ffi/tests/dashd_sync/callbacks.rs index 4e4c67ab0..dcd7f6a32 100644 --- a/dash-spv-ffi/tests/dashd_sync/callbacks.rs +++ b/dash-spv-ffi/tests/dashd_sync/callbacks.rs @@ -2,13 +2,14 @@ use std::ffi::CStr; use std::os::raw::{c_char, c_void}; +use std::slice; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use dash_spv_ffi::*; use key_wallet_ffi::managed_account::FFITransactionRecord; -use key_wallet_ffi::types::FFITransactionContext; +use key_wallet_ffi::types::FFIBalance; /// Tracks callback invocations for verification. /// @@ -37,8 +38,13 @@ pub(super) struct CallbackTracker { pub(super) peers_updated_count: AtomicU32, // Wallet event tracking + /// Counts `MempoolTransactionReceived` *and* `BlockProcessChange` + /// events that carried at least one record — i.e. any event through which + /// a new/updated transaction reached the consumer. pub(super) transaction_received_count: AtomicU32, + /// Counts `TransactionInstantSendLocked` events. pub(super) transaction_status_changed_count: AtomicU32, + /// Counts any event that carried a fresh balance (all three variants). pub(super) balance_updated_count: AtomicU32, // Data from callbacks @@ -49,10 +55,12 @@ pub(super) struct CallbackTracker { pub(super) connected_peers: Mutex>, pub(super) errors: Mutex>, - // Transaction data from on_transaction_received (txid, net_amount) + /// Per-record (txid, net_amount) seen via any event. pub(super) received_transactions: Mutex>, - // Balance data from on_balance_updated + // Balance data — `last_spendable` tracks the `confirmed` balance field + // so existing assertions that expect "spendable after funds received" + // continue to hold. pub(super) last_spendable: AtomicU64, pub(super) last_unconfirmed: AtomicU64, @@ -341,10 +349,20 @@ extern "C" fn on_peers_updated(connected_count: u32, best_height: u32, user_data tracing::debug!("on_peers_updated: connected={}, best_height={}", connected_count, best_height); } -extern "C" fn on_transaction_received( +fn record_balance(tracker: &CallbackTracker, balance: *const FFIBalance) { + if balance.is_null() { + return; + } + let b = unsafe { *balance }; + tracker.last_spendable.store(b.confirmed, Ordering::SeqCst); + tracker.last_unconfirmed.store(b.unconfirmed, Ordering::SeqCst); + tracker.balance_updated_count.fetch_add(1, Ordering::SeqCst); +} + +extern "C" fn on_mempool_transaction_received( wallet_id: *const c_char, - account_index: u32, record: *const FFITransactionRecord, + balance: *const FFIBalance, user_data: *mut c_void, ) { let Some(tracker) = (unsafe { tracker_from(user_data) }) else { @@ -357,47 +375,56 @@ extern "C" fn on_transaction_received( .lock() .unwrap_or_else(|e| e.into_inner()) .push((r.txid, r.net_amount)); + tracker.transaction_received_count.fetch_add(1, Ordering::SeqCst); } - tracker.transaction_received_count.fetch_add(1, Ordering::SeqCst); + record_balance(tracker, balance); let wallet_str = unsafe { cstr_or_unknown(wallet_id) }; - tracing::info!("on_transaction_received: wallet={}, account={}", wallet_str, account_index,); + tracing::info!("on_mempool_transaction_received: wallet={}", wallet_str); } -extern "C" fn on_transaction_status_changed( +extern "C" fn on_transaction_instant_send_locked( _wallet_id: *const c_char, _txid: *const [u8; 32], - status: FFITransactionContext, + _islock_data: *const u8, + _islock_len: usize, + balance: *const FFIBalance, user_data: *mut c_void, ) { let Some(tracker) = (unsafe { tracker_from(user_data) }) else { return; }; tracker.transaction_status_changed_count.fetch_add(1, Ordering::SeqCst); - tracing::debug!("on_transaction_status_changed: status={:?}", status); + record_balance(tracker, balance); + tracing::debug!("on_transaction_instant_send_locked"); } -extern "C" fn on_balance_updated( +extern "C" fn on_block_process_change( wallet_id: *const c_char, - spendable: u64, - unconfirmed: u64, - immature: u64, - locked: u64, + height: u32, + transactions_updated: *const FFITransactionRecord, + record_count: u32, + balance: *const FFIBalance, user_data: *mut c_void, ) { let Some(tracker) = (unsafe { tracker_from(user_data) }) else { return; }; - tracker.last_spendable.store(spendable, Ordering::SeqCst); - tracker.last_unconfirmed.store(unconfirmed, Ordering::SeqCst); - tracker.balance_updated_count.fetch_add(1, Ordering::SeqCst); + if !transactions_updated.is_null() && record_count > 0 { + let records = + unsafe { slice::from_raw_parts(transactions_updated, record_count as usize) }; + let mut sink = tracker.received_transactions.lock().unwrap_or_else(|e| e.into_inner()); + for r in records { + sink.push((r.txid, r.net_amount)); + tracker.transaction_received_count.fetch_add(1, Ordering::SeqCst); + } + } + record_balance(tracker, balance); let wallet_str = unsafe { cstr_or_unknown(wallet_id) }; tracing::info!( - "on_balance_updated: wallet={}, spendable={}, unconfirmed={}, immature={}, locked={}", + "on_block_process_change: wallet={}, height={}, records={}", wallet_str, - spendable, - unconfirmed, - immature, - locked, + height, + record_count, ); } @@ -444,9 +471,9 @@ pub(super) fn create_network_callbacks(tracker: &Arc) -> FFINet /// Arc outlives all callback invocations. pub(super) fn create_wallet_callbacks(tracker: &Arc) -> FFIWalletEventCallbacks { FFIWalletEventCallbacks { - on_transaction_received: Some(on_transaction_received), - on_transaction_status_changed: Some(on_transaction_status_changed), - on_balance_updated: Some(on_balance_updated), + on_mempool_transaction_received: Some(on_mempool_transaction_received), + on_transaction_instant_send_locked: Some(on_transaction_instant_send_locked), + on_block_process_change: Some(on_block_process_change), user_data: Arc::as_ptr(tracker) as *mut c_void, } } diff --git a/dash-spv/tests/dashd_sync/helpers.rs b/dash-spv/tests/dashd_sync/helpers.rs index 99069b04b..010ec0d4b 100644 --- a/dash-spv/tests/dashd_sync/helpers.rs +++ b/dash-spv/tests/dashd_sync/helpers.rs @@ -140,7 +140,7 @@ pub(super) async fn wait_for_mempool_tx( _ = &mut timeout => return None, result = receiver.recv() => { match result { - Ok(WalletEvent::TransactionReceived { ref record, .. }) if record.context == TransactionContext::Mempool => return Some(record.txid), + Ok(WalletEvent::MempoolTransactionReceived { ref record, .. }) if record.context == TransactionContext::Mempool => return Some(record.txid), Ok(_) => continue, Err(_) => return None, } diff --git a/key-wallet-manager/src/accessors.rs b/key-wallet-manager/src/accessors.rs index 46e5cf842..4aab75053 100644 --- a/key-wallet-manager/src/accessors.rs +++ b/key-wallet-manager/src/accessors.rs @@ -182,25 +182,6 @@ impl WalletManager { self.wallet_infos.iter().map(|(id, info)| (*id, info.balance())).collect() } - /// Emit `BalanceUpdated` events for wallets whose balance differs from the snapshot. - pub(crate) fn emit_balance_changes(&self, old_balances: &[(WalletId, WalletCoreBalance)]) { - for (wallet_id, old_balance) in old_balances { - if let Some(info) = self.wallet_infos.get(wallet_id) { - let new_balance = info.balance(); - if *old_balance != new_balance { - let event = WalletEvent::BalanceUpdated { - wallet_id: *wallet_id, - confirmed: new_balance.confirmed(), - unconfirmed: new_balance.unconfirmed(), - immature: new_balance.immature(), - locked: new_balance.locked(), - }; - let _ = self.event_sender.send(event); - } - } - } - } - /// Get all outpoints from wallet UTXOs across all managed wallets. /// Used for bloom filter construction to detect spends of our UTXOs. pub fn watched_outpoints(&self) -> Vec { diff --git a/key-wallet-manager/src/event_tests.rs b/key-wallet-manager/src/event_tests.rs index 3e851cad7..471992808 100644 --- a/key-wallet-manager/src/event_tests.rs +++ b/key-wallet-manager/src/event_tests.rs @@ -22,17 +22,19 @@ async fn test_mempool_to_confirmed_event_flow() { manager.check_transaction_in_all_wallets(&tx, TransactionContext::Mempool, true, true).await; let event = assert_single_event(&mut rx); match event { - WalletEvent::TransactionReceived { + WalletEvent::MempoolTransactionReceived { wallet_id: ev_wid, record, - .. + balance, } => { assert_eq!(record.context, TransactionContext::Mempool); assert_eq!(record.txid, tx.txid()); assert_eq!(ev_wid, wallet_id); assert_eq!(record.net_amount, TX_AMOUNT as i64); + assert_eq!(balance.unconfirmed(), TX_AMOUNT); + assert_eq!(balance.confirmed(), 0); } - other => panic!("expected TransactionReceived, got {:?}", other), + other => panic!("expected MempoolTransactionReceived, got {:?}", other), } // Same tx now confirmed in a block @@ -44,23 +46,27 @@ async fn test_mempool_to_confirmed_event_flow() { manager.check_transaction_in_all_wallets(&tx, block_ctx, true, true).await; let event = assert_single_event(&mut rx); match event { - WalletEvent::TransactionStatusChanged { + WalletEvent::BlockProcessChange { wallet_id: ev_wid, - txid: ev_txid, - status, + height, + transactions_updated, + balance, } => { assert_eq!(ev_wid, wallet_id); - assert_eq!(ev_txid, tx.txid()); + assert_eq!(height, 100); + assert_eq!(transactions_updated.len(), 1); + assert_eq!(transactions_updated[0].txid, tx.txid()); assert!( matches!( - status, - TransactionContext::InBlock(info) if info.height() == 100 - ), - "expected InBlock(100), got {:?}", - status + transactions_updated[0].context, + TransactionContext::InBlock(info) if info.height() == 100 + ), + "expected record context InBlock(100)" ); + assert_eq!(balance.confirmed(), TX_AMOUNT); + assert_eq!(balance.unconfirmed(), 0); } - other => panic!("expected TransactionStatusChanged, got {:?}", other), + other => panic!("expected BlockProcessChange, got {:?}", other), } } @@ -188,21 +194,27 @@ async fn test_mempool_tx_emits_balance_updated() { manager.process_mempool_transaction(&tx, None).await; let events = drain_events(&mut rx); - let balance_events: Vec<_> = - events.iter().filter(|e| matches!(e, WalletEvent::BalanceUpdated { .. })).collect(); - assert_eq!(balance_events.len(), 1, "expected exactly 1 BalanceUpdated, got {:?}", events); + let received_events: Vec<_> = events + .iter() + .filter(|e| matches!(e, WalletEvent::MempoolTransactionReceived { .. })) + .collect(); + assert_eq!( + received_events.len(), + 1, + "expected exactly 1 MempoolTransactionReceived, got {:?}", + events + ); assert!( matches!( - balance_events[0], - WalletEvent::BalanceUpdated { + received_events[0], + WalletEvent::MempoolTransactionReceived { wallet_id: wid, - unconfirmed, - confirmed, + balance, .. - } if *wid == wallet_id && *unconfirmed == TX_AMOUNT && *confirmed == 0 + } if *wid == wallet_id && balance.unconfirmed() == TX_AMOUNT && balance.confirmed() == 0 ), - "expected BalanceUpdated with unconfirmed={TX_AMOUNT}, confirmed=0, got {:?}", - balance_events[0] + "expected balance.unconfirmed={TX_AMOUNT}, confirmed=0, got {:?}", + received_events[0] ); } @@ -215,21 +227,30 @@ async fn test_instantsend_tx_emits_balance_updated_spendable() { manager.process_mempool_transaction(&tx, Some(dummy_instant_lock(tx.txid()))).await; let events = drain_events(&mut rx); - let balance_events: Vec<_> = - events.iter().filter(|e| matches!(e, WalletEvent::BalanceUpdated { .. })).collect(); - assert_eq!(balance_events.len(), 1, "expected exactly 1 BalanceUpdated, got {:?}", events); + let received_events: Vec<_> = events + .iter() + .filter(|e| matches!(e, WalletEvent::MempoolTransactionReceived { .. })) + .collect(); + assert_eq!( + received_events.len(), + 1, + "expected exactly 1 MempoolTransactionReceived, got {:?}", + events + ); assert!( matches!( - balance_events[0], - WalletEvent::BalanceUpdated { + received_events[0], + WalletEvent::MempoolTransactionReceived { wallet_id: wid, - confirmed, - unconfirmed, - .. - } if *wid == wallet_id && *confirmed == TX_AMOUNT && *unconfirmed == 0 + record, + balance, + } if *wid == wallet_id + && matches!(record.context, TransactionContext::InstantSend(_)) + && balance.confirmed() == TX_AMOUNT + && balance.unconfirmed() == 0 ), - "expected BalanceUpdated with confirmed={TX_AMOUNT}, unconfirmed=0, got {:?}", - balance_events[0] + "expected IS-context record + balance.confirmed={TX_AMOUNT}, got {:?}", + received_events[0] ); } @@ -245,14 +266,13 @@ async fn test_mempool_to_instantsend_transitions_balance() { assert!( events.iter().any(|e| matches!( e, - WalletEvent::BalanceUpdated { + WalletEvent::MempoolTransactionReceived { wallet_id: wid, - unconfirmed, - confirmed, + balance, .. - } if *wid == wallet_id && *unconfirmed == TX_AMOUNT && *confirmed == 0 + } if *wid == wallet_id && balance.unconfirmed() == TX_AMOUNT && balance.confirmed() == 0 )), - "expected unconfirmed balance after mempool, got {:?}", + "expected MempoolTransactionReceived with unconfirmed={TX_AMOUNT}, got {:?}", events ); @@ -262,14 +282,13 @@ async fn test_mempool_to_instantsend_transitions_balance() { assert!( events.iter().any(|e| matches!( e, - WalletEvent::BalanceUpdated { + WalletEvent::TransactionInstantSendLocked { wallet_id: wid, - confirmed, - unconfirmed, + balance, .. - } if *wid == wallet_id && *confirmed == TX_AMOUNT && *unconfirmed == 0 + } if *wid == wallet_id && balance.confirmed() == TX_AMOUNT && balance.unconfirmed() == 0 )), - "expected confirmed balance after IS lock, got {:?}", + "expected TransactionInstantSendLocked with confirmed={TX_AMOUNT}, got {:?}", events ); } @@ -334,27 +353,30 @@ async fn test_process_instant_send_lock_dedup() { manager.process_mempool_transaction(&tx, None).await; let mut rx = manager.subscribe_events(); - // First IS lock should emit events + // First IS lock should emit one TransactionInstantSendLocked (balance embedded) manager.process_instant_send_lock(dummy_instant_lock(tx.txid())); let events = drain_events(&mut rx); - assert!( - events.iter().any(|e| matches!( - e, - WalletEvent::TransactionStatusChanged { - wallet_id: wid, - status: TransactionContext::InstantSend(_), - .. - } if *wid == wallet_id - )), - "expected TransactionStatusChanged(InstantSend) with correct wallet_id, got {:?}", + let is_events: Vec<_> = events + .iter() + .filter(|e| matches!(e, WalletEvent::TransactionInstantSendLocked { .. })) + .collect(); + assert_eq!( + is_events.len(), + 1, + "expected exactly 1 TransactionInstantSendLocked, got {:?}", events ); assert!( - events.iter().any( - |e| matches!(e, WalletEvent::BalanceUpdated { wallet_id: wid, .. } if *wid == wallet_id) + matches!( + is_events[0], + WalletEvent::TransactionInstantSendLocked { + wallet_id: wid, + balance, + .. + } if *wid == wallet_id && balance.confirmed() == TX_AMOUNT ), - "expected BalanceUpdated for wallet, got {:?}", - events + "expected TransactionInstantSendLocked for wallet with confirmed balance, got {:?}", + is_events[0] ); // Second IS lock should be a no-op @@ -404,13 +426,12 @@ async fn test_mixed_instantsend_paths_no_duplicate_events() { assert!( events.iter().any(|e| matches!( e, - WalletEvent::TransactionStatusChanged { + WalletEvent::TransactionInstantSendLocked { wallet_id: wid, - status: TransactionContext::InstantSend(_), .. } if *wid == wallet_id )), - "expected TransactionStatusChanged(InstantSend) with correct wallet_id, got {:?}", + "expected TransactionInstantSendLocked with correct wallet_id, got {:?}", events ); @@ -447,13 +468,12 @@ async fn test_mixed_instantsend_paths_reverse_no_duplicate_events() { assert!( events.iter().any(|e| matches!( e, - WalletEvent::TransactionStatusChanged { + WalletEvent::TransactionInstantSendLocked { wallet_id: wid, - status: TransactionContext::InstantSend(_), .. } if *wid == wallet_id )), - "expected TransactionStatusChanged(InstantSend) with correct wallet_id, got {:?}", + "expected TransactionInstantSendLocked with correct wallet_id, got {:?}", events ); @@ -488,19 +508,28 @@ async fn test_process_block_emits_events() { assert_eq!(result.new_txids.len(), 1); let events = drain_events(&mut rx); - let event = events + let block_events: Vec<_> = events .iter() - .find(|e| matches!(e, WalletEvent::TransactionReceived { .. })) - .unwrap_or_else(|| { - panic!("expected TransactionReceived from process_block, got {:?}", events) - }); + .filter(|e| matches!(e, WalletEvent::BlockProcessChange { .. })) + .collect(); + assert_eq!( + block_events.len(), + 1, + "expected exactly one BlockProcessChange, got {:?}", + events + ); - match event { - WalletEvent::TransactionReceived { - account_index, - record, - .. + match block_events[0] { + WalletEvent::BlockProcessChange { + wallet_id: wid, + height, + transactions_updated, + balance, } => { + assert_eq!(*wid, wallet_id); + assert_eq!(*height, 1000); + assert_eq!(transactions_updated.len(), 1); + let record = &transactions_updated[0]; assert!( matches!( record.context, @@ -509,21 +538,14 @@ async fn test_process_block_emits_events() { "expected InBlock at height 1000, got {:?}", record.context ); - assert_eq!(*account_index, 0); assert!( !record.input_details.is_empty() || !record.output_details.is_empty(), "expected non-empty details" ); + assert_eq!(balance.confirmed(), TX_AMOUNT); } _ => unreachable!(), } - assert!( - events.iter().any( - |e| matches!(e, WalletEvent::BalanceUpdated { wallet_id: wid, .. } if *wid == wallet_id) - ), - "expected BalanceUpdated from process_block, got {:?}", - events - ); } #[tokio::test] @@ -588,20 +610,20 @@ async fn test_mempool_to_block_to_chainlocked_event_flow() { let mut rx = manager.subscribe_events(); let tx = create_tx_paying_to(&addr, 0xc4); - // Step 1: mempool — emits TransactionReceived + // Step 1: mempool — emits MempoolTransactionReceived manager.check_transaction_in_all_wallets(&tx, TransactionContext::Mempool, true, true).await; let event = assert_single_event(&mut rx); assert!( matches!( &event, - WalletEvent::TransactionReceived { record, .. } + WalletEvent::MempoolTransactionReceived { record, .. } if record.context == TransactionContext::Mempool ), - "expected TransactionReceived(Mempool), got {:?}", + "expected MempoolTransactionReceived(Mempool), got {:?}", event ); - // Step 2: block confirmation — emits TransactionStatusChanged + // Step 2: block confirmation — emits BlockProcessChange let block_ctx = TransactionContext::InBlock(BlockInfo::new( 1700, BlockHash::from_byte_array([0xc4; 32]), @@ -611,13 +633,14 @@ async fn test_mempool_to_block_to_chainlocked_event_flow() { let event = assert_single_event(&mut rx); assert!( matches!( - event, - WalletEvent::TransactionStatusChanged { - status: TransactionContext::InBlock(_), + &event, + WalletEvent::BlockProcessChange { + height, + transactions_updated, .. - } + } if *height == 1700 && transactions_updated.len() == 1 ), - "expected TransactionStatusChanged(InBlock), got {:?}", + "expected BlockProcessChange(height=1700, 1 tx), got {:?}", event ); @@ -648,10 +671,10 @@ async fn test_chainlocked_block_event_flow() { assert!( matches!( &event, - WalletEvent::TransactionReceived { record, .. } - if matches!(record.context, TransactionContext::InChainLockedBlock(info) if info.height() == 2000) + WalletEvent::BlockProcessChange { height, transactions_updated, .. } + if *height == 2000 && transactions_updated.len() == 1 && matches!(transactions_updated[0].context, TransactionContext::InChainLockedBlock(info) if info.height() == 2000) ), - "expected TransactionReceived(InChainLockedBlock at 2000), got {:?}", + "expected BlockProcessChange(InChainLockedBlock at 2000, 1 tx), got {:?}", event ); } diff --git a/key-wallet-manager/src/events.rs b/key-wallet-manager/src/events.rs index 5d0e2b282..9975bf17e 100644 --- a/key-wallet-manager/src/events.rs +++ b/key-wallet-manager/src/events.rs @@ -1,88 +1,133 @@ //! Wallet events for notifying consumers of wallet state changes. //! -//! These events are emitted by the WalletManager when significant wallet -//! operations occur, allowing consumers to receive push-based notifications. +//! Each variant is self-contained: it carries the transaction record(s) that +//! triggered it and the wallet's new balance after the change. Consumers can +//! persist the transaction(s) and balance atomically off a single event. -use dashcore::{Amount, SignedAmount, Txid}; +use dashcore::ephemerealdata::instant_lock::InstantLock; +use dashcore::prelude::CoreBlockHeight; +use dashcore::Txid; use key_wallet::managed_account::transaction_record::TransactionRecord; -use key_wallet::transaction_checking::TransactionContext; +use key_wallet::WalletCoreBalance; use crate::WalletId; /// Events emitted by the wallet manager. /// -/// Each event represents a meaningful wallet state change that consumers -/// may want to react to. +/// Each event represents a meaningful wallet state change and carries the +/// wallet's balance *after* the change so consumers can save transactions +/// and balance atomically. #[derive(Debug, Clone)] pub enum WalletEvent { - /// A transaction relevant to the wallet was received for the first time. - TransactionReceived { + /// A wallet-relevant transaction was first seen in the mempool + /// (optionally with an InstantSend lock — in that case the record's + /// `context` is `InstantSend(..)`). + MempoolTransactionReceived { /// ID of the affected wallet. wallet_id: WalletId, - /// Account index within the wallet. - account_index: u32, - /// The full transaction record with all details. + /// The full transaction record (context may be Mempool or InstantSend). + /// + /// Boxed to keep the enum compact: `TransactionRecord` is ~800 bytes + /// and would otherwise inflate every variant to that size. record: Box, + /// Wallet balance after the transaction was recorded. + balance: WalletCoreBalance, }, - /// The confirmation status of a previously seen transaction has changed. - TransactionStatusChanged { + /// A previously-seen wallet-relevant transaction was InstantSend-locked. + TransactionInstantSendLocked { /// ID of the affected wallet. wallet_id: WalletId, - /// Transaction ID. + /// Transaction ID that was locked. txid: Txid, - /// New transaction context. - status: TransactionContext, + /// The InstantSend lock that locked the transaction. + instant_send_lock: InstantLock, + /// Wallet balance after the lock was applied. + balance: WalletCoreBalance, }, - /// The wallet balance has changed. - BalanceUpdated { + /// A block was processed. Carries the wallet's newly-recorded and + /// state-modified transactions for the block, along with the post-block + /// balance. `transactions_updated` may be empty when only the balance + /// shifted (e.g. a coinbase maturing as synced height advanced). + BlockProcessChange { /// ID of the affected wallet. wallet_id: WalletId, - /// New confirmed balance in duffs (mature, in a block or InstantSend-locked). - confirmed: u64, - /// New unconfirmed balance in duffs (mature, mempool-only). Also spendable. - unconfirmed: u64, - /// New immature balance (coinbase UTXOs not yet mature). - immature: u64, - /// New locked balance (UTXOs reserved for specific purposes like CoinJoin) - locked: u64, + /// Height of the block that was processed. + height: CoreBlockHeight, + /// Transaction records recorded or updated by this block. + transactions_updated: Vec, + /// Wallet balance after the block was processed. + balance: WalletCoreBalance, }, } impl WalletEvent { - /// Get a short description of this event for logging. + /// ID of the wallet this event pertains to. + pub fn wallet_id(&self) -> WalletId { + match self { + WalletEvent::MempoolTransactionReceived { + wallet_id, + .. + } + | WalletEvent::TransactionInstantSendLocked { + wallet_id, + .. + } + | WalletEvent::BlockProcessChange { + wallet_id, + .. + } => *wallet_id, + } + } + + /// Wallet balance carried by this event. + pub fn balance(&self) -> WalletCoreBalance { + match self { + WalletEvent::MempoolTransactionReceived { + balance, + .. + } + | WalletEvent::TransactionInstantSendLocked { + balance, + .. + } + | WalletEvent::BlockProcessChange { + balance, + .. + } => *balance, + } + } + + /// Short description for logging. pub fn description(&self) -> String { match self { - WalletEvent::TransactionReceived { + WalletEvent::MempoolTransactionReceived { record, + balance, .. } => { format!( - "TransactionReceived(txid={}, amount={}, status={})", - record.txid, - SignedAmount::from_sat(record.net_amount), - record.context + "MempoolTransactionReceived(txid={}, context={}, balance={})", + record.txid, record.context, balance ) } - WalletEvent::TransactionStatusChanged { + WalletEvent::TransactionInstantSendLocked { txid, - status, + balance, .. } => { - format!("TransactionStatusChanged(txid={}, status={})", txid, status) + format!("TransactionInstantSendLocked(txid={}, balance={})", txid, balance) } - WalletEvent::BalanceUpdated { - confirmed, - unconfirmed, - immature, - locked, + WalletEvent::BlockProcessChange { + height, + transactions_updated, + balance, .. } => { format!( - "BalanceUpdated(confirmed={}, unconfirmed={}, immature={}, locked={})", - Amount::from_sat(*confirmed), - Amount::from_sat(*unconfirmed), - Amount::from_sat(*immature), - Amount::from_sat(*locked) + "BlockProcessChange(height={}, tx_count={}, balance={})", + height, + transactions_updated.len(), + balance ) } } diff --git a/key-wallet-manager/src/lib.rs b/key-wallet-manager/src/lib.rs index ccb293edd..59080c079 100644 --- a/key-wallet-manager/src/lib.rs +++ b/key-wallet-manager/src/lib.rs @@ -27,6 +27,7 @@ pub use wallet_interface::{BlockProcessingResult, MempoolTransactionResult, Wall use dashcore::blockdata::transaction::Transaction; use dashcore::prelude::CoreBlockHeight; use key_wallet::account::AccountCollection; +use key_wallet::managed_account::transaction_record::TransactionRecord; use key_wallet::transaction_checking::TransactionContext; use key_wallet::wallet::managed_wallet_info::transaction_building::AccountTypePreference; use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; @@ -80,6 +81,8 @@ pub struct CheckTransactionsResult { pub total_sent: u64, /// Addresses involved across all wallets pub involved_addresses: Vec
, + /// Transaction records recorded or updated by this check, grouped per wallet. + pub records_per_wallet: BTreeMap>, } /// High-level wallet manager that manages multiple wallets @@ -447,14 +450,62 @@ impl WalletManager { Ok(wallet_id) } - /// Check a transaction against all wallets and update their states if relevant. - /// Returns affected wallets and any new addresses generated during gap limit maintenance. + /// Check a transaction against all wallets, updating their states if relevant + /// and emitting the corresponding wallet event per affected wallet based on + /// the transaction context: + /// - `Mempool` with a new record → `MempoolTransactionReceived` + /// - `InstantSend` with a new record → `MempoolTransactionReceived` (record carries the IS context) + /// - `InstantSend` on a previously-seen record → `TransactionInstantSendLocked` + /// - `InBlock` / `InChainLockedBlock` → `BlockProcessChange` for the single transaction + /// + /// Block-processing callers that want a single consolidated + /// `BlockProcessChange` per wallet should use the internal + /// [`check_transaction_in_all_wallets_silent`] entry point. pub async fn check_transaction_in_all_wallets( &mut self, tx: &Transaction, context: TransactionContext, update_state_if_found: bool, update_balance: bool, + ) -> CheckTransactionsResult { + self.check_transaction_in_all_wallets_inner( + tx, + context, + update_state_if_found, + update_balance, + true, + ) + .await + } + + /// Check a transaction against all wallets without emitting wallet events. + /// + /// Used by block processing to batch per-wallet records into a single + /// `BlockProcessChange` event. + pub(crate) async fn check_transaction_in_all_wallets_silent( + &mut self, + tx: &Transaction, + context: TransactionContext, + update_state_if_found: bool, + update_balance: bool, + ) -> CheckTransactionsResult { + self.check_transaction_in_all_wallets_inner( + tx, + context, + update_state_if_found, + update_balance, + false, + ) + .await + } + + async fn check_transaction_in_all_wallets_inner( + &mut self, + tx: &Transaction, + context: TransactionContext, + update_state_if_found: bool, + update_balance: bool, + emit_events: bool, ) -> CheckTransactionsResult { let mut result = CheckTransactionsResult::default(); @@ -496,23 +547,31 @@ impl WalletManager { } } - if check_result.is_new_transaction { - for (account_index, record) in check_result.new_records { - let event = WalletEvent::TransactionReceived { - wallet_id, - account_index, - record: Box::new(record), - }; - let _ = self.event_sender.send(event); - } - } else if check_result.state_modified { - // Known transaction whose state was modified (confirmation or IS-lock). - let event = WalletEvent::TransactionStatusChanged { + // Collect per-wallet records so callers (e.g. block + // processing) can bundle them into a single event. + if !check_result.affected_records.is_empty() { + result + .records_per_wallet + .entry(wallet_id) + .or_default() + .extend(check_result.affected_records.iter().cloned()); + } + + if emit_events && check_result.state_modified { + // Re-read balance after state change + let balance = self + .wallet_infos + .get(&wallet_id) + .map(|info| info.balance()) + .unwrap_or_default(); + self.emit_context_routed_events( wallet_id, - txid: tx.txid(), - status: context.clone(), - }; - let _ = self.event_sender.send(event); + tx, + &context, + check_result.is_new_transaction, + check_result.affected_records, + balance, + ); } } @@ -523,6 +582,72 @@ impl WalletManager { result } + /// Advance `synced_height` and refresh per-wallet balances without + /// emitting wallet events. + pub(crate) fn update_synced_height_silent(&mut self, height: CoreBlockHeight) { + self.synced_height = height; + for (_wallet_id, info) in self.wallet_infos.iter_mut() { + info.update_synced_height(height); + } + } + + /// Emit a `WalletEvent` based on the transaction context after a + /// state-modifying check. One event per record for mempool / first-seen-IS + /// paths; a single consolidated `BlockProcessChange` for in-block paths; + /// a single `TransactionInstantSendLocked` for IS-lock on existing records. + pub(crate) fn emit_context_routed_events( + &self, + wallet_id: WalletId, + tx: &Transaction, + context: &TransactionContext, + is_new_transaction: bool, + records: Vec, + balance: WalletCoreBalance, + ) { + match context { + TransactionContext::InBlock(info) | TransactionContext::InChainLockedBlock(info) => { + let _ = self.event_sender.send(WalletEvent::BlockProcessChange { + wallet_id, + height: info.height(), + transactions_updated: records, + balance, + }); + } + TransactionContext::Mempool => { + // `state_modified` implies `is_new_transaction` for mempool — + // a duplicate mempool arrival is an early-return no-op. + for record in records { + let _ = self.event_sender.send(WalletEvent::MempoolTransactionReceived { + wallet_id, + record: Box::new(record), + balance, + }); + } + } + TransactionContext::InstantSend(lock) => { + if is_new_transaction { + // First seen in mempool *with* an IS lock — treat as a + // mempool arrival; the record's context is already + // `InstantSend(..)`. + for record in records { + let _ = self.event_sender.send(WalletEvent::MempoolTransactionReceived { + wallet_id, + record: Box::new(record), + balance, + }); + } + } else { + let _ = self.event_sender.send(WalletEvent::TransactionInstantSendLocked { + wallet_id, + txid: tx.txid(), + instant_send_lock: lock.clone(), + balance, + }); + } + } + } + } + /// Create an account in a specific wallet /// Note: The index parameter is kept for convenience, even though AccountType contains it pub fn create_account( diff --git a/key-wallet-manager/src/process_block.rs b/key-wallet-manager/src/process_block.rs index 2e5d27cb2..2ddb63219 100644 --- a/key-wallet-manager/src/process_block.rs +++ b/key-wallet-manager/src/process_block.rs @@ -1,12 +1,14 @@ use crate::wallet_interface::{BlockProcessingResult, MempoolTransactionResult, WalletInterface}; -use crate::{WalletEvent, WalletManager}; +use crate::{WalletEvent, WalletId, WalletManager}; use async_trait::async_trait; use core::fmt::Write as _; use dashcore::ephemerealdata::instant_lock::InstantLock; use dashcore::prelude::CoreBlockHeight; use dashcore::{Address, Block, Transaction}; +use key_wallet::managed_account::transaction_record::TransactionRecord; use key_wallet::transaction_checking::{BlockInfo, TransactionContext}; use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; +use std::collections::BTreeMap; use tokio::sync::broadcast; #[async_trait] @@ -19,12 +21,18 @@ impl WalletInterface for WalletM let mut result = BlockProcessingResult::default(); let info = BlockInfo::new(height, block.block_hash(), block.header.time); - // Process each transaction using the base manager + // Snapshot balances up front so we can detect purely balance-driven + // changes (e.g. coinbase maturity) when `update_synced_height` + // refreshes balances after transaction processing. + let snapshot = self.snapshot_balances(); + + // Process each transaction silently, accumulating records per wallet. + let mut records_per_wallet: BTreeMap> = BTreeMap::new(); for tx in &block.txdata { let context = TransactionContext::InBlock(info); let check_result = - self.check_transaction_in_all_wallets(tx, context, true, false).await; + self.check_transaction_in_all_wallets_silent(tx, context, true, false).await; if !check_result.affected_wallets.is_empty() { if check_result.is_new_transaction { @@ -34,10 +42,34 @@ impl WalletInterface for WalletM } } + for (wallet_id, records) in check_result.records_per_wallet { + records_per_wallet.entry(wallet_id).or_default().extend(records); + } + result.new_addresses.extend(check_result.new_addresses); } - self.update_synced_height(height); + // Advance synced height silently — any coinbase-maturity balance shift + // will surface via the `BlockProcessChange` emission below. + self.update_synced_height_silent(height); + + // Emit one `BlockProcessChange` per wallet that either had recorded + // transactions or whose balance shifted. + for (wallet_id, old_balance) in &snapshot { + let new_balance = match self.wallet_infos.get(wallet_id) { + Some(info) => info.balance(), + None => continue, + }; + let records = records_per_wallet.remove(wallet_id).unwrap_or_default(); + if !records.is_empty() || *old_balance != new_balance { + let _ = self.event_sender().send(WalletEvent::BlockProcessChange { + wallet_id: *wallet_id, + height, + transactions_updated: records, + balance: new_balance, + }); + } + } result } @@ -54,8 +86,12 @@ impl WalletInterface for WalletM } None => TransactionContext::Mempool, }; - let snapshot = self.snapshot_balances(); - let check_result = self.check_transaction_in_all_wallets(tx, context, true, false).await; + + // Refresh cached balances only for affected wallets *before* emitting + // so the event carries the post-update balance. + let check_result = self + .check_transaction_in_all_wallets_silent(tx, context.clone(), true, false) + .await; let is_relevant = !check_result.affected_wallets.is_empty(); let net_amount = if is_relevant { @@ -64,13 +100,33 @@ impl WalletInterface for WalletM 0 }; - // Refresh cached balances only for affected wallets for wallet_id in &check_result.affected_wallets { if let Some(info) = self.wallet_infos.get_mut(wallet_id) { info.update_balance(); } } - self.emit_balance_changes(&snapshot); + + // Emit per affected wallet based on context. + for wallet_id in &check_result.affected_wallets { + let Some(records) = check_result.records_per_wallet.get(wallet_id).cloned() else { + continue; + }; + if records.is_empty() { + continue; + } + let balance = match self.wallet_infos.get(wallet_id) { + Some(info) => info.balance(), + None => continue, + }; + self.emit_context_routed_events( + *wallet_id, + tx, + &context, + check_result.is_new_transaction, + records, + balance, + ); + } MempoolTransactionResult { is_relevant, @@ -102,15 +158,27 @@ impl WalletInterface for WalletM } fn update_synced_height(&mut self, height: CoreBlockHeight) { - self.synced_height = height; - let snapshot = self.snapshot_balances(); - for (_wallet_id, info) in self.wallet_infos.iter_mut() { - info.update_synced_height(height); - } + self.update_synced_height_silent(height); - self.emit_balance_changes(&snapshot); + // Emit an empty-records `BlockProcessChange` for any wallet whose + // balance shifted as a result of the height advance (e.g. coinbase + // maturity). + for (wallet_id, old_balance) in &snapshot { + let Some(info) = self.wallet_infos.get(wallet_id) else { + continue; + }; + let new_balance = info.balance(); + if *old_balance != new_balance { + let _ = self.event_sender().send(WalletEvent::BlockProcessChange { + wallet_id: *wallet_id, + height, + transactions_updated: Vec::new(), + balance: new_balance, + }); + } + } } fn filter_committed_height(&self) -> CoreBlockHeight { @@ -130,7 +198,6 @@ impl WalletInterface for WalletM fn process_instant_send_lock(&mut self, instant_lock: InstantLock) { let txid = instant_lock.txid; - let snapshot = self.snapshot_balances(); let mut affected_wallets = Vec::new(); for (wallet_id, info) in self.wallet_infos.iter_mut() { @@ -144,15 +211,17 @@ impl WalletInterface for WalletM } for wallet_id in &affected_wallets { - let event = WalletEvent::TransactionStatusChanged { + let balance = match self.wallet_infos.get(wallet_id) { + Some(info) => info.balance(), + None => continue, + }; + let _ = self.event_sender().send(WalletEvent::TransactionInstantSendLocked { wallet_id: *wallet_id, txid, - status: TransactionContext::InstantSend(instant_lock.clone()), - }; - let _ = self.event_sender().send(event); + instant_send_lock: instant_lock.clone(), + balance, + }); } - - self.emit_balance_changes(&snapshot); } async fn describe(&self) -> String { @@ -236,23 +305,23 @@ mod tests { let (mut manager, _wallet_id, addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); - // Relevant tx should emit BalanceUpdated + // Relevant tx should emit MempoolTransactionReceived with balance let tx = create_tx_paying_to(&addr, 0xaa); manager.process_mempool_transaction(&tx, None).await; let mut found = false; while let Ok(event) = rx.try_recv() { - if let WalletEvent::BalanceUpdated { - unconfirmed, + if let WalletEvent::MempoolTransactionReceived { + balance, .. } = event { - assert!(unconfirmed > 0, "unconfirmed balance should increase"); + assert!(balance.unconfirmed() > 0, "unconfirmed balance should increase"); found = true; break; } } - assert!(found, "should emit BalanceUpdated for mempool transaction"); + assert!(found, "should emit MempoolTransactionReceived for mempool transaction"); // Irrelevant tx should not emit any events let unrelated_tx = Transaction { @@ -290,17 +359,17 @@ mod tests { let mut found = false; while let Ok(event) = rx.try_recv() { - if let WalletEvent::BalanceUpdated { - confirmed, + if let WalletEvent::BlockProcessChange { + balance, .. } = event { - assert!(confirmed > 0, "confirmed balance should increase after block"); + assert!(balance.confirmed() > 0, "confirmed balance should increase after block"); found = true; break; } } - assert!(found, "should emit BalanceUpdated for block processing"); + assert!(found, "should emit BlockProcessChange for block processing"); } #[tokio::test] diff --git a/key-wallet-manager/src/test_helpers.rs b/key-wallet-manager/src/test_helpers.rs index f70cef633..42038d873 100644 --- a/key-wallet-manager/src/test_helpers.rs +++ b/key-wallet-manager/src/test_helpers.rs @@ -74,8 +74,12 @@ pub(crate) fn assert_no_events(rx: &mut broadcast::Receiver) { /// Submit a transaction through a sequence of contexts and verify the event flow. /// -/// The first context produces a `TransactionReceived` event; each subsequent -/// context produces a `TransactionStatusChanged` event. +/// - First `Mempool` or `InstantSend` context → `MempoolTransactionReceived` +/// (the record's `context` matches the submitted context). +/// - Subsequent `InstantSend` context on an existing record → +/// `TransactionInstantSendLocked`. +/// - Subsequent `InBlock` / `InChainLockedBlock` context → +/// `BlockProcessChange` with `transactions_updated.len() == 1`. pub(crate) async fn assert_lifecycle_flow(contexts: &[TransactionContext], input_seed: u8) { assert!(!contexts.is_empty(), "at least one context required"); @@ -87,22 +91,42 @@ pub(crate) async fn assert_lifecycle_flow(contexts: &[TransactionContext], input manager.check_transaction_in_all_wallets(&tx, ctx.clone(), true, true).await; let event = assert_single_event(&mut rx); - if i == 0 { - assert!( - matches!(&event, WalletEvent::TransactionReceived { wallet_id: wid, record, .. } if *wid == wallet_id && record.context == *ctx), - "context[{}]: expected TransactionReceived with wallet_id and status {:?}, got {:?}", - i, - ctx, - event - ); - } else { - assert!( - matches!(&event, WalletEvent::TransactionStatusChanged { wallet_id: wid, status, .. } if *wid == wallet_id && status == ctx), - "context[{}]: expected TransactionStatusChanged with wallet_id and status {:?}, got {:?}", - i, - ctx, - event - ); + match ctx { + TransactionContext::Mempool => { + assert!( + matches!(&event, WalletEvent::MempoolTransactionReceived { wallet_id: wid, record, .. } if *wid == wallet_id && record.context == *ctx), + "context[{}]: expected MempoolTransactionReceived(Mempool), got {:?}", + i, + event + ); + } + TransactionContext::InstantSend(_) => { + if i == 0 { + assert!( + matches!(&event, WalletEvent::MempoolTransactionReceived { wallet_id: wid, record, .. } if *wid == wallet_id && record.context == *ctx), + "context[{}]: expected MempoolTransactionReceived(InstantSend), got {:?}", + i, + event + ); + } else { + assert!( + matches!(&event, WalletEvent::TransactionInstantSendLocked { wallet_id: wid, .. } if *wid == wallet_id), + "context[{}]: expected TransactionInstantSendLocked, got {:?}", + i, + event + ); + } + } + TransactionContext::InBlock(info) | TransactionContext::InChainLockedBlock(info) => { + let expected_height = info.height(); + assert!( + matches!(&event, WalletEvent::BlockProcessChange { wallet_id: wid, height, transactions_updated, .. } if *wid == wallet_id && *height == expected_height && transactions_updated.len() == 1), + "context[{}]: expected BlockProcessChange(height={}, 1 tx), got {:?}", + i, + expected_height, + event + ); + } } } } diff --git a/key-wallet/src/transaction_checking/account_checker.rs b/key-wallet/src/transaction_checking/account_checker.rs index a9750f1aa..c62456f67 100644 --- a/key-wallet/src/transaction_checking/account_checker.rs +++ b/key-wallet/src/transaction_checking/account_checker.rs @@ -46,8 +46,13 @@ pub struct TransactionCheckResult { pub total_received_for_credit_conversion: u64, /// New addresses generated during gap limit maintenance pub new_addresses: Vec
, - /// Transaction records created for new transactions, paired with their account index - pub new_records: Vec<(u32, TransactionRecord)>, + /// Transaction records recorded or updated by this check. + /// + /// Contains one entry per affected account. Entries are present when the + /// transaction was newly recorded, confirmed in a block, or InstantSend-locked + /// on top of an existing record. Use `is_new_transaction` to know whether + /// these records are new or updates to existing ones. + pub affected_records: Vec, } /// Enum representing the type of Core account that matched with embedded data @@ -375,7 +380,7 @@ impl ManagedAccountCollection { total_sent: 0, total_received_for_credit_conversion: 0, new_addresses: Vec::new(), - new_records: Vec::new(), + affected_records: Vec::new(), }; for account_type in account_types { diff --git a/key-wallet/src/transaction_checking/wallet_checker.rs b/key-wallet/src/transaction_checking/wallet_checker.rs index 05a8625a1..1ee4ab58e 100644 --- a/key-wallet/src/transaction_checking/wallet_checker.rs +++ b/key-wallet/src/transaction_checking/wallet_checker.rs @@ -103,6 +103,7 @@ impl WalletTransactionChecker for ManagedWalletInfo { account.mark_utxos_instant_send(&txid); if let Some(record) = account.transactions.get_mut(&txid) { record.update_context(context.clone()); + result.affected_records.push(record.clone()); } } } @@ -129,12 +130,13 @@ impl WalletTransactionChecker for ManagedWalletInfo { if is_new { let record = account.record_transaction(tx, &account_match, context.clone(), tx_type); - if let Some(account_index) = account_match.account_type_match.account_index() { - result.new_records.push((account_index, record)); - } + result.affected_records.push(record); result.state_modified = true; } else if account.confirm_transaction(tx, &account_match, context.clone(), tx_type) { result.state_modified = true; + if let Some(record) = account.transactions.get(&txid) { + result.affected_records.push(record.clone()); + } } for address_info in account_match.account_type_match.all_involved_addresses() { From 3c446a3504c94d38cda0191a119729789523cbce Mon Sep 17 00:00:00 2001 From: Quantum Explorer Date: Fri, 24 Apr 2026 03:47:44 +0800 Subject: [PATCH 2/3] fix(ci): cargo fmt + rustdoc intra-doc link - Run cargo fmt across touched files. - Replace `[check_transaction_in_all_wallets_silent]` intra-doc link with a plain backtick reference since the function is `pub(crate)` and can't be resolved from a `pub` doc under `-D rustdoc::broken-intra-doc-links`. Co-Authored-By: Claude Opus 4.7 (1M context) --- dash-spv-ffi/tests/dashd_sync/callbacks.rs | 3 +-- key-wallet-manager/src/event_tests.rs | 13 +++---------- key-wallet-manager/src/lib.rs | 2 +- key-wallet-manager/src/process_block.rs | 5 ++--- 4 files changed, 7 insertions(+), 16 deletions(-) diff --git a/dash-spv-ffi/tests/dashd_sync/callbacks.rs b/dash-spv-ffi/tests/dashd_sync/callbacks.rs index dcd7f6a32..0222a480d 100644 --- a/dash-spv-ffi/tests/dashd_sync/callbacks.rs +++ b/dash-spv-ffi/tests/dashd_sync/callbacks.rs @@ -410,8 +410,7 @@ extern "C" fn on_block_process_change( return; }; if !transactions_updated.is_null() && record_count > 0 { - let records = - unsafe { slice::from_raw_parts(transactions_updated, record_count as usize) }; + let records = unsafe { slice::from_raw_parts(transactions_updated, record_count as usize) }; let mut sink = tracker.received_transactions.lock().unwrap_or_else(|e| e.into_inner()); for r in records { sink.push((r.txid, r.net_amount)); diff --git a/key-wallet-manager/src/event_tests.rs b/key-wallet-manager/src/event_tests.rs index 471992808..9f2c52c4e 100644 --- a/key-wallet-manager/src/event_tests.rs +++ b/key-wallet-manager/src/event_tests.rs @@ -508,16 +508,9 @@ async fn test_process_block_emits_events() { assert_eq!(result.new_txids.len(), 1); let events = drain_events(&mut rx); - let block_events: Vec<_> = events - .iter() - .filter(|e| matches!(e, WalletEvent::BlockProcessChange { .. })) - .collect(); - assert_eq!( - block_events.len(), - 1, - "expected exactly one BlockProcessChange, got {:?}", - events - ); + let block_events: Vec<_> = + events.iter().filter(|e| matches!(e, WalletEvent::BlockProcessChange { .. })).collect(); + assert_eq!(block_events.len(), 1, "expected exactly one BlockProcessChange, got {:?}", events); match block_events[0] { WalletEvent::BlockProcessChange { diff --git a/key-wallet-manager/src/lib.rs b/key-wallet-manager/src/lib.rs index 59080c079..0e6064f12 100644 --- a/key-wallet-manager/src/lib.rs +++ b/key-wallet-manager/src/lib.rs @@ -460,7 +460,7 @@ impl WalletManager { /// /// Block-processing callers that want a single consolidated /// `BlockProcessChange` per wallet should use the internal - /// [`check_transaction_in_all_wallets_silent`] entry point. + /// `check_transaction_in_all_wallets_silent` entry point. pub async fn check_transaction_in_all_wallets( &mut self, tx: &Transaction, diff --git a/key-wallet-manager/src/process_block.rs b/key-wallet-manager/src/process_block.rs index 2ddb63219..ecd708564 100644 --- a/key-wallet-manager/src/process_block.rs +++ b/key-wallet-manager/src/process_block.rs @@ -89,9 +89,8 @@ impl WalletInterface for WalletM // Refresh cached balances only for affected wallets *before* emitting // so the event carries the post-update balance. - let check_result = self - .check_transaction_in_all_wallets_silent(tx, context.clone(), true, false) - .await; + let check_result = + self.check_transaction_in_all_wallets_silent(tx, context.clone(), true, false).await; let is_relevant = !check_result.affected_wallets.is_empty(); let net_amount = if is_relevant { From aa99ca4f75f7c4c2b2f2599f8cc3e918dc6b228c Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Thu, 23 Apr 2026 16:32:49 -0700 Subject: [PATCH 3/3] renamed methods, docs, and strings after renaming the events --- dash-spv/ARCHITECTURE.md | 4 ++-- dash-spv/tests/dashd_sync/helpers.rs | 8 ++++---- dash-spv/tests/dashd_sync/tests_mempool.rs | 6 +++--- key-wallet-manager/src/event_tests.rs | 6 +++--- key-wallet-manager/src/process_block.rs | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dash-spv/ARCHITECTURE.md b/dash-spv/ARCHITECTURE.md index 9ec363165..2c228455f 100644 --- a/dash-spv/ARCHITECTURE.md +++ b/dash-spv/ARCHITECTURE.md @@ -1163,7 +1163,7 @@ handle_tx() ├─ wallet.process_mempool_transaction(tx, is_locked) │ ├─ Not relevant → discard │ └─ Relevant → store in MempoolState - │ ├─ Wallet emits BalanceUpdated event + │ ├─ Wallet emits MempoolTransactionReceived event (carries record + balance) │ └─ New addresses discovered → flag filter rebuild └─ Return MempoolTransactionResult { is_relevant, net_amount, is_outgoing, addresses, new_addresses } ``` @@ -1229,7 +1229,7 @@ The `WalletInterface` trait provides four methods for mempool support: - `pending_balance`: regular unconfirmed transactions - `pending_instant_balance`: IS-locked transactions (immediately spendable) -The wallet emits `BalanceUpdated` events only when balance actually changes, with four categories: spendable, unconfirmed, immature, locked. +The wallet's post-event balance is carried inline on `MempoolTransactionReceived`, `TransactionInstantSendLocked`, and `BlockProcessChange` events, with four categories: spendable, unconfirmed, immature, locked. **Capacity and limits:** diff --git a/dash-spv/tests/dashd_sync/helpers.rs b/dash-spv/tests/dashd_sync/helpers.rs index 010ec0d4b..70ece27b9 100644 --- a/dash-spv/tests/dashd_sync/helpers.rs +++ b/dash-spv/tests/dashd_sync/helpers.rs @@ -126,7 +126,7 @@ pub(super) async fn wait_for_network_event( } } -/// Wait for a wallet `TransactionReceived` event with mempool status within the given timeout. +/// Wait for a wallet `MempoolTransactionReceived` event with mempool context within the given timeout. /// Returns `Some(txid)` if received, `None` on timeout. pub(super) async fn wait_for_mempool_tx( receiver: &mut broadcast::Receiver, @@ -176,13 +176,13 @@ pub(super) async fn wait_for_mempool_synced( } } -/// Assert that no mempool `TransactionReceived` event arrives within the given duration. +/// Assert that no mempool `MempoolTransactionReceived` event arrives within the given duration. pub(super) async fn assert_no_mempool_tx( receiver: &mut broadcast::Receiver, wait: Duration, ) { if let Some(txid) = wait_for_mempool_tx(receiver, wait).await { - panic!("Unexpected mempool TransactionReceived event with txid: {}", txid); + panic!("Unexpected MempoolTransactionReceived event with txid: {}", txid); } } @@ -319,7 +319,7 @@ pub(super) async fn wait_for_mempool_txs_both( for _ in 0..count { let txid = wait_for_mempool_tx(receiver, timeout) .await - .expect("Expected mempool TransactionReceived event"); + .expect("Expected MempoolTransactionReceived event"); txids.insert(txid); } txids diff --git a/dash-spv/tests/dashd_sync/tests_mempool.rs b/dash-spv/tests/dashd_sync/tests_mempool.rs index 845f31fda..681420a8a 100644 --- a/dash-spv/tests/dashd_sync/tests_mempool.rs +++ b/dash-spv/tests/dashd_sync/tests_mempool.rs @@ -38,7 +38,7 @@ async fn test_mempool_detects_incoming_tx() { let mempool_txid = wait_for_mempool_tx_both(&mut fa, &mut bf, MEMPOOL_TIMEOUT) .await - .expect("Expected mempool TransactionReceived event"); + .expect("Expected MempoolTransactionReceived event"); assert_eq!(mempool_txid, txid, "Mempool event txid should match sent txid"); fa.stop().await; @@ -106,7 +106,7 @@ async fn test_mempool_to_confirmed_lifecycle() { let mempool_txid = wait_for_mempool_tx_both(&mut fa, &mut bf, MEMPOOL_TIMEOUT) .await - .expect("Expected mempool TransactionReceived event"); + .expect("Expected MempoolTransactionReceived event"); assert_eq!(mempool_txid, txid); // Mine the transaction @@ -552,7 +552,7 @@ async fn test_broadcast_transaction_local_detection() { // The locally dispatched transaction should be picked up by the mempool manager let detected = wait_for_mempool_tx_both(&mut fa, &mut bf, MEMPOOL_TIMEOUT) .await - .expect("Expected mempool TransactionReceived event after broadcast"); + .expect("Expected MempoolTransactionReceived event after broadcast"); assert_eq!(detected, txid, "Detected txid should match broadcast txid"); // Step 4: Mine the broadcast tx and verify it transitions to confirmed diff --git a/key-wallet-manager/src/event_tests.rs b/key-wallet-manager/src/event_tests.rs index 9f2c52c4e..117ea059a 100644 --- a/key-wallet-manager/src/event_tests.rs +++ b/key-wallet-manager/src/event_tests.rs @@ -182,11 +182,11 @@ async fn test_mempool_after_instantsend_is_suppressed() { } // --------------------------------------------------------------------------- -// BalanceUpdated event tests +// Balance-carrying event tests // --------------------------------------------------------------------------- #[tokio::test] -async fn test_mempool_tx_emits_balance_updated() { +async fn test_mempool_tx_event_carries_unconfirmed_balance() { let (mut manager, wallet_id, addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); let tx = create_tx_paying_to(&addr, 0xf1); @@ -219,7 +219,7 @@ async fn test_mempool_tx_emits_balance_updated() { } #[tokio::test] -async fn test_instantsend_tx_emits_balance_updated_spendable() { +async fn test_instantsend_tx_event_carries_spendable_balance() { let (mut manager, wallet_id, addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); let tx = create_tx_paying_to(&addr, 0xf2); diff --git a/key-wallet-manager/src/process_block.rs b/key-wallet-manager/src/process_block.rs index ecd708564..905b11a00 100644 --- a/key-wallet-manager/src/process_block.rs +++ b/key-wallet-manager/src/process_block.rs @@ -348,7 +348,7 @@ mod tests { } #[tokio::test] - async fn test_process_block_emits_balance_updated() { + async fn test_process_block_event_carries_balance() { let (mut manager, _wallet_id, addr) = setup_manager_with_wallet(); let tx = create_tx_paying_to(&addr, 0xcc); let block = make_block(vec![tx]);