Skip to content

Commit b55f977

Browse files
Parallelize persistence in the async bg processor
Co-authored-by: Matt Corallo <[email protected]>
1 parent fe2562e commit b55f977

File tree

1 file changed

+175
-32
lines changed
  • lightning-background-processor/src

1 file changed

+175
-32
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 175 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,117 @@ pub(crate) mod futures_util {
443443
pub(crate) fn dummy_waker() -> Waker {
444444
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
445445
}
446+
447+
enum JoinerResult<E, F: Future<Output = Result<(), E>> + Unpin> {
448+
Pending(Option<F>),
449+
Ready(Result<(), E>),
450+
}
451+
452+
pub(crate) struct Joiner<
453+
E,
454+
A: Future<Output = Result<(), E>> + Unpin,
455+
B: Future<Output = Result<(), E>> + Unpin,
456+
C: Future<Output = Result<(), E>> + Unpin,
457+
D: Future<Output = Result<(), E>> + Unpin,
458+
> {
459+
a: JoinerResult<E, A>,
460+
b: JoinerResult<E, B>,
461+
c: JoinerResult<E, C>,
462+
d: JoinerResult<E, D>,
463+
}
464+
465+
impl<
466+
E,
467+
A: Future<Output = Result<(), E>> + Unpin,
468+
B: Future<Output = Result<(), E>> + Unpin,
469+
C: Future<Output = Result<(), E>> + Unpin,
470+
D: Future<Output = Result<(), E>> + Unpin,
471+
> Joiner<E, A, B, C, D>
472+
{
473+
pub(crate) fn new() -> Self {
474+
Self {
475+
a: JoinerResult::Pending(None),
476+
b: JoinerResult::Pending(None),
477+
c: JoinerResult::Pending(None),
478+
d: JoinerResult::Pending(None),
479+
}
480+
}
481+
482+
pub(crate) fn set_a(&mut self, fut: A) {
483+
self.a = JoinerResult::Pending(Some(fut));
484+
}
485+
pub(crate) fn set_b(&mut self, fut: B) {
486+
self.b = JoinerResult::Pending(Some(fut));
487+
}
488+
pub(crate) fn set_c(&mut self, fut: C) {
489+
self.c = JoinerResult::Pending(Some(fut));
490+
}
491+
pub(crate) fn set_d(&mut self, fut: D) {
492+
self.d = JoinerResult::Pending(Some(fut));
493+
}
494+
}
495+
496+
impl<
497+
E,
498+
A: Future<Output = Result<(), E>> + Unpin,
499+
B: Future<Output = Result<(), E>> + Unpin,
500+
C: Future<Output = Result<(), E>> + Unpin,
501+
D: Future<Output = Result<(), E>> + Unpin,
502+
> Future for Joiner<E, A, B, C, D>
503+
where
504+
Joiner<E, A, B, C, D>: Unpin,
505+
{
506+
type Output = [Result<(), E>; 4];
507+
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
508+
let mut all_complete = true;
509+
macro_rules! handle {
510+
($val: ident) => {
511+
match &mut (self.$val) {
512+
JoinerResult::Pending(None) => {
513+
self.$val = JoinerResult::Ready(Ok(()));
514+
},
515+
JoinerResult::<E, _>::Pending(Some(ref mut val)) => {
516+
match Pin::new(val).poll(ctx) {
517+
Poll::Ready(res) => {
518+
self.$val = JoinerResult::Ready(res);
519+
},
520+
Poll::Pending => {
521+
all_complete = false;
522+
},
523+
}
524+
},
525+
JoinerResult::Ready(_) => {},
526+
}
527+
};
528+
}
529+
handle!(a);
530+
handle!(b);
531+
handle!(c);
532+
handle!(d);
533+
534+
if all_complete {
535+
let mut res = [Ok(()), Ok(()), Ok(()), Ok(())];
536+
if let JoinerResult::Ready(ref mut val) = &mut self.a {
537+
core::mem::swap(&mut res[0], val);
538+
}
539+
if let JoinerResult::Ready(ref mut val) = &mut self.b {
540+
core::mem::swap(&mut res[1], val);
541+
}
542+
if let JoinerResult::Ready(ref mut val) = &mut self.c {
543+
core::mem::swap(&mut res[2], val);
544+
}
545+
if let JoinerResult::Ready(ref mut val) = &mut self.d {
546+
core::mem::swap(&mut res[3], val);
547+
}
548+
Poll::Ready(res)
549+
} else {
550+
Poll::Pending
551+
}
552+
}
553+
}
446554
}
447555
use core::task;
448-
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
556+
use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
449557

