Skip to content

Commit 587c91f

Browse files
authored
Optimize Sender Recovery Process (paradigmxyz#11385)
1 parent 9c8f5d8 commit 587c91f

File tree

1 file changed

+86
-60
lines changed

1 file changed

+86
-60
lines changed

crates/stages/stages/src/stages/sender_recovery.rs

Lines changed: 86 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ const BATCH_SIZE: usize = 100_000;
2929
/// Maximum number of senders to recover per rayon worker job.
3030
const WORKER_CHUNK_SIZE: usize = 100;
3131

32+
/// Type alias for a sender that transmits the result of sender recovery.
33+
type RecoveryResultSender = mpsc::Sender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;
34+
3235
/// The sender recovery stage iterates over existing transactions,
3336
/// recovers the transaction signer and stores them
3437
/// in [`TransactionSenders`][reth_db::tables::TransactionSenders] table.
@@ -100,8 +103,10 @@ where
100103
.map(|start| start..std::cmp::min(start + BATCH_SIZE as u64, tx_range.end))
101104
.collect::<Vec<Range<u64>>>();
102105

106+
let tx_batch_sender = setup_range_recovery(provider);
107+
103108
for range in batch {
104-
recover_range(range, provider, &mut senders_cursor)?;
109+
recover_range(range, provider, tx_batch_sender.clone(), &mut senders_cursor)?;
105110
}
106111

107112
Ok(ExecOutput {
@@ -136,15 +141,16 @@ where
136141
fn recover_range<Provider, CURSOR>(
137142
tx_range: Range<u64>,
138143
provider: &Provider,
144+
tx_batch_sender: mpsc::Sender<Vec<(Range<u64>, RecoveryResultSender)>>,
139145
senders_cursor: &mut CURSOR,
140146
) -> Result<(), StageError>
141147
where
142148
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
143149
CURSOR: DbCursorRW<tables::TransactionSenders>,
144150
{
145-
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders batch");
151+
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");
146152

147-
// Preallocate channels
153+
// Preallocate channels for each chunks in the batch
148154
let (chunks, receivers): (Vec<_>, Vec<_>) = tx_range
149155
.clone()
150156
.step_by(WORKER_CHUNK_SIZE)
@@ -156,62 +162,9 @@ where
156162
})
157163
.unzip();
158164

159-
let static_file_provider = provider.static_file_provider();
160-
161-
// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
162-
// there will be a timeout grace period in which Tokio does not allow spawning
163-
// additional blocking tasks. This would cause this function to return
164-
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
165-
//
166-
// However, using `std::thread::spawn` allows us to utilize the timeout grace
167-
// period to complete some work without throwing errors during the shutdown.
168-
std::thread::spawn(move || {
169-
for (chunk_range, recovered_senders_tx) in chunks {
170-
// Read the raw value, and let the rayon worker to decompress & decode.
171-
let chunk = match static_file_provider.fetch_range_with_predicate(
172-
StaticFileSegment::Transactions,
173-
chunk_range.clone(),
174-
|cursor, number| {
175-
Ok(cursor
176-
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
177-
number.into(),
178-
)?
179-
.map(|tx| (number, tx)))
180-
},
181-
|_| true,
182-
) {
183-
Ok(chunk) => chunk,
184-
Err(err) => {
185-
// We exit early since we could not process this chunk.
186-
let _ = recovered_senders_tx
187-
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
188-
break
189-
}
190-
};
191-
192-
// Spawn the task onto the global rayon pool
193-
// This task will send the results through the channel after it has read the transaction
194-
// and calculated the sender.
195-
rayon::spawn(move || {
196-
let mut rlp_buf = Vec::with_capacity(128);
197-
for (number, tx) in chunk {
198-
let res = tx
199-
.value()
200-
.map_err(|err| Box::new(SenderRecoveryStageError::StageError(err.into())))
201-
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));
202-
203-
let is_err = res.is_err();
204-
205-
let _ = recovered_senders_tx.send(res);
206-
207-
// Finish early
208-
if is_err {
209-
break
210-
}
211-
}
212-
});
213-
}
214-
});
165+
if let Some(err) = tx_batch_sender.send(chunks).err() {
166+
return Err(StageError::Fatal(err.into()));
167+
}
215168

