Skip to content
31 changes: 29 additions & 2 deletions dash-spv-ffi/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,10 @@ impl FFIWalletEventCallbacks {
let wallet_id_hex = hex::encode(wallet_id);
let c_wallet_id = CString::new(wallet_id_hex).unwrap_or_default();

let ffi_ctx = FFITransactionContext::from(record.context.clone());
let islock_data = ffi_ctx.islock_data;
let islock_len = ffi_ctx.islock_len;

let tx_bytes =
dashcore::consensus::serialize(&record.transaction).into_boxed_slice();

Expand Down Expand Up @@ -736,7 +740,7 @@ impl FFIWalletEventCallbacks {
let ffi_record = FFITransactionRecord {
txid: record.txid.to_byte_array(),
net_amount: record.net_amount,
context: FFITransactionContext::from(record.context),
context: ffi_ctx,
transaction_type: FFITransactionType::from(record.transaction_type),
direction: FFITransactionDirection::from(record.direction),
fee: record.fee.unwrap_or(0),
Expand Down Expand Up @@ -778,6 +782,16 @@ impl FFIWalletEventCallbacks {
}
}
}
// SAFETY: Free the heap-allocated IS lock bytes produced by
// `From<TransactionContext>` after the callback returns.
if !islock_data.is_null() && islock_len > 0 {
unsafe {
drop(Box::from_raw(std::ptr::slice_from_raw_parts_mut(
islock_data as *mut u8,
islock_len,
)));
}
}
Comment thread
xdustinface marked this conversation as resolved.
}
}
WalletEvent::TransactionStatusChanged {
Expand All @@ -789,12 +803,25 @@ impl FFIWalletEventCallbacks {
let wallet_id_hex = hex::encode(wallet_id);
let c_wallet_id = CString::new(wallet_id_hex).unwrap_or_default();
let txid_bytes = txid.as_byte_array();
let ffi_ctx = FFITransactionContext::from(status.clone());
let islock_data = ffi_ctx.islock_data;
let islock_len = ffi_ctx.islock_len;
cb(
c_wallet_id.as_ptr(),
txid_bytes as *const [u8; 32],
FFITransactionContext::from(*status),
ffi_ctx,
self.user_data,
);
// SAFETY: Free the heap-allocated IS lock bytes produced by
// `From<TransactionContext>` after the callback returns.
if !islock_data.is_null() && islock_len > 0 {
unsafe {
Comment thread
xdustinface marked this conversation as resolved.
drop(Box::from_raw(std::ptr::slice_from_raw_parts_mut(
islock_data as *mut u8,
islock_len,
)));
}
}
}
}
WalletEvent::BalanceUpdated {
Expand Down
100 changes: 70 additions & 30 deletions dash-spv/src/sync/mempool/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};

