Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
//! task.

use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
QueuedAttestationBatch, QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock,
ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
Expand Down Expand Up @@ -491,6 +492,16 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
work: Work::ColumnReconstruction(process_fn),
}
}
ReadyWork::DelayedAttestationBatch(QueuedAttestationBatch {
process_batch,
attestations,
}) => Self {
drop_during_sync: true,
work: Work::DelayedAttestationBatch {
attestations,
process_batch,
},
},
}
}
}
Expand Down Expand Up @@ -560,6 +571,10 @@ pub enum Work<E: EthSpec> {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
},
DelayedAttestationBatch {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
},
GossipAggregate {
aggregate: Box<GossipAggregatePackage<E>>,
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
Expand Down Expand Up @@ -634,6 +649,7 @@ pub enum WorkType {
GossipAttestationToConvert,
UnknownBlockAttestation,
GossipAttestationBatch,
DelayedAttestationBatch,
GossipAggregate,
UnknownBlockAggregate,
UnknownLightClientOptimisticUpdate,
Expand Down Expand Up @@ -683,6 +699,7 @@ impl<E: EthSpec> Work<E> {
match self {
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
Work::DelayedAttestationBatch { .. } => WorkType::DelayedAttestationBatch,
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
Work::GossipBlock(_) => WorkType::GossipBlock,
Expand Down Expand Up @@ -1305,6 +1322,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
_ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),

// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Work::GossipAttestationBatch { .. } => crit!(
Expand All @@ -1320,6 +1338,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
"Unsupported inbound event"
)
}
Work::DelayedAttestationBatch { .. } => {}
Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id),
Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id),
Work::GossipDataColumnSidecar { .. } => {
Expand Down Expand Up @@ -1407,6 +1426,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::DelayedAttestationBatch => 0,
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
Expand Down Expand Up @@ -1571,6 +1591,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
} => task_spawner.spawn_blocking(move || {
process_batch(aggregates);
}),
Work::DelayedAttestationBatch {
attestations,
process_batch,
} => task_spawner.spawn_blocking(move || {
process_batch(attestations);
}),
Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
process_fn.await;
}),
Expand Down
150 changes: 150 additions & 0 deletions beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! block will be re-queued until their block is imported, or until they expire.
use crate::metrics;
use crate::{AsyncFn, BlockingFn, Work, WorkEvent};
use crate::{GossipAttestationBatch, GossipAttestationPackage, SingleAttestation};
use fnv::FnvHashMap;
use futures::task::Poll;
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -48,6 +49,9 @@ pub const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
/// For how long to queue aggregated and unaggregated attestations for re-processing.
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);

/// Batched attestation delay.
pub const QUEUED_BATCH_ATTESTATION_DELAY: Duration = Duration::from_millis(50);

/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);

Expand All @@ -60,6 +64,9 @@ pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue delayed column reconstruction.
pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);

/// Maximum attestation for batches during batch processing.
pub const MAXIMUM_BATCHED_ATTESTATIONS: usize = 1_024;

/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
Expand Down Expand Up @@ -115,6 +122,8 @@ pub enum ReprocessQueueMessage {
BackfillSync(QueuedBackfillBatch),
/// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction),
/// A delayed attestation which will be batched for optimization.
BatchedAttestation(QueuedBatchedAttestation),
}

/// Events sent by the scheduler once they are ready for re-processing.
Expand All @@ -127,6 +136,7 @@ pub enum ReadyWork {
LightClientUpdate(QueuedLightClientUpdate),
BackfillSync(QueuedBackfillBatch),
ColumnReconstruction(QueuedColumnReconstruction),
DelayedAttestationBatch(QueuedAttestationBatch),
}

/// An Attestation for which the corresponding block was not seen while processing, queued for
Expand Down Expand Up @@ -173,6 +183,18 @@ pub struct IgnoredRpcBlock {
pub process_fn: BlockingFn,
}

pub struct QueuedBatchedAttestation {
pub attestation: Box<GossipAttestationPackage<SingleAttestation>>,
pub process_individual:
Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
pub process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
}

pub struct QueuedAttestationBatch {
pub attestations: GossipAttestationBatch,
pub process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
}

/// A backfill batch work that has been queued for processing later.
pub struct QueuedBackfillBatch(pub BlockingFn);