216169
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Appending recovered senders to the database");
217170

@@ -235,6 +188,7 @@ where
235188
provider.sealed_header(block_number)?.ok_or_else(|| {
236189
ProviderError::HeaderNotFound(block_number.into())
237190
})?;
191+
238192
Err(StageError::Block {
239193
block: Box::new(sealed_header),
240194
error: BlockErrorKind::Validation(
@@ -269,10 +223,82 @@ where
269223
.into(),
270224
));
271225
}
272-
273226
Ok(())
274227
}
275228

229+
/// Spawns a thread to handle the recovery of transaction senders for
230+
/// specified chunks of a given batch. It processes incoming ranges, fetching and recovering
231+
/// transactions in parallel using global rayon pool
232+
fn setup_range_recovery<Provider>(
233+
provider: &Provider,
234+
) -> mpsc::Sender<Vec<(Range<u64>, RecoveryResultSender)>>
235+
where
236+
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
237+
{
238+
let (tx_sender, tx_receiver) = mpsc::channel::<Vec<(Range<u64>, RecoveryResultSender)>>();
239+
let static_file_provider = provider.static_file_provider();
240+
241+
// We do not use `tokio::task::spawn_blocking` because, during a shutdown,
242+
// there will be a timeout grace period in which Tokio does not allow spawning
243+
// additional blocking tasks. This would cause this function to return
244+
// `SenderRecoveryStageError::RecoveredSendersMismatch` at the end.
245+
//
246+
// However, using `std::thread::spawn` allows us to utilize the timeout grace
247+
// period to complete some work without throwing errors during the shutdown.
248+
std::thread::spawn(move || {
249+
while let Ok(chunks) = tx_receiver.recv() {
250+
for (chunk_range, recovered_senders_tx) in chunks {
251+
// Read the raw value, and let the rayon worker to decompress & decode.
252+
let chunk = match static_file_provider.fetch_range_with_predicate(
253+
StaticFileSegment::Transactions,
254+
chunk_range.clone(),
255+
|cursor, number| {
256+
Ok(cursor
257+
.get_one::<TransactionMask<RawValue<TransactionSignedNoHash>>>(
258+
number.into(),
259+
)?
260+
.map(|tx| (number, tx)))
261+
},
262+
|_| true,
263+
) {
264+
Ok(chunk) => chunk,
265+
Err(err) => {
266+
// We exit early since we could not process this chunk.
267+
let _ = recovered_senders_tx
268+
.send(Err(Box::new(SenderRecoveryStageError::StageError(err.into()))));
269+
break
270+
}
271+
};
272+
273+
// Spawn the task onto the global rayon pool
274+
// This task will send the results through the channel after it has read the
275+
// transaction and calculated the sender.
276+
rayon::spawn(move || {
277+
let mut rlp_buf = Vec::with_capacity(128);
278+
for (number, tx) in chunk {
279+
let res = tx
280+
.value()
281+
.map_err(|err| {
282+
Box::new(SenderRecoveryStageError::StageError(err.into()))
283+
})
284+
.and_then(|tx| recover_sender((number, tx), &mut rlp_buf));
285+
286+
let is_err = res.is_err();
287+
288+
let _ = recovered_senders_tx.send(res);
289+
290+
// Finish early
291+
if is_err {
292+
break
293+
}
294+
}
295+
});
296+
}
297+
}
298+
});
299+
tx_sender
300+
}
301+
276302
#[inline]
277303
fn recover_sender(
278304
(tx_id, tx): (TxNumber, TransactionSignedNoHash),

0 commit comments

Comments
 (0)