Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,11 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
},
};

let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
let (stop_sender, _) = tokio::sync::watch::channel(());

Ok(Node {
runtime,
stop_sender,
stop_receiver,
config,
wallet,
tx_sync,
Expand Down
62 changes: 27 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ uniffi::include_scaffolding!("ldk_node");
pub struct Node<K: KVStore + Sync + Send + 'static> {
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
stop_sender: tokio::sync::watch::Sender<()>,
stop_receiver: tokio::sync::watch::Receiver<()>,
config: Arc<Config>,
wallet: Arc<Wallet>,
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
Expand Down Expand Up @@ -247,7 +246,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
// Setup wallet sync
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_receiver.clone();
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
.onchain_wallet_sync_interval_secs
Expand Down Expand Up @@ -288,7 +287,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
);
});

let mut stop_fee_updates = self.stop_receiver.clone();
let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs =
Expand Down Expand Up @@ -331,7 +330,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_receiver.clone();
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
Expand Down Expand Up @@ -369,7 +368,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let gossip_source = Arc::clone(&self.gossip_source);
let gossip_sync_store = Arc::clone(&self.kv_store);
let gossip_sync_logger = Arc::clone(&self.logger);
let mut stop_gossip_sync = self.stop_receiver.clone();
let mut stop_gossip_sync = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
loop {
Expand Down Expand Up @@ -412,7 +411,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_receiver.clone();
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
Expand Down Expand Up @@ -462,12 +461,11 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
});
}

// Regularly reconnect to channel peers.
let connect_cm = Arc::clone(&self.channel_manager);
// Regularly reconnect to persisted peers.
let connect_pm = Arc::clone(&self.peer_manager);
let connect_logger = Arc::clone(&self.logger);
let connect_peer_store = Arc::clone(&self.peer_store);
let mut stop_connect = self.stop_receiver.clone();
let mut stop_connect = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Expand All @@ -482,29 +480,23 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
.iter()
.map(|(peer, _addr)| *peer)
.collect::<Vec<_>>();
for node_id in connect_cm
.list_channels()
.iter()
.map(|chan| chan.counterparty.node_id)
.filter(|id| !pm_peers.contains(id))
{
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
let res = do_connect_peer(
peer_info.node_id,
peer_info.address,
Arc::clone(&connect_pm),
Arc::clone(&connect_logger),
).await;
match res {
Ok(_) => {
log_info!(connect_logger, "Successfully reconnected to peer {}", node_id);
},
Err(e) => {
log_error!(connect_logger, "Failed to reconnect to peer {}: {}", node_id, e);
}
}
}

for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
let res = do_connect_peer(
peer_info.node_id,
peer_info.address.clone(),
Arc::clone(&connect_pm),
Arc::clone(&connect_logger),
).await;
match res {
Ok(_) => {
log_info!(connect_logger, "Successfully reconnected to peer {}", peer_info.node_id);
},
Err(e) => {
log_error!(connect_logger, "Failed to reconnect to peer {}: {}", peer_info.node_id, e);
}
}
}
}
}
}
Expand All @@ -516,7 +508,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let bcast_config = Arc::clone(&self.config);
let bcast_store = Arc::clone(&self.kv_store);
let bcast_logger = Arc::clone(&self.logger);
let mut stop_bcast = self.stop_receiver.clone();
let mut stop_bcast = self.stop_sender.subscribe();
runtime.spawn(async move {
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
let mut interval = tokio::time::interval(Duration::from_secs(30));
Expand Down Expand Up @@ -572,7 +564,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
}
});

let mut stop_tx_bcast = self.stop_receiver.clone();
let mut stop_tx_bcast = self.stop_sender.subscribe();
let tx_bcaster = Arc::clone(&self.tx_broadcaster);
runtime.spawn(async move {
// Every second we try to clear our broadcasting queue.
Expand Down Expand Up @@ -613,7 +605,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let background_logger = Arc::clone(&self.logger);
let background_error_logger = Arc::clone(&self.logger);
let background_scorer = Arc::clone(&self.scorer);
let stop_bp = self.stop_receiver.clone();
let stop_bp = self.stop_sender.subscribe();
let sleeper = move |d| {
let mut stop = stop_bp.clone();
Box::pin(async move {
Expand Down Expand Up @@ -650,7 +642,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
});

if let Some(liquidity_source) = self.liquidity_source.as_ref() {
let mut stop_liquidity_handler = self.stop_receiver.clone();
let mut stop_liquidity_handler = self.stop_sender.subscribe();
let liquidity_handler = Arc::clone(&liquidity_source);
runtime.spawn(async move {
loop {
Expand Down
1 change: 1 addition & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ pub(crate) fn do_channel_full_cycle<K: KVStore + Sync + Send, E: ElectrumApi>(
.unwrap();

assert_eq!(node_a.list_peers().first().unwrap().node_id, node_b.node_id());
assert!(node_a.list_peers().first().unwrap().is_persisted);
let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id());
let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id());
assert_eq!(funding_txo_a, funding_txo_b);
Expand Down
52 changes: 52 additions & 0 deletions tests/integration_tests_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,55 @@ fn sign_verify_msg() {
let pkey = node.node_id();
assert!(node.verify_signature(msg, sig.as_str(), &pkey));
}

#[test]
fn connection_restart_behavior() {
do_connection_restart_behavior(true);
do_connection_restart_behavior(false);
}

fn do_connection_restart_behavior(persist: bool) {
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();
let (node_a, node_b) = setup_two_nodes(&electrsd, false);

let node_id_a = node_a.node_id();
let node_id_b = node_b.node_id();

let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
std::thread::sleep(std::time::Duration::from_secs(1));
node_a.connect(node_id_b, node_addr_b, persist).unwrap();

let peer_details_a = node_a.list_peers().first().unwrap().clone();
assert_eq!(peer_details_a.node_id, node_id_b);
assert_eq!(peer_details_a.is_persisted, persist);
assert!(peer_details_a.is_connected);

let peer_details_b = node_b.list_peers().first().unwrap().clone();
assert_eq!(peer_details_b.node_id, node_id_a);
assert_eq!(peer_details_b.is_persisted, false);
assert!(peer_details_a.is_connected);

// Restart nodes.
node_a.stop().unwrap();
node_b.stop().unwrap();
node_b.start().unwrap();
node_a.start().unwrap();

// Sleep a bit to allow for the reconnect to happen.
std::thread::sleep(std::time::Duration::from_secs(5));

if persist {
let peer_details_a = node_a.list_peers().first().unwrap().clone();
assert_eq!(peer_details_a.node_id, node_id_b);
assert_eq!(peer_details_a.is_persisted, persist);
assert!(peer_details_a.is_connected);

let peer_details_b = node_b.list_peers().first().unwrap().clone();
assert_eq!(peer_details_b.node_id, node_id_a);
assert_eq!(peer_details_b.is_persisted, false);
assert!(peer_details_a.is_connected);
} else {
assert!(node_a.list_peers().is_empty());
assert!(node_b.list_peers().is_empty());
}
}