Skip to content

Commit 80f68ce

Browse files
Parallelize persistence in the async bg processor
Co-authored-by: Matt Corallo <[email protected]>
1 parent fe2562e commit 80f68ce

File tree

1 file changed

+179
-32
lines changed
  • lightning-background-processor/src

1 file changed

+179
-32
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 179 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ extern crate core;
2121

2222
#[cfg(not(feature = "std"))]
2323
extern crate alloc;
24+
#[cfg(not(feature = "std"))]
25+
use alloc::format;
26+
#[cfg(not(feature = "std"))]
27+
use alloc::vec::Vec;
2428

2529
#[macro_use]
2630
extern crate lightning;
@@ -443,9 +447,117 @@ pub(crate) mod futures_util {
443447
pub(crate) fn dummy_waker() -> Waker {
444448
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
445449
}
450+
451+
enum JoinerResult<E, F: Future<Output = Result<(), E>> + Unpin> {
452+
Pending(Option<F>),
453+
Ready(Result<(), E>),
454+
}
455+
456+
pub(crate) struct Joiner<
457+
E,
458+
A: Future<Output = Result<(), E>> + Unpin,
459+
B: Future<Output = Result<(), E>> + Unpin,
460+
C: Future<Output = Result<(), E>> + Unpin,
461+
D: Future<Output = Result<(), E>> + Unpin,
462+
> {
463+
a: JoinerResult<E, A>,
464+
b: JoinerResult<E, B>,
465+
c: JoinerResult<E, C>,
466+
d: JoinerResult<E, D>,
467+
}
468+
469+
impl<
470+
E,
471+
A: Future<Output = Result<(), E>> + Unpin,
472+
B: Future<Output = Result<(), E>> + Unpin,
473+
C: Future<Output = Result<(), E>> + Unpin,
474+
D: Future<Output = Result<(), E>> + Unpin,
475+
> Joiner<E, A, B, C, D>
476+
{
477+
pub(crate) fn new() -> Self {
478+
Self {
479+
a: JoinerResult::Pending(None),
480+
b: JoinerResult::Pending(None),
481+
c: JoinerResult::Pending(None),
482+
d: JoinerResult::Pending(None),
483+
}
484+
}
485+
486+
pub(crate) fn set_a(&mut self, fut: A) {
487+
self.a = JoinerResult::Pending(Some(fut));
488+
}
489+
pub(crate) fn set_b(&mut self, fut: B) {
490+
self.b = JoinerResult::Pending(Some(fut));
491+
}
492+
pub(crate) fn set_c(&mut self, fut: C) {
493+
self.c = JoinerResult::Pending(Some(fut));
494+
}
495+
pub(crate) fn set_d(&mut self, fut: D) {
496+
self.d = JoinerResult::Pending(Some(fut));
497+
}
498+
}
499+
500+
impl<
501+
E,
502+
A: Future<Output = Result<(), E>> + Unpin,
503+
B: Future<Output = Result<(), E>> + Unpin,
504+
C: Future<Output = Result<(), E>> + Unpin,
505+
D: Future<Output = Result<(), E>> + Unpin,
506+
> Future for Joiner<E, A, B, C, D>
507+
where
508+
Joiner<E, A, B, C, D>: Unpin,
509+
{
510+
type Output = [Result<(), E>; 4];
511+
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
512+
let mut all_complete = true;
513+
macro_rules! handle {
514+
($val: ident) => {
515+
match &mut (self.$val) {
516+
JoinerResult::Pending(None) => {
517+
self.$val = JoinerResult::Ready(Ok(()));
518+
},
519+
JoinerResult::<E, _>::Pending(Some(ref mut val)) => {
520+
match Pin::new(val).poll(ctx) {
521+
Poll::Ready(res) => {
522+
self.$val = JoinerResult::Ready(res);
523+
},
524+
Poll::Pending => {
525+
all_complete = false;
526+
},
527+
}
528+
},
529+
JoinerResult::Ready(_) => {},
530+
}
531+
};
532+
}
533+
handle!(a);
534+
handle!(b);
535+
handle!(c);
536+
handle!(d);
537+
538+
if all_complete {
539+
let mut res = [Ok(()), Ok(()), Ok(()), Ok(())];
540+
if let JoinerResult::Ready(ref mut val) = &mut self.a {
541+
core::mem::swap(&mut res[0], val);
542+
}
543+
if let JoinerResult::Ready(ref mut val) = &mut self.b {
544+
core::mem::swap(&mut res[1], val);
545+
}
546+
if let JoinerResult::Ready(ref mut val) = &mut self.c {
547+
core::mem::swap(&mut res[2], val);
548+
}
549+
if let JoinerResult::Ready(ref mut val) = &mut self.d {
550+
core::mem::swap(&mut res[3], val);
551+
}
552+
Poll::Ready(res)
553+
} else {
554+
Poll::Pending
555+
}
556+
}
557+
}
446558
}
447559
use core::task;
448-
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
560+
use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
449561

