@@ -70,9 +70,13 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
70
70
71
71
use lightning_liquidity:: ALiquidityManager ;
72
72
73
+ use core:: future:: Future ;
73
74
use core:: ops:: Deref ;
75
+ use core:: pin:: Pin ;
74
76
use core:: time:: Duration ;
75
77
78
+ use lightning:: util:: async_poll:: { MultiResultFuturePoller , ResultFuture } ;
79
+
76
80
#[ cfg( feature = "std" ) ]
77
81
use core:: sync:: atomic:: { AtomicBool , Ordering } ;
78
82
#[ cfg( feature = "std" ) ]
@@ -627,11 +631,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627
631
pub async fn process_events_async <
628
632
' a ,
629
633
UL : ' static + Deref ,
630
- CF : ' static + Deref ,
631
- T : ' static + Deref ,
632
- F : ' static + Deref ,
634
+ CF : ' static + Deref + Sync ,
635
+ T : ' static + Deref + Sync ,
636
+ F : ' static + Deref + Sync ,
633
637
G : ' static + Deref < Target = NetworkGraph < L > > ,
634
- L : ' static + Deref ,
638
+ L : ' static + Deref + Sync ,
635
639
P : ' static + Deref ,
636
640
EventHandlerFuture : core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
637
641
EventHandler : Fn ( Event ) -> EventHandlerFuture ,
@@ -646,10 +650,10 @@ pub async fn process_events_async<
646
650
RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
647
651
PM : ' static + Deref ,
648
652
LM : ' static + Deref ,
649
- D : ' static + Deref ,
650
- O : ' static + Deref ,
651
- K : ' static + Deref ,
652
- OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > ,
653
+ D : ' static + Deref + Sync ,
654
+ O : ' static + Deref + Sync ,
655
+ K : ' static + Deref + Sync ,
656
+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > + Clone + Send ,
653
657
S : ' static + Deref < Target = SC > + Send + Sync ,
654
658
SC : for < ' b > WriteableScore < ' b > ,
655
659
SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
@@ -826,17 +830,24 @@ where
826
830
None => { } ,
827
831
}
828
832
833
+ let mut futures = Vec :: new ( ) ;
834
+
829
835
// Persist channel manager.
830
836
if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
831
837
log_trace ! ( logger, "Persisting ChannelManager..." ) ;
832
- kv_store
833
- . write (
834
- CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
835
- CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
836
- CHANNEL_MANAGER_PERSISTENCE_KEY ,
837
- & channel_manager. get_cm ( ) . encode ( ) ,
838
- )
839
- . await ?;
838
+ let res = kv_store. write (
839
+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
840
+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
841
+ CHANNEL_MANAGER_PERSISTENCE_KEY ,
842
+ & channel_manager. get_cm ( ) . encode ( ) ,
843
+ ) ;
844
+
845
+ let fut: Pin <
846
+ Box < dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > > + Send + ' static > ,
847
+ > = Box :: pin ( async move { res. await . map_err ( |e| ( e, true ) ) } ) ;
848
+
849
+ futures. push ( ResultFuture :: Pending ( fut) ) ;
850
+
840
851
log_trace ! ( logger, "Done persisting ChannelManager." ) ;
841
852
}
842
853
@@ -864,17 +875,29 @@ where
864
875
log_warn ! ( logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually." ) ;
865
876
log_trace ! ( logger, "Persisting network graph." ) ;
866
877
}
867
- if let Err ( e) = kv_store
868
- . write (
869
- NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
870
- NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
871
- NETWORK_GRAPH_PERSISTENCE_KEY ,
872
- & network_graph. encode ( ) ,
873
- )
874
- . await
875
- {
876
- log_error ! ( logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e) ;
877
- }
878
+ let res = kv_store. write (
879
+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
880
+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
881
+ NETWORK_GRAPH_PERSISTENCE_KEY ,
882
+ & network_graph. encode ( ) ,
883
+ ) ;
884
+ let fut: Pin <
885
+ Box <
886
+ dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > >
887
+ + Send
888
+ + ' static ,
889
+ > ,
890
+ > = Box :: pin ( async move {
891
+ res. await . map_err ( |e| {
892
+ ( lightning:: io:: Error :: new (
893
+ lightning:: io:: ErrorKind :: Other ,
894
+ format ! ( "failed to persist network graph, check your disk and permissions {}" , e) ) ,
895
+ false )
896
+ } )
897
+ } ) ;
898
+
899
+ futures. push ( ResultFuture :: Pending ( fut) ) ;
900
+
878
901
have_pruned = true ;
879
902
}
880
903
let prune_timer =
@@ -901,21 +924,28 @@ where
901
924
} else {
902
925
log_trace ! ( logger, "Persisting scorer" ) ;
903
926
}
904
- if let Err ( e) = kv_store
905
- . write (
906
- SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
907
- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
908
- SCORER_PERSISTENCE_KEY ,
909
- & scorer. encode ( ) ,
910
- )
911
- . await
912
- {
913
- log_error ! (
914
- logger,
915
- "Error: Failed to persist scorer, check your disk and permissions {}" ,
916
- e
917
- ) ;
918
- }
927
+ let res = kv_store. write (
928
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
929
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
930
+ SCORER_PERSISTENCE_KEY ,
931
+ & scorer. encode ( ) ,
932
+ ) ;
933
+ let fut: Pin <
934
+ Box <
935
+ dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > >
936
+ + Send
937
+ + ' static ,
938
+ > ,
939
+ > = Box :: pin ( async move {
940
+ res. await . map_err ( |e| {
941
+ ( lightning:: io:: Error :: new (
942
+ lightning:: io:: ErrorKind :: Other ,
943
+ format ! ( "failed to persist scorer, check your disk and permissions {}" , e) ) ,
944
+ false )
945
+ } )
946
+ } ) ;
947
+
948
+ futures. push ( ResultFuture :: Pending ( fut) ) ;
919
949
}
920
950
last_scorer_persist_call = sleeper ( SCORER_PERSIST_TIMER ) ;
921
951
} ,
@@ -928,14 +958,43 @@ where
928
958
Some ( false ) => {
929
959
log_trace ! ( logger, "Regenerating sweeper spends if necessary" ) ;
930
960
if let Some ( ref sweeper) = sweeper {
931
- let _ = sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await ;
961
+ let sweeper = sweeper. clone ( ) ;
962
+ let fut: Pin <
963
+ Box <
964
+ dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > >
965
+ + Send
966
+ + ' static ,
967
+ > ,
968
+ > = Box :: pin ( async move {
969
+ sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await . map_err ( |_| {
970
+ ( lightning:: io:: Error :: new (
971
+ lightning:: io:: ErrorKind :: Other ,
972
+ format ! ( "failed to persist sweeper, check your disk and permissions" ) ) ,
973
+ false )
974
+ } )
975
+ } ) ;
976
+
977
+ futures. push ( ResultFuture :: Pending ( fut) ) ;
932
978
}
933
979
last_sweeper_call = sleeper ( SWEEPER_TIMER ) ;
934
980
} ,
935
981
Some ( true ) => break ,
936
982
None => { } ,
937
983
}
938
984
985
+ // Run persistence tasks in parallel.
986
+ let multi_res = MultiResultFuturePoller :: new ( futures) . await ;
987
+ for res in multi_res {
988
+ if let Err ( ( e, exit) ) = res {
989
+ log_error ! ( logger, "Error: {}" , e) ;
990
+
991
+ if exit {
992
+ log_error ! ( logger, "Exiting background processor" ) ;
993
+ return Err ( e) ;
994
+ }
995
+ }
996
+ }
997
+
939
998
// Onion messenger timer tick.
940
999
match check_sleeper ( & mut last_onion_message_handler_call) {
941
1000
Some ( false ) => {
@@ -1025,9 +1084,9 @@ fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker
1025
1084
/// synchronous background persistence.
1026
1085
pub async fn process_events_async_with_kv_store_sync <
1027
1086
UL : ' static + Deref ,
1028
- CF : ' static + Deref ,
1029
- T : ' static + Deref ,
1030
- F : ' static + Deref ,
1087
+ CF : ' static + Deref + Sync ,
1088
+ T : ' static + Deref + Sync ,
1089
+ F : ' static + Deref + Sync ,
1031
1090
G : ' static + Deref < Target = NetworkGraph < L > > ,
1032
1091
L : ' static + Deref + Send + Sync ,
1033
1092
P : ' static + Deref ,
@@ -1044,10 +1103,13 @@ pub async fn process_events_async_with_kv_store_sync<
1044
1103
RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
1045
1104
PM : ' static + Deref ,
1046
1105
LM : ' static + Deref ,
1047
- D : ' static + Deref ,
1048
- O : ' static + Deref ,
1049
- K : ' static + Deref ,
1050
- OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , KVStoreSyncWrapper < K > , L , O > > ,
1106
+ D : ' static + Deref + Sync ,
1107
+ O : ' static + Deref + Sync ,
1108
+ K : ' static + Deref + Sync ,
1109
+ OS : ' static
1110
+ + Deref < Target = OutputSweeper < T , D , F , CF , KVStoreSyncWrapper < K > , L , O > >
1111
+ + Clone
1112
+ + Send ,
1051
1113
S : ' static + Deref < Target = SC > + Send + Sync ,
1052
1114
SC : for < ' b > WriteableScore < ' b > ,
1053
1115
SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
0 commit comments