Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! block will be re-queued until their block is imported, or until they expire.
use crate::metrics;
use crate::{AsyncFn, BlockingFn, Work, WorkEvent};
use crate::{GossipAttestationBatch, GossipAttestationPackage, SingleAttestation};
use fnv::FnvHashMap;
use futures::task::Poll;
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -48,6 +49,9 @@ pub const ADDITIONAL_QUEUED_BLOCK_DELAY: Duration = Duration::from_millis(5);
/// For how long to queue aggregated and unaggregated attestations for re-processing.
pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);

/// Batched attestation delay.
pub const QUEUED_BATCH_ATTESTATION_DELAY: Duration = Duration::from_millis(50);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be a flag, and if set to 0 disable this mechanism. In case we observe propagation degradation have a way to turn it off. And upper bound the flag value to a reasonable value.


/// For how long to queue light client updates for re-processing.
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);

Expand All @@ -60,6 +64,9 @@ pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12);
/// For how long to queue delayed column reconstruction.
pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150);

/// Maximum attestation for batches during batch processing.
pub const MAXIMUM_BATCHED_ATTESTATIONS: usize = 1_024;

/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
/// it's nice to have extra protection.
Expand Down Expand Up @@ -115,6 +122,8 @@ pub enum ReprocessQueueMessage {
BackfillSync(QueuedBackfillBatch),
/// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction),
/// A delayed attestation which will be batched for optimization.
BatchedAttestation(QueuedBatchedAttestation),
}

/// Events sent by the scheduler once they are ready for re-processing.
Expand Down Expand Up @@ -173,6 +182,13 @@ pub struct IgnoredRpcBlock {
pub process_fn: BlockingFn,
}

pub struct QueuedBatchedAttestation {
pub attestation: Box<GossipAttestationPackage<SingleAttestation>>,
pub process_individual:
Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
pub process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
}

/// A backfill batch work that has been queued for processing later.
pub struct QueuedBackfillBatch(pub BlockingFn);

