Skip to content

Commit df282cb

Browse files
committed
Parallelize persistence in the async bg processor
1 parent 7d247cf commit df282cb

File tree

3 files changed

+134
-57
lines changed

3 files changed

+134
-57
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 121 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ extern crate core;
2121

2222
#[cfg(not(feature = "std"))]
2323
extern crate alloc;
24+
#[cfg(not(feature = "std"))]
25+
use alloc::format;
26+
#[cfg(not(feature = "std"))]
27+
use alloc::vec::Vec;
2428

2529
#[macro_use]
2630
extern crate lightning;
@@ -70,9 +74,13 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
7074

7175
use lightning_liquidity::ALiquidityManager;
7276

77+
use core::future::Future;
7378
use core::ops::Deref;
79+
use core::pin::Pin;
7480
use core::time::Duration;
7581

82+
use lightning::util::async_poll::{MaybeSync, MultiResultFuturePoller, ResultFuture};
83+
7684
#[cfg(feature = "std")]
7785
use core::sync::atomic::{AtomicBool, Ordering};
7886
#[cfg(feature = "std")]
@@ -617,11 +625,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
617625
pub async fn process_events_async<
618626
'a,
619627
UL: 'static + Deref,
620-
CF: 'static + Deref,
621-
T: 'static + Deref,
622-
F: 'static + Deref,
628+
CF: 'static + Deref + Send + Sync,
629+
T: 'static + Deref + Send + Sync,
630+
F: 'static + Deref + Send + Sync,
623631
G: 'static + Deref<Target = NetworkGraph<L>>,
624-
L: 'static + Deref,
632+
L: 'static + Deref + Send + Sync,
625633
P: 'static + Deref,
626634
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
627635
EventHandler: Fn(Event) -> EventHandlerFuture,
@@ -636,10 +644,10 @@ pub async fn process_events_async<
636644
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
637645
PM: 'static + Deref,
638646
LM: 'static + Deref,
639-
D: 'static + Deref,
640-
O: 'static + Deref,
641-
K: 'static + Deref,
642-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
647+
D: 'static + Deref + Send + Sync,
648+
O: 'static + Deref + Send + Sync,
649+
K: 'static + Deref + Send + Sync,
650+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Clone + Send,
643651
S: 'static + Deref<Target = SC> + Send + Sync,
644652
SC: for<'b> WriteableScore<'b>,
645653
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -664,7 +672,7 @@ where
664672
PM::Target: APeerManager,
665673
LM::Target: ALiquidityManager,
666674
O::Target: 'static + OutputSpender,
667-
D::Target: 'static + ChangeDestinationSource,
675+
D::Target: 'static + ChangeDestinationSource + MaybeSync,
668676
K::Target: 'static + KVStore,
669677
{
670678
let async_event_handler = |event| {
@@ -808,17 +816,24 @@ where
808816
None => {},
809817
}
810818

819+
let mut futures = Vec::new();
820+
811821
// Persist channel manager.
812822
if channel_manager.get_cm().get_and_clear_needs_persistence() {
813823
log_trace!(logger, "Persisting ChannelManager...");
814-
kv_store
815-
.write(
816-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
817-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
818-
CHANNEL_MANAGER_PERSISTENCE_KEY,
819-
&channel_manager.get_cm().encode(),
820-
)
821-
.await?;
824+
let res = kv_store.write(
825+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
826+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
827+
CHANNEL_MANAGER_PERSISTENCE_KEY,
828+
&channel_manager.get_cm().encode(),
829+
);
830+
831+
let fut: Pin<
832+
Box<dyn Future<Output = Result<(), (lightning::io::Error, bool)>> + Send + 'static>,
833+
> = Box::pin(async move { res.await.map_err(|e| (e, true)) });
834+
835+
futures.push(ResultFuture::Pending(fut));
836+
822837
log_trace!(logger, "Done persisting ChannelManager.");
823838
}
824839

@@ -846,17 +861,29 @@ where
846861
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
847862
log_trace!(logger, "Persisting network graph.");
848863
}
849-
if let Err(e) = kv_store
850-
.write(
851-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
852-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
853-
NETWORK_GRAPH_PERSISTENCE_KEY,
854-
&network_graph.encode(),
855-
)
856-
.await
857-
{
858-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
859-
}
864+
let res = kv_store.write(
865+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
866+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
867+
NETWORK_GRAPH_PERSISTENCE_KEY,
868+
&network_graph.encode(),
869+
);
870+
let fut: Pin<
871+
Box<
872+
dyn Future<Output = Result<(), (lightning::io::Error, bool)>>
873+
+ Send
874+
+ 'static,
875+
>,
876+
> = Box::pin(async move {
877+
res.await.map_err(|e| {
878+
(lightning::io::Error::new(
879+
lightning::io::ErrorKind::Other,
880+
format!("failed to persist network graph, check your disk and permissions {}", e)),
881+
false)
882+
})
883+
});
884+
885+
futures.push(ResultFuture::Pending(fut));
886+
860887
have_pruned = true;
861888
}
862889
let prune_timer =
@@ -883,21 +910,28 @@ where
883910
} else {
884911
log_trace!(logger, "Persisting scorer");
885912
}
886-
if let Err(e) = kv_store
887-
.write(
888-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
889-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
890-
SCORER_PERSISTENCE_KEY,
891-
&scorer.encode(),
892-
)
893-
.await
894-
{
895-
log_error!(
896-
logger,
897-
"Error: Failed to persist scorer, check your disk and permissions {}",
898-
e
899-
);
900-
}
913+
let res = kv_store.write(
914+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
915+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
916+
SCORER_PERSISTENCE_KEY,
917+
&scorer.encode(),
918+
);
919+
let fut: Pin<
920+
Box<
921+
dyn Future<Output = Result<(), (lightning::io::Error, bool)>>
922+
+ Send
923+
+ 'static,
924+
>,
925+
> = Box::pin(async move {
926+
res.await.map_err(|e| {
927+
(lightning::io::Error::new(
928+
lightning::io::ErrorKind::Other,
929+
format!("failed to persist scorer, check your disk and permissions {}", e)),
930+
false)
931+
})
932+
});
933+
934+
futures.push(ResultFuture::Pending(fut));
901935
}
902936
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
903937
},
@@ -910,14 +944,46 @@ where
910944
Some(false) => {
911945
log_trace!(logger, "Regenerating sweeper spends if necessary");
912946
if let Some(ref sweeper) = sweeper {
913-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
947+
let sweeper = sweeper.clone();
948+
let fut: Pin<
949+
Box<
950+
dyn Future<Output = Result<(), (lightning::io::Error, bool)>>
951+
+ Send
952+
+ 'static,
953+
>,
954+
> = Box::pin(async move {
955+
sweeper.regenerate_and_broadcast_spend_if_necessary().await.map_err(|_| {
956+
(
957+
lightning::io::Error::new(
958+
lightning::io::ErrorKind::Other,
959+
"failed to persist sweeper, check your disk and permissions",
960+
),
961+
false,
962+
)
963+
})
964+
});
965+
966+
futures.push(ResultFuture::Pending(fut));
914967
}
915968
last_sweeper_call = sleeper(SWEEPER_TIMER);
916969
},
917970
Some(true) => break,
918971
None => {},
919972
}
920973

