-
Notifications
You must be signed in to change notification settings - Fork 418
Remove background processor macro and parallelize persistence #3968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👋 Thanks for assigning @TheBlueMatt as a reviewer! |
None, | ||
)); | ||
|
||
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If desired, I can split this commit into multiple smaller clean up commits.
ddd5249
to
64c502c
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3968 +/- ##
==========================================
- Coverage 88.93% 88.89% -0.05%
==========================================
Files 174 174
Lines 123880 124232 +352
Branches 123880 124232 +352
==========================================
+ Hits 110176 110433 +257
- Misses 11251 11319 +68
- Partials 2453 2480 +27
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
1364139
to
b68a228
Compare
@@ -674,7 +682,7 @@ where | |||
PM::Target: APeerManager, | |||
LM::Target: ALiquidityManager, | |||
O::Target: 'static + OutputSpender, | |||
D::Target: 'static + ChangeDestinationSource, | |||
D::Target: 'static + ChangeDestinationSource + MaybeSync, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now all of a sudden needed for rust 1.63.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bleh, in order to use MultiResultFuturePoller
we end up having to Box
everything (because the futures need to be the same type, forcing dyn indirection), and then we have to expand our type bounds (because the compiler can't see through the Future
passed to MultiResultFuturePoller
to tell if its send/sync or not). I'm not convinced its worth it, we can drop all of that with a simple poller, see https://git.bitcoin.ninja/?p=rust-lightning;a=shortlog;h=refs/heads/2025-07-3968-poller-demo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I indeed noticed too that the futures always are of different types and need to go through dyn. The code that you show there is indeed better, will use that.
Only expansion, clean ups in the next commit to facilitate review.
b68a228
to
df282cb
Compare
(false, false) => FASTEST_TIMER, | ||
}; | ||
// Channel manager timer tick. | ||
match check_sleeper(&mut last_freshness_call) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're splitting between async and sync anyway, it would be nice to do the new-sleeper init in check_sleeper
- if check_sleeper
polls Ready
, we aren't allowed to poll
again (per the Future
API contract) so need to be careful to ensure we always create a new sweeper. It would be easier to just do it in check_sleeper
rather than being diligent in review here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it with
fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
fut: &mut SleepFuture, new_sleeper: impl Fn() -> SleepFuture,
) -> Option<bool> {
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
task::Poll::Ready(exit) => {
*fut = new_sleeper();
Some(exit)
},
task::Poll::Pending => None,
}
}
But that ran into a complication with the network graph. There the new interval is based on whether a prune has already happened. Can probably be refactored to make it work, but perhaps it is also ok to leave this pre-existing weakness for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seems worth fixing. We can just pass NETWORK_PRUNE_TIMER
for the network graph prune and in the case where we need to prune but prunable_network_graph
returns None
we can override it with FIRST_NETWORK_PRUNE_TIMER
. That's mostly for the "RGS sync but RGS hasn't finished yet" case anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do this in a direct follow up to minimize the rebase pain of the PR with macro expansion and code move.
@@ -674,7 +682,7 @@ where | |||
PM::Target: APeerManager, | |||
LM::Target: ALiquidityManager, | |||
O::Target: 'static + OutputSpender, | |||
D::Target: 'static + ChangeDestinationSource, | |||
D::Target: 'static + ChangeDestinationSource + MaybeSync, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bleh, in order to use MultiResultFuturePoller
we end up having to Box
everything (because the futures need to be the same type, forcing dyn indirection), and then we have to expand our type bounds (because the compiler can't see through the Future
passed to MultiResultFuturePoller
to tell if its send/sync or not). I'm not convinced its worth it, we can drop all of that with a simple poller, see https://git.bitcoin.ninja/?p=rust-lightning;a=shortlog;h=refs/heads/2025-07-3968-poller-demo
👋 The first review has been submitted! Do you think this PR is ready for a second reviewer? If so, click here to assign a second reviewer. |
6c18934
to
80f68ce
Compare
@@ -15,27 +15,22 @@ use core::marker::Unpin; | |||
use core::pin::Pin; | |||
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; | |||
|
|||
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> { | |||
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer using MultiResultFuturePoller
, but left the commit in as a general clean up.
80f68ce
to
b55f977
Compare
(false, false) => FASTEST_TIMER, | ||
}; | ||
// Channel manager timer tick. | ||
match check_sleeper(&mut last_freshness_call) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seems worth fixing. We can just pass NETWORK_PRUNE_TIMER
for the network graph prune and in the case where we need to prune but prunable_network_graph
returns None
we can override it with FIRST_NETWORK_PRUNE_TIMER
. That's mostly for the "RGS sync but RGS hasn't finished yet" case anyway.
Macros can only be expanded recursively, so the log macros needed to be brought back. Also some unnecessary parenthesis, curly braces and unused arguments/code removed.
Prepare for parallelization.
Co-authored-by: Matt Corallo <[email protected]>
b55f977
to
d9a6641
Compare
log_trace!(logger, "Done persisting ChannelManager."); | ||
} | ||
|
||
// Note that we want to run a graph prune once not long after startup before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is now in the wrong spot. It was on the assignment of what the timer's time is, and now its on the sleep itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm yes, moved. For the async macro invocation, it was always at the wrong location, because async sets the timer further down. It just wasn't so easy to spot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And moved back after the check_sleeper extension...
let fastest_timeout = batch_delay.min(Duration::from_millis(100)); | ||
sleeper.wait_timeout(fastest_timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow all the await_slow
logic got dropped from the sync BP and now only appears in the async one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, TIL we only do that on the async BP...weird...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that surfaced indeed... Not sure if it was intentional originally, or should be mirrored for sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed it offline and we can swap the way the check_sleeper
method works in a followup to avoid having to rebase this. Gonna go ahead and land.
let fastest_timeout = batch_delay.min(Duration::from_millis(100)); | ||
sleeper.wait_timeout(fastest_timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, TIL we only do that on the async BP...weird...
This is straightforward code translation. There's some risk in the reordering, but its relatively low so just gonna land with one review. I might do a followup to do an initial poll of the CM persist future before we do the graph prune/scorer time step so that it has a chance to get going before we do some CPU-intensive tasks. |
Follow up: #3978 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Post-merge ACK. Also confirmed that this/current main works with LDK Node.
@@ -761,7 +761,6 @@ where | |||
last_forwards_processing_call = sleeper(cur_batch_delay); | |||
} | |||
if should_break { | |||
log_trace!(logger, "Terminating background processor."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think I liked the previous behavior (logging this when initiating the shutdown process) a bit better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it changed. We are still logging before we do the final persist round. The only difference is logging before or after the break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another post-merge ACK. I had started looking at it a bit before merging but very much in favor of this 👍 and less macros where feasible.
if last_freshness_call.elapsed() > FRESHNESS_TIMER { | ||
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); | ||
channel_manager.get_cm().timer_tick_occurred(); | ||
last_freshness_call = Instant::now(); | ||
} | ||
if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER { | ||
if let Some(om) = &onion_messenger { | ||
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); | ||
om.get_om().timer_tick_occurred(); | ||
} | ||
last_onion_message_handler_call = Instant::now(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this reordering in the sync one needed or done to match the async one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done to match async (#3968 (comment))
log_error!(logger, | ||
"Error: Failed to persist scorer, check your disk and permissions {}", | ||
e, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be a bit oddly formatted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm indeed. Rustfmt is unpredictable inside macros.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added fix to #3978
The
define_run_body!
macro appears to have reached a complexity threshold where further extensions would add more complexity than benefit. At this point, it may be more practical to accept some duplication between the sync and async variants.This PR expands the macro, cleans up the code and implements an initial step towards parallelization by running scorer, graph, sweeper and channelmanager persistence simultaneously.