use dashcore::ephemerealdata::instant_lock::InstantLock;
use dashcore::network::message_blockdata::Inventory;
use dashcore::{Amount, Transaction, Txid};
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -56,8 +57,8 @@ pub(crate) struct MempoolManager<W: WalletInterface> {
pending_requests: HashMap<Txid, Instant>,
/// Connected peers and their activation state.
pub(super) peers: HashMap<SocketAddr, Option<VecDeque<Txid>>>,
/// IS lock txids that arrived before their corresponding transaction, with insertion time.
pending_is_locks: HashMap<Txid, Instant>,
/// IS locks that arrived before their corresponding transaction, with insertion time.
pending_is_locks: HashMap<Txid, (InstantLock, Instant)>,
/// Txids already downloaded, with download timestamp.
/// Prevents duplicate downloads when multiple peers announce the same transactions.
/// Entries expire after `SEEN_TXID_EXPIRY`.
Expand Down Expand Up @@ -302,11 +303,12 @@ impl<W: WalletInterface> MempoolManager<W> {
self.progress.add_received(1);

// Check for a pre-arrived IS lock before wallet processing consumes it
let is_locked = self.pending_is_locks.remove(&txid).is_some();
let pending_lock = self.pending_is_locks.remove(&txid).map(|(lock, _)| lock);
let is_locked = pending_lock.is_some();

let result = {
let mut wallet = self.wallet.write().await;
wallet.process_mempool_transaction(&tx, is_locked).await
wallet.process_mempool_transaction(&tx, pending_lock).await
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if !result.is_relevant {
Expand Down Expand Up @@ -356,30 +358,31 @@ impl<W: WalletInterface> MempoolManager<W> {

/// Mark a mempool transaction as InstantSend-locked and notify the wallet.
///
/// If the transaction hasn't arrived yet, remembers the txid so the lock
/// If the transaction hasn't arrived yet, remembers the lock so it
/// can be applied when the transaction is later received via `handle_tx`.
pub(super) async fn mark_instant_send(&mut self, txid: &Txid) {
pub(super) async fn process_instant_send(&mut self, instant_lock: InstantLock) {
let txid = instant_lock.txid;
let mut state = self.mempool_state.write().await;
let marked = if let Some(tx) = state.transactions.get_mut(txid) {
let instant_lock_opt = if let Some(tx) = state.transactions.get_mut(&txid) {
tx.is_instant_send = true;
tracing::debug!("Marked mempool tx {} as InstantSend-locked", txid);
true
Some(instant_lock)
} else if self.pending_is_locks.len() < MAX_PENDING_IS_LOCKS {
self.pending_is_locks.insert(*txid, Instant::now());
self.pending_is_locks.insert(txid, (instant_lock, Instant::now()));
tracing::debug!("IS lock arrived before tx {}, remembering for later", txid);
false
None
} else {
tracing::warn!(
"Pending IS locks at capacity ({}), dropping IS lock for {}",
MAX_PENDING_IS_LOCKS,
txid
);
false
None
};
drop(state);
if marked {
if let Some(lock) = instant_lock_opt {
let mut wallet = self.wallet.write().await;
wallet.process_instant_send_lock(*txid);
wallet.process_instant_send_lock(lock);
}
}

Expand All @@ -398,7 +401,7 @@ impl<W: WalletInterface> MempoolManager<W> {

// Prune pending IS locks whose transaction never arrived
let before = self.pending_is_locks.len();
self.pending_is_locks.retain(|_, inserted_at| inserted_at.elapsed() < timeout);
self.pending_is_locks.retain(|_, (_, inserted_at)| inserted_at.elapsed() < timeout);
let expired = before - self.pending_is_locks.len();
if expired > 0 {
tracing::debug!("Pruned {} expired pending IS locks", expired);
Expand Down Expand Up @@ -504,6 +507,21 @@ mod tests {
use crate::test_utils::test_socket_address;
use tokio::sync::mpsc;

fn dummy_instant_lock(txid: Txid) -> InstantLock {
InstantLock {
txid,
..InstantLock::default()
}
}

fn rich_instant_lock(txid: Txid) -> InstantLock {
InstantLock {
txid,
cyclehash: BlockHash::from_byte_array([0xab; 32]),
Comment thread
xdustinface marked this conversation as resolved.
..InstantLock::default()
}
}

fn create_test_manager(
) -> (MempoolManager<MockWallet>, RequestSender, mpsc::UnboundedReceiver<NetworkRequest>) {
let wallet = Arc::new(RwLock::new(MockWallet::new()));
Expand Down Expand Up @@ -917,7 +935,7 @@ mod tests {
));
}

manager.mark_instant_send(&txid).await;
manager.process_instant_send(dummy_instant_lock(txid)).await;

// Verify mempool state also reflects IS flag
let state = manager.mempool_state.read().await;
Expand All @@ -929,15 +947,15 @@ mod tests {
let changes = status_changes.lock().await;
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].0, txid);
assert_eq!(changes[0].1, TransactionContext::InstantSend);
assert!(matches!(changes[0].1, TransactionContext::InstantSend(_)));
}

#[tokio::test]
async fn test_mark_instant_send_stores_pending_for_unknown() {
let (mut manager, _requests, _rx) = create_test_manager();

let unknown_txid = Txid::from_byte_array([0xbb; 32]);
manager.mark_instant_send(&unknown_txid).await;
manager.process_instant_send(dummy_instant_lock(unknown_txid)).await;

// No immediate wallet notification
let wallet = manager.wallet.read().await;
Expand Down Expand Up @@ -1055,7 +1073,8 @@ mod tests {
manager
.peers
.insert(test_socket_address(1), Some(VecDeque::from([Txid::from_byte_array([2; 32])])));
manager.pending_is_locks.insert(Txid::from_byte_array([3; 32]), Instant::now());
let txid3 = Txid::from_byte_array([3; 32]);
manager.pending_is_locks.insert(txid3, (dummy_instant_lock(txid3), Instant::now()));

manager.clear_pending();

Expand Down Expand Up @@ -1092,7 +1111,7 @@ mod tests {

#[tokio::test]
async fn test_instant_send_before_transaction() {
let (mut manager, _requests, _wallet) = create_relevant_manager();
let (mut manager, _requests, wallet) = create_relevant_manager();

let tx = Transaction {
version: 1,
Expand All @@ -1103,8 +1122,8 @@ mod tests {
};
let txid = tx.txid();

// IS lock arrives before the transaction
manager.mark_instant_send(&txid).await;
// IS lock arrives before the transaction (with a distinct cyclehash)
manager.process_instant_send(rich_instant_lock(txid)).await;
assert!(manager.pending_is_locks.contains_key(&txid));

// Transaction arrives
Expand All @@ -1116,6 +1135,18 @@ mod tests {
// Transaction stored with IS flag set
let state = manager.mempool_state.read().await;
assert!(state.transactions.get(&txid).unwrap().is_instant_send);
drop(state);

// Wallet received the IS lock payload with the correct cyclehash
let w = wallet.read().await;
let locks = w.processed_instant_locks.lock().await;
let received = locks.iter().find(|(id, lock)| {
*id == txid
&& lock
.as_ref()
.is_some_and(|l| l.cyclehash == BlockHash::from_byte_array([0xab; 32]))
});
assert!(received.is_some(), "wallet should have received rich IS lock with cyclehash 0xab");
}

#[tokio::test]
Expand All @@ -1132,7 +1163,7 @@ mod tests {
let txid = tx.txid();

// IS lock arrives before the transaction
manager.mark_instant_send(&txid).await;
manager.process_instant_send(dummy_instant_lock(txid)).await;
assert!(manager.pending_is_locks.contains_key(&txid));

// Transaction arrives but wallet says it's not relevant
Expand All @@ -1154,13 +1185,14 @@ mod tests {
for i in 0..MAX_PENDING_IS_LOCKS {
let mut bytes = [0u8; 32];
bytes[0..8].copy_from_slice(&(i as u64).to_le_bytes());
manager.pending_is_locks.insert(Txid::from_byte_array(bytes), Instant::now());
let txid = Txid::from_byte_array(bytes);
manager.pending_is_locks.insert(txid, (dummy_instant_lock(txid), Instant::now()));
}
assert_eq!(manager.pending_is_locks.len(), MAX_PENDING_IS_LOCKS);

// Next IS lock should be dropped
let overflow_txid = Txid::from_byte_array([0xff; 32]);
manager.mark_instant_send(&overflow_txid).await;
manager.process_instant_send(dummy_instant_lock(overflow_txid)).await;
assert!(!manager.pending_is_locks.contains_key(&overflow_txid));
assert_eq!(manager.pending_is_locks.len(), MAX_PENDING_IS_LOCKS);
}
Expand Down Expand Up @@ -1191,8 +1223,10 @@ mod tests {

// Also store a pending IS lock for this txid and an unrelated one
let unrelated_txid = Txid::from_byte_array([0xdd; 32]);
manager.pending_is_locks.insert(txid, Instant::now());
manager.pending_is_locks.insert(unrelated_txid, Instant::now());
manager.pending_is_locks.insert(txid, (dummy_instant_lock(txid), Instant::now()));
manager
.pending_is_locks
.insert(unrelated_txid, (dummy_instant_lock(unrelated_txid), Instant::now()));

manager.prune_expired(test_timeout).await;

Expand All @@ -1216,13 +1250,19 @@ mod tests {

// Insert a pending IS lock that is older than the test timeout
let stale_txid = Txid::from_byte_array([0xaa; 32]);
manager
.pending_is_locks
.insert(stale_txid, Instant::now() - test_timeout - Duration::from_secs(1));
manager.pending_is_locks.insert(
stale_txid,
(
dummy_instant_lock(stale_txid),
Instant::now() - test_timeout - Duration::from_secs(1),
),
);

// Insert a fresh pending IS lock
let fresh_txid = Txid::from_byte_array([0xbb; 32]);
manager.pending_is_locks.insert(fresh_txid, Instant::now());
manager
.pending_is_locks
.insert(fresh_txid, (dummy_instant_lock(fresh_txid), Instant::now()));

manager.prune_expired(test_timeout).await;

Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/mempool/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<W: WalletInterface + 'static> SyncManager for MempoolManager<W> {
instant_lock,
..
} => {
self.mark_instant_send(&instant_lock.txid).await;
self.process_instant_send(instant_lock.clone()).await;
Ok(vec![])
}
_ => Ok(vec![]),
Expand Down
4 changes: 2 additions & 2 deletions key-wallet-ffi/FFI_API.md
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ Get the parent wallet ID of a managed account Note: ManagedAccount doesn't stor
#### `managed_wallet_check_transaction`

```c
managed_wallet_check_transaction(managed_wallet: *mut FFIManagedWalletInfo, wallet: *mut FFIWallet, tx_bytes: *const u8, tx_len: usize, context_type: FFITransactionContextType, block_info: FFIBlockInfo, update_state: bool, result_out: *mut FFITransactionCheckResult, error: *mut FFIError,) -> bool
managed_wallet_check_transaction(managed_wallet: *mut FFIManagedWalletInfo, wallet: *mut FFIWallet, tx_bytes: *const u8, tx_len: usize, context_type: FFITransactionContextType, block_info: FFIBlockInfo, islock_data: *const u8, islock_len: usize, update_state: bool, result_out: *mut FFITransactionCheckResult, error: *mut FFIError,) -> bool
```

**Description:**
Expand Down Expand Up @@ -1317,7 +1317,7 @@ Build and sign a transaction using the wallet's managed info This is the recomm
#### `wallet_check_transaction`

```c
wallet_check_transaction(wallet: *mut FFIWallet, tx_bytes: *const u8, tx_len: usize, context_type: FFITransactionContextType, block_info: FFIBlockInfo, update_state: bool, result_out: *mut FFITransactionCheckResult, error: *mut FFIError,) -> bool
wallet_check_transaction(wallet: *mut FFIWallet, tx_bytes: *const u8, tx_len: usize, context_type: FFITransactionContextType, block_info: FFIBlockInfo, islock_data: *const u8, islock_len: usize, update_state: bool, result_out: *mut FFITransactionCheckResult, error: *mut FFIError,) -> bool
```

**Description:**
Expand Down
11 changes: 9 additions & 2 deletions key-wallet-ffi/src/managed_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ pub unsafe extern "C" fn managed_core_account_get_transactions(

ffi_record.txid = record.txid.to_byte_array();
ffi_record.net_amount = record.net_amount;
ffi_record.context = FFITransactionContext::from(record.context);
ffi_record.context = FFITransactionContext::from(record.context.clone());
ffi_record.transaction_type = FFITransactionType::from(record.transaction_type);
ffi_record.direction = FFITransactionDirection::from(record.direction);
ffi_record.fee = record.fee.unwrap_or(0);
Expand Down Expand Up @@ -823,7 +823,10 @@ pub unsafe extern "C" fn managed_core_account_free_transactions(
}

for i in 0..count {
let record = &*transactions.add(i);
let record = &mut *transactions.add(i);

// Free IS lock data
record.context.free_islock_data();

// Free input detail addresses first, then the array
if !record.input_details.is_null() && record.input_details_count > 0 {
Expand Down Expand Up @@ -2012,6 +2015,8 @@ mod tests {
r0.context = FFITransactionContext {
context_type: FFITransactionContextType::Mempool,
block_info: FFIBlockInfo::empty(),
islock_data: std::ptr::null(),
islock_len: 0,
};
r0.transaction_type = FFITransactionType::Standard;
r0.direction = FFITransactionDirection::Incoming;
Expand Down Expand Up @@ -2053,6 +2058,8 @@ mod tests {
r1.context = FFITransactionContext {
context_type: FFITransactionContextType::Mempool,
block_info: FFIBlockInfo::empty(),
islock_data: std::ptr::null(),
islock_len: 0,
};
r1.transaction_type = FFITransactionType::Standard;
r1.direction = FFITransactionDirection::Outgoing;
Expand Down
Loading
Loading