450558
/// Processes background events in a future.
451559
///
@@ -808,17 +916,25 @@ where
808916
None => {},
809917
}
810918

919+
let mut futures = Joiner::new();
920+
811921
// Persist channel manager.
812922
if channel_manager.get_cm().get_and_clear_needs_persistence() {
813923
log_trace!(logger, "Persisting ChannelManager...");
814-
kv_store
815-
.write(
816-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
817-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
818-
CHANNEL_MANAGER_PERSISTENCE_KEY,
819-
&channel_manager.get_cm().encode(),
820-
)
821-
.await?;
924+
925+
let fut = async {
926+
kv_store
927+
.write(
928+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
929+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
930+
CHANNEL_MANAGER_PERSISTENCE_KEY,
931+
&channel_manager.get_cm().encode(),
932+
)
933+
.await
934+
};
935+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
936+
futures.set_a(Box::pin(fut));
937+
822938
log_trace!(logger, "Done persisting ChannelManager.");
823939
}
824940

@@ -846,17 +962,25 @@ where
846962
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
847963
log_trace!(logger, "Persisting network graph.");
848964
}
849-
if let Err(e) = kv_store
850-
.write(
851-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
852-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
853-
NETWORK_GRAPH_PERSISTENCE_KEY,
854-
&network_graph.encode(),
855-
)
856-
.await
857-
{
858-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
859-
}
965+
let fut = async {
966+
if let Err(e) = kv_store
967+
.write(
968+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
969+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
970+
NETWORK_GRAPH_PERSISTENCE_KEY,
971+
&network_graph.encode(),
972+
)
973+
.await
974+
{
975+
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
976+
}
977+
978+
Ok(())
979+
};
980+
981+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
982+
futures.set_b(Box::pin(fut));
983+
860984
have_pruned = true;
861985
}
862986
let prune_timer =
@@ -883,21 +1007,28 @@ where
8831007
} else {
8841008
log_trace!(logger, "Persisting scorer");
8851009
}
886-
if let Err(e) = kv_store
887-
.write(
888-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
889-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
890-
SCORER_PERSISTENCE_KEY,
891-
&scorer.encode(),
892-
)
893-
.await
894-
{
895-
log_error!(
1010+
let fut = async {
1011+
if let Err(e) = kv_store
1012+
.write(
1013+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1014+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1015+
SCORER_PERSISTENCE_KEY,
1016+
&scorer.encode(),
1017+
)
1018+
.await
1019+
{
1020+
log_error!(
8961021
logger,
8971022
"Error: Failed to persist scorer, check your disk and permissions {}",
8981023
e
8991024
);
900-
}
1025+
}
1026+
1027+
Ok(())
1028+
};
1029+
1030+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1031+
futures.set_c(Box::pin(fut));
9011032
}
9021033
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
9031034
},
@@ -910,14 +1041,26 @@ where
9101041
Some(false) => {
9111042
log_trace!(logger, "Regenerating sweeper spends if necessary");
9121043
if let Some(ref sweeper) = sweeper {
913-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1044+
let fut = async {
1045+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1046+
1047+
Ok(())
1048+
};
1049+
1050+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1051+
futures.set_d(Box::pin(fut));
9141052
}
9151053
last_sweeper_call = sleeper(SWEEPER_TIMER);
9161054
},
9171055
Some(true) => break,
9181056
None => {},
9191057
}
9201058

1059+
// Run persistence tasks in parallel and exit if any of them returns an error.
1060+
for res in futures.await {
1061+
res?;
1062+
}
1063+
9211064
// Onion messenger timer tick.
9221065
match check_sleeper(&mut last_onion_message_handler_call) {
9231066
Some(false) => {

0 commit comments

Comments
 (0)