Expand Down Expand Up @@ -220,6 +242,8 @@ enum InboundEvent {
ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// An attestation batched is now ready for processing.
ReadyBatchedAttestation(QueuedAttestationId),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
Expand All @@ -242,6 +266,8 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
/// Queue for batched attestation with a delay
batched_attestation_queue: DelayQueue<QueuedAttestationId>,

/* Queued items */
/// Queued blocks.
Expand All @@ -250,6 +276,8 @@ struct ReprocessQueue<S> {
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
/// Queued attestations.
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate, DelayKey)>,
/// Queued batch attestations.
queued_batch_attestations: FnvHashMap<usize, (Vec<QueuedBatchedAttestation>, DelayKey)>,
/// Attestations (aggregated and unaggregated) per root.
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
/// Queued Light Client Updates.
Expand All @@ -264,6 +292,7 @@ struct ReprocessQueue<S> {
/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize,
current_attestation_batch: usize,
next_lc_update: usize,
early_block_debounce: TimeLatch,
rpc_block_debounce: TimeLatch,
Expand All @@ -279,6 +308,7 @@ pub type QueuedLightClientUpdateId = usize;
enum QueuedAttestationId {
Aggregate(usize),
Unaggregate(usize),
Batched(usize),
}

impl QueuedAggregate {
Expand Down Expand Up @@ -335,6 +365,17 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}

match self.batched_attestation_queue.poll_expired(cx) {
Poll::Ready(Some(attestation_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyBatchedAttestation(
attestation_id.into_inner(),
)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}

match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(lc_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
Expand Down Expand Up @@ -420,17 +461,20 @@ impl<S: SlotClock> ReprocessQueue<S> {
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
batched_attestation_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
queued_batch_attestations: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
next_attestation: 0,
current_attestation_batch: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
rpc_block_debounce: TimeLatch::default(),
Expand Down Expand Up @@ -670,6 +714,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
.map(|(unaggregate, delay_key)| {
(ReadyWork::Unaggregate(unaggregate), delay_key)
}),
QueuedAttestationId::Batched(_) => {
error!("this should never occur");
None
}
} {
// Remove the delay.
self.attestations_delay_queue.remove(&delay_key);
Expand Down Expand Up @@ -784,6 +832,68 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
InboundEvent::Msg(BatchedAttestation(queued_batch_attestation)) => {
let batch_processing_delay = QUEUED_BATCH_ATTESTATION_DELAY;

let mut time_to_next_batch = 0;

if let Some(batched_queue) = self
.queued_batch_attestations
.get_mut(&self.current_attestation_batch)
{
if batched_queue.0.len() >= MAXIMUM_BATCHED_ATTESTATIONS {
self.current_attestation_batch += 1;
if let Some(current_slot_time) =
self.slot_clock.millis_from_current_slot_start()
{
let slot_time = current_slot_time.as_millis() as usize;
let total_slot_duration =
self.slot_clock.slot_duration().as_millis() as usize;

time_to_next_batch = (0..=total_slot_duration)
.step_by(batch_processing_delay.as_millis() as usize)
.find(|&t| t > slot_time)
.map_or(0, |t| t - slot_time);
}

let delay_key = self.batched_attestation_queue.insert(
QueuedAttestationId::Batched(self.current_attestation_batch),
Duration::from_millis(time_to_next_batch as u64),
);

self.queued_batch_attestations.insert(
self.current_attestation_batch,
(vec![queued_batch_attestation], delay_key),
);
} else {
batched_queue.0.push(queued_batch_attestation);
}
} else {
self.current_attestation_batch += 1;
if let Some(current_slot_time) =
self.slot_clock.millis_from_current_slot_start()
{
let slot_time = current_slot_time.as_millis() as usize;
let total_slot_duration =
self.slot_clock.slot_duration().as_millis() as usize;

time_to_next_batch = (0..=total_slot_duration)
.step_by(batch_processing_delay.as_millis() as usize)
.find(|&t| t > slot_time)
.map_or(0, |t| t - slot_time);
}

let delay_key = self.batched_attestation_queue.insert(
QueuedAttestationId::Batched(self.current_attestation_batch),
Duration::from_millis(time_to_next_batch as u64),
);

self.queued_batch_attestations.insert(
self.current_attestation_batch,
(vec![queued_batch_attestation], delay_key),
);
}
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.beacon_block_root;
Expand Down Expand Up @@ -827,6 +937,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
ReadyWork::Unaggregate(unaggregate),
)
}),
QueuedAttestationId::Batched(_) => {
error!("batched attestation ID reached ReadyAttestation handler");
None
}
} {
if self.ready_work_tx.try_send(work).is_err() {
error!(
Expand All @@ -852,6 +966,42 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
InboundEvent::ReadyBatchedAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
);

let QueuedAttestationId::Batched(batch_id) = queued_id else {
crit!("Invalid attestation Id batched for attestation");
return;
};

if let Some(batch_attestation) = self.queued_batch_attestations.remove(&batch_id) {
let mut attestations = GossipAttestationBatch::new();
let mut iter = batch_attestation.0.into_iter();

if let Some(first) = iter.next() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easier to read to do if !is_empty() { let first = batch_attestation.first().expect("not empty") }?

attestations.push(*first.attestation);
let process_batch = first.process_batch;

for unaggregate in iter {
attestations.push(*unaggregate.attestation);
}

if self
.ready_work_tx
.try_send(ReadyWork::DelayedAttestationBatch(QueuedAttestationBatch {
attestations,
process_batch,
}))
.is_err()
{
error!("Failed to send batched attestations");
}
}
}
}

InboundEvent::ReadyLightClientUpdate(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES,
Expand Down
Loading
Loading