Skip to content

Commit aa2bdd6

Browse files
committed
Introduce check_sleeper helper
1 parent e9cd8cd commit aa2bdd6

File tree

1 file changed

+140
-162
lines changed
  • lightning-background-processor/src

1 file changed

+140
-162
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 140 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,6 @@ where
677677
D::Target: 'static + ChangeDestinationSource,
678678
K::Target: 'static + KVStore,
679679
{
680-
let mut should_break = false;
681680
let async_event_handler = |event| {
682681
let network_graph = gossip_sync.network_graph();
683682
let event_handler = &event_handler;
@@ -733,38 +732,36 @@ where
733732
let mut cur_batch_delay = batch_delay.get();
734733
let mut last_forwards_processing_call = sleeper(cur_batch_delay);
735734
loop {
735+
// Handle channel manager events.
736736
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
737+
738+
// Handle chain monitor events.
737739
chain_monitor.process_pending_events_async(async_event_handler).await;
740+
741+
// Handle onion messenger events.
738742
if let Some(om) = &onion_messenger {
739743
om.get_om().process_pending_events_async(async_event_handler).await
740744
}
745+
746+
// Handle peer manager events.
741747
peer_manager.as_ref().process_events();
742-
if should_break {
743-
break;
744-
}
745-
if (|fut: &mut SleepFuture| {
746-
let mut waker = dummy_waker();
747-
let mut ctx = task::Context::from_waker(&mut waker);
748-
match core::pin::Pin::new(fut).poll(&mut ctx) {
749-
task::Poll::Ready(exit) => {
750-
should_break = exit;
751-
true
752-
},
753-
task::Poll::Pending => false,
754-
}
755-
})(&mut last_forwards_processing_call)
756-
{
757-
channel_manager.get_cm().process_pending_htlc_forwards();
758-
cur_batch_delay = batch_delay.next();
759-
last_forwards_processing_call = sleeper(cur_batch_delay);
760-
}
761-
if should_break {
762-
break;
748+
match check_sleeper(&mut last_forwards_processing_call) {
749+
Some(false) => {
750+
channel_manager.get_cm().process_pending_htlc_forwards();
751+
cur_batch_delay = batch_delay.next();
752+
last_forwards_processing_call = sleeper(cur_batch_delay);
753+
},
754+
Some(true) => break,
755+
None => {},
763756
}
757+
758+
// Start mobile interruptable platform check timer.
764759
let mut await_start = None;
765760
if mobile_interruptable_platform {
766761
await_start = Some(sleeper(Duration::from_secs(1)));
767762
}
763+
764+
// Wait for events or timeouts.
768765
let om_fut = if let Some(om) = onion_messenger.as_ref() {
769766
let fut = om.get_om().get_update_future();
770767
OptionalSelector { optional_future: Some(fut) }
@@ -796,30 +793,29 @@ where
796793
match fut.await {
797794
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
798795
SelectorOutput::E(exit) => {
799-
should_break = exit;
796+
if exit {
797+
break;
798+
}
800799
},
801800
SelectorOutput::F(exit) => {
802-
should_break = exit;
801+
if exit {
802+
break;
803+
}
803804
},
804805
}
806+
807+
// Check to see if we were interrupted on a mobile platform.
805808
let await_slow = if mobile_interruptable_platform {
806-
(|fut: &mut SleepFuture| {
807-
let mut waker = dummy_waker();
808-
let mut ctx = task::Context::from_waker(&mut waker);
809-
match core::pin::Pin::new(fut).poll(&mut ctx) {
810-
task::Poll::Ready(exit) => {
811-
should_break = exit;
812-
true
813-
},
814-
task::Poll::Pending => false,
815-
}
816-
})(&mut await_start.unwrap())
809+
match check_sleeper(&mut await_start.unwrap()) {
810+
Some(true) => break,
811+
Some(false) => true,
812+
None => false,
813+
}
817814
} else {
818815
false
819816
};
820-
if should_break {
821-
break;
822-
}
817+
818+
// Persist channel manager.
823819
if channel_manager.get_cm().get_and_clear_needs_persistence() {
824820
log_trace!(logger, "Persisting ChannelManager...");
825821
kv_store
@@ -832,71 +828,57 @@ where
832828
.await?;
833829
log_trace!(logger, "Done persisting ChannelManager.");
834830
}
835-
if (|fut: &mut SleepFuture| {
836-
let mut waker = dummy_waker();
837-
let mut ctx = task::Context::from_waker(&mut waker);
838-
match core::pin::Pin::new(fut).poll(&mut ctx) {
839-
task::Poll::Ready(exit) => {
840-
should_break = exit;
841-
true
842-
},
843-
task::Poll::Pending => false,
844-
}
845-
})(&mut last_freshness_call)
846-
{
847-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
848-
channel_manager.get_cm().timer_tick_occurred();
849-
last_freshness_call = sleeper(FRESHNESS_TIMER);
850-
}
851-
if (|fut: &mut SleepFuture| {
852-
let mut waker = dummy_waker();
853-
let mut ctx = task::Context::from_waker(&mut waker);
854-
match core::pin::Pin::new(fut).poll(&mut ctx) {
855-
task::Poll::Ready(exit) => {
856-
should_break = exit;
857-
true
858-
},
859-
task::Poll::Pending => false,
860-
}
861-
})(&mut last_onion_message_handler_call)
862-
{
863-
if let Some(om) = &onion_messenger {
864-
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
865-
om.get_om().timer_tick_occurred();
866-
}
867-
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
831+
832+
// Channel manager timer tick.
833+
match check_sleeper(&mut last_freshness_call) {
834+
Some(false) => {
835+
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
836+
channel_manager.get_cm().timer_tick_occurred();
837+
last_freshness_call = sleeper(FRESHNESS_TIMER);
838+
},
839+
Some(true) => break,
840+
None => {},
841+
}
842+
843+
// Onion messenger timer tick.
844+
match check_sleeper(&mut last_onion_message_handler_call) {
845+
Some(false) => {
846+
if let Some(om) = &onion_messenger {
847+
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
848+
om.get_om().timer_tick_occurred();
849+
}
850+
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
851+
},
852+
Some(true) => break,
853+
None => {},
868854
}
855+
856+
// Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers.
869857
if await_slow {
870858
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
871859
peer_manager.as_ref().disconnect_all_peers();
872860
last_ping_call = sleeper(PING_TIMER);
873-
} else if (|fut: &mut SleepFuture| {
874-
let mut waker = dummy_waker();
875-
let mut ctx = task::Context::from_waker(&mut waker);
876-
match core::pin::Pin::new(fut).poll(&mut ctx) {
877-
task::Poll::Ready(exit) => {
878-
should_break = exit;
879-
true
861+
} else {
862+
match check_sleeper(&mut last_ping_call) {
863+
Some(false) => {
864+
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
865+
peer_manager.as_ref().timer_tick_occurred();
866+
last_ping_call = sleeper(PING_TIMER);
880867
},
881-
task::Poll::Pending => false,
868+
Some(true) => break,
869+
_ => {},
882870
}
883-
})(&mut last_ping_call)
884-
{
885-
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
886-
peer_manager.as_ref().timer_tick_occurred();
887-
last_ping_call = sleeper(PING_TIMER);
888871
}
889-
let prune_timer_elapsed = (|fut: &mut SleepFuture| {
890-
let mut waker = dummy_waker();
891-
let mut ctx = task::Context::from_waker(&mut waker);
892-
match core::pin::Pin::new(fut).poll(&mut ctx) {
893-
task::Poll::Ready(exit) => {
894-
should_break = exit;
895-
true
896-
},
897-
task::Poll::Pending => false,
872+
873+
// Prune and persist the network graph if necessary.
874+
let prune_timer_elapsed = {
875+
match check_sleeper(&mut last_prune_call) {
876+
Some(false) => true,
877+
Some(true) => break,
878+
None => false,
898879
}
899-
})(&mut last_prune_call);
880+
};
881+
900882
let should_prune = match gossip_sync {
901883
GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
902884
_ => prune_timer_elapsed,
@@ -929,6 +911,8 @@ where
929911
if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
930912
last_prune_call = sleeper(prune_timer);
931913
}
914+
915+
// Decay and persist the scorer if necessary.
932916
if !have_decayed_scorer {
933917
if let Some(ref scorer) = scorer {
934918
if let Some(duration_since_epoch) = fetch_time() {
@@ -938,76 +922,59 @@ where
938922
}
939923
have_decayed_scorer = true;
940924
}
941-
if (|fut: &mut SleepFuture| {
942-
let mut waker = dummy_waker();
943-
let mut ctx = task::Context::from_waker(&mut waker);
944-
match core::pin::Pin::new(fut).poll(&mut ctx) {
945-
task::Poll::Ready(exit) => {
946-
should_break = exit;
947-
true
948-
},
949-
task::Poll::Pending => false,
950-
}
951-
})(&mut last_scorer_persist_call)
952-
{
953-
if let Some(ref scorer) = scorer {
954-
if let Some(duration_since_epoch) = fetch_time() {
955-
log_trace!(logger, "Calling time_passed and persisting scorer");
956-
scorer.write_lock().time_passed(duration_since_epoch);
957-
} else {
958-
log_trace!(logger, "Persisting scorer");
925+
match check_sleeper(&mut last_scorer_persist_call) {
926+
Some(false) => {
927+
if let Some(ref scorer) = scorer {
928+
if let Some(duration_since_epoch) = fetch_time() {
929+
log_trace!(logger, "Calling time_passed and persisting scorer");
930+
scorer.write_lock().time_passed(duration_since_epoch);
931+
} else {
932+
log_trace!(logger, "Persisting scorer");
933+
}
934+
if let Err(e) = kv_store
935+
.write(
936+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
937+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
938+
SCORER_PERSISTENCE_KEY,
939+
&scorer.encode(),
940+
)
941+
.await
942+
{
943+
log_error!(
944+
logger,
945+
"Error: Failed to persist scorer, check your disk and permissions {}",
946+
e
947+
);
948+
}
959949
}
960-
if let Err(e) = kv_store
961-
.write(
962-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
963-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
964-
SCORER_PERSISTENCE_KEY,
965-
&scorer.encode(),
966-
)
967-
.await
968-
{
969-
log_error!(
970-
logger,
971-
"Error: Failed to persist scorer, check your disk and permissions {}",
972-
e
973-
);
950+
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
951+
},
952+
Some(true) => break,
953+
None => {},
954+
}
955+
956+
// Rebroadcast pending claims.
957+
match check_sleeper(&mut last_rebroadcast_call) {
958+
Some(false) => {
959+
log_trace!(logger, "Rebroadcasting monitor's pending claims");
960+
chain_monitor.rebroadcast_pending_claims();
961+
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
962+
},
963+
Some(true) => break,
964+
None => {},
965+
}
966+
967+
// Sweeper regeneration and broadcast.
968+
match check_sleeper(&mut last_sweeper_call) {
969+
Some(false) => {
970+
log_trace!(logger, "Regenerating sweeper spends if necessary");
971+
if let Some(ref sweeper) = sweeper {
972+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
974973
}
975-
}
976-
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
977-
}
978-
if (|fut: &mut SleepFuture| {
979-
let mut waker = dummy_waker();
980-
let mut ctx = task::Context::from_waker(&mut waker);
981-
match core::pin::Pin::new(fut).poll(&mut ctx) {
982-
task::Poll::Ready(exit) => {
983-
should_break = exit;
984-
true
985-
},
986-
task::Poll::Pending => false,
987-
}
988-
})(&mut last_rebroadcast_call)
989-
{
990-
log_trace!(logger, "Rebroadcasting monitor's pending claims");
991-
chain_monitor.rebroadcast_pending_claims();
992-
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
993-
}
994-
if (|fut: &mut SleepFuture| {
995-
let mut waker = dummy_waker();
996-
let mut ctx = task::Context::from_waker(&mut waker);
997-
match core::pin::Pin::new(fut).poll(&mut ctx) {
998-
task::Poll::Ready(exit) => {
999-
should_break = exit;
1000-
true
1001-
},
1002-
task::Poll::Pending => false,
1003-
}
1004-
})(&mut last_sweeper_call)
1005-
{
1006-
log_trace!(logger, "Regenerating sweeper spends if necessary");
1007-
if let Some(ref sweeper) = sweeper {
1008-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1009-
}
1010-
last_sweeper_call = sleeper(SWEEPER_TIMER);
974+
last_sweeper_call = sleeper(SWEEPER_TIMER);
975+
},
976+
Some(true) => break,
977+
None => {},
1011978
}
1012979
}
1013980
log_trace!(logger, "Terminating background processor.");
@@ -1043,6 +1010,17 @@ where
10431010
Ok(())
10441011
}
10451012

1013+
fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
1014+
fut: &mut SleepFuture,
1015+
) -> Option<bool> {
1016+
let mut waker = dummy_waker();
1017+
let mut ctx = task::Context::from_waker(&mut waker);
1018+
match core::pin::Pin::new(fut).poll(&mut ctx) {
1019+
task::Poll::Ready(exit) => Some(exit),
1020+
task::Poll::Pending => None,
1021+
}
1022+
}
1023+
10461024
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
10471025
/// synchronous background persistence.
10481026
pub async fn process_events_async_with_kv_store_sync<

0 commit comments

Comments
 (0)