From b061470f756108519d43bbfdcbb7cb846ed36342 Mon Sep 17 00:00:00 2001 From: dcorral Date: Wed, 11 Feb 2026 12:59:55 +0100 Subject: [PATCH] Implement RGB KVStore Integration --- lightning/src/chain/channelmonitor.rs | 4 + lightning/src/chain/onchaintx.rs | 2 + lightning/src/ln/chan_utils.rs | 4 +- lightning/src/ln/channel.rs | 66 +++-- lightning/src/ln/channelmanager.rs | 60 ++-- lightning/src/ln/outbound_payment.rs | 71 ++--- lightning/src/rgb_utils/mod.rs | 409 +++++++++++++++----------- lightning/src/sign/mod.rs | 17 +- 8 files changed, 376 insertions(+), 257 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index e64e6e72b..1ed9c477c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -7107,7 +7107,9 @@ mod tests { SecretKey::from_slice(&[41; 32]).unwrap(), [41; 32], [0; 32], + std::path::PathBuf::new(), [0; 32], + None, ); let counterparty_pubkeys = ChannelPublicKeys { @@ -7370,7 +7372,9 @@ mod tests { SecretKey::from_slice(&[41; 32]).unwrap(), [41; 32], [0; 32], + std::path::PathBuf::new(), [0; 32], + None, ); let counterparty_pubkeys = ChannelPublicKeys { diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index fb65aa0f1..a3e91e3ad 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -1304,7 +1304,9 @@ mod tests { SecretKey::from_slice(&[41; 32]).unwrap(), [41; 32], [0; 32], + std::path::PathBuf::new(), [0; 32], + None, ); let counterparty_pubkeys = ChannelPublicKeys { funding_pubkey: PublicKey::from_secret_key( diff --git a/lightning/src/ln/chan_utils.rs b/lightning/src/ln/chan_utils.rs index 3e337e1aa..a26abb6e3 100644 --- a/lightning/src/ln/chan_utils.rs +++ b/lightning/src/ln/chan_utils.rs @@ -36,6 +36,7 @@ use crate::ln::msgs::DecodeError; use crate::rgb_utils::{color_htlc, is_tx_colored}; use crate::sign::EntropySource; use crate::types::payment::{PaymentHash, PaymentPreimage}; +use crate::util::persist::KVStoreSync; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, Writeable, Writer}; use crate::util::transaction_utils; @@ -2156,6 +2157,7 @@ impl<'a> TrustedCommitmentTransaction<'a> { pub fn get_htlc_sigs( &self, htlc_base_key: &SecretKey, channel_parameters: &DirectedChannelTransactionParameters, entropy_source: &ES, secp_ctx: &Secp256k1, ldk_data_dir: &PathBuf, + rgb_kv_store: &dyn KVStoreSync, ) -> Result, ()> where ES::Target: EntropySource { let inner = self.inner; let keys = &inner.keys; @@ -2167,7 +2169,7 @@ impl<'a> TrustedCommitmentTransaction<'a> { assert!(this_htlc.transaction_output_index.is_some()); let mut htlc_tx = build_htlc_transaction(&txid, inner.feerate_per_kw, channel_parameters.contest_delay(), &this_htlc, &self.channel_type_features, &keys.broadcaster_delayed_payment_key, &keys.revocation_key); if inner.is_colored() { - if let Err(_e) = color_htlc(&mut htlc_tx, this_htlc, ldk_data_dir) { + if let Err(_e) = color_htlc(&mut htlc_tx, this_htlc, ldk_data_dir, rgb_kv_store) { return Err(()); } } diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 92342fe5a..f23c2023b 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -78,9 +78,8 @@ use crate::ln::types::ChannelId; use crate::ln::LN_MAX_MSG_LEN; use crate::offers::static_invoice::StaticInvoice; use crate::rgb_utils::{ - color_closing, color_commitment, color_htlc, get_rgb_channel_info_path, - get_rgb_channel_info_pending, parse_rgb_channel_info, rename_rgb_files, - update_rgb_channel_amount_pending, + color_closing, color_commitment, color_htlc, get_rgb_channel_info_pending, + read_rgb_channel_info, rename_rgb_files, update_rgb_channel_amount_pending, }; use crate::routing::gossip::NodeId; use crate::sign::ecdsa::EcdsaChannelSigner; @@ -104,6 +103,8 @@ use crate::prelude::*; use crate::sign::type_resolver::ChannelSignerType; #[cfg(any(test, fuzzing, debug_assertions))] use crate::sync::Mutex; +use crate::sync::Arc; +use crate::util::persist::KVStoreSync; use core::ops::Deref; use core::time::Duration; use core::{cmp, fmt, mem}; @@ -3132,6 +3133,9 @@ where pub(super) consignment_endpoint: Option, pub(crate) ldk_data_dir: PathBuf, + + /// Optional KVStore for RGB data persistence + pub(crate) rgb_kv_store: Arc, } /// A channel struct implementing this trait can receive an initial counterparty commitment @@ -3232,7 +3236,7 @@ where let temporary_channel_id = context.channel_id; context.channel_id = channel_id; if context.is_colored() { - rename_rgb_files(&context.channel_id, &temporary_channel_id, &context.ldk_data_dir); + rename_rgb_files(&context.channel_id, &temporary_channel_id, context.rgb_kv_store.as_ref()); } assert!(!context.channel_state.is_monitor_update_in_progress()); // We have not had any monitor(s) yet to fail update! @@ -3401,6 +3405,7 @@ where msg_push_msat: u64, open_channel_fields: msgs::CommonOpenChannelFields, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Result<(FundingScope, ChannelContext), ChannelError> where ES::Target: EntropySource, @@ -3723,6 +3728,7 @@ where consignment_endpoint: open_channel_fields.consignment_endpoint, ldk_data_dir, + rgb_kv_store, }; Ok((funding, channel_context)) @@ -3748,6 +3754,7 @@ where _logger: L, consignment_endpoint: Option, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Result<(FundingScope, ChannelContext), APIError> where ES::Target: EntropySource, @@ -3966,6 +3973,7 @@ where consignment_endpoint, ldk_data_dir, + rgb_kv_store, }; Ok((funding, channel_context)) @@ -4344,13 +4352,8 @@ where /// Get the channel local RGB amount pub fn get_local_rgb_amount(&self) -> u64 { - let info_file_path = get_rgb_channel_info_path( - &self.channel_id.0.as_hex().to_string(), - &self.ldk_data_dir, - false, - ); - if info_file_path.exists() { - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = self.channel_id.0.as_hex().to_string(); + if let Ok(rgb_info) = read_rgb_channel_info(self.rgb_kv_store.as_ref(), &channel_id_str, false) { rgb_info.local_rgb_amount } else { 0 @@ -4359,13 +4362,8 @@ where /// Get the channel remote RGB amount pub fn get_remote_rgb_amount(&self) -> u64 { - let info_file_path = get_rgb_channel_info_path( - &self.channel_id.0.as_hex().to_string(), - &self.ldk_data_dir, - false, - ); - if info_file_path.exists() { - let rgb_info = parse_rgb_channel_info(&info_file_path); + let channel_id_str = self.channel_id.0.as_hex().to_string(); + if let Ok(rgb_info) = read_rgb_channel_info(self.rgb_kv_store.as_ref(), &channel_id_str, false) { rgb_info.remote_rgb_amount } else { 0 @@ -5080,7 +5078,7 @@ where &holder_keys.revocation_key, ); if self.is_colored() { - color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir) + color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir, self.rgb_kv_store.as_ref()) .expect("successful htlc coloring"); } @@ -7324,6 +7322,7 @@ where &self.context.channel_id, &mut closing_transaction, &self.context.ldk_data_dir, + self.context.rgb_kv_store.as_ref(), ) .expect("successful closing TX coloring"); } @@ -8914,7 +8913,7 @@ where &self.context.channel_id, rgb_offered_htlc, rgb_received_htlc, - &self.context.ldk_data_dir, + self.context.rgb_kv_store.as_ref(), ); } @@ -11729,7 +11728,7 @@ where let were_node_one = node_id.as_slice() < counterparty_node_id.as_slice(); let contract_id = if self.context.is_colored() { - let (rgb_info, _) = get_rgb_channel_info_pending(&self.context.channel_id, &self.context.ldk_data_dir); + let rgb_info = get_rgb_channel_info_pending(&self.context.channel_id, self.context.rgb_kv_store.as_ref()); Some(rgb_info.contract_id) } else { None @@ -12883,7 +12882,7 @@ where } } if self.context.is_colored() && rgb_received_htlc > 0 { - update_rgb_channel_amount_pending(&self.context.channel_id, 0, rgb_received_htlc, &self.context.ldk_data_dir); + update_rgb_channel_amount_pending(&self.context.channel_id, 0, rgb_received_htlc, self.context.rgb_kv_store.as_ref()); } if let Some((feerate, update_state)) = self.context.pending_update_fee { if update_state == FeeUpdateState::AwaitingRemoteRevokeToAnnounce { @@ -13550,6 +13549,7 @@ where fee_estimator: &LowerBoundedFeeEstimator, entropy_source: &ES, signer_provider: &SP, counterparty_node_id: PublicKey, their_features: &InitFeatures, channel_value_satoshis: u64, push_msat: u64, user_id: u128, config: &UserConfig, current_chain_height: u32, outbound_scid_alias: u64, temporary_channel_id: Option, logger: L, consignment_endpoint: Option, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Result, APIError> where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -13589,6 +13589,7 @@ where logger, consignment_endpoint, ldk_data_dir, + rgb_kv_store, )?; let unfunded_context = UnfundedChannelContext { unfunded_channel_age_ticks: 0, @@ -13672,7 +13673,7 @@ where let temporary_channel_id = self.context.channel_id; self.context.channel_id = ChannelId::v1_from_funding_outpoint(funding_txo); if self.context.is_colored() { - rename_rgb_files(&self.context.channel_id, &temporary_channel_id, &self.context.ldk_data_dir); + rename_rgb_files(&self.context.channel_id, &temporary_channel_id, self.context.rgb_kv_store.as_ref()); } // If the funding transaction is a coinbase transaction, we need to set the minimum depth to 100. @@ -13925,7 +13926,8 @@ where fee_estimator: &LowerBoundedFeeEstimator, entropy_source: &ES, signer_provider: &SP, counterparty_node_id: PublicKey, our_supported_features: &ChannelTypeFeatures, their_features: &InitFeatures, msg: &msgs::OpenChannel, user_id: u128, config: &UserConfig, - current_chain_height: u32, logger: &L, is_0conf: bool, ldk_data_dir: PathBuf + current_chain_height: u32, logger: &L, is_0conf: bool, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Result, ChannelError> where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -13966,6 +13968,7 @@ where msg.push_msat, msg.common_fields.clone(), ldk_data_dir, + rgb_kv_store, )?; let unfunded_context = UnfundedChannelContext { unfunded_channel_age_ticks: 0, @@ -14166,7 +14169,7 @@ where counterparty_node_id: PublicKey, their_features: &InitFeatures, funding_satoshis: u64, funding_inputs: Vec, user_id: u128, config: &UserConfig, current_chain_height: u32, outbound_scid_alias: u64, funding_confirmation_target: ConfirmationTarget, - logger: L, ldk_data_dir: PathBuf, + logger: L, ldk_data_dir: PathBuf, rgb_kv_store: Arc, ) -> Result where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -14209,6 +14212,7 @@ where // ok to pass consignment_endpoint as None since this method is unused None, ldk_data_dir, + rgb_kv_store, )?; let unfunded_context = UnfundedChannelContext { unfunded_channel_age_ticks: 0, @@ -14319,7 +14323,7 @@ where holder_node_id: PublicKey, counterparty_node_id: PublicKey, our_supported_features: &ChannelTypeFeatures, their_features: &InitFeatures, msg: &msgs::OpenChannelV2, user_id: u128, config: &UserConfig, current_chain_height: u32, logger: &L, - ldk_data_dir: PathBuf, + ldk_data_dir: PathBuf, rgb_kv_store: Arc, ) -> Result where ES::Target: EntropySource, F::Target: FeeEstimator, @@ -14365,6 +14369,7 @@ where 0 /* push_msat not used in dual-funding */, msg.common_fields.clone(), ldk_data_dir, + rgb_kv_store, )?; let channel_id = ChannelId::v2_from_revocation_basepoints( &funding.get_holder_pubkeys().revocation_basepoint, @@ -15054,16 +15059,16 @@ where } } -impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf)> +impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf, Arc)> for FundedChannel where ES::Target: EntropySource, SP::Target: SignerProvider, { fn read( - reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf), + reader: &mut R, args: (&'a ES, &'b SP, &'c ChannelTypeFeatures, PathBuf, Arc), ) -> Result { - let (entropy_source, signer_provider, our_supported_features, ldk_data_dir) = args; + let (entropy_source, signer_provider, our_supported_features, ldk_data_dir, rgb_kv_store) = args; let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); if ver <= 2 { return Err(DecodeError::UnknownVersion); @@ -15859,6 +15864,7 @@ where consignment_endpoint, ldk_data_dir, + rgb_kv_store, }, holder_commitment_point, pending_splice, @@ -16770,7 +16776,9 @@ mod tests { // These aren't set in the test vectors: [0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff], [0; 32], + std::path::PathBuf::new(), [0; 32], + None, ); let holder_pubkeys = signer.pubkeys(&secp_ctx); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 01e65f815..b97237ea8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -120,8 +120,8 @@ use crate::onion_message::messenger::{ }; use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; use crate::rgb_utils::{ - get_rgb_channel_info, get_rgb_payment_info_path, handle_funding, is_channel_rgb, - parse_rgb_payment_info, + get_rgb_channel_info, handle_funding, is_channel_rgb, + read_rgb_payment_info, }; use crate::routing::router::{ BlindedTail, FixedRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, @@ -139,6 +139,7 @@ use crate::types::string::UntrustedString; use crate::util::config::{ChannelConfig, ChannelConfigOverrides, ChannelConfigUpdate, UserConfig}; use crate::util::errors::APIError; use crate::util::logger::{Level, Logger, WithContext}; +use crate::util::persist::KVStoreSync; use crate::util::scid_utils::fake_scid; use crate::util::ser::{ BigSize, FixedLengthReader, LengthReadable, MaybeReadable, Readable, ReadableArgs, VecWriter, @@ -2953,6 +2954,10 @@ pub struct ChannelManager< logger: L, ldk_data_dir: PathBuf, + + /// Optional KVStore for RGB data persistence. + /// When set, RGB data will be stored in the database instead of filesystem. + rgb_kv_store: Arc, } /// Chain-related parameters used to construct a new `ChannelManager`. @@ -3976,7 +3981,8 @@ where pub fn new( fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, message_router: MR, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, - params: ChainParameters, current_timestamp: u32, ldk_data_dir: PathBuf + params: ChainParameters, current_timestamp: u32, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self where L: Clone, @@ -4005,7 +4011,7 @@ where best_block: RwLock::new(params.best_block), outbound_scid_aliases: Mutex::new(new_hash_set()), - pending_outbound_payments: OutboundPayments::new(new_hash_map(), logger.clone(), ldk_data_dir.clone()), + pending_outbound_payments: OutboundPayments::new(new_hash_map(), logger.clone(), rgb_kv_store.clone()), forward_htlcs: Mutex::new(new_hash_map()), decode_update_add_htlcs: Mutex::new(new_hash_map()), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }), @@ -4052,6 +4058,7 @@ where testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), ldk_data_dir, + rgb_kv_store, } } @@ -4170,7 +4177,8 @@ where }; match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key, their_features, channel_value_satoshis, push_msat, user_channel_id, config, - self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger, consignment_endpoint, self.ldk_data_dir.clone()) + self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger, consignment_endpoint, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone()) { Ok(res) => res, Err(e) => { @@ -5259,18 +5267,11 @@ where // The top-level caller should hold the total_consistency_lock read lock. debug_assert!(self.total_consistency_lock.try_write().is_err()); - let rgb_payment_info_hash_path_outbound = - get_rgb_payment_info_path(payment_hash, &self.ldk_data_dir, false); - let needs_rgb_modification = if rgb_payment_info_hash_path_outbound.exists() { - let info = parse_rgb_payment_info(&rgb_payment_info_hash_path_outbound); - if !info.swap_payment { - Some(info) - } else { - None - } - } else { - None - }; + let needs_rgb_modification = + match read_rgb_payment_info(self.rgb_kv_store.as_ref(), payment_hash, false) { + Ok(info) if !info.swap_payment => Some(info), + _ => None, + }; let modified_path; let path = if let Some(rgb_payment_info) = needs_rgb_modification { modified_path = { @@ -7552,14 +7553,14 @@ where .contains(&outgoing_amt_msat); if is_in_range && chan.context.is_usable() { if let Some((cid, outgoing_amount_rgb)) = outgoing_rgb_payment { - if !is_channel_rgb(&chan.context.channel_id, &self.ldk_data_dir) + if !is_channel_rgb(&chan.context.channel_id, self.rgb_kv_store.as_ref()) { return None; } - let (rgb_chan_info, _) = get_rgb_channel_info( + let rgb_chan_info = get_rgb_channel_info( &chan.context.channel_id.0.as_hex().to_string(), - &self.ldk_data_dir, false, + self.rgb_kv_store.as_ref(), ); if rgb_chan_info.contract_id == *cid && rgb_chan_info.local_rgb_amount >= *outgoing_amount_rgb @@ -9994,7 +9995,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ InboundV1Channel::new( &self.fee_estimator, &self.entropy_source, &self.signer_provider, *counterparty_node_id, &self.channel_type_features(), &peer_state.latest_features, &open_channel_msg, - user_channel_id, &config, best_block_height, &self.logger, accept_0conf, self.ldk_data_dir.clone() + user_channel_id, &config, best_block_height, &self.logger, accept_0conf, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|err| MsgHandleErrInternal::from_chan_no_close(err, *temporary_channel_id) ).map(|mut channel| { let logger = WithChannelContext::from(&self.logger, &channel.context, None); @@ -10016,6 +10018,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ user_channel_id, &config, best_block_height, &self.logger, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|e| { let channel_id = open_channel_msg.common_fields.temporary_channel_id; MsgHandleErrInternal::from_chan_no_close(e, channel_id) @@ -10285,7 +10288,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ let mut channel = InboundV1Channel::new( &self.fee_estimator, &self.entropy_source, &self.signer_provider, *counterparty_node_id, &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, - &self.config.read().unwrap(), best_block_height, &self.logger, /*is_0conf=*/false, self.ldk_data_dir.clone() + &self.config.read().unwrap(), best_block_height, &self.logger, /*is_0conf=*/false, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.common_fields.temporary_channel_id))?; let logger = WithChannelContext::from(&self.logger, &channel.context, None); let message_send_event = channel.accept_inbound_channel(&&logger).map(|msg| { @@ -10303,6 +10307,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ &peer_state.latest_features, msg, user_channel_id, &self.config.read().unwrap(), best_block_height, &self.logger, self.ldk_data_dir.clone(), + self.rgb_kv_store.clone(), ).map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.common_fields.temporary_channel_id))?; let message_send_event = MessageSendEvent::SendAcceptChannelV2 { node_id: *counterparty_node_id, @@ -10387,7 +10392,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(Ok(inbound_chan)) => { let logger = WithChannelContext::from(&self.logger, &inbound_chan.context, None); if let Some(consignment_endpoint) = &inbound_chan.context.consignment_endpoint { - handle_funding(&msg.temporary_channel_id, msg.funding_txid.to_string(), &self.ldk_data_dir, consignment_endpoint.clone())?; + handle_funding(&msg.temporary_channel_id, msg.funding_txid.to_string(), &self.ldk_data_dir, consignment_endpoint.clone(), self.rgb_kv_store.as_ref())?; } match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &&logger) { Ok(res) => res, @@ -16741,6 +16746,9 @@ pub struct ChannelManagerReadArgs< /// LDK data directory pub ldk_data_dir: PathBuf, + + /// Optional KVStore for RGB data persistence + pub rgb_kv_store: Arc, } impl< @@ -16775,6 +16783,7 @@ where config: UserConfig, mut channel_monitors: Vec<&'a ChannelMonitor<::EcdsaSigner>>, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { Self { entropy_source, @@ -16791,6 +16800,7 @@ where channel_monitors.drain(..).map(|monitor| (monitor.channel_id(), monitor)), ), ldk_data_dir, + rgb_kv_store, } } } @@ -16894,6 +16904,7 @@ where &args.signer_provider, &provided_channel_type_features(&args.config), args.ldk_data_dir.clone(), + args.rgb_kv_store.clone(), ), )?; let logger = WithChannelContext::from(&args.logger, &channel.context, None); @@ -17324,7 +17335,7 @@ where pending_outbound_payments = Some(outbounds); } let pending_outbounds = - OutboundPayments::new(pending_outbound_payments.unwrap(), args.logger.clone(), args.ldk_data_dir.clone()); + OutboundPayments::new(pending_outbound_payments.unwrap(), args.logger.clone(), args.rgb_kv_store.clone()); for (peer_pubkey, peer_storage) in peer_storage_dir { if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) { @@ -18239,6 +18250,7 @@ where testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), ldk_data_dir: args.ldk_data_dir, + rgb_kv_store: args.rgb_kv_store, }; let mut processed_claims: HashSet> = new_hash_set(); diff --git a/lightning/src/ln/outbound_payment.rs b/lightning/src/ln/outbound_payment.rs index 844193c40..6b71ed85f 100644 --- a/lightning/src/ln/outbound_payment.rs +++ b/lightning/src/ln/outbound_payment.rs @@ -27,7 +27,7 @@ use crate::offers::invoice_request::InvoiceRequest; use crate::offers::nonce::Nonce; use crate::offers::static_invoice::StaticInvoice; use crate::rgb_utils::{ - filter_first_hops, get_rgb_payment_info_path, is_payment_rgb, parse_rgb_payment_info, + filter_first_hops, is_payment_rgb, read_rgb_payment_info, }; use crate::routing::router::{ BlindedTail, InFlightHtlcs, Path, PaymentParameters, Route, RouteParameters, @@ -47,10 +47,9 @@ use core::ops::Deref; use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; -use std::path::PathBuf; - use crate::prelude::*; -use crate::sync::Mutex; +use crate::sync::{Arc, Mutex}; +use crate::util::persist::KVStoreSync; /// The number of ticks of [`ChannelManager::timer_tick_occurred`] until we time-out the idempotency /// of payments by [`PaymentId`]. See [`OutboundPayments::remove_stale_payments`]. @@ -850,7 +849,7 @@ where awaiting_invoice: AtomicBool, retry_lock: Mutex<()>, logger: L, - ldk_data_dir: PathBuf, + rgb_kv_store: Arc, } impl OutboundPayments @@ -859,7 +858,7 @@ where { pub(super) fn new( pending_outbound_payments: HashMap, logger: L, - ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { let has_invoice_requests = pending_outbound_payments.values().any(|payment| { matches!( @@ -875,7 +874,7 @@ where awaiting_invoice: AtomicBool::new(has_invoice_requests), retry_lock: Mutex::new(()), logger, - ldk_data_dir, + rgb_kv_store, } } @@ -1047,9 +1046,14 @@ where } let mut filtered_first_hops = first_hops.into_iter().collect::>(); - let rgb_payment = is_payment_rgb(&self.ldk_data_dir, &payment_hash).then(|| { - filter_first_hops(&self.ldk_data_dir, &payment_hash, &mut filtered_first_hops) - }); + let rgb_payment = if is_payment_rgb(self.rgb_kv_store.as_ref(), &payment_hash) { + filter_first_hops(self.rgb_kv_store.as_ref(), &payment_hash, &mut filtered_first_hops); + let info = read_rgb_payment_info(self.rgb_kv_store.as_ref(), &payment_hash, false) + .expect("payment info must exist"); + Some((info.contract_id, info.amount)) + } else { + None + }; let mut route_params = RouteParameters::from_payment_params_and_value( PaymentParameters::from_bolt12_invoice(&invoice) .with_user_config_ignoring_fee_limit(params_config), invoice.amount_msats(), @@ -1399,14 +1403,11 @@ where .. } = pmt { - let rgb_payment_info_path = - get_rgb_payment_info_path(payment_hash, &self.ldk_data_dir, false); - let rgb_payment = if rgb_payment_info_path.exists() { - let rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_path); - Some((rgb_payment_info.contract_id, rgb_payment_info.amount)) - } else { - None - }; + let rgb_payment = + match read_rgb_payment_info(self.rgb_kv_store.as_ref(), payment_hash, false) { + Ok(info) => Some((info.contract_id, info.amount)), + Err(_) => None, + }; if pending_amt_msat < total_msat { retry_id_route_params = Some(( *payment_hash, @@ -1556,9 +1557,9 @@ where SP: Fn(SendAlongPathArgs) -> Result<(), APIError>, { let mut filtered_first_hops = first_hops.into_iter().collect::>(); - is_payment_rgb(&self.ldk_data_dir, &payment_hash).then(|| { - filter_first_hops(&self.ldk_data_dir, &payment_hash, &mut filtered_first_hops) - }); + if is_payment_rgb(self.rgb_kv_store.as_ref(), &payment_hash) { + filter_first_hops(self.rgb_kv_store.as_ref(), &payment_hash, &mut filtered_first_hops); + } let route = self.find_initial_route( payment_id, payment_hash, &recipient_onion, keysend_preimage, None, &mut route_params, router, &filtered_first_hops, &inflight_htlcs, node_signer, best_block_height, @@ -1612,9 +1613,9 @@ where } let mut filtered_first_hops = first_hops.into_iter().collect::>(); - is_payment_rgb(&self.ldk_data_dir, &payment_hash).then(|| { - filter_first_hops(&self.ldk_data_dir, &payment_hash, &mut filtered_first_hops) - }); + if is_payment_rgb(self.rgb_kv_store.as_ref(), &payment_hash) { + filter_first_hops(self.rgb_kv_store.as_ref(), &payment_hash, &mut filtered_first_hops); + } let mut route = match router.find_route_with_id( &node_signer.get_node_id(Recipient::Node).unwrap(), &route_params, @@ -2951,7 +2952,7 @@ mod tests { #[rustfmt::skip] fn do_fails_paying_after_expiration(on_retry: bool) { let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger)); let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph, &logger, &scorer); @@ -2996,7 +2997,7 @@ mod tests { #[rustfmt::skip] fn do_find_route_error(on_retry: bool) { let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger)); let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph, &logger, &scorer); @@ -3035,7 +3036,7 @@ mod tests { #[rustfmt::skip] fn initial_send_payment_path_failed_evs() { let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger)); let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph, &logger, &scorer); @@ -3118,7 +3119,7 @@ mod tests { fn removes_stale_awaiting_invoice_using_absolute_timeout() { let pending_events = Mutex::new(VecDeque::new()); let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let payment_id = PaymentId([0; 32]); let absolute_expiry = 100; let tick_interval = 10; @@ -3174,7 +3175,7 @@ mod tests { fn removes_stale_awaiting_invoice_using_timer_ticks() { let pending_events = Mutex::new(VecDeque::new()); let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let payment_id = PaymentId([0; 32]); let timer_ticks = 3; let expiration = StaleExpiration::TimerTicks(timer_ticks); @@ -3229,7 +3230,7 @@ mod tests { fn removes_abandoned_awaiting_invoice() { let pending_events = Mutex::new(VecDeque::new()); let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let payment_id = PaymentId([0; 32]); let expiration = StaleExpiration::AbsoluteTimeout(Duration::from_secs(100)); @@ -3269,7 +3270,7 @@ mod tests { let nonce = Nonce([0; 16]); let pending_events = Mutex::new(VecDeque::new()); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let payment_id = PaymentId([0; 32]); let expiration = StaleExpiration::AbsoluteTimeout(Duration::from_secs(100)); @@ -3322,7 +3323,7 @@ mod tests { let keys_manager = test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet); let pending_events = Mutex::new(VecDeque::new()); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let expanded_key = ExpandedKey::new([42; 32]); let nonce = Nonce([0; 16]); let payment_id = PaymentId([0; 32]); @@ -3385,7 +3386,7 @@ mod tests { let keys_manager = test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet); let pending_events = Mutex::new(VecDeque::new()); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let expanded_key = ExpandedKey::new([42; 32]); let nonce = Nonce([0; 16]); let payment_id = PaymentId([0; 32]); @@ -3494,7 +3495,7 @@ mod tests { fn time_out_unreleased_async_payments() { let pending_events = Mutex::new(VecDeque::new()); let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let payment_id = PaymentId([0; 32]); let absolute_expiry = 60; @@ -3545,7 +3546,7 @@ mod tests { fn abandon_unreleased_async_payment() { let pending_events = Mutex::new(VecDeque::new()); let logger = test_utils::TestLogger::new(); - let outbound_payments = OutboundPayments::new(new_hash_map(), &logger); + let outbound_payments = OutboundPayments::new(new_hash_map(), &logger, None); let payment_id = PaymentId([0; 32]); let absolute_expiry = 60; diff --git a/lightning/src/rgb_utils/mod.rs b/lightning/src/rgb_utils/mod.rs index af38c5678..cb3c9eeb7 100644 --- a/lightning/src/rgb_utils/mod.rs +++ b/lightning/src/rgb_utils/mod.rs @@ -11,6 +11,7 @@ use crate::ln::types::ChannelId; use crate::sign::SignerProvider; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentHash; +use crate::util::persist::KVStoreSync; use bitcoin::blockdata::transaction::Transaction; use bitcoin::hex::DisplayHex; @@ -29,13 +30,15 @@ use rgb_lib::{ use serde::{Deserialize, Serialize}; use tokio::runtime::Handle; +use crate::io; use core::ops::Deref; use std::collections::HashMap; use std::fs; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::Arc; -/// Static blinding costant (will be removed in the future) +/// Static blinding constant (will be removed in the future) pub const STATIC_BLINDING: u64 = 777; /// Name of the file containing the bitcoin network pub const BITCOIN_NETWORK_FNAME: &str = "bitcoin_network"; @@ -49,8 +52,22 @@ pub const WALLET_ACCOUNT_XPUB_VANILLA_FNAME: &str = "wallet_account_xpub_vanilla pub const WALLET_ACCOUNT_XPUB_COLORED_FNAME: &str = "wallet_account_xpub_colored"; /// Name of the file containing the master fingerprint of the wallet pub const WALLET_MASTER_FINGERPRINT_FNAME: &str = "wallet_master_fingerprint"; -const INBOUND_EXT: &str = "inbound"; -const OUTBOUND_EXT: &str = "outbound"; + +// KVStore namespace constants for RGB data persistence +/// Primary namespace for all RGB data +pub const RGB_PRIMARY_NS: &str = "rgb"; +/// Secondary namespace for channel info +pub const RGB_CHANNEL_INFO_NS: &str = "channel_info"; +/// Secondary namespace for pending channel info +pub const RGB_CHANNEL_INFO_PENDING_NS: &str = "channel_info_pending"; +/// Secondary namespace for inbound payment info +pub const RGB_PAYMENT_INFO_INBOUND_NS: &str = "payment_info_inbound"; +/// Secondary namespace for outbound payment info +pub const RGB_PAYMENT_INFO_OUTBOUND_NS: &str = "payment_info_outbound"; +/// Secondary namespace for transfer info +pub const RGB_TRANSFER_INFO_NS: &str = "transfer_info"; +/// Secondary namespace for consignment data +pub const RGB_CONSIGNMENT_NS: &str = "consignment"; /// RGB channel info #[derive(Debug, Clone, Deserialize, Serialize)] @@ -220,18 +237,6 @@ async fn _accept_transfer( .unwrap() } -/// Read TransferInfo file -pub fn read_rgb_transfer_info(path: &Path) -> TransferInfo { - let serialized_info = fs::read_to_string(path).expect("able to read transfer info file"); - serde_json::from_str(&serialized_info).expect("valid transfer info") -} - -/// Write TransferInfo file -pub fn write_rgb_transfer_info(path: &PathBuf, info: &TransferInfo) { - let serialized_info = serde_json::to_string(&info).expect("valid transfer info"); - fs::write(path, serialized_info).expect("able to write transfer info file") -} - fn _counterparty_output_index( outputs: &[TxOut], channel_type_features: &ChannelTypeFeatures, payment_key: &PublicKey, ) -> Option { @@ -264,10 +269,11 @@ where { let channel_id = &channel_context.channel_id; let ldk_data_dir = channel_context.ldk_data_dir.as_path(); + let kv_store = channel_context.rgb_kv_store.as_ref(); let commitment_tx = commitment_transaction.clone().built.transaction; - let (rgb_info, _) = get_rgb_channel_info_pending(channel_id, ldk_data_dir); + let rgb_info = get_rgb_channel_info_pending(channel_id, kv_store); let contract_id = rgb_info.contract_id; let chan_id = channel_id.0.as_hex(); @@ -287,32 +293,32 @@ where let inbound = htlc.offered == counterparty; let htlc_payment_hash = htlc.payment_hash.0.as_hex().to_string(); - let htlc_proxy_id = format!("{chan_id}{htlc_payment_hash}"); - let mut rgb_payment_info_proxy_id_path = ldk_data_dir.join(htlc_proxy_id); - let rgb_payment_info_path = ldk_data_dir.join(htlc_payment_hash); - let mut rgb_payment_info_path = rgb_payment_info_path.clone(); - if inbound { - rgb_payment_info_proxy_id_path.set_extension(INBOUND_EXT); - rgb_payment_info_path.set_extension(INBOUND_EXT); - } else { - rgb_payment_info_proxy_id_path.set_extension(OUTBOUND_EXT); - rgb_payment_info_path.set_extension(OUTBOUND_EXT); - } - let rgb_payment_info_tmp_path = _append_pending_extension(&rgb_payment_info_path); - - if rgb_payment_info_tmp_path.exists() { - let mut rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_tmp_path); + let htlc_proxy_key = format!("{}_{}", chan_id, htlc_payment_hash); + let pending_key = format!("{}_pending", htlc_payment_hash); + let namespace = + if inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; + + // Check if pending payment info exists + if let Ok(data) = kv_store.read(RGB_PRIMARY_NS, namespace, &pending_key) { + let serialized_info = String::from_utf8(data).expect("valid utf8"); + let mut rgb_payment_info: RgbPaymentInfo = + serde_json::from_str(&serialized_info).expect("valid json"); rgb_payment_info.local_rgb_amount = rgb_info.local_rgb_amount; rgb_payment_info.remote_rgb_amount = rgb_info.remote_rgb_amount; let serialized_info = serde_json::to_string(&rgb_payment_info).expect("valid rgb payment info"); - fs::write(&rgb_payment_info_proxy_id_path, serialized_info) - .expect("able to write rgb payment info file"); - fs::remove_file(rgb_payment_info_tmp_path).expect("able to remove file"); + kv_store + .write(RGB_PRIMARY_NS, namespace, &htlc_proxy_key, serialized_info.into_bytes()) + .expect("able to write rgb payment info"); + let _ = kv_store.remove(RGB_PRIMARY_NS, namespace, &pending_key, false); } - let rgb_payment_info = if rgb_payment_info_proxy_id_path.exists() { - parse_rgb_payment_info(&rgb_payment_info_proxy_id_path) + // Try to read from proxy key, or create new + let rgb_payment_info = if let Ok(data) = + kv_store.read(RGB_PRIMARY_NS, namespace, &htlc_proxy_key) + { + let serialized_info = String::from_utf8(data).expect("valid utf8"); + serde_json::from_str(&serialized_info).expect("valid json") } else { let rgb_payment_info = RgbPaymentInfo { contract_id, @@ -324,10 +330,19 @@ where }; let serialized_info = serde_json::to_string(&rgb_payment_info).expect("valid rgb payment info"); - fs::write(rgb_payment_info_proxy_id_path, serialized_info.clone()) - .expect("able to write rgb payment info file"); - fs::write(rgb_payment_info_path, serialized_info) - .expect("able to write rgb payment info file"); + // Write to proxy key + kv_store + .write( + RGB_PRIMARY_NS, + namespace, + &htlc_proxy_key, + serialized_info.clone().into_bytes(), + ) + .expect("able to write rgb payment info"); + // Write to main key + kv_store + .write(RGB_PRIMARY_NS, namespace, &htlc_payment_hash, serialized_info.into_bytes()) + .expect("able to write rgb payment info"); rgb_payment_info }; @@ -402,15 +417,15 @@ where ) .unwrap(); - // save RGB transfer data to disk + // save RGB transfer data to database let rgb_amount = if counterparty { vout_p2wpkh_amt + rgb_offered_htlc } else { vout_p2wsh_amt + rgb_received_htlc }; let transfer_info = TransferInfo { contract_id, rgb_amount }; - let transfer_info_path = ldk_data_dir.join(format!("{txid}_transfer_info")); - write_rgb_transfer_info(&transfer_info_path, &transfer_info); + let txid_str = txid.to_string(); + write_rgb_transfer_info(kv_store, &txid_str, &transfer_info).expect("KVStore write failed"); Ok(()) } @@ -418,6 +433,7 @@ where /// Color HTLC transaction pub(crate) fn color_htlc( htlc_tx: &mut Transaction, htlc: &HTLCOutputInCommitment, ldk_data_dir: &Path, + kv_store: &dyn KVStoreSync, ) -> Result<(), ChannelError> { if htlc.rgb_payment.is_none_or(|(_, a)| a == 0) { return Ok(()); @@ -427,8 +443,8 @@ pub(crate) fn color_htlc( let consignment_htlc_outpoint = htlc_tx.input.first().unwrap().previous_output; let commitment_txid = consignment_htlc_outpoint.txid.to_string(); - let transfer_info_path = ldk_data_dir.join(format!("{commitment_txid}_transfer_info")); - let transfer_info = read_rgb_transfer_info(&transfer_info_path); + let transfer_info = read_rgb_transfer_info(kv_store, &commitment_txid) + .expect("transfer info must exist in KVStore"); let contract_id = transfer_info.contract_id; let asset_coloring_info = AssetColoringInfo { @@ -462,10 +478,10 @@ pub(crate) fn color_htlc( ) .unwrap(); - // save RGB transfer data to disk + // save RGB transfer data let transfer_info = TransferInfo { contract_id, rgb_amount: htlc_amount_rgb }; - let transfer_info_path = ldk_data_dir.join(format!("{txid}_transfer_info")); - write_rgb_transfer_info(&transfer_info_path, &transfer_info); + let txid_str = txid.to_string(); + write_rgb_transfer_info(kv_store, &txid_str, &transfer_info).expect("KVStore write failed"); Ok(()) } @@ -473,10 +489,11 @@ pub(crate) fn color_htlc( /// Color closing transaction pub(crate) fn color_closing( channel_id: &ChannelId, closing_transaction: &mut ClosingTransaction, ldk_data_dir: &Path, + kv_store: &dyn KVStoreSync, ) -> Result<(), ChannelError> { let closing_tx = closing_transaction.clone().built; - let (rgb_info, _) = get_rgb_channel_info_pending(channel_id, ldk_data_dir); + let rgb_info = get_rgb_channel_info_pending(channel_id, kv_store); let contract_id = rgb_info.contract_id; let holder_vout_amount = rgb_info.local_rgb_amount; @@ -533,85 +550,38 @@ pub(crate) fn color_closing( ) .unwrap(); - // save RGB transfer data to disk + // save RGB transfer data let transfer_info = TransferInfo { contract_id, rgb_amount: holder_vout_amount }; - let transfer_info_path = ldk_data_dir.join(format!("{txid}_transfer_info")); - write_rgb_transfer_info(&transfer_info_path, &transfer_info); + let txid_str = txid.to_string(); + write_rgb_transfer_info(kv_store, &txid_str, &transfer_info).expect("KVStore write failed"); Ok(()) } -/// Get RgbPaymentInfo file path -pub fn get_rgb_payment_info_path( - payment_hash: &PaymentHash, ldk_data_dir: &Path, inbound: bool, -) -> PathBuf { - let mut path = ldk_data_dir.join(payment_hash.0.as_hex().to_string()); - path.set_extension(if inbound { INBOUND_EXT } else { OUTBOUND_EXT }); - path -} - -/// Parse RgbPaymentInfo -pub fn parse_rgb_payment_info(rgb_payment_info_path: &PathBuf) -> RgbPaymentInfo { - let serialized_info = - fs::read_to_string(rgb_payment_info_path).expect("valid rgb payment info"); - serde_json::from_str(&serialized_info).expect("valid rgb info file") -} - -/// Get RgbInfo file path -pub fn get_rgb_channel_info_path(channel_id: &str, ldk_data_dir: &Path, pending: bool) -> PathBuf { - let mut info_file_path = ldk_data_dir.join(channel_id); - if pending { - info_file_path.set_extension("pending"); - } - info_file_path -} - -/// Get RgbInfo file +/// Get RgbInfo from KVStore pub(crate) fn get_rgb_channel_info( - channel_id: &str, ldk_data_dir: &Path, pending: bool, -) -> (RgbInfo, PathBuf) { - let info_file_path = get_rgb_channel_info_path(channel_id, ldk_data_dir, pending); - let info = parse_rgb_channel_info(&info_file_path); - (info, info_file_path) + channel_id: &str, pending: bool, kv_store: &dyn KVStoreSync, +) -> RgbInfo { + read_rgb_channel_info(kv_store, channel_id, pending) + .expect("channel info must exist in KVStore") } -/// Get pending RgbInfo file -pub fn get_rgb_channel_info_pending( - channel_id: &ChannelId, ldk_data_dir: &Path, -) -> (RgbInfo, PathBuf) { - get_rgb_channel_info(&channel_id.0.as_hex().to_string(), ldk_data_dir, true) +/// Get pending RgbInfo from KVStore +pub fn get_rgb_channel_info_pending(channel_id: &ChannelId, kv_store: &dyn KVStoreSync) -> RgbInfo { + get_rgb_channel_info(&channel_id.0.as_hex().to_string(), true, kv_store) } -/// Parse RgbInfo -pub fn parse_rgb_channel_info(rgb_channel_info_path: &PathBuf) -> RgbInfo { - let serialized_info = fs::read_to_string(rgb_channel_info_path).expect("valid rgb info file"); - serde_json::from_str(&serialized_info).expect("valid rgb info file") +/// Whether the channel has RGB data in KVStore +pub fn is_channel_rgb(channel_id: &ChannelId, kv_store: &dyn KVStoreSync) -> bool { + let channel_id_str = channel_id.0.as_hex().to_string(); + read_rgb_channel_info(kv_store, &channel_id_str, false).is_ok() } -/// Whether the channel data for a channel exist -pub fn is_channel_rgb(channel_id: &ChannelId, ldk_data_dir: &Path) -> bool { - get_rgb_channel_info_path(&channel_id.0.as_hex().to_string(), ldk_data_dir, false).exists() -} - -/// Write RgbInfo file -pub fn write_rgb_channel_info(path: &PathBuf, rgb_info: &RgbInfo) { - let serialized_info = serde_json::to_string(&rgb_info).expect("valid rgb info"); - fs::write(path, serialized_info).expect("able to write") -} - -fn _append_pending_extension(path: &Path) -> PathBuf { - let mut new_path = path.to_path_buf(); - new_path.set_extension(format!("{}_pending", new_path.extension().unwrap().to_string_lossy())); - new_path -} - -/// Write RGB payment info to file +/// Write RGB payment info to database pub fn write_rgb_payment_info_file( - ldk_data_dir: &Path, payment_hash: &PaymentHash, contract_id: ContractId, amount_rgb: u64, - swap_payment: bool, inbound: bool, + payment_hash: &PaymentHash, contract_id: ContractId, amount_rgb: u64, swap_payment: bool, + inbound: bool, kv_store: &Arc, ) { - let rgb_payment_info_path = get_rgb_payment_info_path(payment_hash, ldk_data_dir, inbound); - let rgb_payment_info_tmp_path = _append_pending_extension(&rgb_payment_info_path); let rgb_payment_info = RgbPaymentInfo { contract_id, amount: amount_rgb, @@ -620,42 +590,51 @@ pub fn write_rgb_payment_info_file( swap_payment, inbound, }; + // Write to main key + write_rgb_payment_info(kv_store.as_ref(), payment_hash, &rgb_payment_info) + .expect("able to write rgb payment info"); + // Write to pending key (will be processed by color_commitment) + let pending_key = format!("{}_pending", payment_hash.0.as_hex()); + let namespace = + if inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; let serialized_info = serde_json::to_string(&rgb_payment_info).expect("valid rgb payment info"); - std::fs::write(rgb_payment_info_path, serialized_info.clone()) - .expect("able to write rgb payment info file"); - std::fs::write(rgb_payment_info_tmp_path, serialized_info) - .expect("able to write rgb payment info tmp file"); + kv_store + .write(RGB_PRIMARY_NS, namespace, &pending_key, serialized_info.into_bytes()) + .expect("able to write rgb payment info pending"); } -/// Rename RGB files from temporary to final channel ID +/// Rename RGB channel info from temporary to final channel ID in KVStore +/// Rename RGB channel info from temporary to final channel ID in KVStore pub(crate) fn rename_rgb_files( - channel_id: &ChannelId, temporary_channel_id: &ChannelId, ldk_data_dir: &Path, + channel_id: &ChannelId, temporary_channel_id: &ChannelId, kv_store: &dyn KVStoreSync, ) { let temp_chan_id = temporary_channel_id.0.as_hex().to_string(); let chan_id = channel_id.0.as_hex().to_string(); - fs::rename( - get_rgb_channel_info_path(&temp_chan_id, ldk_data_dir, false), - get_rgb_channel_info_path(&chan_id, ldk_data_dir, false), - ) - .expect("rename ok"); - fs::rename( - get_rgb_channel_info_path(&temp_chan_id, ldk_data_dir, true), - get_rgb_channel_info_path(&chan_id, ldk_data_dir, true), - ) - .expect("rename ok"); + // Skip if the channel IDs are the same (nothing to rename) + if temp_chan_id == chan_id { + return; + } - let funding_consignment_tmp = ldk_data_dir.join(format!("consignment_{}", temp_chan_id)); - if funding_consignment_tmp.exists() { - let funding_consignment = ldk_data_dir.join(format!("consignment_{}", chan_id)); - fs::rename(funding_consignment_tmp, funding_consignment).expect("rename ok"); + if let Ok(rgb_info) = read_rgb_channel_info(kv_store, &temp_chan_id, false) { + let _ = write_rgb_channel_info(kv_store, &chan_id, &rgb_info, false); + let _ = kv_store.remove(RGB_PRIMARY_NS, RGB_CHANNEL_INFO_NS, &temp_chan_id, false); + } + if let Ok(rgb_info) = read_rgb_channel_info(kv_store, &temp_chan_id, true) { + let _ = write_rgb_channel_info(kv_store, &chan_id, &rgb_info, true); + let _ = kv_store.remove(RGB_PRIMARY_NS, RGB_CHANNEL_INFO_PENDING_NS, &temp_chan_id, false); + } + + if let Ok(consignment_data) = read_rgb_consignment(kv_store, &temp_chan_id) { + let _ = write_rgb_consignment(kv_store, &chan_id, consignment_data); + let _ = remove_rgb_consignment(kv_store, &temp_chan_id); } } /// Handle funding on the receiver side pub(crate) fn handle_funding( temporary_channel_id: &ChannelId, funding_txid: String, ldk_data_dir: &Path, - consignment_endpoint: RgbTransport, + consignment_endpoint: RgbTransport, kv_store: &dyn KVStoreSync, ) -> Result<(), MsgHandleErrInternal> { let handle = Handle::current(); let _ = handle.enter(); @@ -722,24 +701,22 @@ pub(crate) fn handle_funding( remote_rgb_amount, }; let temporary_channel_id_str = temporary_channel_id.0.as_hex().to_string(); - write_rgb_channel_info( - &get_rgb_channel_info_path(&temporary_channel_id_str, ldk_data_dir, true), - &rgb_info, - ); - write_rgb_channel_info( - &get_rgb_channel_info_path(&temporary_channel_id_str, ldk_data_dir, false), - &rgb_info, - ); + + // Write to KVStore + write_rgb_channel_info(kv_store, &temporary_channel_id_str, &rgb_info, true) + .expect("KVStore write failed"); + write_rgb_channel_info(kv_store, &temporary_channel_id_str, &rgb_info, false) + .expect("KVStore write failed"); Ok(()) } -/// Update RGB channel amount +/// Update RGB channel amount in KVStore pub fn update_rgb_channel_amount( - channel_id: &str, rgb_offered_htlc: u64, rgb_received_htlc: u64, ldk_data_dir: &Path, - pending: bool, + channel_id: &str, rgb_offered_htlc: u64, rgb_received_htlc: u64, pending: bool, + kv_store: &dyn KVStoreSync, ) { - let (mut rgb_info, info_file_path) = get_rgb_channel_info(channel_id, ldk_data_dir, pending); + let mut rgb_info = get_rgb_channel_info(channel_id, pending, kv_store); if rgb_offered_htlc > rgb_received_htlc { let spent = rgb_offered_htlc - rgb_received_htlc; @@ -751,45 +728,143 @@ pub fn update_rgb_channel_amount( rgb_info.remote_rgb_amount -= received; } - write_rgb_channel_info(&info_file_path, &rgb_info) + write_rgb_channel_info(kv_store, channel_id, &rgb_info, pending).expect("KVStore write failed"); } /// Update pending RGB channel amount pub(crate) fn update_rgb_channel_amount_pending( - channel_id: &ChannelId, rgb_offered_htlc: u64, rgb_received_htlc: u64, ldk_data_dir: &Path, + channel_id: &ChannelId, rgb_offered_htlc: u64, rgb_received_htlc: u64, + kv_store: &dyn KVStoreSync, ) { update_rgb_channel_amount( &channel_id.0.as_hex().to_string(), rgb_offered_htlc, rgb_received_htlc, - ldk_data_dir, true, + kv_store, ) } +/// Read TransferInfo from KVStore +pub fn read_rgb_transfer_info( + kv_store: &K, txid: &str, +) -> Result { + let data = kv_store.read(RGB_PRIMARY_NS, RGB_TRANSFER_INFO_NS, txid)?; + let serialized_info = + String::from_utf8(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + serde_json::from_str(&serialized_info) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} + +/// Write TransferInfo to KVStore +pub fn write_rgb_transfer_info( + kv_store: &K, txid: &str, info: &TransferInfo, +) -> Result<(), io::Error> { + let serialized_info = + serde_json::to_string(info).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + kv_store.write(RGB_PRIMARY_NS, RGB_TRANSFER_INFO_NS, txid, serialized_info.into_bytes()) +} + +/// Read RgbInfo (channel info) from KVStore +pub fn read_rgb_channel_info( + kv_store: &K, channel_id: &str, pending: bool, +) -> Result { + let namespace = if pending { RGB_CHANNEL_INFO_PENDING_NS } else { RGB_CHANNEL_INFO_NS }; + let data = kv_store.read(RGB_PRIMARY_NS, namespace, channel_id)?; + let serialized_info = + String::from_utf8(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + serde_json::from_str(&serialized_info) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} + +/// Write RgbInfo (channel info) to KVStore +pub fn write_rgb_channel_info( + kv_store: &K, channel_id: &str, rgb_info: &RgbInfo, pending: bool, +) -> Result<(), io::Error> { + let namespace = if pending { RGB_CHANNEL_INFO_PENDING_NS } else { RGB_CHANNEL_INFO_NS }; + let serialized_info = serde_json::to_string(rgb_info) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + kv_store.write(RGB_PRIMARY_NS, namespace, channel_id, serialized_info.into_bytes()) +} + +/// Read RgbPaymentInfo from KVStore +pub fn read_rgb_payment_info( + kv_store: &K, payment_hash: &PaymentHash, inbound: bool, +) -> Result { + let namespace = + if inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; + let key = payment_hash.0.as_hex().to_string(); + let data = kv_store.read(RGB_PRIMARY_NS, namespace, &key)?; + let serialized_info = + String::from_utf8(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + serde_json::from_str(&serialized_info) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} + +/// Write RgbPaymentInfo to KVStore +pub fn write_rgb_payment_info( + kv_store: &K, payment_hash: &PaymentHash, info: &RgbPaymentInfo, +) -> Result<(), io::Error> { + let namespace = + if info.inbound { RGB_PAYMENT_INFO_INBOUND_NS } else { RGB_PAYMENT_INFO_OUTBOUND_NS }; + let key = payment_hash.0.as_hex().to_string(); + let serialized_info = + serde_json::to_string(info).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + kv_store.write(RGB_PRIMARY_NS, namespace, &key, serialized_info.into_bytes()) +} + +/// Read consignment from KVStore +pub fn read_rgb_consignment( + kv_store: &K, id: &str, +) -> Result, io::Error> { + kv_store.read(RGB_PRIMARY_NS, RGB_CONSIGNMENT_NS, id) +} + +/// Write consignment to KVStore +pub fn write_rgb_consignment( + kv_store: &K, id: &str, data: Vec, +) -> Result<(), io::Error> { + kv_store.write(RGB_PRIMARY_NS, RGB_CONSIGNMENT_NS, id, data) +} + +/// Remove RGB channel info from KVStore (used when renaming channels) +pub fn remove_rgb_channel_info( + kv_store: &K, channel_id: &str, pending: bool, +) -> Result<(), io::Error> { + let namespace = if pending { RGB_CHANNEL_INFO_PENDING_NS } else { RGB_CHANNEL_INFO_NS }; + kv_store.remove(RGB_PRIMARY_NS, namespace, channel_id, false) +} + +/// Remove consignment from KVStore +pub fn remove_rgb_consignment( + kv_store: &K, id: &str, +) -> Result<(), io::Error> { + kv_store.remove(RGB_PRIMARY_NS, RGB_CONSIGNMENT_NS, id, false) +} + /// Whether the payment is colored -pub(crate) fn is_payment_rgb(ldk_data_dir: &Path, payment_hash: &PaymentHash) -> bool { - get_rgb_payment_info_path(payment_hash, ldk_data_dir, false).exists() - || get_rgb_payment_info_path(payment_hash, ldk_data_dir, true).exists() +pub fn is_payment_rgb(kv_store: &K, payment_hash: &PaymentHash) -> bool { + read_rgb_payment_info(kv_store, payment_hash, false).is_ok() + || read_rgb_payment_info(kv_store, payment_hash, true).is_ok() } -/// Detect the contract ID of the payment and then filter hops based on contract ID and amount -pub(crate) fn filter_first_hops( - ldk_data_dir: &Path, payment_hash: &PaymentHash, first_hops: &mut Vec, -) -> (ContractId, u64) { - let rgb_payment_info_path = get_rgb_payment_info_path(payment_hash, ldk_data_dir, false); - let rgb_payment_info = parse_rgb_payment_info(&rgb_payment_info_path); +/// Filter first hops to only include channels with sufficient RGB assets for the payment +pub fn filter_first_hops( + kv_store: &K, payment_hash: &PaymentHash, first_hops: &mut Vec, +) { + let rgb_payment_info = match read_rgb_payment_info(kv_store, payment_hash, false) { + Ok(info) => info, + Err(_) => return, + }; let contract_id = rgb_payment_info.contract_id; let rgb_amount = rgb_payment_info.amount; first_hops.retain(|h| { - let info_file_path = ldk_data_dir.join(h.channel_id.0.as_hex().to_string()); - if !info_file_path.exists() { - return false; + let channel_id_str = h.channel_id.0.as_hex().to_string(); + match read_rgb_channel_info(kv_store, &channel_id_str, false) { + Ok(rgb_info) => { + rgb_info.contract_id == contract_id && rgb_info.local_rgb_amount >= rgb_amount + }, + Err(_) => false, } - let serialized_info = fs::read_to_string(info_file_path).expect("valid rgb info file"); - let rgb_info: RgbInfo = - serde_json::from_str(&serialized_info).expect("valid rgb info file"); - rgb_info.contract_id == contract_id && rgb_info.local_rgb_amount >= rgb_amount }); - (contract_id, rgb_amount) } diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 58eaeccac..81de392e0 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -38,6 +38,7 @@ use bitcoin::{secp256k1, Psbt, Sequence, Txid, WPubkeyHash, Witness}; use lightning_invoice::RawBolt11Invoice; use std::path::PathBuf; +use std::sync::Arc; use crate::chain::transaction::OutPoint; use crate::crypto::utils::{hkdf_extract_expand_twice, sign, sign_with_aux_rand}; @@ -62,6 +63,7 @@ use crate::rgb_utils::color_htlc; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; use crate::util::async_poll::AsyncResult; +use crate::util::persist::KVStoreSync; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; @@ -1218,6 +1220,8 @@ pub struct InMemorySigner { entropy_source: RandomBytes, /// The LDK data directory ldk_data_dir: PathBuf, + /// RGB KVStore for database access + rgb_kv_store: Arc, } impl PartialEq for InMemorySigner { @@ -1248,6 +1252,7 @@ impl Clone for InMemorySigner { channel_keys_id: self.channel_keys_id, entropy_source: RandomBytes::new(self.get_secure_random_bytes()), ldk_data_dir: self.ldk_data_dir.clone(), + rgb_kv_store: self.rgb_kv_store.clone(), } } } @@ -1259,6 +1264,7 @@ impl InMemorySigner { payment_key_v2: SecretKey, v2_remote_key_derivation: bool, delayed_payment_base_key: SecretKey, htlc_base_key: SecretKey, commitment_seed: [u8; 32], channel_keys_id: [u8; 32], ldk_data_dir: PathBuf, rand_bytes_unique_start: [u8; 32], + rgb_kv_store: Arc, ) -> InMemorySigner { InMemorySigner { funding_key: sealed::MaybeTweakedSecretKey::from(funding_key), @@ -1272,6 +1278,7 @@ impl InMemorySigner { channel_keys_id, entropy_source: RandomBytes::new(rand_bytes_unique_start), ldk_data_dir, + rgb_kv_store, } } @@ -1281,6 +1288,7 @@ impl InMemorySigner { payment_key_v2: SecretKey, v2_remote_key_derivation: bool, delayed_payment_base_key: SecretKey, htlc_base_key: SecretKey, commitment_seed: [u8; 32], channel_keys_id: [u8; 32], ldk_data_dir: PathBuf, rand_bytes_unique_start: [u8; 32], + rgb_kv_store: Arc, ) -> InMemorySigner { InMemorySigner { funding_key: sealed::MaybeTweakedSecretKey::from(funding_key), @@ -1294,6 +1302,7 @@ impl InMemorySigner { channel_keys_id, entropy_source: RandomBytes::new(rand_bytes_unique_start), ldk_data_dir, + rgb_kv_store, } } @@ -1568,7 +1577,7 @@ impl EcdsaChannelSigner for InMemorySigner { &keys.revocation_key, ); if commitment_tx.is_colored() { - if let Err(_e) = color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir) { + if let Err(_e) = color_htlc(&mut htlc_tx, htlc, &self.ldk_data_dir, self.rgb_kv_store.as_ref()) { return Err(()); } } @@ -1993,6 +2002,7 @@ pub struct KeysManager { starting_time_secs: u64, starting_time_nanos: u32, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, } impl KeysManager { @@ -2021,6 +2031,7 @@ impl KeysManager { pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, v2_remote_key_derivation: bool, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { // Constants for key derivation path indices used in this function. const NODE_SECRET_INDEX: ChildNumber = ChildNumber::Hardened { index: 0 }; @@ -2115,6 +2126,7 @@ impl KeysManager { starting_time_secs, starting_time_nanos, ldk_data_dir, + rgb_kv_store, }; let secp_seed = res.get_secure_random_bytes(); res.secp_ctx.seeded_randomize(&secp_seed); @@ -2234,6 +2246,7 @@ impl KeysManager { params.clone(), self.ldk_data_dir.clone(), prng_seed, + self.rgb_kv_store.clone(), ) } @@ -2651,6 +2664,7 @@ impl PhantomKeysManager { pub fn new( seed: &[u8; 32], starting_time_secs: u64, starting_time_nanos: u32, cross_node_seed: &[u8; 32], v2_remote_key_derivation: bool, ldk_data_dir: PathBuf, + rgb_kv_store: Arc, ) -> Self { let inner = KeysManager::new( seed, @@ -2658,6 +2672,7 @@ impl PhantomKeysManager { starting_time_nanos, v2_remote_key_derivation, ldk_data_dir, + rgb_kv_store, ); let (inbound_key, phantom_key) = hkdf_extract_expand_twice( b"LDK Inbound and Phantom Payment Key Expansion",