Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ea45814
using events api for eager start attestation tasks
hopinheimer Aug 18, 2025
6f20083
merge `origin/unstable` into `hopinheimer/eager-send-attestation`
hopinheimer Aug 18, 2025
46fe220
minor nits
hopinheimer Aug 18, 2025
de7226f
clippy chill vibes
hopinheimer Aug 18, 2025
5f2a101
missed something
hopinheimer Aug 18, 2025
ae16705
linty
hopinheimer Aug 18, 2025
c13d0c3
implemented head monitoring service
hopinheimer Sep 19, 2025
8405f01
Merge branch 'unstable' of github.com:sigp/lighthouse into eager-send…
hopinheimer Sep 19, 2025
65e04fb
merge from feature branch
hopinheimer Sep 19, 2025
a1d36e6
fixing some linting issues
hopinheimer Sep 19, 2025
a489d32
comments and removing unwanted code
hopinheimer Sep 20, 2025
2b974db
clippy change
hopinheimer Sep 20, 2025
5600aef
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 15, 2025
b65fc30
same data attestation bug solved
hopinheimer Oct 16, 2025
9024931
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 20, 2025
b25703f
fixing dangling conditions and amaking head_monitor_service optional
hopinheimer Oct 20, 2025
e68548b
Merge branch 'eager-send-attestation' of github.com:hopinheimer/light…
hopinheimer Oct 20, 2025
c4d851c
fmt
hopinheimer Oct 20, 2025
daf5b2e
update comment
hopinheimer Oct 20, 2025
c49b8eb
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 25, 2025
0e35ee5
massive refact
hopinheimer Oct 28, 2025
29867d2
fixes and linting
hopinheimer Oct 29, 2025
fd43876
remove unused code
hopinheimer Oct 31, 2025
b054a10
changes
hopinheimer Nov 1, 2025
eb057d0
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 2, 2025
91e5980
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 5, 2025
2e0c78c
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 10, 2025
32eed9a
addressing comments
hopinheimer Nov 10, 2025
dac9f00
removing unwanted logs
hopinheimer Nov 11, 2025
bf1471c
fmt
hopinheimer Nov 11, 2025
9794737
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 11, 2025
b20ded5
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 12, 2025
39b9a58
fixing a unwanted service starting bug
hopinheimer Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions validator_client/beacon_node_fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use beacon_node_health::{
SyncDistanceTier, check_node_health,
};
use clap::ValueEnum;
use eth2::types::SseHead;
use eth2::{BeaconNodeHttpClient, Timeouts};
use futures::future;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize, Serializer, ser::SerializeStruct};
use slot_clock::SlotClock;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::future::Future;
Expand Down Expand Up @@ -49,6 +51,48 @@ pub struct Config {
pub sync_tolerances: BeaconNodeSyncDistanceTiers,
}

type CacheHashMap = HashMap<usize, SseHead>;

/// Cache to maintain the latest head received from each of the beacon nodes
/// in the `BeaconNodeFallback`.
#[derive(Debug)]
pub struct BeaconHeadCache {
cache: RwLock<CacheHashMap>,
}

impl BeaconHeadCache {
pub fn new() -> Self {
Self {
cache: RwLock::new(HashMap::new()),
}
}

pub async fn get(&self, beacon_node_index: usize) -> Option<SseHead> {
self.cache.read().await.get(&beacon_node_index).cloned()
}

pub async fn insert(&self, beacon_node_index: usize, head: SseHead) {
self.cache.write().await.insert(beacon_node_index, head);
}

pub async fn is_latest(&self, head: &SseHead) -> bool {
let cache = self.cache.read().await;
cache
.values()
.all(|cache_head| head.slot >= cache_head.slot)
}

pub async fn purge_cache(&self) {
self.cache.write().await.clear();
}
}

impl Default for BeaconHeadCache {
fn default() -> Self {
Self::new()
}
}

