Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
//! task.

use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
QueuedAttestationBatch, QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock,
ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
Expand Down Expand Up @@ -69,8 +70,8 @@ use types::{
};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
spawn_reprocess_scheduler,
MAXIMUM_BATCHED_ATTESTATIONS, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork, spawn_reprocess_scheduler,
};

mod metrics;
Expand Down Expand Up @@ -121,6 +122,7 @@ pub struct BeaconProcessorQueueLengths {
gossip_block_queue: usize,
gossip_blob_queue: usize,
gossip_data_column_queue: usize,
batched_attestation: usize,
delayed_block_queue: usize,
status_queue: usize,
bbrange_queue: usize,
Expand Down Expand Up @@ -167,14 +169,18 @@ impl BeaconProcessorQueueLengths {
),
// Capacity for a full slot's worth of attestations if subscribed to all subnets
unknown_block_attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
(active_validator_count / slots_per_epoch) / MAXIMUM_BATCHED_ATTESTATIONS,
MIN_QUEUE_LEN,
),
sync_message_queue: 2048,
sync_contribution_queue: 1024,
gossip_voluntary_exit_queue: 4096,
gossip_proposer_slashing_queue: 4096,
gossip_attester_slashing_queue: 4096,
batched_attestation: std::cmp::max(
active_validator_count / slots_per_epoch,
MIN_QUEUE_LEN,
),
unknown_light_client_update_queue: 128,
rpc_block_queue: 1024,
rpc_blob_queue: 1024,
Expand Down Expand Up @@ -491,6 +497,16 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
work: Work::ColumnReconstruction(process_fn),
}
}
ReadyWork::DelayedAttestationBatch(QueuedAttestationBatch {
process_batch,
attestations,
}) => Self {
drop_during_sync: true,
work: Work::DelayedAttestationBatch {
attestations,
process_batch,
},
},
}
}
}
Expand Down Expand Up @@ -560,6 +576,10 @@ pub enum Work<E: EthSpec> {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
},
DelayedAttestationBatch {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
},
GossipAggregate {
aggregate: Box<GossipAggregatePackage<E>>,
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
Expand Down Expand Up @@ -634,6 +654,7 @@ pub enum WorkType {
GossipAttestationToConvert,
UnknownBlockAttestation,
GossipAttestationBatch,
DelayedAttestationBatch,
GossipAggregate,
UnknownBlockAggregate,
UnknownLightClientOptimisticUpdate,
Expand Down Expand Up @@ -683,6 +704,7 @@ impl<E: EthSpec> Work<E> {
match self {
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
Work::DelayedAttestationBatch { .. } => WorkType::DelayedAttestationBatch,
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
Work::GossipBlock(_) => WorkType::GossipBlock,
Expand Down Expand Up @@ -874,6 +896,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);
let mut batched_attestation_queue = FifoQueue::new(queue_lengths.batched_attestation);

let mut status_queue = FifoQueue::new(queue_lengths.status_queue);
let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue);
Expand Down Expand Up @@ -1051,6 +1074,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
Some(item)
} else if let Some(item) = batched_attestation_queue.pop() {
Some(item)
} else if let Some(item) = gossip_blob_queue.pop() {
Some(item)
} else if let Some(item) = gossip_data_column_queue.pop() {
Expand Down Expand Up @@ -1305,6 +1330,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
_ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),

// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Work::GossipAttestationBatch { .. } => crit!(
Expand All @@ -1320,6 +1346,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
"Unsupported inbound event"
)
}
Work::DelayedAttestationBatch { .. } => {
batched_attestation_queue.push(work, work_id)
}

Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id),
Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id),
Work::GossipDataColumnSidecar { .. } => {
Expand Down Expand Up @@ -1407,6 +1437,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::DelayedAttestationBatch => 0,
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
Expand Down Expand Up @@ -1571,6 +1602,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
} => task_spawner.spawn_blocking(move || {
process_batch(aggregates);
}),
Work::DelayedAttestationBatch {
attestations,
process_batch,
} => task_spawner.spawn_blocking(move || {
process_batch(attestations);
}),
Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
process_fn.await;
}),
Expand Down
Loading