Skip to content

Commit 32b0fb1

Browse files
authored
Merge pull request #3912 from realbigsean/sync-error-handling
Sync error handling
2 parents 9fde813 + 5e8d798 commit 32b0fb1

File tree

7 files changed

+358
-95
lines changed

7 files changed

+358
-95
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 78 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -956,23 +956,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
956956
pub async fn get_block_and_blobs_checking_early_attester_cache(
957957
&self,
958958
block_root: &Hash256,
959-
) -> Result<
960-
(
961-
Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
962-
Option<Arc<BlobsSidecar<T::EthSpec>>>,
963-
),
964-
Error,
965-
> {
966-
if let (Some(block), Some(blobs)) = (
967-
self.early_attester_cache.get_block(*block_root),
968-
self.early_attester_cache.get_blobs(*block_root),
969-
) {
970-
return Ok((Some(block), Some(blobs)));
959+
) -> Result<Option<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>, Error> {
960+
// If there is no data availability boundary, the Eip4844 fork is disabled.
961+
if let Some(finalized_data_availability_boundary) =
962+
self.finalized_data_availability_boundary()
963+
{
964+
// Only use the attester cache if we can find both the block and blob
965+
if let (Some(block), Some(blobs)) = (
966+
self.early_attester_cache.get_block(*block_root),
967+
self.early_attester_cache.get_blobs(*block_root),
968+
) {
969+
Ok(Some(SignedBeaconBlockAndBlobsSidecar {
970+
beacon_block: block,
971+
blobs_sidecar: blobs,
972+
}))
973+
// Attempt to get the block and blobs from the database
974+
} else if let Some(block) = self.get_block(block_root).await?.map(Arc::new) {
975+
let blobs = self
976+
.get_blobs(block_root, finalized_data_availability_boundary)?
977+
.map(Arc::new);
978+
Ok(blobs.map(|blobs| SignedBeaconBlockAndBlobsSidecar {
979+
beacon_block: block,
980+
blobs_sidecar: blobs,
981+
}))
982+
} else {
983+
Ok(None)
984+
}
985+
} else {
986+
Ok(None)
971987
}
972-
Ok((
973-
self.get_block(block_root).await?.map(Arc::new),
974-
self.get_blobs(block_root).ok().flatten().map(Arc::new),
975-
))
976988
}
977989

978990
/// Returns the block at the given root, if any.
@@ -1044,33 +1056,46 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
10441056

10451057
/// Returns the blobs at the given root, if any.
10461058
///
1047-
/// ## Errors
1059+
/// Returns `Ok(None)` if the blobs and associated block are not found.
10481060
///
1049-
/// May return a database error.
1061+
/// If we can find the corresponding block in our database, we know whether we *should* have
1062+
/// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't,
1063+
/// this will reconstruct an empty `BlobsSidecar`.
1064+
///
1065+
/// ## Errors
1066+
/// - any database read errors
1067+
/// - block and blobs are inconsistent in the database
1068+
/// - this method is called with a pre-eip4844 block root
1069+
/// - this method is called for a blob that is beyond the prune depth
10501070
pub fn get_blobs(
10511071
&self,
10521072
block_root: &Hash256,
1073+
data_availability_boundary: Epoch,
10531074
) -> Result<Option<BlobsSidecar<T::EthSpec>>, Error> {
10541075
match self.store.get_blobs(block_root)? {
10551076
Some(blobs) => Ok(Some(blobs)),
10561077
None => {
1057-
if let Ok(Some(block)) = self.get_blinded_block(block_root) {
1058-
let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?;
1059-
1060-
if expected_kzg_commitments.len() > 0 {
1061-
Err(Error::DBInconsistent(format!(
1062-
"Expected kzg commitments but no blobs stored for block root {}",
1063-
block_root
1064-
)))
1065-
} else {
1066-
Ok(Some(BlobsSidecar::empty_from_parts(
1067-
*block_root,
1068-
block.slot(),
1069-
)))
1070-
}
1071-
} else {
1072-
Ok(None)
1073-
}
1078+
// Check for the corresponding block to understand whether we *should* have blobs.
1079+
self.get_blinded_block(block_root)?
1080+
.map(|block| {
1081+
// If there are no KZG commitments in the block, we know the sidecar should
1082+
// be empty.
1083+
let expected_kzg_commitments =
1084+
match block.message().body().blob_kzg_commitments() {
1085+
Ok(kzg_commitments) => kzg_commitments,
1086+
Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock),
1087+
};
1088+
if expected_kzg_commitments.is_empty() {
1089+
Ok(BlobsSidecar::empty_from_parts(*block_root, block.slot()))
1090+
} else if data_availability_boundary <= block.epoch() {
1091+
// We should have blobs for all blocks younger than the boundary.
1092+
Err(Error::BlobsUnavailable)
1093+
} else {
1094+
// We shouldn't have blobs for blocks older than the boundary.
1095+
Err(Error::BlobsOlderThanDataAvailabilityBoundary(block.epoch()))
1096+
}
1097+
})
1098+
.transpose()
10741099
}
10751100
}
10761101
}
@@ -2486,17 +2511,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
24862511