/// Indicates a measurement of latency between the VC and a BN.
pub struct LatencyMeasurement {
/// An identifier for the beacon node (e.g. the URL).
Expand Down Expand Up @@ -378,6 +422,7 @@ impl CandidateBeaconNode {
#[derive(Clone, Debug)]
pub struct BeaconNodeFallback<T> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode>>>,
beacon_head_cache: Option<Arc<BeaconHeadCache>>,
distance_tiers: BeaconNodeSyncDistanceTiers,
slot_clock: Option<T>,
broadcast_topics: Vec<ApiTopic>,
Expand All @@ -387,13 +432,15 @@ pub struct BeaconNodeFallback<T> {
impl<T: SlotClock> BeaconNodeFallback<T> {
pub fn new(
candidates: Vec<CandidateBeaconNode>,
beacon_head_cache: Arc<BeaconHeadCache>,
config: Config,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
) -> Self {
let distance_tiers = config.sync_tolerances;
Self {
candidates: Arc::new(RwLock::new(candidates)),
beacon_head_cache: Some(beacon_head_cache),
distance_tiers,
slot_clock: None,
broadcast_topics,
Expand All @@ -410,6 +457,14 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
self.slot_clock = Some(slot_clock);
}

/// Set the beacon head cache reference.
///
/// This is used to refresh beacon_head_cache in the `HeadMonitorService` to avoid dangling
/// reference to BNs when the candidate is change in the `update_candidates_list`
pub fn set_beacon_head_cache(&mut self, cache: Arc<BeaconHeadCache>) {
self.beacon_head_cache = Some(cache);
}

/// The count of candidates, regardless of their state.
pub async fn num_total(&self) -> usize {
self.candidates.read().await.len()
Expand Down Expand Up @@ -493,6 +548,10 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
let mut candidates = self.candidates.write().await;
*candidates = new_candidates;

if let Some(cache) = &self.beacon_head_cache {
cache.purge_cache().await;
}

Ok(new_list)
}

Expand Down Expand Up @@ -646,6 +705,41 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
Err(Errors(errors))
}

/// Try `func` on a specific beacon node by index first, then fall back to the normal order.
/// Returns immediately if the preferred node succeeds, otherwise falls back to first_success.
/// This is an insurance against potential race conditions that may arise.
pub async fn first_success_from_index<F, O, Err, R>(
&self,
preferred_index: Option<usize>,
func: F,
) -> Result<O, Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
Err: Debug,
{
let candidates = self.candidates.read().await;

// Try the preferred beacon node first if it exists
if let Some(preferred_idx) = preferred_index
&& let Some(preferred_candidate) = candidates.iter().find(|c| c.index == preferred_idx)
{
let preferred_node = preferred_candidate.beacon_node.clone();
drop(candidates);

match Self::run_on_candidate(preferred_node, &func).await {
Ok(val) => return Ok(val),
Err(_) => {
return self.first_success(func).await;
}
}
}

// Fall back to normal first_success behavior
drop(candidates);
self.first_success(func).await
}

/// Run the future `func` on `candidate` while reporting metrics.
async fn run_on_candidate<F, R, Err, O>(
candidate: BeaconNodeHttpClient,
Expand Down Expand Up @@ -927,8 +1021,17 @@ mod tests {
topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
) -> BeaconNodeFallback<TestingSlotClock> {
let mut beacon_node_fallback =
BeaconNodeFallback::new(candidates, Config::default(), topics, spec);
let beacon_head_cache = Arc::new(BeaconHeadCache {
cache: RwLock::new(HashMap::new()),
});

let mut beacon_node_fallback = BeaconNodeFallback::new(
candidates,
beacon_head_cache,
Config::default(),
topics,
spec,
);

beacon_node_fallback.set_slot_clock(TestingSlotClock::new(
Slot::new(1),
Expand Down
28 changes: 27 additions & 1 deletion validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use metrics::set_gauge;
use monitoring_api::{MonitoringHttpClient, ProcessType};
use sensitive_url::SensitiveUrl;
use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase};
use tokio::sync::Mutex;

use account_utils::validator_definitions::ValidatorDefinitions;
use beacon_node_fallback::{
BeaconNodeFallback, CandidateBeaconNode, start_fallback_updater_service,
BeaconHeadCache, BeaconNodeFallback, CandidateBeaconNode, start_fallback_updater_service,
};
use clap::ArgMatches;
use doppelganger_service::DoppelgangerService;
Expand Down Expand Up @@ -42,6 +43,7 @@ use validator_services::{
attestation_service::{AttestationService, AttestationServiceBuilder},
block_service::{BlockService, BlockServiceBuilder},
duties_service::{self, DutiesService, DutiesServiceBuilder},
head_monitor_service::{HeadMonitorService, HeadMonitorServiceBuilder},
latency_service,
preparation_service::{PreparationService, PreparationServiceBuilder},
sync_committee_service::SyncCommitteeService,
Expand Down Expand Up @@ -79,6 +81,7 @@ pub struct ProductionValidatorClient<E: EthSpec> {
context: RuntimeContext<E>,
duties_service: Arc<DutiesService<ValidatorStore<E>, SystemTimeSlotClock>>,
block_service: BlockService<ValidatorStore<E>, SystemTimeSlotClock>,
head_monitor_service: HeadMonitorService<ValidatorStore<E>, SystemTimeSlotClock>,
attestation_service: AttestationService<ValidatorStore<E>, SystemTimeSlotClock>,
sync_committee_service: SyncCommitteeService<ValidatorStore<E>, SystemTimeSlotClock>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
Expand Down Expand Up @@ -352,15 +355,19 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Initialize the number of connected, avaliable beacon nodes to 0.
set_gauge(&validator_metrics::AVAILABLE_BEACON_NODES_COUNT, 0);

let beacon_head_cache = Arc::new(BeaconHeadCache::new());

let mut beacon_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
candidates,
beacon_head_cache.clone(),
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
);

let mut proposer_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
proposer_candidates,
beacon_head_cache.clone(),
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
Expand Down Expand Up @@ -493,15 +500,26 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
block_service_builder = block_service_builder.proposer_nodes(proposer_nodes.clone());
}

let (head_sender, head_receiver) = mpsc::channel(1_024);

let block_service = block_service_builder.build()?;

let head_monitor_service = HeadMonitorServiceBuilder::new()
.executor(context.executor.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.beacon_head_cache(beacon_head_cache.clone())
.head_monitor_tx(Arc::new(head_sender))
.build()?;

let attestation_service = AttestationServiceBuilder::new()
.duties_service(duties_service.clone())
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.head_monitor_rx(Arc::new(Mutex::new(head_receiver)))
.disable(config.disable_attesting)
.build()?;

Expand All @@ -526,6 +544,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
context,
duties_service,
block_service,
head_monitor_service,
attestation_service,
sync_committee_service,
doppelganger_service,
Expand Down Expand Up @@ -604,6 +623,13 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start preparation service: {}", e))?;

if let Err(e) = self.head_monitor_service.clone().start_update_service() {
warn!(
error = %e,
"Unable to start head monitor service, validator client running compromised performance"
);
}

if let Some(doppelganger_service) = self.doppelganger_service.clone() {
DoppelgangerService::start_update_service(
doppelganger_service,
Expand Down
Loading
Loading