diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 8a90dc93e97..8948c461557 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,8 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; -use lightning::chain::transaction::OutPoint; +use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -87,7 +86,6 @@ use lightning::util::wallet_utils::{WalletSourceSync, WalletSync}; use lightning_invoice::RawBolt11Invoice; use crate::utils::test_logger::{self, Output}; -use crate::utils::test_persister::TestPersister; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; @@ -293,10 +291,20 @@ impl Writer for VecWriter { } } +fn serialize_monitor(monitor: &ChannelMonitor) -> Vec { + let mut ser = VecWriter(Vec::new()); + monitor.write(&mut ser).unwrap(); + ser.0 +} + /// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]` /// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass /// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by -/// storing both old `ChannelMonitor`s and ones that are "being persisted" here. +/// storing restart candidates here. +/// +/// Separately, we track every `InProgress` persistence operation that still needs a +/// `channel_monitor_updated` call. A newer persisted monitor can make an older monitor invalid for +/// restart while the older update still needs to be completed to unblock the live `ChainMonitor`. /// /// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will /// simply be replayed on startup. @@ -309,128 +317,206 @@ struct LatestMonitorState { persisted_monitor_id: u64, /// The latest serialized `ChannelMonitor` that we told LDK we persisted. persisted_monitor: Vec, - /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting", - /// from LDK's perspective. + /// A set of (monitor id, serialized `ChannelMonitor`)s which remain safe to use as stale + /// monitors on reload. pending_monitors: Vec<(u64, Vec)>, + /// A set of (monitor id, serialized `ChannelMonitor`)s which still need a + /// `channel_monitor_updated` callback. + pending_monitor_completions: Vec<(u64, Vec)>, +} +impl LatestMonitorState { + fn assert_pending_sorted(pending: &[(u64, Vec)]) { + assert!( + pending.windows(2).all(|pair| pair[0].0 < pair[1].0), + "updates should be sorted by id" + ); + } + + fn upsert_pending_entry( + pending: &mut Vec<(u64, Vec)>, monitor_id: u64, serialized_monitor: Vec, + ) { + match pending.binary_search_by_key(&monitor_id, |(id, _)| *id) { + Ok(idx) => pending[idx].1 = serialized_monitor, + Err(idx) => pending.insert(idx, (monitor_id, serialized_monitor)), + } + } + + fn mark_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + self.pending_monitors.retain(|(id, _)| *id > monitor_id); + if monitor_id >= self.persisted_monitor_id { + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; + } + } + + fn upsert_pending(&mut self, monitor_id: u64, serialized_monitor: Vec) { + Self::upsert_pending_entry( + &mut self.pending_monitors, + monitor_id, + serialized_monitor.clone(), + ); + Self::upsert_pending_entry( + &mut self.pending_monitor_completions, + monitor_id, + serialized_monitor, + ); + } + + fn mark_completion_finished(&mut self, monitor_id: u64, serialized_monitor: Vec) { + self.pending_monitor_completions.retain(|(id, _)| *id != monitor_id); + self.mark_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_completions(&mut self) -> Vec<(u64, Vec)> { + Self::assert_pending_sorted(&self.pending_monitor_completions); + self.pending_monitor_completions.drain(..).collect() + } + + fn take_pending_completion( + &mut self, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + Self::assert_pending_sorted(&self.pending_monitor_completions); + match selector { + MonitorUpdateSelector::First => { + if self.pending_monitor_completions.is_empty() { + None + } else { + Some(self.pending_monitor_completions.remove(0)) + } + }, + MonitorUpdateSelector::Second => { + if self.pending_monitor_completions.len() > 1 { + Some(self.pending_monitor_completions.remove(1)) + } else { + None + } + }, + MonitorUpdateSelector::Last => self.pending_monitor_completions.pop(), + } + } } -struct TestChainMonitor { - pub logger: Arc, - pub keys: Arc, - pub persister: Arc, - pub chain_monitor: Arc< - chainmonitor::ChainMonitor< - TestChannelSigner, - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, - >, - >, +struct HarnessPersister { + pub update_ret: Mutex, pub latest_monitors: Mutex>, } -impl TestChainMonitor { - pub fn new( - broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, - ) -> Self { - Self { - chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( - None, - broadcaster, - logger.clone(), - feeest, - Arc::clone(&persister), - Arc::clone(&keys), - keys.get_peer_storage_key(), - false, - )), - logger, - keys, - persister, - latest_monitors: Mutex::new(new_hash_map()), +impl HarnessPersister { + fn track_monitor_update( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + status: chain::ChannelMonitorUpdateStatus, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + if let Some(state) = latest_monitors.get_mut(&channel_id) { + match status { + chain::ChannelMonitorUpdateStatus::Completed => { + // Completing update N makes any older in-flight monitor blobs unusable on + // restart. A newer ChannelManager serialization will no longer advertise those + // earlier updates as blocked, so reloading them would violate the Watch API. + state.mark_persisted(monitor_id, serialized_monitor); + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + state.upsert_pending(monitor_id, serialized_monitor); + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => {}, + } + } else { + let state = match status { + chain::ChannelMonitorUpdateStatus::Completed => LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: serialized_monitor, + pending_monitors: Vec::new(), + pending_monitor_completions: Vec::new(), + }, + chain::ChannelMonitorUpdateStatus::InProgress => LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: Vec::new(), + pending_monitors: vec![(monitor_id, serialized_monitor.clone())], + pending_monitor_completions: vec![(monitor_id, serialized_monitor)], + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => return, + }; + assert!( + latest_monitors.insert(channel_id, state).is_none(), + "Already had monitor state pre-persist" + ); } } -} -impl chain::Watch for TestChainMonitor { - fn watch_channel( - &self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor, - ) -> Result { - let mut ser = VecWriter(Vec::new()); - monitor.write(&mut ser).unwrap(); - let monitor_id = monitor.get_latest_update_id(); - let res = self.chain_monitor.watch_channel(channel_id, monitor); - let state = match res { - Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: ser.0, - pending_monitors: Vec::new(), - }, - Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: Vec::new(), - pending_monitors: vec![(monitor_id, ser.0)], - }, - Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(), - Err(()) => panic!(), - }; - if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() { - panic!("Already had monitor pre-watch_channel"); + + fn mark_update_completed( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + ) { + if let Some(state) = self.latest_monitors.lock().unwrap().get_mut(&channel_id) { + // Once LDK acknowledges update N as completed, any older pending monitor blob is fully + // superseded and must not be offered back on restart. + state.mark_completion_finished(monitor_id, serialized_monitor); } - res } - fn update_channel( - &self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate, - ) -> chain::ChannelMonitorUpdateStatus { - let mut map_lock = self.latest_monitors.lock().unwrap(); - let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call"); - let latest_monitor_data = map_entry - .pending_monitors - .last() - .as_ref() - .map(|(_, data)| data) - .unwrap_or(&map_entry.persisted_monitor); - let deserialized_monitor = - <(BlockLocator, channelmonitor::ChannelMonitor)>::read( - &mut &latest_monitor_data[..], - (&*self.keys, &*self.keys), - ) + fn drain_pending_updates(&self, channel_id: &ChannelId) -> Vec<(u64, Vec)> { + self.latest_monitors + .lock() .unwrap() - .1; - deserialized_monitor - .update_monitor( - update, - &&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }, - &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, - &self.logger, - ) - .unwrap(); - let mut ser = VecWriter(Vec::new()); - deserialized_monitor.write(&mut ser).unwrap(); - let res = self.chain_monitor.update_channel(channel_id, update); - match res { - chain::ChannelMonitorUpdateStatus::Completed => { - map_entry.persisted_monitor_id = update.update_id; - map_entry.persisted_monitor = ser.0; - }, - chain::ChannelMonitorUpdateStatus::InProgress => { - map_entry.pending_monitors.push((update.update_id, ser.0)); - }, - chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(), + .get_mut(channel_id) + .map_or_else(Vec::new, |state| state.drain_pending_completions()) + } + + fn drain_all_pending_updates(&self) -> Vec<(ChannelId, u64, Vec)> { + let mut completed_updates = Vec::new(); + for (channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + for (monitor_id, data) in state.drain_pending_completions() { + completed_updates.push((*channel_id, monitor_id, data)); + } } - res + completed_updates + } + + fn take_pending_update( + &self, channel_id: &ChannelId, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .and_then(|state| state.take_pending_completion(selector)) + } +} +impl chainmonitor::Persist for HarnessPersister { + fn persist_new_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = data.get_latest_update_id(); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update(data.channel_id(), monitor_id, serialized_monitor, status); + status } - fn release_pending_monitor_events( - &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - return self.chain_monitor.release_pending_monitor_events(); + fn update_persisted_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + update: Option<&channelmonitor::ChannelMonitorUpdate>, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = update.map_or_else(|| data.get_latest_update_id(), |upd| upd.update_id); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update(data.channel_id(), monitor_id, serialized_monitor, status); + status } + + fn archive_persisted_channel(&self, _monitor_name: lightning::util::persist::MonitorName) {} } +type TestChainMonitor = chainmonitor::ChainMonitor< + TestChannelSigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + struct KeyProvider { node_secret: SecretKey, rand_bytes_id: atomic::AtomicU32, @@ -654,12 +740,15 @@ struct HarnessNode<'a> { node_id: u8, node: ChanMan<'a>, monitor: Arc, + persister: Arc, + monitor_logger: Arc, keys_manager: Arc, logger: Arc, broadcaster: Arc, fee_estimator: Arc, wallet: TestWalletSource, persistence_style: ChannelMonitorUpdateStatus, + deferred: bool, serialized_manager: Vec, height: u32, last_htlc_clear_fee: u32, @@ -683,24 +772,34 @@ impl<'a> HarnessNode<'a> { (logger_for_monitor, logger) } + fn build_persister(persistence_style: ChannelMonitorUpdateStatus) -> Arc { + Arc::new(HarnessPersister { + update_ret: Mutex::new(persistence_style), + latest_monitors: Mutex::new(new_hash_map()), + }) + } + fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, keys_manager: &Arc, logger_for_monitor: Arc, - persistence_style: ChannelMonitorUpdateStatus, + persister: &Arc, deferred: bool, ) -> Arc { - Arc::new(TestChainMonitor::new( + Arc::new(chainmonitor::ChainMonitor::new( + None, Arc::clone(broadcaster), logger_for_monitor, Arc::clone(fee_estimator), - Arc::new(TestPersister { update_ret: Mutex::new(persistence_style) }), + Arc::clone(persister), Arc::clone(keys_manager), + keys_manager.get_peer_storage_key(), + deferred, )) } fn new( node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, - out: &Out, router: &'a FuzzRouter, chan_type: ChanType, + deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { let (logger_for_monitor, logger) = Self::build_loggers(node_id, out); let node_secret = SecretKey::from_slice(&[ @@ -713,12 +812,14 @@ impl<'a> HarnessNode<'a> { rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()), }); + let persister = Self::build_persister(persistence_style); let monitor = Self::build_chain_monitor( &broadcaster, &fee_estimator, &keys_manager, - logger_for_monitor, - persistence_style, + Arc::clone(&logger_for_monitor), + &persister, + deferred, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -741,12 +842,15 @@ impl<'a> HarnessNode<'a> { node_id, node, monitor, + persister, + monitor_logger: logger_for_monitor, keys_manager, logger, broadcaster, fee_estimator, wallet, persistence_style, + deferred, serialized_manager: Vec::new(), height: 0, last_htlc_clear_fee: 253, @@ -757,64 +861,32 @@ impl<'a> HarnessNode<'a> { self.persistence_style = style; } - fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + fn finish_monitor_update(&self, chan_id: ChannelId, monitor_id: u64, data: Vec) { + self.monitor.channel_monitor_updated(chan_id, monitor_id).unwrap(); + self.persister.mark_update_completed(chan_id, monitor_id, data); + } + + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool { + assert_eq!(self.monitor.pending_operation_count(), 0); + let completed_updates = self.persister.drain_pending_updates(chan_id); + let completed_any = !completed_updates.is_empty(); + for (monitor_id, data) in completed_updates { + self.finish_monitor_update(*chan_id, monitor_id, data); } + completed_any } fn complete_all_pending_monitor_updates(&self) { - for (channel_id, state) in self.monitor.latest_monitors.lock().unwrap().iter_mut() { - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); - if id >= state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + assert_eq!(self.monitor.pending_operation_count(), 0); + for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() { + self.finish_monitor_update(channel_id, monitor_id, data); } } fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - let update = match selector { - MonitorUpdateSelector::First => { - if state.pending_monitors.is_empty() { - None - } else { - Some(state.pending_monitors.remove(0)) - } - }, - MonitorUpdateSelector::Second => { - if state.pending_monitors.len() > 1 { - Some(state.pending_monitors.remove(1)) - } else { - None - } - }, - MonitorUpdateSelector::Last => state.pending_monitors.pop(), - }; - if let Some((id, data)) = update { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + assert_eq!(self.monitor.pending_operation_count(), 0); + if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } @@ -836,9 +908,30 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) { + fn checkpoint_manager_persistence(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { + let pending_monitor_writes = self.monitor.pending_operation_count(); self.serialized_manager = self.node.encode(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.monitor_logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } + true + } else { + assert_eq!(self.monitor.pending_operation_count(), 0); + false + } + } + + fn force_checkpoint_manager_persistence(&mut self) { + let pending_monitor_writes = self.monitor.pending_operation_count(); + self.serialized_manager = self.node.encode(); + self.node.get_and_clear_needs_persistence(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.monitor_logger); + } else { + assert_eq!(pending_monitor_writes, 0); } } @@ -943,18 +1036,20 @@ impl<'a> HarnessNode<'a> { &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { let (logger_for_monitor, logger) = Self::build_loggers(self.node_id, out); + let persister = Self::build_persister(ChannelMonitorUpdateStatus::Completed); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, &self.keys_manager, - logger_for_monitor, - ChannelMonitorUpdateStatus::Completed, + Arc::clone(&logger_for_monitor), + &persister, + self.deferred, ); let mut monitors = new_hash_map(); let mut use_old_mons = use_old_mons; { - let mut old_monitors = self.monitor.latest_monitors.lock().unwrap(); + let mut old_monitors = self.persister.latest_monitors.lock().unwrap(); for (channel_id, mut prev_state) in old_monitors.drain() { let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 { // Reload with the oldest `ChannelMonitor` (the one that we already told @@ -985,7 +1080,8 @@ impl<'a> HarnessNode<'a> { // considering them discarded. LDK should replay these for us as they're stored in // the `ChannelManager`. prev_state.pending_monitors.clear(); - chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); + prev_state.pending_monitor_completions.clear(); + persister.latest_monitors.lock().unwrap().insert(channel_id, prev_state); } } let mut monitor_refs = new_hash_map(); @@ -1007,17 +1103,28 @@ impl<'a> HarnessNode<'a> { channel_monitors: monitor_refs, }; - let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) - .expect("Failed to read manager"); + let (_block_locator, manager) = + <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) + .expect("Failed to read manager"); + let expected_status = if self.deferred { + ChannelMonitorUpdateStatus::InProgress + } else { + ChannelMonitorUpdateStatus::Completed + }; for (channel_id, mon) in monitors.drain() { - assert_eq!( - chain_monitor.chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) - ); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status)); + } + *persister.update_ret.lock().unwrap() = self.persistence_style; + if self.deferred { + let count = chain_monitor.pending_operation_count(); + self.serialized_manager = manager.encode(); + manager.get_and_clear_needs_persistence(); + chain_monitor.flush(count, &logger_for_monitor); } - *chain_monitor.persister.update_ret.lock().unwrap() = self.persistence_style; - self.node = manager.1; + self.node = manager; self.monitor = chain_monitor; + self.persister = persister; + self.monitor_logger = logger_for_monitor; self.logger = logger; } } @@ -1233,11 +1340,13 @@ impl PeerLink { || (self.node_a == node_b && self.node_b == node_a) } - fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) { + fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool { + let mut completed_updates = false; for id in &self.channel_ids { - nodes[self.node_a].complete_all_monitor_updates(id); - nodes[self.node_b].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id); } + completed_updates } fn complete_monitor_updates_for_node( @@ -1807,10 +1916,24 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) { dest.peer_connected(source.get_our_node_id(), &init_src, false).unwrap(); } +fn get_two_nodes_mut<'a, 'b>( + nodes: &'b mut [HarnessNode<'a>; 3], first_idx: usize, second_idx: usize, +) -> (&'b mut HarnessNode<'a>, &'b mut HarnessNode<'a>) { + assert_ne!(first_idx, second_idx); + if first_idx < second_idx { + let (left, right) = nodes.split_at_mut(second_idx); + (&mut left[first_idx], &mut right[0]) + } else { + let (left, right) = nodes.split_at_mut(first_idx); + (&mut right[0], &mut left[second_idx]) + } +} + fn make_channel( - source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool, - trusted_accept: bool, chain_state: &mut ChainState, + nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32, + trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState, ) { + let (source, dest) = get_two_nodes_mut(nodes, source_idx, dest_idx); if trusted_open { source .create_channel_to_trusted_peer_0reserve( @@ -1921,7 +2044,8 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor updates for dest after watch_channel. + dest.checkpoint_manager_persistence(); + // Complete any flushed monitor updates for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -1942,7 +2066,8 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor updates for source after watch_channel. + source.checkpoint_manager_persistence(); + // Complete any flushed monitor updates for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -2014,6 +2139,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; + let deferred = [ + config_byte & 0b0010_0000 != 0, + config_byte & 0b0100_0000 != 0, + config_byte & 0b1000_0000 != 0, + ]; let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); @@ -2051,6 +2181,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], + deferred[0], &out, router, chan_type, @@ -2061,6 +2192,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], + deferred[1], &out, router, chan_type, @@ -2071,6 +2203,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], + deferred[2], &out, router, chan_type, @@ -2088,14 +2221,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // channel gets its own txid and funding outpoint. // A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept), // channel 3 A has 0-reserve (trusted accept). - make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state); + make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state); // B-C: channel 4 B has 0-reserve (via trusted accept), // channel 5 C has 0-reserve (via trusted open). - make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state); + make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state); // Wipe the transactions-broadcasted set to make sure we don't broadcast // any transactions during normal operation after setup. @@ -2122,7 +2255,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { }; for node in &mut nodes { - node.serialized_manager = node.encode(); + node.force_checkpoint_manager_persistence(); } Self { @@ -2542,7 +2675,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // claim/fail handling per event batch. let mut claim_set = new_hash_map(); let mut events = nodes[node_idx].get_and_clear_pending_events(); - let had_events = !events.is_empty(); + let mut had_events = !events.is_empty(); for event in events.drain(..) { match event { events::Event::PaymentClaimable { payment_hash, .. } => { @@ -2598,6 +2731,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } while nodes[node_idx].needs_pending_htlc_processing() { nodes[node_idx].process_pending_htlc_forwards(); + had_events = true; } had_events } @@ -2617,12 +2751,13 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { for i in 0..std::usize::MAX { if i == 100 { panic!( - "It may take may iterations to settle the state, but it should not take forever" - ); + "It may take may iterations to settle the state, but it should not take forever" + ); } + let mut made_progress = self.checkpoint_manager_persistences(); // Next, make sure no monitor updates are pending. - self.ab_link.complete_all_monitor_updates(&self.nodes); - self.bc_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. if self.process_msg_events(0, false, ProcessMessages::AllMessages) { last_pass_no_updates = false; @@ -2649,6 +2784,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { last_pass_no_updates = false; continue; } + if made_progress { + last_pass_no_updates = false; + continue; + } if last_pass_no_updates { // In some cases, we may generate a message to send in // `process_msg_events`, but block sending until @@ -2747,19 +2886,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) { + fn checkpoint_manager_persistences(&mut self) -> bool { + let mut made_progress = false; for node in &mut self.nodes { - node.refresh_serialized_manager(); + made_progress |= node.checkpoint_manager_persistence(); } + made_progress } } #[inline] pub fn do_test(data: &[u8], out: Out) { let router = FuzzRouter {}; - // Read initial monitor styles and channel type from fuzz input byte 0: + // Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0: // bits 0-2: monitor styles (1 bit per node) // bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments) + // bits 5-7: deferred monitor write mode (1 bit per node) let config_byte = if !data.is_empty() { data[0] } else { 0 }; let mut harness = Harness::new(config_byte, out, &router); let mut read_pos = 1; // First byte was consumed for initial config. @@ -3171,7 +3313,7 @@ pub fn do_test(data: &[u8], out: Out) { _ => break 'fuzz_loop, } - harness.refresh_serialized_managers(); + harness.checkpoint_manager_persistences(); } harness.finish(); }