diff --git a/src/builder.rs b/src/builder.rs index cc0c9adeb..a09b2563f 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -943,12 +943,11 @@ fn build_with_store_internal( }, }; - let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); + let (stop_sender, _) = tokio::sync::watch::channel(()); Ok(Node { runtime, stop_sender, - stop_receiver, config, wallet, tx_sync, diff --git a/src/lib.rs b/src/lib.rs index d0b6e9993..24b2123f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -179,7 +179,6 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>, stop_sender: tokio::sync::watch::Sender<()>, - stop_receiver: tokio::sync::watch::Receiver<()>, config: Arc, wallet: Arc, tx_sync: Arc>>, @@ -247,7 +246,7 @@ impl Node { // Setup wallet sync let wallet = Arc::clone(&self.wallet); let sync_logger = Arc::clone(&self.logger); - let mut stop_sync = self.stop_receiver.clone(); + let mut stop_sync = self.stop_sender.subscribe(); let onchain_wallet_sync_interval_secs = self .config .onchain_wallet_sync_interval_secs @@ -288,7 +287,7 @@ impl Node { ); }); - let mut stop_fee_updates = self.stop_receiver.clone(); + let mut stop_fee_updates = self.stop_sender.subscribe(); let fee_update_logger = Arc::clone(&self.logger); let fee_estimator = Arc::clone(&self.fee_estimator); let fee_rate_cache_update_interval_secs = @@ -331,7 +330,7 @@ impl Node { let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); - let mut stop_sync = self.stop_receiver.clone(); + let mut stop_sync = self.stop_sender.subscribe(); let wallet_sync_interval_secs = self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); runtime.spawn(async move { @@ -369,7 +368,7 @@ impl Node { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); - let mut stop_gossip_sync = self.stop_receiver.clone(); + let mut stop_gossip_sync = self.stop_sender.subscribe(); runtime.spawn(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { @@ -412,7 +411,7 @@ impl Node { if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); - let mut stop_listen = self.stop_receiver.clone(); + let mut stop_listen = self.stop_sender.subscribe(); let listening_logger = Arc::clone(&self.logger); let mut bind_addrs = Vec::with_capacity(listening_addresses.len()); @@ -462,12 +461,11 @@ impl Node { }); } - // Regularly reconnect to channel peers. - let connect_cm = Arc::clone(&self.channel_manager); + // Regularly reconnect to persisted peers. let connect_pm = Arc::clone(&self.peer_manager); let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); - let mut stop_connect = self.stop_receiver.clone(); + let mut stop_connect = self.stop_sender.subscribe(); runtime.spawn(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -482,29 +480,23 @@ impl Node { .iter() .map(|(peer, _addr)| *peer) .collect::>(); - for node_id in connect_cm - .list_channels() - .iter() - .map(|chan| chan.counterparty.node_id) - .filter(|id| !pm_peers.contains(id)) - { - if let Some(peer_info) = connect_peer_store.get_peer(&node_id) { - let res = do_connect_peer( - peer_info.node_id, - peer_info.address, - Arc::clone(&connect_pm), - Arc::clone(&connect_logger), - ).await; - match res { - Ok(_) => { - log_info!(connect_logger, "Successfully reconnected to peer {}", node_id); - }, - Err(e) => { - log_error!(connect_logger, "Failed to reconnect to peer {}: {}", node_id, e); - } - } - } + + for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) { + let res = do_connect_peer( + peer_info.node_id, + peer_info.address.clone(), + Arc::clone(&connect_pm), + Arc::clone(&connect_logger), + ).await; + match res { + Ok(_) => { + log_info!(connect_logger, "Successfully reconnected to peer {}", peer_info.node_id); + }, + Err(e) => { + log_error!(connect_logger, "Failed to reconnect to peer {}: {}", peer_info.node_id, e); } + } + } } } } @@ -516,7 +508,7 @@ impl Node { let bcast_config = Arc::clone(&self.config); let bcast_store = Arc::clone(&self.kv_store); let bcast_logger = Arc::clone(&self.logger); - let mut stop_bcast = self.stop_receiver.clone(); + let mut stop_bcast = self.stop_sender.subscribe(); runtime.spawn(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. let mut interval = tokio::time::interval(Duration::from_secs(30)); @@ -572,7 +564,7 @@ impl Node { } }); - let mut stop_tx_bcast = self.stop_receiver.clone(); + let mut stop_tx_bcast = self.stop_sender.subscribe(); let tx_bcaster = Arc::clone(&self.tx_broadcaster); runtime.spawn(async move { // Every second we try to clear our broadcasting queue. @@ -613,7 +605,7 @@ impl Node { let background_logger = Arc::clone(&self.logger); let background_error_logger = Arc::clone(&self.logger); let background_scorer = Arc::clone(&self.scorer); - let stop_bp = self.stop_receiver.clone(); + let stop_bp = self.stop_sender.subscribe(); let sleeper = move |d| { let mut stop = stop_bp.clone(); Box::pin(async move { @@ -650,7 +642,7 @@ impl Node { }); if let Some(liquidity_source) = self.liquidity_source.as_ref() { - let mut stop_liquidity_handler = self.stop_receiver.clone(); + let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); runtime.spawn(async move { loop { diff --git a/tests/common.rs b/tests/common.rs index 815056b82..3696f9b71 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -355,6 +355,7 @@ pub(crate) fn do_channel_full_cycle( .unwrap(); assert_eq!(node_a.list_peers().first().unwrap().node_id, node_b.node_id()); + assert!(node_a.list_peers().first().unwrap().is_persisted); let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 55e3dc553..1c5a67521 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -272,3 +272,55 @@ fn sign_verify_msg() { let pkey = node.node_id(); assert!(node.verify_signature(msg, sig.as_str(), &pkey)); } + +#[test] +fn connection_restart_behavior() { + do_connection_restart_behavior(true); + do_connection_restart_behavior(false); +} + +fn do_connection_restart_behavior(persist: bool) { + let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let (node_a, node_b) = setup_two_nodes(&electrsd, false); + + let node_id_a = node_a.node_id(); + let node_id_b = node_b.node_id(); + + let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone(); + std::thread::sleep(std::time::Duration::from_secs(1)); + node_a.connect(node_id_b, node_addr_b, persist).unwrap(); + + let peer_details_a = node_a.list_peers().first().unwrap().clone(); + assert_eq!(peer_details_a.node_id, node_id_b); + assert_eq!(peer_details_a.is_persisted, persist); + assert!(peer_details_a.is_connected); + + let peer_details_b = node_b.list_peers().first().unwrap().clone(); + assert_eq!(peer_details_b.node_id, node_id_a); + assert_eq!(peer_details_b.is_persisted, false); + assert!(peer_details_a.is_connected); + + // Restart nodes. + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_b.start().unwrap(); + node_a.start().unwrap(); + + // Sleep a bit to allow for the reconnect to happen. + std::thread::sleep(std::time::Duration::from_secs(5)); + + if persist { + let peer_details_a = node_a.list_peers().first().unwrap().clone(); + assert_eq!(peer_details_a.node_id, node_id_b); + assert_eq!(peer_details_a.is_persisted, persist); + assert!(peer_details_a.is_connected); + + let peer_details_b = node_b.list_peers().first().unwrap().clone(); + assert_eq!(peer_details_b.node_id, node_id_a); + assert_eq!(peer_details_b.is_persisted, false); + assert!(peer_details_a.is_connected); + } else { + assert!(node_a.list_peers().is_empty()); + assert!(node_b.list_peers().is_empty()); + } +}