Skip to content
Merged
2 changes: 1 addition & 1 deletion crates/iota-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod test {
..Default::default()
})
.with_submit_delay_step_override_millis(3000)
.with_state_accumulator_v2_enabled_callback(Arc::new(|idx| idx % 2 == 0))
.with_state_accumulator_callback(Arc::new(|idx| idx % 2 == 0))
.build()
.await
.into();
Expand Down
5 changes: 0 additions & 5 deletions crates/iota-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ pub struct NodeConfig {
#[serde(default)]
pub execution_cache: ExecutionCacheConfig,

// step 1 in removing the old state accumulator
#[serde(skip)]
#[serde(default = "bool_true")]
pub state_accumulator_v2: bool,

#[serde(default = "bool_true")]
pub enable_validator_tx_finalizer: bool,
}
Expand Down
28 changes: 1 addition & 27 deletions crates/iota-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use super::{
use crate::{
authority::{
AuthorityMetrics, ResolverWrapper,
epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
epoch_start_configuration::EpochStartConfiguration,
shared_object_version_manager::{
AssignedTxAndVersions, ConsensusSharedObjVerAssignment, SharedObjVerManager,
},
Expand Down Expand Up @@ -976,15 +976,6 @@ impl AuthorityPerEpochStore {
self.parent_path.clone()
}

pub fn state_accumulator_v2_enabled(&self) -> bool {
let flag = match self.get_chain_identifier().chain() {
Chain::Unknown | Chain::Testnet => EpochFlag::StateAccumulatorV2EnabledTestnet,
Chain::Mainnet => EpochFlag::StateAccumulatorV2EnabledMainnet,
};

self.epoch_start_configuration.flags().contains(&flag)
}

/// Returns `&Arc<EpochStartConfiguration>`
/// User can treat this `Arc` as `&EpochStartConfiguration`, or clone the
/// Arc to pass as owned object
Expand Down Expand Up @@ -1364,23 +1355,6 @@ impl AuthorityPerEpochStore {
.map_err(Into::into)
}

/// Returns future containing the state digest for the given epoch
/// once available.
/// TODO: remove once StateAccumulatorV1 is removed
pub async fn notify_read_checkpoint_state_digests(
&self,
checkpoints: Vec<CheckpointSequenceNumber>,
) -> IotaResult<Vec<Accumulator>> {
self.checkpoint_state_notify_read
.read(&checkpoints, |checkpoints| -> IotaResult<_> {
Ok(self
.tables()?
.state_hash_by_checkpoint
.multi_get(checkpoints)?)
})
.await
}

pub async fn notify_read_running_root(
&self,
checkpoint: CheckpointSequenceNumber,
Expand Down
27 changes: 11 additions & 16 deletions crates/iota-core/src/authority/epoch_start_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,22 @@ pub trait EpochStartConfigTrait {
pub enum EpochFlag {
WritebackCacheEnabled = 0,

StateAccumulatorV2EnabledTestnet = 1,
StateAccumulatorV2EnabledMainnet = 2,
StateAccumulatorV1EnabledTestnet = 1,
Copy link
Member

@alexsporn alexsporn Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these flags and make it default behaviour.

Copy link
Contributor Author

@bingyanglin bingyanglin Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Those flags are removed in 494bcfd.
Also it was mad default behaviour like here, with commits 78fe7e6 and 3a1fde3.

StateAccumulatorV1EnabledMainnet = 2,
}

impl EpochFlag {
pub fn default_flags_for_new_epoch(config: &NodeConfig) -> Vec<Self> {
Self::default_flags_impl(&config.execution_cache, config.state_accumulator_v2)
Self::default_flags_impl(&config.execution_cache)
}

/// For situations in which there is no config available (e.g. setting up a
/// downloaded snapshot).
pub fn default_for_no_config() -> Vec<Self> {
Self::default_flags_impl(&Default::default(), true)
Self::default_flags_impl(&Default::default())
}

fn default_flags_impl(
cache_config: &ExecutionCacheConfig,
enable_state_accumulator_v2: bool,
) -> Vec<Self> {
fn default_flags_impl(cache_config: &ExecutionCacheConfig) -> Vec<Self> {
let mut new_flags = vec![];

if matches!(
Expand All @@ -84,10 +81,8 @@ impl EpochFlag {
new_flags.push(EpochFlag::WritebackCacheEnabled);
}

if enable_state_accumulator_v2 {
new_flags.push(EpochFlag::StateAccumulatorV2EnabledTestnet);
new_flags.push(EpochFlag::StateAccumulatorV2EnabledMainnet);
}
new_flags.push(EpochFlag::StateAccumulatorV1EnabledTestnet);
new_flags.push(EpochFlag::StateAccumulatorV1EnabledMainnet);

new_flags
}
Expand All @@ -99,11 +94,11 @@ impl fmt::Display for EpochFlag {
// is used as metric key
match self {
EpochFlag::WritebackCacheEnabled => write!(f, "WritebackCacheEnabled"),
EpochFlag::StateAccumulatorV2EnabledTestnet => {
write!(f, "StateAccumulatorV2EnabledTestnet")
EpochFlag::StateAccumulatorV1EnabledTestnet => {
write!(f, "StateAccumulatorV1EnabledTestnet")
}
EpochFlag::StateAccumulatorV2EnabledMainnet => {
write!(f, "StateAccumulatorV2EnabledMainnet")
EpochFlag::StateAccumulatorV1EnabledMainnet => {
write!(f, "StateAccumulatorV1EnabledMainnet")
}
}
}
Expand Down
122 changes: 4 additions & 118 deletions crates/iota-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,13 @@ impl StateAccumulatorMetrics {

pub enum StateAccumulator {
V1(StateAccumulatorV1),
V2(StateAccumulatorV2),
}

pub struct StateAccumulatorV1 {
store: Arc<dyn AccumulatorStore>,
metrics: Arc<StateAccumulatorMetrics>,
}

pub struct StateAccumulatorV2 {
store: Arc<dyn AccumulatorStore>,
metrics: Arc<StateAccumulatorMetrics>,
}

pub trait AccumulatorStore: ObjectStore + Send + Sync {
fn get_root_state_accumulator_for_epoch(
&self,
Expand Down Expand Up @@ -166,11 +160,7 @@ impl StateAccumulator {
epoch_store: &Arc<AuthorityPerEpochStore>,
metrics: Arc<StateAccumulatorMetrics>,
) -> Self {
if epoch_store.state_accumulator_v2_enabled() {
StateAccumulator::V2(StateAccumulatorV2::new(store, metrics))
} else {
StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
}
StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
}

pub fn new_for_tests(
Expand All @@ -187,14 +177,12 @@ impl StateAccumulator {
pub fn metrics(&self) -> Arc<StateAccumulatorMetrics> {
match self {
StateAccumulator::V1(impl_v1) => impl_v1.metrics.clone(),
StateAccumulator::V2(impl_v2) => impl_v2.metrics.clone(),
}
}

pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
match self {
StateAccumulator::V1(impl_v1) => &impl_v1.metrics,
StateAccumulator::V2(impl_v2) => &impl_v2.metrics,
}
.inconsistent_state
.set(is_inconsistent_state as i64);
Expand Down Expand Up @@ -232,12 +220,8 @@ impl StateAccumulator {
checkpoint_acc: Option<Accumulator>,
) -> IotaResult {
match self {
StateAccumulator::V1(_) => {
// V1 does not have a running root accumulator
Ok(())
}
StateAccumulator::V2(impl_v2) => {
impl_v2
StateAccumulator::V1(impl_v1) => {
impl_v1
.accumulate_running_root(epoch_store, checkpoint_seq_num, checkpoint_acc)
.await
}
Expand All @@ -251,12 +235,7 @@ impl StateAccumulator {
) -> IotaResult<Accumulator> {
match self {
StateAccumulator::V1(impl_v1) => {
impl_v1
.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
.await
}
StateAccumulator::V2(impl_v2) => {
impl_v2.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
impl_v1.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
}
}
}
Expand All @@ -266,9 +245,6 @@ impl StateAccumulator {
StateAccumulator::V1(impl_v1) => Self::accumulate_live_object_set_impl(
impl_v1.store.iter_cached_live_object_set_for_testing(),
),
StateAccumulator::V2(impl_v2) => Self::accumulate_live_object_set_impl(
impl_v2.store.iter_cached_live_object_set_for_testing(),
),
}
}

Expand All @@ -279,9 +255,6 @@ impl StateAccumulator {
StateAccumulator::V1(impl_v1) => {
Self::accumulate_live_object_set_impl(impl_v1.store.iter_live_object_set())
}
StateAccumulator::V2(impl_v2) => {
Self::accumulate_live_object_set_impl(impl_v2.store.iter_live_object_set())
}
}
}

Expand All @@ -290,7 +263,6 @@ impl StateAccumulator {
pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
match self {
StateAccumulator::V1(impl_v1) => impl_v1.accumulate_effects(effects),
StateAccumulator::V2(impl_v2) => impl_v2.accumulate_effects(effects),
}
}

Expand Down Expand Up @@ -339,92 +311,6 @@ impl StateAccumulatorV1 {
Self { store, metrics }
}

/// Unions all checkpoint accumulators at the end of the epoch to generate
/// the root state hash and persists it to db. This function is
/// idempotent. Can be called on non-consecutive epochs, e.g. to
/// accumulate epoch 3 after having last accumulated epoch 1.
pub async fn accumulate_epoch(
&self,
epoch_store: Arc<AuthorityPerEpochStore>,
last_checkpoint_of_epoch: CheckpointSequenceNumber,
) -> IotaResult<Accumulator> {
let _scope = monitored_scope("AccumulateEpochV1");
let epoch = epoch_store.epoch();
if let Some((_checkpoint, acc)) = self.store.get_root_state_accumulator_for_epoch(epoch)? {
return Ok(acc);
}

// Get the next checkpoint to accumulate (first checkpoint of the epoch)
// by adding 1 to the highest checkpoint of the previous epoch
let (_highest_epoch, (next_to_accumulate, mut root_state_accumulator)) = self
.store
.get_root_state_accumulator_for_highest_epoch()?
.map(|(epoch, (checkpoint, acc))| {
(
epoch,
(
checkpoint
.checked_add(1)
.expect("Overflowed u64 for epoch ID"),
acc,
),
)
})
.unwrap_or((0, (0, Accumulator::default())));

debug!(
"Accumulating epoch {} from checkpoint {} to checkpoint {} (inclusive)",
epoch, next_to_accumulate, last_checkpoint_of_epoch
);

let (checkpoints, mut accumulators) = epoch_store
.get_accumulators_in_checkpoint_range(next_to_accumulate, last_checkpoint_of_epoch)?
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

let remaining_checkpoints: Vec<_> = (next_to_accumulate..=last_checkpoint_of_epoch)
.filter(|seq_num| !checkpoints.contains(seq_num))
.collect();

if !remaining_checkpoints.is_empty() {
debug!(
"Awaiting accumulation of checkpoints {:?} for epoch {} accumulation",
remaining_checkpoints, epoch
);
}

let mut remaining_accumulators = epoch_store
.notify_read_checkpoint_state_digests(remaining_checkpoints)
.await
.expect("Failed to notify read checkpoint state digests");

accumulators.append(&mut remaining_accumulators);

assert!(accumulators.len() == (last_checkpoint_of_epoch - next_to_accumulate + 1) as usize);

for acc in accumulators {
root_state_accumulator.union(&acc);
}

self.store.insert_state_accumulator_for_epoch(
epoch,
&last_checkpoint_of_epoch,
&root_state_accumulator,
)?;

Ok(root_state_accumulator)
}

pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
accumulate_effects(effects)
}
}

impl StateAccumulatorV2 {
pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
Self { store, metrics }
}

pub async fn accumulate_running_root(
&self,
epoch_store: &AuthorityPerEpochStore,
Expand Down
Loading
Loading