450562
/// Processes background events in a future.
451563
///
@@ -808,17 +920,25 @@ where
808920
None => {},
809921
}
810922

923+
let mut futures = Joiner::new();
924+
811925
// Persist channel manager.
812926
if channel_manager.get_cm().get_and_clear_needs_persistence() {
813927
log_trace!(logger, "Persisting ChannelManager...");
814-
kv_store
815-
.write(
816-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
817-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
818-
CHANNEL_MANAGER_PERSISTENCE_KEY,
819-
&channel_manager.get_cm().encode(),
820-
)
821-
.await?;
928+
929+
let fut = async {
930+
kv_store
931+
.write(
932+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
933+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
934+
CHANNEL_MANAGER_PERSISTENCE_KEY,
935+
&channel_manager.get_cm().encode(),
936+
)
937+
.await
938+
};
939+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
940+
futures.set_a(Box::pin(fut));
941+
822942
log_trace!(logger, "Done persisting ChannelManager.");
823943
}
824944

@@ -846,17 +966,25 @@ where
846966
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
847967
log_trace!(logger, "Persisting network graph.");
848968
}
849-
if let Err(e) = kv_store
850-
.write(
851-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
852-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
853-
NETWORK_GRAPH_PERSISTENCE_KEY,
854-
&network_graph.encode(),
855-
)
856-
.await
857-
{
858-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
859-
}
969+
let fut = async {
970+
if let Err(e) = kv_store
971+
.write(
972+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
973+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
974+
NETWORK_GRAPH_PERSISTENCE_KEY,
975+
&network_graph.encode(),
976+
)
977+
.await
978+
{
979+
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
980+
}
981+
982+
Ok(())
983+
};
984+
985+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
986+
futures.set_b(Box::pin(fut));
987+
860988
have_pruned = true;
861989
}
862990
let prune_timer =
@@ -883,21 +1011,28 @@ where
8831011
} else {
8841012
log_trace!(logger, "Persisting scorer");
8851013
}
886-
if let Err(e) = kv_store
887-
.write(
888-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
889-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
890-
SCORER_PERSISTENCE_KEY,
891-
&scorer.encode(),
892-
)
893-
.await
894-
{
895-
log_error!(
1014+
let fut = async {
1015+
if let Err(e) = kv_store
1016+
.write(
1017+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1018+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1019+
SCORER_PERSISTENCE_KEY,
1020+
&scorer.encode(),
1021+
)
1022+
.await
1023+
{
1024+
log_error!(
8961025
logger,
8971026
"Error: Failed to persist scorer, check your disk and permissions {}",
8981027
e
8991028
);
900-
}
1029+
}
1030+
1031+
Ok(())
1032+
};
1033+
1034+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1035+
futures.set_c(Box::pin(fut));
9011036
}
9021037
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
9031038
},
@@ -910,14 +1045,26 @@ where
9101045
Some(false) => {
9111046
log_trace!(logger, "Regenerating sweeper spends if necessary");
9121047
if let Some(ref sweeper) = sweeper {
913-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1048+
let fut = async {
1049+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1050+
1051+
Ok(())
1052+
};
1053+
1054+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1055+
futures.set_d(Box::pin(fut));
9141056
}
9151057
last_sweeper_call = sleeper(SWEEPER_TIMER);
9161058
},
9171059
Some(true) => break,
9181060
None => {},
9191061
}
9201062

1063+
// Run persistence tasks in parallel and exit if any of them returns an error.
1064+
for res in futures.await {
1065+
res?;
1066+
}
1067+
9211068
// Onion messenger timer tick.
9221069
match check_sleeper(&mut last_onion_message_handler_call) {
9231070
Some(false) => {

0 commit comments

Comments
 (0)