diff --git a/Cargo.toml b/Cargo.toml index 3cf5afd73..7b939bb2c 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,18 +40,18 @@ default = [] #lightning-macros = { version = "0.2.0" } #lightning-dns-resolver = { version = "0.3.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144" } -lightning-dns-resolver = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144" } +lightning = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["std"] } +lightning-types = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3" } +lightning-invoice = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3" } +lightning-persister = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3" } +lightning-rapid-gossip-sync = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3" } +lightning-block-sync = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["std"] } +lightning-macros = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3" } +lightning-dns-resolver = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -81,13 +81,13 @@ async-trait = { version = "0.1", default-features = false } vss-client = { package = "vss-client-ng", version = "0.5" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/jkczyz/bitcoin-payment-instructions", rev = "679dac50cc0d81ec4d31da94b93d467e5308f16a" } +bitcoin-payment-instructions = { git = "https://github.com/jkczyz/bitcoin-payment-instructions", rev = "d0b3708c7f6f49a15dfc4b2cfbf7dceda8ab010b" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "369a2cf9c8ef810deea0cd2b4cf6ed0691b78144", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/jkczyz/rust-lightning", rev = "4b827703a81f46dc45c99d9aedd7a0b59cc440f3", features = ["std", "_test_utils"] } rand = { version = "0.9.2", default-features = false, features = ["std", "thread_rng", "os_rng"] } proptest = "1.0.0" regex = "1.5.6" diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index f87c7b294..6fa6d8d84 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -123,6 +123,8 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] + void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason); diff --git a/src/builder.rs b/src/builder.rs index 05f3cae76..2dd029b2f 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1496,6 +1496,8 @@ fn build_with_store_internal( Arc::clone(&pending_payment_store), )); + tx_broadcaster.set_wallet(Arc::downgrade(&wallet)); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 7ece757ae..43bc9db4f 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -568,16 +568,18 @@ impl BitcoindChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 // features, we should eventually switch to use `submitpackage` via the // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual // transactions. - for tx in &package { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), - self.api_client.broadcast_transaction(tx), + self.api_client.broadcast_transaction(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index c62cbb526..0e71b4fb1 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -275,7 +275,9 @@ impl ElectrumChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().expect("lock").client().as_ref() { @@ -285,7 +287,7 @@ impl ElectrumChainSource { return; }; - for tx in package { + for tx in txs { electrum_client.broadcast(tx).await; } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 5825a0984..5f88ad76e 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -352,12 +352,14 @@ impl EsploraChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { - for tx in &package { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), - self.esplora_client.broadcast(tx), + self.esplora_client.broadcast(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index b70620b99..1bbf39d26 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -24,7 +24,7 @@ use crate::config::{ WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -453,15 +453,27 @@ impl ChainSource { return; } Some(next_package) = receiver.recv() => { + let package = match self.tx_broadcaster.classify_package(next_package).await { + Ok(p) => p, + Err(e) => { + log_error!( + tx_bcast_logger, + "Skipping broadcast: failed to persist payment records: {:?}", + e, + ); + continue; + }, + }; + let txs = package.into_iter().map(|(tx, _)| tx); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.process_broadcast_package(next_package).await + esplora_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.process_broadcast_package(next_package).await + electrum_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { - bitcoind_chain_source.process_broadcast_package(next_package).await + bitcoind_chain_source.process_broadcast_package(txs).await }, } } diff --git a/src/event.rs b/src/event.rs index 9932e2c7f..dc3a9bdb3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1529,6 +1529,18 @@ where ); } + if let Err(e) = + self.wallet.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid)) + { + log_error!( + self.logger, + "Failed to graduate funding payment on ChannelReady for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) @@ -1558,6 +1570,16 @@ where } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + if let Err(e) = self.wallet.handle_channel_closed(channel_id) { + log_error!( + self.logger, + "Failed to handle ChannelClosed for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/lib.rs b/src/lib.rs index 24e063842..c206d2fa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1581,6 +1581,14 @@ impl Node { Error::ChannelSplicingFailed })?; + if funding_template.min_rbf_feerate().is_some() { + log_error!( + self.logger, + "Failed to splice channel: pending splice requires RBF, use rbf_channel instead" + ); + return Err(Error::ChannelSplicingFailed); + } + let contribution = self .runtime .block_on(funding_template.splice_in( @@ -1694,20 +1702,88 @@ impl Node { Error::ChannelSplicingFailed })?; + if funding_template.min_rbf_feerate().is_some() { + log_error!( + self.logger, + "Failed to splice channel: pending splice requires RBF, use rbf_channel instead" + ); + return Err(Error::ChannelSplicingFailed); + } + let outputs = vec![bitcoin::TxOut { value: Amount::from_sat(splice_amount_sats), script_pubkey: address.script_pubkey(), }]; + let contribution = + funding_template.splice_out(outputs, min_feerate, max_feerate).map_err(|e| { + log_error!(self.logger, "Failed to splice channel: {}", e); + Error::ChannelSplicingFailed + })?; + + self.channel_manager + .funding_contributed( + &channel_details.channel_id, + &counterparty_node_id, + contribution, + None, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to splice channel: {:?}", e); + Error::ChannelSplicingFailed + }) + } else { + log_error!( + self.logger, + "Channel not found for user_channel_id {} and counterparty {}", + user_channel_id, + counterparty_node_id + ); + Err(Error::ChannelSplicingFailed) + } + } + + /// Replace a pending splice's funding transaction with a higher-feerate version. + /// + /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior + /// contribution is reused when possible; otherwise, coin selection is re-run. + /// + /// # Experimental API + /// + /// This API is experimental and may change in the future. + pub fn rbf_channel( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + ) -> Result<(), Error> { + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + let min_feerate = + self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + let funding_template = self + .channel_manager + .splice_channel(&channel_details.channel_id, &counterparty_node_id) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + })?; + + if funding_template.min_rbf_feerate().is_none() { + log_error!(self.logger, "Failed to RBF channel: no pending splice to replace"); + return Err(Error::ChannelSplicingFailed); + } + let contribution = self .runtime - .block_on(funding_template.splice_out( - outputs, - min_feerate, + .block_on(funding_template.rbf_prior_contribution( + None, max_feerate, Arc::clone(&self.wallet), )) .map_err(|e| { - log_error!(self.logger, "Failed to splice channel: {}", e); + log_error!(self.logger, "Failed to RBF channel: {}", e); Error::ChannelSplicingFailed })?; @@ -1719,7 +1795,7 @@ impl Node { None, ) .map_err(|e| { - log_error!(self.logger, "Failed to splice channel: {:?}", e); + log_error!(self.logger, "Failed to RBF channel: {:?}", e); Error::ChannelSplicingFailed }) } else { diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index eb72f89ec..16837d70c 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use bitcoin::Txid; +use lightning::chain::chaininterface::FundingCandidate; use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::PaymentId; @@ -13,6 +14,19 @@ use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::payment::store::PaymentDetailsUpdate; use crate::payment::PaymentDetails; +/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The +/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF +/// predecessors that may still confirm if reorgs intervene. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingDetails { + /// Every negotiated candidate, oldest first. + pub candidates: Vec, +} + +impl_writeable_tlv_based!(FundingDetails, { + (0, candidates, optional_vec), +}); + /// Represents a pending payment #[derive(Clone, Debug, PartialEq, Eq)] pub struct PendingPaymentDetails { @@ -20,11 +34,24 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// Set when the payment's transaction is an interactive-funding broadcast (channel + /// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on + /// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations. + /// + /// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + pub funding_details: Option, } impl PendingPaymentDetails { pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + Self { details, conflicting_txids, funding_details: None } + } + + pub(crate) fn with_funding_details( + details: PaymentDetails, conflicting_txids: Vec, funding_details: FundingDetails, + ) -> Self { + Self { details, conflicting_txids, funding_details: Some(funding_details) } } /// Convert to finalized payment for the main payment store @@ -36,6 +63,7 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (4, funding_details, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +71,7 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub funding_details: Option>, } impl StorableObject for PendingPaymentDetails { @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails { } } + if let Some(new_funding_details) = update.funding_details { + if self.funding_details != new_funding_details { + self.funding_details = new_funding_details; + updated = true; + } + } + updated } @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { } else { Some(value.conflicting_txids.clone()) }; - Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids, + funding_details: Some(value.funding_details.clone()), + } } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 7084135b0..1e65fb1aa 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -6,21 +6,35 @@ // accordance with one or both of these licenses. use std::ops::Deref; +use std::sync::{Mutex as StdMutex, Weak}; use bitcoin::Transaction; use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType}; use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; +use crate::types::Wallet; +use crate::Error; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; +/// A package of transactions that LDK handed to the broadcaster in one +/// `broadcast_transactions` call, along with each transaction's type. Queued until the +/// background task classifies and broadcasts it. +pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>; + pub(crate) struct TransactionBroadcaster where L::Target: LdkLogger, { - queue_sender: mpsc::Sender>, - queue_receiver: Mutex>>, + queue_sender: mpsc::Sender, + queue_receiver: Mutex>, + /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts + /// (channel opens and splices) into payment records. Remains `None` while the + /// builder is wiring the node up, during which broadcasts are forwarded to the + /// queue but no payment record is written. [`Self::set_wallet`] installs the handle + /// once the [`Wallet`] exists. + wallet: StdMutex>>, logger: L, } @@ -30,14 +44,48 @@ where { pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } + Self { + queue_sender, + queue_receiver: Mutex::new(queue_receiver), + wallet: StdMutex::new(None), + logger, + } + } + + /// Installs the [`Wallet`] handle used to classify funding broadcasts (channel + /// opens and splices) into payment records. Called once the builder has constructed + /// both the broadcaster and the wallet. + pub(crate) fn set_wallet(&self, wallet: Weak) { + *self.wallet.lock().expect("lock") = Some(wallet); } pub(crate) async fn get_broadcast_queue( &self, - ) -> MutexGuard<'_, mpsc::Receiver>> { + ) -> MutexGuard<'_, mpsc::Receiver> { self.queue_receiver.lock().await } + + /// Classifies a queued package into payment records and returns the package ready + /// for the chain client. Returns `Err` if any classification fails; callers must + /// not broadcast the package in that case, since a crash would leave the tx + /// on-chain without a record. + pub(crate) async fn classify_package( + &self, package: BroadcastPackage, + ) -> Result { + let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet_opt { + tokio::task::spawn_blocking(move || { + for (tx, tx_type) in &package { + wallet.classify_broadcast(tx, tx_type)?; + } + Ok::<_, Error>(package) + }) + .await + .map_err(|_| Error::PersistenceFailed)? + } else { + Ok(package) + } + } } impl BroadcasterInterface for TransactionBroadcaster @@ -45,7 +93,8 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { - let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); + let package: BroadcastPackage = + txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); }); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index daeb7becb..15945a91d 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::future::Future; use std::ops::Deref; use std::str::FromStr; @@ -32,15 +33,16 @@ use bitcoin::{ WitnessProgram, WitnessVersion, }; use lightning::chain::chaininterface::{ - BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, + BroadcasterInterface, TransactionType, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BestBlock as BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; -use lightning::ln::funding::FundingTxInput; +use lightning::ln::funding::{FundingContribution, FundingTxInput}; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; use lightning::ln::script::ShutdownScript; +use lightning::ln::types::ChannelId as LnChannelId; use lightning::sign::{ ChangeDestinationSource, EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, PeerStorageKey, Recipient, SignerProvider, SpendableOutputDescriptor, @@ -55,6 +57,9 @@ use persist::KVStoreWalletPersister; use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use lightning::chain::chaininterface::{ChannelFunding, FundingCandidate, FundingPurpose}; + +use crate::payment::pending_payment_store::FundingDetails; use crate::payment::store::ConfirmationStatus; use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, @@ -251,18 +256,9 @@ impl Wallet { for event in events { match event { WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { - let cur_height = locked_wallet.latest_checkpoint().height(); - let confirmation_height = block_time.block_id.height; - let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 - { - PaymentStatus::Succeeded - } else { - PaymentStatus::Pending - }; - let confirmation_status = ConfirmationStatus::Confirmed { block_hash: block_time.block_id.hash, - height: confirmation_height, + height: block_time.block_id.height, timestamp: block_time.confirmation_time, }; @@ -270,6 +266,23 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + confirmation_status, + )? { + continue; + } + + let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; + let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 + { + PaymentStatus::Succeeded + } else { + PaymentStatus::Pending + }; + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -279,13 +292,12 @@ impl Wallet { confirmation_status, ); - self.payment_store.insert_or_update(payment.clone())?; - if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment)?; + self.persist_pending(pending_payment)?; + } else { + self.payment_store.insert_or_update(payment)?; } }, WalletEvent::ChainTipChanged { new_tip, .. } => { @@ -296,8 +308,11 @@ impl Wallet { "Non-pending payment {:?} found in pending store", p.details.id, ); + // Funding records complete on `ChannelReady`, not after + // `ANTI_REORG_DELAY` confirmations. p.details.status == PaymentStatus::Pending && matches!(p.details.kind, PaymentKind::Onchain { .. }) + && p.funding_details.is_none() }); let mut unconfirmed_outbound_txids: Vec = Vec::new(); @@ -358,6 +373,14 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -366,10 +389,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -405,6 +426,15 @@ impl Wallet { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -413,10 +443,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, _ => { continue; @@ -1084,9 +1112,12 @@ impl Wallet { let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); })?; + // Use list_output rather than get_utxo to include outputs spent by unconfirmed + // transactions (e.g., a prior splice being replaced via RBF). + let mut wallet_outputs: HashMap = + locked_wallet.list_output().map(|o| (o.outpoint, o)).collect(); for (i, txin) in psbt.unsigned_tx.input.iter().enumerate() { - if let Some(utxo) = locked_wallet.get_utxo(txin.previous_output) { - debug_assert!(!utxo.is_spent); + if let Some(utxo) = wallet_outputs.remove(&txin.previous_output) { psbt.inputs[i] = locked_wallet.get_psbt_input(utxo, None, true).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT input: {}", e); })?; @@ -1144,6 +1175,41 @@ impl Wallet { Ok(tx) } + /// Computes the amount, fee, and direction of an on-chain payment from the + /// wallet's view of the transaction. Used by [`TransactionBroadcaster`] to + /// describe a single-funded channel-open, for which no [`FundingContribution`] + /// is available. + /// + /// [`TransactionBroadcaster`]: crate::tx_broadcaster::TransactionBroadcaster + /// [`FundingContribution`]: lightning::ln::funding::FundingContribution + pub(crate) fn onchain_payment_fields( + &self, tx: &Transaction, + ) -> (Option, Option, PaymentDirection) { + let locked_wallet = self.inner.lock().expect("lock"); + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + (amount_msat, Some(fee_sat * 1000), direction) + } + fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, @@ -1200,6 +1266,223 @@ impl Wallet { PendingPaymentDetails::new(payment, conflicting_txids) } + /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their + /// respective stores in a fixed order. Callers that need to keep the two stores in + /// sync should always go through this. + fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + self.payment_store.insert_or_update(pending.details.clone())?; + self.pending_payment_store.insert_or_update(pending)?; + Ok(()) + } + + /// Called on `ChannelReady` to mark a funding payment (channel open or splice) as + /// succeeded. + /// + /// If `funding_txo_txid` matches a candidate other than the currently-active one, + /// that candidate is promoted to active first and the outer [`PaymentDetails`] is + /// updated from its contribution. If no candidate matches (the confirmed funding + /// txid belongs to a broadcast this node didn't contribute to), the pending record + /// is left in place for later handling. + pub(crate) fn handle_channel_ready( + &self, channel_id: LnChannelId, funding_txo_txid: Option, + ) -> Result<(), Error> { + let funding_txo_txid = match funding_txo_txid { + Some(t) => t, + None => return Ok(()), + }; + + let mut pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + let funding_details = match pending.funding_details.clone() { + Some(fd) => fd, + None => return Ok(()), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == funding_txo_txid) + { + Some(c) => c.clone(), + None => { + // Confirmed `funding_txo` wasn't produced by any of our broadcasts. The + // record is left alone; some higher-level flow decides what to do. + log_debug!( + self.logger, + "ChannelReady for channel {}: confirmed funding_txo {} is not one of our candidates", + channel_id, + funding_txo_txid, + ); + return Ok(()); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + if old_txid != funding_txo_txid { + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != funding_txo_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + // Preserve the confirmation status already on the record (set by wallet sync if + // it's seen the tx confirm). `ChannelReady` alone doesn't carry block details. + let existing_status = match pending.details.kind { + PaymentKind::Onchain { status, .. } => status, + _ => ConfirmationStatus::Unconfirmed, + }; + pending.details.kind = + PaymentKind::Onchain { txid: funding_txo_txid, status: existing_status }; + + pending.details.status = PaymentStatus::Succeeded; + let payment_id = pending.details.id; + self.payment_store.insert_or_update(pending.details)?; + self.pending_payment_store.remove(&payment_id)?; + + Ok(()) + } + + /// Called on `ChannelClosed`. Removes any funding record (channel open or splice) + /// for `channel_id` whose candidates never reached confirmed — e.g. a funding + /// transaction that never made it on-chain. A record that does reflect a confirmed + /// transaction is left alone and will transition to `Succeeded` normally. + pub(crate) fn handle_channel_closed(&self, channel_id: LnChannelId) -> Result<(), Error> { + let pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + + let is_confirmed = matches!( + pending.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } + ); + if is_confirmed { + return Ok(()); + } + + let payment_id = pending.details.id; + self.pending_payment_store.remove(&payment_id)?; + self.payment_store.remove(&payment_id)?; + Ok(()) + } + + /// Updates a funding record's `kind` in response to a wallet-sync event, swapping + /// the active candidate when `event_txid` differs from the current one. + /// + /// Amount, fee, and direction are not recomputed from the wallet's view: they were + /// set at broadcast time from the `FundingContribution` and must persist until + /// `ChannelReady`. + /// + /// Returns `true` when a funding record was updated (so the caller skips the + /// default Onchain create/update path), `false` otherwise. + fn apply_funding_details_status_update( + &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, + ) -> Result { + // `ChannelReady` may move the payment to the main store before wallet sync + // sees the tx confirm. In that case, update `kind` directly; recomputing from + // the wallet's view would overwrite the per-node fee set at broadcast time. + if let Some(mut existing) = self.payment_store.get(&payment_id) { + if existing.status == PaymentStatus::Succeeded + && matches!(existing.kind, PaymentKind::Onchain { .. }) + && self.pending_payment_store.get(&payment_id).is_none() + { + let needs_update = match existing.kind { + PaymentKind::Onchain { txid, status } => { + txid != event_txid || status != confirmation_status + }, + _ => false, + }; + if needs_update { + existing.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + self.payment_store.insert_or_update(existing)?; + } + return Ok(true); + } + } + + let mut pending = match self.pending_payment_store.get(&payment_id) { + Some(p) => p, + None => return Ok(false), + }; + let funding_details = match pending.funding_details.as_ref() { + Some(fd) => fd, + None => return Ok(false), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == event_txid) { + Some(c) => c.clone(), + None => { + log_debug!( + self.logger, + "Event txid {} resolved to funding_details payment {} but is not in candidates", + event_txid, + payment_id, + ); + return Ok(false); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding_details record must use PaymentKind::Onchain"); + return Ok(false); + }, + }; + + if old_txid != event_txid { + // A different candidate confirmed. Move the previous active txid onto + // `conflicting_txids` and re-derive amount/fee from the new candidate's + // contributions. + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != event_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + pending.details.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + + self.persist_pending(pending)?; + + Ok(true) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); if self.pending_payment_store.contains_key(&direct_payment_id) { @@ -1211,12 +1494,28 @@ impl Wallet { .list_filter(|p| { matches!(p.details.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid) || p.conflicting_txids.contains(&target_txid) + || p.funding_details + .as_ref() + .map(|fd| fd.candidates.iter().any(|c| c.txid == target_txid)) + .unwrap_or(false) }) .first() { return Some(replaced_details.details.id); } + // Once moved to the main store, a funding payment is still matched by its + // confirmed txid so late wallet events resolve correctly. + if let Some(p) = self + .payment_store + .list_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid), + ) + .first() + { + return Some(p.id); + } + None } @@ -1415,16 +1714,235 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); - let pending_payment_store = - self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment_store)?; - self.payment_store.insert_or_update(new_payment)?; + let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); + self.persist_pending(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); Ok(new_txid) } + + pub(crate) fn classify_broadcast( + &self, tx: &Transaction, tx_type: &TransactionType, + ) -> Result<(), Error> { + match tx_type { + TransactionType::Funding { channels } => self.classify_funding(tx, channels), + TransactionType::InteractiveFunding(candidates) => { + self.classify_interactive_funding(tx, candidates) + }, + _ => Ok(()), + } + } + + fn classify_funding( + &self, tx: &Transaction, channels: &[(PublicKey, LnChannelId)], + ) -> Result<(), Error> { + // Batch funding (one transaction funding multiple channels) isn't supported; let + // wallet sync record the payment normally so graduation still runs through + // ANTI_REORG_DELAY. + if channels.len() != 1 { + if channels.len() > 1 { + log_trace!( + self.logger, + "Skipping funding classification for batched broadcast ({} channels)", + channels.len() + ); + } + return Ok(()); + } + + let (counterparty_node_id, channel_id) = channels[0]; + let txid = tx.compute_txid(); + let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx); + + let candidate = FundingCandidate { + txid, + channels: vec![ChannelFunding { + counterparty_node_id, + channel_id, + purpose: FundingPurpose::Establishment, + contribution: None, + }], + }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { candidates: vec![candidate] }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded channel-funding broadcast {} for channel {}", + txid, + channel_id, + ); + Ok(()) + } + + fn classify_interactive_funding( + &self, tx: &Transaction, candidates: &[FundingCandidate], + ) -> Result<(), Error> { + // `InteractiveFunding` carries the full negotiated history. The currently-broadcast + // candidate is the last entry; earlier entries are RBF predecessors. + let active = match candidates.last() { + Some(c) => c, + None => return Ok(()), + }; + let first = match candidates.first() { + Some(c) => c, + None => return Ok(()), + }; + + let txid = tx.compute_txid(); + debug_assert_eq!(active.txid, txid, "broadcast tx must match the active candidate"); + + // Aggregate amount/fee/direction across this candidate's channels by summing the + // local-stake contributions. If we didn't contribute on this candidate, leave the + // record to wallet sync — there's nothing for us to track here, and any wallet- + // visible activity (e.g. a counterparty's splice-out paid to our address) is + // better surfaced as a plain on-chain receive. + let aggregate = aggregate_local_stakes(active); + let amount_msat = match aggregate.amount_msat { + Some(amt) => Some(amt), + None => { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no local contribution", + txid, + ); + return Ok(()); + }, + }; + let fee_paid_msat = aggregate.fee_paid_msat; + let direction = aggregate.direction; + + // Skip broadcasts that don't move funds in or out of our on-chain wallet — e.g. + // a splice-out we initiated toward an external address. + let (wallet_amount_msat, _wallet_fee_msat, _wallet_direction) = + self.onchain_payment_fields(tx); + if wallet_amount_msat == Some(0) { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no wallet-level activity", + txid, + ); + return Ok(()); + } + + // Anchor the PaymentId to the first negotiated candidate so the record stays + // stable across RBF replacements. + let payment_id = PaymentId(first.txid.to_byte_array()); + let candidate_count = candidates.len(); + let active_channel_count = active.channels.len(); + + let details = PaymentDetails::new( + payment_id, + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + // Funding records carry their own RBF history in `candidates`; lookup by txid + // (find_payment_by_txid) already searches that, so no separate + // `conflicting_txids` Vec is needed. + let funding_details = FundingDetails { candidates: candidates.to_vec() }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", + txid, + candidate_count, + active_channel_count, + ); + Ok(()) + } +} + +/// Returns this node's share of the on-chain fee for a funding transaction (channel +/// open or splice), in millisatoshis. Sourced from the contribution's +/// [`FundingContribution::estimated_fee`], which upstream computes per-contributor. +fn our_actual_fee_msat(contribution: &FundingContribution) -> u64 { + contribution.estimated_fee().to_sat() * 1000 +} + +fn record_includes_channel(details: &FundingDetails, channel_id: LnChannelId) -> bool { + details.candidates.iter().any(|c| c.channels.iter().any(|ch| ch.channel_id == channel_id)) +} + +struct LocalStakeAggregate { + amount_msat: Option, + fee_paid_msat: Option, + direction: PaymentDirection, +} + +/// Aggregates local-stake amount/fee/direction across the channels of a single +/// [`FundingCandidate`]. Each channel's contribution (when present) is treated as +/// local-stake-only, so contributions across channels are summed without +/// double-counting. +fn aggregate_local_stakes(candidate: &FundingCandidate) -> LocalStakeAggregate { + let mut amount_outbound: u64 = 0; + let mut amount_inbound: u64 = 0; + let mut fee: u64 = 0; + let mut have_contribution = false; + for channel in &candidate.channels { + if let Some(c) = channel.contribution.as_ref() { + have_contribution = true; + fee = fee.saturating_add(our_actual_fee_msat(c)); + match contribution_direction(c) { + Some((PaymentDirection::Outbound, amt)) => { + amount_outbound = amount_outbound.saturating_add(amt); + }, + Some((PaymentDirection::Inbound, amt)) => { + amount_inbound = amount_inbound.saturating_add(amt); + }, + None => {}, + } + } + } + if !have_contribution { + return LocalStakeAggregate { + amount_msat: None, + fee_paid_msat: None, + direction: PaymentDirection::Outbound, + }; + } + let (direction, amount_msat) = if amount_outbound >= amount_inbound { + (PaymentDirection::Outbound, amount_outbound.saturating_sub(amount_inbound)) + } else { + (PaymentDirection::Inbound, amount_inbound.saturating_sub(amount_outbound)) + }; + LocalStakeAggregate { amount_msat: Some(amount_msat), fee_paid_msat: Some(fee), direction } +} + +/// Returns this contribution's direction and magnitude in msat, or `None` if it can't +/// be classified as a single inbound or outbound payment. +fn contribution_direction(contribution: &FundingContribution) -> Option<(PaymentDirection, u64)> { + let value_added = contribution.value_added(); + let outputs_total: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + + if value_added > Amount::ZERO && outputs_total == Amount::ZERO { + Some((PaymentDirection::Outbound, value_added.to_sat() * 1000)) + } else if value_added == Amount::ZERO && outputs_total > Amount::ZERO { + Some((PaymentDirection::Inbound, outputs_total.to_sat() * 1000)) + } else { + None + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index d2c057a16..4502ec9a3 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,7 +26,7 @@ use common::{ setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, }; -use electrsd::corepc_node::Node as BitcoinD; +use electrsd::corepc_node::{self, Node as BitcoinD}; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -1067,7 +1067,13 @@ async fn splice_channel() { expect_channel_ready_event!(node_a, node_b.node_id()); expect_channel_ready_event!(node_b, node_a.node_id()); - let expected_splice_in_fee_sat = 255; + // Our per-node fee contribution, sourced from `FundingContribution::estimated_fee`. + let expected_splice_in_fee_sat = 251; + // Total wallet outflow on-chain = channel addition + fee. Coin selection may + // allocate a few sats more to the channel than the user requested when the + // surplus over (request + fee + change) is too small to leave as change. + let expected_balance_reduction_sat = 4_000_255; + let expected_channel_addition_sat = 4_000_004; let payments = node_b.list_payments(); let payment = @@ -1076,9 +1082,9 @@ async fn splice_channel() { assert_eq!( node_b.list_balances().total_onchain_balance_sats, - premine_amount_sat - 4_000_000 - expected_splice_in_fee_sat + premine_amount_sat - expected_balance_reduction_sat ); - assert_eq!(node_b.list_balances().total_lightning_balance_sats, 4_000_000); + assert_eq!(node_b.list_balances().total_lightning_balance_sats, expected_channel_addition_sat); let payment_id = node_b.spontaneous_payment().send(amount_msat, node_a.node_id(), None).unwrap(); @@ -1093,7 +1099,10 @@ async fn splice_channel() { node_a.list_balances().total_lightning_balance_sats, 4_000_000 - closing_transaction_fee_sat - anchor_output_sat + amount_msat / 1000 ); - assert_eq!(node_b.list_balances().total_lightning_balance_sats, 4_000_000 - amount_msat / 1000); + assert_eq!( + node_b.list_balances().total_lightning_balance_sats, + expected_channel_addition_sat - amount_msat / 1000 + ); // Splice-out funds for Node A from the payment sent by Node B let address = node_a.onchain_payment().new_address().unwrap(); @@ -1127,6 +1136,171 @@ async fn splice_channel() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn rbf_splice_channel() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // rbf_channel should fail when there's no pending splice + assert_eq!( + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + Err(NodeError::ChannelSplicingFailed), + ); + + // Initiate a splice-in to create a pending splice + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let original_txo = expect_splice_pending_event!(node_a, node_b.node_id()); + expect_splice_pending_event!(node_b, node_a.node_id()); + + // splice_in should fail when there's a pending splice (RBF guard) + assert_eq!( + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // splice_out should fail when there's a pending splice (RBF guard) + let address = node_a.onchain_payment().new_address().unwrap(); + assert_eq!( + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 100_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // rbf_channel should succeed when there's a pending splice + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + + let rbf_txo = expect_splice_pending_event!(node_a, node_b.node_id()); + expect_splice_pending_event!(node_b, node_a.node_id()); + + assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + + // After RBF but before confirmation, node_b (the initiator) should have a single + // on-chain payment covering both candidates: id anchored to the first broadcast, + // `kind.txid` pointing at the latest (RBF) candidate, and the original candidate + // recorded as a replaced one on the pending record. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment exists"); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Unconfirmed, got {:?}", other), + } + assert_eq!(payment.status, PaymentStatus::Pending); + // Only one Onchain Pending payment for this splice attempt (not one per candidate). + let splice_payments = node_b.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + && p.status == PaymentStatus::Pending + }); + assert_eq!( + splice_payments.len(), + 1, + "expected exactly one pending Onchain payment for the splice, got {}: {:#?}", + splice_payments.len(), + splice_payments, + ); + } + + // Wait for the RBF transaction to replace the original in the mempool + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + + // Mine blocks and confirm the RBF splice + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the RBF transaction is the one that locked, not the original + match node_a.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_b.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_a.event_handled().unwrap(); + }, + ref e => panic!("node_a got unexpected event: {:?}", e), + } + match node_b.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_a.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_b.event_handled().unwrap(); + }, + ref e => panic!("node_b got unexpected event: {:?}", e), + } + + // After `ChannelReady` we should have graduated to `Succeeded` — even though + // `ANTI_REORG_DELAY` may not have elapsed yet — and the `kind.txid` should + // reflect the winning RBF candidate, with `fee_paid_msat` matching our + // per-node `FundingContribution::estimated_fee` for that candidate. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment graduated"); + assert_eq!(payment.status, PaymentStatus::Succeeded); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Confirmed { .. } } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Confirmed, got {:?}", other), + } + assert!( + payment.fee_paid_msat.is_some(), + "splice payment should carry a fee from its FundingContribution", + ); + } + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();