Skip to content

Commit e6293bd

Browse files
authored
Merge pull request #747 from TheBlueMatt/2026-01-parallel-monitors
Parallelize init further
2 parents 4caf1b6 + 78842ad commit e6293bd

File tree

5 files changed

+152
-91
lines changed

5 files changed

+152
-91
lines changed

Cargo.toml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ default = []
3939
#lightning-liquidity = { version = "0.2.0", features = ["std"] }
4040
#lightning-macros = { version = "0.2.0" }
4141

42-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] }
43-
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
44-
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] }
45-
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
46-
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["tokio"] }
47-
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
48-
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
49-
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["rest-client", "rpc-client", "tokio"] }
50-
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
51-
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] }
52-
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
42+
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] }
43+
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
44+
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] }
45+
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
46+
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["tokio"] }
47+
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
48+
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
49+
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["rest-client", "rpc-client", "tokio"] }
50+
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
51+
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] }
52+
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
5353

5454
bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
5555
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
@@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]}
7878
vss-client = { package = "vss-client-ng", version = "0.4" }
7979
prost = { version = "0.11.6", default-features = false}
8080
#bitcoin-payment-instructions = { version = "0.6" }
81-
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "fdca6c62f2fe2c53427d3e51e322a49aa7323ee2" }
81+
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ce9ff5281ae9bb05526981f6f9df8f8d929c7c44" }
8282

8383
[target.'cfg(windows)'.dependencies]
8484
winapi = { version = "0.3", features = ["winbase"] }
8585

8686
[dev-dependencies]
87-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std", "_test_utils"] }
87+
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std", "_test_utils"] }
8888
proptest = "1.0.0"
8989
regex = "1.5.6"
9090
criterion = { version = "0.7.0", features = ["async_tokio"] }

src/builder.rs

Lines changed: 96 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use lightning::routing::scoring::{
3333
};
3434
use lightning::sign::{EntropySource, NodeSigner};
3535
use lightning::util::persist::{
36-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
36+
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
3737
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
3838
};
3939
use lightning::util::ser::ReadableArgs;
@@ -69,11 +69,12 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
6969
use crate::message_handler::NodeCustomMessageHandler;
7070
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
7171
use crate::peer_store::PeerStore;
72-
use crate::runtime::Runtime;
72+
use crate::runtime::{Runtime, RuntimeSpawner};
7373
use crate::tx_broadcaster::TransactionBroadcaster;
7474
use crate::types::{
75-
ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager,
76-
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore,
75+
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
76+
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister,
77+
SyncAndAsyncKVStore,
7778
};
7879
use crate::wallet::persist::KVStoreWalletPersister;
7980
use crate::wallet::Wallet;
@@ -1051,10 +1052,20 @@ fn build_with_store_internal(
10511052
}
10521053
}
10531054

1055+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
1056+
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
1057+
1058+
let kv_store_ref = Arc::clone(&kv_store);
1059+
let logger_ref = Arc::clone(&logger);
1060+
let (payment_store_res, node_metris_res) = runtime.block_on(async move {
1061+
tokio::join!(
1062+
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1063+
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1064+
)
1065+
});
1066+
10541067
// Initialize the status fields.
1055-
let node_metrics = match runtime
1056-
.block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await })
1057-
{
1068+
let node_metrics = match node_metris_res {
10581069
Ok(metrics) => Arc::new(RwLock::new(metrics)),
10591070
Err(e) => {
10601071
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1065,23 +1076,20 @@ fn build_with_store_internal(
10651076
}
10661077
},
10671078
};
1068-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
1069-
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
10701079

1071-
let payment_store =
1072-
match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) {
1073-
Ok(payments) => Arc::new(PaymentStore::new(
1074-
payments,
1075-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1076-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1077-
Arc::clone(&kv_store),
1078-
Arc::clone(&logger),
1079-
)),
1080-
Err(e) => {
1081-
log_error!(logger, "Failed to read payment data from store: {}", e);
1082-
return Err(BuildError::ReadFailed);
1083-
},
1084-
};
1080+
let payment_store = match payment_store_res {
1081+
Ok(payments) => Arc::new(PaymentStore::new(
1082+
payments,
1083+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1084+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1085+
Arc::clone(&kv_store),
1086+
Arc::clone(&logger),
1087+
)),
1088+
Err(e) => {
1089+
log_error!(logger, "Failed to read payment data from store: {}", e);
1090+
return Err(BuildError::ReadFailed);
1091+
},
1092+
};
10851093

10861094
let (chain_source, chain_tip_opt) = match chain_data_source_config {
10871095
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
@@ -1261,8 +1269,9 @@ fn build_with_store_internal(
12611269
));
12621270