24872512
while let Some((_root, block)) = filtered_chain_segment.first() {
24882513
// Determine the epoch of the first block in the remaining segment.
2489-
let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
2514+
let start_epoch = block.epoch();
24902515

24912516
// The `last_index` indicates the position of the first block in an epoch greater
24922517
// than the current epoch: partitioning the blocks into a run of blocks in the same
24932518
// epoch and everything else. These same-epoch blocks can all be signature-verified with
24942519
// the same `BeaconState`.
24952520
let last_index = filtered_chain_segment
24962521
.iter()
2497-
.position(|(_root, block)| {
2498-
block.slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch
2499-
})
2522+
.position(|(_root, block)| block.epoch() > start_epoch)
25002523
.unwrap_or(filtered_chain_segment.len());
25012524

25022525
let mut blocks = filtered_chain_segment.split_off(last_index);
@@ -3162,7 +3185,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31623185
// Sync aggregate.
31633186
if let Ok(sync_aggregate) = block.body().sync_aggregate() {
31643187
// `SyncCommittee` for the sync_aggregate should correspond to the duty slot
3165-
let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
3188+
let duty_epoch = block.epoch();
31663189

31673190
match self.sync_committee_at_epoch(duty_epoch) {
31683191
Ok(sync_committee) => {
@@ -3429,7 +3452,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34293452
parent_block_slot: Slot,
34303453
) {
34313454
// Do not write to eth1 finalization cache for blocks older than 5 epochs.
3432-
if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch {
3455+
if block.epoch() + 5 < current_epoch {
34333456
return;
34343457
}
34353458

@@ -5856,6 +5879,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
58565879
.flatten()
58575880
}
58585881

5882+
/// The epoch that is a data availability boundary, or the latest finalized epoch.
5883+
/// `None` if the `Eip4844` fork is disabled.
5884+
pub fn finalized_data_availability_boundary(&self) -> Option<Epoch> {
5885+
self.data_availability_boundary().map(|boundary| {
5886+
std::cmp::max(
5887+
boundary,
5888+
self.canonical_head
5889+
.cached_head()
5890+
.finalized_checkpoint()
5891+
.epoch,
5892+
)
5893+
})
5894+
}
5895+
58595896
/// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if
58605897
/// the `Eip4844` fork is disabled.
58615898
pub fn is_data_availability_check_required(&self) -> Result<bool, Error> {

beacon_node/beacon_chain/src/blob_verification.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use crate::beacon_chain::{BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DI
66
use crate::{kzg_utils, BeaconChainError};
77
use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions;
88
use types::signed_beacon_block::BlobReconstructionError;
9-
use types::ExecPayload;
109
use types::{
1110
BeaconBlockRef, BeaconStateError, BlobsSidecar, EthSpec, Hash256, KzgCommitment,
1211
SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, SignedBeaconBlockHeader, Slot,
1312
Transactions,
1413
};
14+
use types::{Epoch, ExecPayload};
1515

1616
#[derive(Debug)]
1717
pub enum BlobError {
@@ -384,6 +384,7 @@ impl<E: EthSpec> IntoBlockWrapper<E> for AvailableBlock<E> {
384384

385385
pub trait AsBlock<E: EthSpec> {
386386
fn slot(&self) -> Slot;
387+
fn epoch(&self) -> Epoch;
387388
fn parent_root(&self) -> Hash256;
388389
fn state_root(&self) -> Hash256;
389390
fn signed_block_header(&self) -> SignedBeaconBlockHeader;
@@ -399,6 +400,12 @@ impl<E: EthSpec> AsBlock<E> for BlockWrapper<E> {
399400
BlockWrapper::BlockAndBlob(block, _) => block.slot(),
400401
}
401402
}
403+
fn epoch(&self) -> Epoch {
404+
match self {
405+
BlockWrapper::Block(block) => block.epoch(),
406+
BlockWrapper::BlockAndBlob(block, _) => block.epoch(),
407+
}
408+
}
402409
fn parent_root(&self) -> Hash256 {
403410
match self {
404411
BlockWrapper::Block(block) => block.parent_root(),
@@ -444,6 +451,12 @@ impl<E: EthSpec> AsBlock<E> for &BlockWrapper<E> {
444451
BlockWrapper::BlockAndBlob(block, _) => block.slot(),
445452
}
446453
}
454+
fn epoch(&self) -> Epoch {
455+
match self {
456+
BlockWrapper::Block(block) => block.epoch(),
457+
BlockWrapper::BlockAndBlob(block, _) => block.epoch(),
458+
}
459+
}
447460
fn parent_root(&self) -> Hash256 {
448461
match self {
449462
BlockWrapper::Block(block) => block.parent_root(),
@@ -491,6 +504,14 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
491504
}
492505
}
493506
}
507+
fn epoch(&self) -> Epoch {
508+
match &self.0 {
509+
AvailableBlockInner::Block(block) => block.epoch(),
510+
AvailableBlockInner::BlockAndBlob(block_sidecar_pair) => {
511+
block_sidecar_pair.beacon_block.epoch()
512+
}
513+
}
514+
}
494515
fn parent_root(&self) -> Hash256 {
495516
match &self.0 {
496517
AvailableBlockInner::Block(block) => block.parent_root(),

beacon_node/beacon_chain/src/errors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ pub enum BeaconChainError {
209209
BlsToExecutionChangeBadFork(ForkName),
210210
InconsistentFork(InconsistentFork),
211211
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
212+
BlobsUnavailable,
213+
NoKzgCommitmentsFieldOnBlock,
214+
BlobsOlderThanDataAvailabilityBoundary(Epoch),
212215
}
213216

214217
easy_from_to!(SlotProcessingError, BeaconChainError);

beacon_node/lighthouse_network/src/peer_manager/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
513513
Protocol::MetaData => PeerAction::LowToleranceError,
514514
Protocol::Status => PeerAction::LowToleranceError,
515515
},
516+
RPCResponseErrorCode::BlobsNotFoundForBlock => PeerAction::LowToleranceError,
516517
},
517518
RPCError::SSZDecodeError(_) => PeerAction::Fatal,
518519
RPCError::UnsupportedProtocol => {

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ pub struct LightClientBootstrapRequest {
330330
#[strum(serialize_all = "snake_case")]
331331
pub enum RPCResponseErrorCode {
332332
RateLimited,
333+
BlobsNotFoundForBlock,
333334
InvalidRequest,
334335
ServerError,
335336
/// Error spec'd to indicate that a peer does not have blocks on a requested range.
@@ -359,6 +360,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
359360
2 => RPCResponseErrorCode::ServerError,
360361
3 => RPCResponseErrorCode::ResourceUnavailable,
361362
139 => RPCResponseErrorCode::RateLimited,
363+
140 => RPCResponseErrorCode::BlobsNotFoundForBlock,
362364
_ => RPCResponseErrorCode::Unknown,
363365
};
364366
RPCCodedResponse::Error(code, err)
@@ -397,6 +399,7 @@ impl RPCResponseErrorCode {
397399
RPCResponseErrorCode::ResourceUnavailable => 3,
398400
RPCResponseErrorCode::Unknown => 255,
399401
RPCResponseErrorCode::RateLimited => 139,
402+
RPCResponseErrorCode::BlobsNotFoundForBlock => 140,
400403
}
401404
}
402405
}
@@ -425,6 +428,7 @@ impl std::fmt::Display for RPCResponseErrorCode {
425428
RPCResponseErrorCode::ServerError => "Server error occurred",
426429
RPCResponseErrorCode::Unknown => "Unknown error occurred",
427430
RPCResponseErrorCode::RateLimited => "Rate limited",
431+
RPCResponseErrorCode::BlobsNotFoundForBlock => "No blobs for the given root",
428432
};
429433
f.write_str(repr)
430434
}
@@ -507,9 +511,23 @@ impl std::fmt::Display for OldBlocksByRangeRequest {
507511
}
508512
}
509513

514+
impl std::fmt::Display for BlobsByRootRequest {
515+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
516+
write!(
517+
f,
518+
"Request: BlobsByRoot: Number of Requested Roots: {}",
519+
self.block_roots.len()
520+
)
521+
}
522+
}
523+
510524
impl std::fmt::Display for BlobsByRangeRequest {
511525
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512-
write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count)
526+
write!(
527+
f,
528+
"Request: BlobsByRange: Start Slot: {}, Count: {}",
529+
self.start_slot, self.count
530+
)
513531
}
514532
}
515533

0 commit comments

Comments
 (0)