974+
// Run persistence tasks in parallel.
975+
let multi_res = MultiResultFuturePoller::new(futures).await;
976+
for res in multi_res {
977+
if let Err((e, exit)) = res {
978+
log_error!(logger, "Error: {}", e);
979+
980+
if exit {
981+
log_error!(logger, "Exiting background processor");
982+
return Err(e);
983+
}
984+
}
985+
}
986+
921987
// Onion messenger timer tick.
922988
match check_sleeper(&mut last_onion_message_handler_call) {
923989
Some(false) => {
@@ -1007,9 +1073,9 @@ fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker
10071073
/// synchronous background persistence.
10081074
pub async fn process_events_async_with_kv_store_sync<
10091075
UL: 'static + Deref,
1010-
CF: 'static + Deref,
1011-
T: 'static + Deref,
1012-
F: 'static + Deref,
1076+
CF: 'static + Deref + Send + Sync,
1077+
T: 'static + Deref + Send + Sync,
1078+
F: 'static + Deref + Send + Sync,
10131079
G: 'static + Deref<Target = NetworkGraph<L>>,
10141080
L: 'static + Deref + Send + Sync,
10151081
P: 'static + Deref,
@@ -1026,10 +1092,13 @@ pub async fn process_events_async_with_kv_store_sync<
10261092
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
10271093
PM: 'static + Deref,
10281094
LM: 'static + Deref,
1029-
D: 'static + Deref,
1030-
O: 'static + Deref,
1031-
K: 'static + Deref,
1032-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
1095+
D: 'static + Deref + Send + Sync,
1096+
O: 'static + Deref + Send + Sync,
1097+
K: 'static + Deref + Send + Sync,
1098+
OS: 'static
1099+
+ Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>
1100+
+ Clone
1101+
+ Send,
10331102
S: 'static + Deref<Target = SC> + Send + Sync,
10341103
SC: for<'b> WriteableScore<'b>,
10351104
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -1054,7 +1123,7 @@ where
10541123
PM::Target: APeerManager,
10551124
LM::Target: ALiquidityManager,
10561125
O::Target: 'static + OutputSpender,
1057-
D::Target: 'static + ChangeDestinationSource,
1126+
D::Target: 'static + ChangeDestinationSource + MaybeSync,
10581127
K::Target: 'static + KVStoreSync,
10591128
{
10601129
let kv_store = KVStoreSyncWrapper(kv_store);

lightning/src/util/async_poll.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,21 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

18-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
18+
/// A future that can be in a pending or ready state, where the ready state contains a `Result`.
19+
pub enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
20+
/// The future is still pending and needs to be polled again.
1921
Pending(F),
22+
/// The future has completed and contains a result.
2023
Ready(Result<(), E>),
2124
}
2225

23-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
26+
/// A utility to poll multiple futures that return results, collecting their results into a vector.
27+
pub struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
2428
futures_state: Vec<ResultFuture<F, E>>,
2529
}
2630

2731
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
32+
/// Creates a new `MultiResultFuturePoller` with the given futures.
2833
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
2934
Self { futures_state }
3035
}
@@ -95,21 +100,24 @@ pub(crate) fn dummy_waker() -> Waker {
95100
#[cfg(feature = "std")]
96101
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;
97102
#[cfg(not(feature = "std"))]
103+
/// A type alias for a future that returns a result of type T.
98104
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a>>;
99105

100-
// Marker trait to optionally implement `Sync` under std.
106+
/// Marker trait to optionally implement `Sync` under std.
101107
#[cfg(feature = "std")]
102108
pub use core::marker::Sync as MaybeSync;
103109

104110
#[cfg(not(feature = "std"))]
111+
/// Marker trait to optionally implement `Sync` under std.
105112
pub trait MaybeSync {}
106113
#[cfg(not(feature = "std"))]
107114
impl<T> MaybeSync for T where T: ?Sized {}
108115

109-
// Marker trait to optionally implement `Send` under std.
116+
/// Marker trait to optionally implement `Send` under std.
110117
#[cfg(feature = "std")]
111118
pub use core::marker::Send as MaybeSend;
112119
#[cfg(not(feature = "std"))]
120+
/// Marker trait to optionally implement `Send` under std.
113121
pub trait MaybeSend {}
114122
#[cfg(not(feature = "std"))]
115123
impl<T> MaybeSend for T where T: ?Sized {}

lightning/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub mod ser;
3232
pub mod sweep;
3333
pub mod wakers;
3434

35-
pub(crate) mod async_poll;
35+
pub mod async_poll;
3636
pub(crate) mod atomic_counter;
3737
pub(crate) mod byte_utils;
3838
pub mod hash_tables;

0 commit comments

Comments
 (0)