Expand Down Expand Up @@ -220,6 +236,8 @@ enum InboundEvent {
ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// An attestation batched is now ready for processing.
ReadyBatchedAttestation(QueuedAttestationId),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
Expand All @@ -242,6 +260,8 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
/// Queue for batched attestation with a delay
batched_attestation_queue: DelayQueue<QueuedAttestationId>,

/* Queued items */
/// Queued blocks.
Expand All @@ -250,6 +270,8 @@ struct ReprocessQueue<S> {
queued_aggregates: FnvHashMap<usize, (QueuedAggregate, DelayKey)>,
/// Queued attestations.
queued_unaggregates: FnvHashMap<usize, (QueuedUnaggregate, DelayKey)>,
/// Queued batch attestations.
queued_batch_attestations: FnvHashMap<usize, (QueuedBatchedAttestation, DelayKey)>,
/// Attestations (aggregated and unaggregated) per root.
awaiting_attestations_per_root: HashMap<Hash256, Vec<QueuedAttestationId>>,
/// Queued Light Client Updates.
Expand Down Expand Up @@ -279,6 +301,7 @@ pub type QueuedLightClientUpdateId = usize;
enum QueuedAttestationId {
Aggregate(usize),
Unaggregate(usize),
Batched(usize),
}

impl QueuedAggregate {
Expand Down Expand Up @@ -335,6 +358,17 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}

match self.batched_attestation_queue.poll_expired(cx) {
Poll::Ready(Some(attestation_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyBatchedAttestation(
attestation_id.into_inner(),
)));
}
// `Poll::Ready(None)` means that there are no more entries in the delay queue and we
// will continue to get this result until something else is added into the queue.
Poll::Ready(None) | Poll::Pending => (),
}

match self.lc_updates_delay_queue.poll_expired(cx) {
Poll::Ready(Some(lc_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyLightClientUpdate(
Expand Down Expand Up @@ -420,12 +454,14 @@ impl<S: SlotClock> ReprocessQueue<S> {
gossip_block_delay_queue: DelayQueue::new(),
rpc_block_delay_queue: DelayQueue::new(),
attestations_delay_queue: DelayQueue::new(),
batched_attestation_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
queued_lc_updates: FnvHashMap::default(),
queued_aggregates: FnvHashMap::default(),
queued_unaggregates: FnvHashMap::default(),
queued_batch_attestations: FnvHashMap::default(),
awaiting_attestations_per_root: HashMap::new(),
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
Expand Down Expand Up @@ -670,6 +706,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
.map(|(unaggregate, delay_key)| {
(ReadyWork::Unaggregate(unaggregate), delay_key)
}),
QueuedAttestationId::Batched(_) => {
error!("this should never occur");
None
}
} {
// Remove the delay.
self.attestations_delay_queue.remove(&delay_key);
Expand Down Expand Up @@ -784,6 +824,23 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
InboundEvent::Msg(BatchedAttestation(queued_batch_attestation)) => {
let mut batch_processing_delay = QUEUED_BATCH_ATTESTATION_DELAY;
if self.batched_attestation_queue.len() >= MAXIMUM_BATCHED_ATTESTATIONS {
batch_processing_delay = Duration::from_secs(0);
}

let att_id = QueuedAttestationId::Batched(self.next_attestation);

let delay_key = self
.batched_attestation_queue
.insert(att_id, batch_processing_delay);

self.queued_batch_attestations
.insert(self.next_attestation, (queued_batch_attestation, delay_key));

self.next_attestation += 1;
}
// A block that was queued for later processing is now ready to be processed.
InboundEvent::ReadyGossipBlock(ready_block) => {
let block_root = ready_block.beacon_block_root;
Expand Down Expand Up @@ -827,6 +884,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
ReadyWork::Unaggregate(unaggregate),
)
}),
QueuedAttestationId::Batched(_) => {
error!("batched attestation ID reached ReadyAttestation handler");
None
}
} {
if self.ready_work_tx.try_send(work).is_err() {
error!(
Expand All @@ -852,6 +913,26 @@ impl<S: SlotClock> ReprocessQueue<S> {
}
}
}
InboundEvent::ReadyBatchedAttestation(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_ATTESTATIONS,
);

if let Some((batch_attestation, _delay_key)) =
self.queued_batch_attestations.remove(&match queued_id {
QueuedAttestationId::Batched(id) => id,
_ => {
error!("Invalid attestation ID for batched attestation");
return;
}
})
{
// Call the process_batch closure with the single attestation as a batch
// since the block was never imported and we're expiring the attestation.
(batch_attestation.process_batch)(vec![*batch_attestation.attestation]);
}
}

InboundEvent::ReadyLightClientUpdate(queued_id) => {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_EXPIRED_OPTIMISTIC_UPDATES,
Expand Down
31 changes: 17 additions & 14 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes,
use beacon_processor::{
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
WorkEvent as BeaconWorkEvent,
work_reprocessing_queue::{QueuedBatchedAttestation, ReprocessQueueMessage},
};
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::methods::{
Expand Down Expand Up @@ -100,20 +101,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let process_batch =
move |attestations| processor.process_gossip_attestation_batch(attestations, true);

self.try_send(BeaconWorkEvent {
drop_during_sync: true,
work: Work::GossipAttestation {
attestation: Box::new(GossipAttestationPackage {
message_id,
peer_id,
attestation: Box::new(attestation),
subnet_id,
should_import,
seen_timestamp,
}),
process_individual: Box::new(process_individual),
process_batch: Box::new(process_batch),
},
self.beacon_processor_send.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::BatchedAttestation(
QueuedBatchedAttestation {
attestation: Box::new(GossipAttestationPackage {
message_id,
peer_id,
attestation: Box::new(attestation),
subnet_id,
should_import,
seen_timestamp,
}),
process_individual: Box::new(process_individual),
process_batch: Box::new(process_batch),
},
)),
})
}

Expand Down
Loading