12631271
let peer_storage_key = keys_manager.get_peer_storage_key();
1264-
let persister = Arc::new(Persister::new(
1272+
let monitor_reader = Arc::new(AsyncPersister::new(
12651273
Arc::clone(&kv_store),
1274+
RuntimeSpawner::new(Arc::clone(&runtime)),
12661275
Arc::clone(&logger),
12671276
PERSISTER_MAX_PENDING_UPDATES,
12681277
Arc::clone(&keys_manager),
@@ -1271,8 +1280,18 @@ fn build_with_store_internal(
12711280
Arc::clone(&fee_estimator),
12721281
));
12731282

1283+
// Read ChannelMonitors and the NetworkGraph
1284+
let kv_store_ref = Arc::clone(&kv_store);
1285+
let logger_ref = Arc::clone(&logger);
1286+
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
1287+
tokio::join!(
1288+
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
1289+
read_network_graph(&*kv_store_ref, logger_ref),
1290+
)
1291+
});
1292+
12741293
// Read ChannelMonitor state from store
1275-
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
1294+
let channel_monitors = match monitor_read_res {
12761295
Ok(monitors) => monitors,
12771296
Err(e) => {
12781297
if e.kind() == lightning::io::ErrorKind::NotFound {
@@ -1284,6 +1303,16 @@ fn build_with_store_internal(
12841303
},
12851304
};
12861305

1306+
let persister = Arc::new(Persister::new(
1307+
Arc::clone(&kv_store),
1308+
Arc::clone(&logger),
1309+
PERSISTER_MAX_PENDING_UPDATES,
1310+
Arc::clone(&keys_manager),
1311+
Arc::clone(&keys_manager),
1312+
Arc::clone(&tx_broadcaster),
1313+
Arc::clone(&fee_estimator),
1314+
));
1315+
12871316
// Initialize the ChainMonitor
12881317
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
12891318
Some(Arc::clone(&chain_source)),
@@ -1296,9 +1325,7 @@ fn build_with_store_internal(
12961325
));
12971326

12981327
// Initialize the network graph, scorer, and router
1299-
let network_graph = match runtime
1300-
.block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await })
1301-
{
1328+
let network_graph = match network_graph_res {
13021329
Ok(graph) => Arc::new(graph),
13031330
Err(e) => {
13041331
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1310,9 +1337,42 @@ fn build_with_store_internal(
13101337
},
13111338
};
13121339

1313-
let local_scorer = match runtime.block_on(async {
1314-
read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await
1315-
}) {
1340+
// Read various smaller LDK and ldk-node objects from the store
1341+
let kv_store_ref = Arc::clone(&kv_store);
1342+
let logger_ref = Arc::clone(&logger);
1343+
let network_graph_ref = Arc::clone(&network_graph);
1344+
let output_sweeper_future = read_output_sweeper(
1345+
Arc::clone(&tx_broadcaster),
1346+
Arc::clone(&fee_estimator),
1347+
Arc::clone(&chain_source),
1348+
Arc::clone(&keys_manager),
1349+
Arc::clone(&kv_store_ref),
1350+
Arc::clone(&logger_ref),
1351+
);
1352+
let (
1353+
scorer_res,
1354+
external_scores_res,
1355+
channel_manager_bytes_res,
1356+
sweeper_bytes_res,
1357+
event_queue_res,
1358+
peer_info_res,
1359+
) = runtime.block_on(async move {
1360+
tokio::join!(
1361+
read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
1362+
read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)),
1363+
KVStore::read(
1364+
&*kv_store_ref,
1365+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1366+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1367+
CHANNEL_MANAGER_PERSISTENCE_KEY,
1368+
),
1369+
output_sweeper_future,
1370+
read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)),
1371+
read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)),
1372+
)
1373+
});
1374+
1375+
let local_scorer = match scorer_res {
13161376
Ok(scorer) => scorer,
13171377
Err(e) => {
13181378
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1328,9 +1388,7 @@ fn build_with_store_internal(
13281388
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
13291389

13301390
// Restore external pathfinding scores from cache if possible.
1331-
match runtime.block_on(async {
1332-
read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await
1333-
}) {
1391+
match external_scores_res {
13341392
Ok(external_scores) => {
13351393
scorer.lock().unwrap().merge(external_scores, cur_time);
13361394
log_trace!(logger, "External scores from cache merged successfully");
@@ -1383,12 +1441,7 @@ fn build_with_store_internal(
13831441

13841442
// Initialize the ChannelManager
13851443
let channel_manager = {
1386-
if let Ok(reader) = KVStoreSync::read(
1387-
&*kv_store,
1388-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1389-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1390-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1391-
) {
1444+
if let Ok(reader) = channel_manager_bytes_res {
13921445
let channel_monitor_references =
13931446
channel_monitors.iter().map(|(_, chanmon)| chanmon).collect();
13941447
let read_args = ChannelManagerReadArgs::new(
@@ -1613,17 +1666,7 @@ fn build_with_store_internal(
16131666
let connection_manager =
16141667
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
16151668

1616-
let output_sweeper = match runtime.block_on(async {
1617-
read_output_sweeper(
1618-
Arc::clone(&tx_broadcaster),
1619-
Arc::clone(&fee_estimator),
1620-
Arc::clone(&chain_source),
1621-
Arc::clone(&keys_manager),
1622-
Arc::clone(&kv_store),
1623-
Arc::clone(&logger),
1624-
)
1625-
.await
1626-
}) {
1669+
let output_sweeper = match sweeper_bytes_res {
16271670
Ok(output_sweeper) => Arc::new(output_sweeper),
16281671
Err(e) => {
16291672
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1644,9 +1687,7 @@ fn build_with_store_internal(
16441687
},
16451688
};
16461689

1647-
let event_queue = match runtime
1648-
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1649-
{
1690+
let event_queue = match event_queue_res {
16501691
Ok(event_queue) => Arc::new(event_queue),
16511692
Err(e) => {
16521693
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1658,9 +1699,7 @@ fn build_with_store_internal(
16581699
},
16591700
};
16601701

1661-
let peer_store = match runtime
1662-
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1663-
{
1702+
let peer_store = match peer_info_res {
16641703
Ok(peer_store) => Arc::new(peer_store),
16651704
Err(e) => {
16661705
if e.kind() == std::io::ErrorKind::NotFound {

src/gossip.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,16 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use std::future::Future;
98
use std::sync::atomic::{AtomicU32, Ordering};
109
use std::sync::Arc;
1110
use std::time::Duration;
1211

13-
use lightning::util::native_async::FutureSpawner;
1412
use lightning_block_sync::gossip::GossipVerifier;
1513

1614
use crate::chain::ChainSource;
1715
use crate::config::RGS_SYNC_TIMEOUT_SECS;
1816
use crate::logger::{log_trace, LdkLogger, Logger};
19-
use crate::runtime::Runtime;
17+
use crate::runtime::{Runtime, RuntimeSpawner};
2018
use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync};
2119
use crate::Error;
2220

@@ -114,19 +112,3 @@ impl GossipSource {
114112
}
115113
}
116114
}
117-
118-
pub(crate) struct RuntimeSpawner {
119-
runtime: Arc<Runtime>,
120-
}
121-
122-
impl RuntimeSpawner {
123-
pub(crate) fn new(runtime: Arc<Runtime>) -> Self {
124-
Self { runtime }
125-
}
126-
}
127-
128-
impl FutureSpawner for RuntimeSpawner {
129-
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
130-
self.runtime.spawn_cancellable_background_task(future);
131-
}
132-
}

src/runtime.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use std::future::Future;
99
use std::sync::{Arc, Mutex};
1010
use std::time::Duration;
1111

12+
use lightning::util::native_async::FutureSpawner;
13+
1214
use tokio::task::{JoinHandle, JoinSet};
1315

1416
use crate::config::{
@@ -219,3 +221,29 @@ enum RuntimeMode {
219221
Owned(tokio::runtime::Runtime),
220222
Handle(tokio::runtime::Handle),
221223
}
224+
225+
pub(crate) struct RuntimeSpawner {
226+
runtime: Arc<Runtime>,
227+
}
228+
229+
impl RuntimeSpawner {
230+
pub(crate) fn new(runtime: Arc<Runtime>) -> Self {
231+
Self { runtime }
232+
}
233+
}
234+
235+
impl FutureSpawner for RuntimeSpawner {
236+
type E = tokio::sync::oneshot::error::RecvError;
237+
type SpawnedFutureResult<O> = tokio::sync::oneshot::Receiver<O>;
238+
fn spawn<O: Send + 'static, F: Future<Output = O> + Send + 'static>(
239+
&self, future: F,
240+
) -> Self::SpawnedFutureResult<O> {
241+
let (result, output) = tokio::sync::oneshot::channel();
242+
self.runtime.spawn_cancellable_background_task(async move {
243+
// We don't care if the send works or not, if the receiver is dropped its not our
244+
// problem.
245+
let _ = result.send(future.await);
246+
});
247+
output
248+
}
249+
}

0 commit comments

Comments
 (0)