Skip to content
Merged
2 changes: 1 addition & 1 deletion crates/iota-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod test {
..Default::default()
})
.with_submit_delay_step_override_millis(3000)
.with_state_accumulator_v2_enabled_callback(Arc::new(|idx| idx % 2 == 0))
.with_state_accumulator_callback(Arc::new(|idx| idx % 2 == 0))
.build()
.await
.into();
Expand Down
5 changes: 0 additions & 5 deletions crates/iota-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ pub struct NodeConfig {
#[serde(default)]
pub execution_cache: ExecutionCacheConfig,

// step 1 in removing the old state accumulator
#[serde(skip)]
#[serde(default = "bool_true")]
pub state_accumulator_v2: bool,

#[serde(default = "bool_true")]
pub enable_validator_tx_finalizer: bool,
}
Expand Down
28 changes: 1 addition & 27 deletions crates/iota-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use super::{
use crate::{
authority::{
AuthorityMetrics, ResolverWrapper,
epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
epoch_start_configuration::EpochStartConfiguration,
shared_object_version_manager::{
AssignedTxAndVersions, ConsensusSharedObjVerAssignment, SharedObjVerManager,
},
Expand Down Expand Up @@ -976,15 +976,6 @@ impl AuthorityPerEpochStore {
self.parent_path.clone()
}

pub fn state_accumulator_v2_enabled(&self) -> bool {
let flag = match self.get_chain_identifier().chain() {
Chain::Unknown | Chain::Testnet => EpochFlag::StateAccumulatorV2EnabledTestnet,
Chain::Mainnet => EpochFlag::StateAccumulatorV2EnabledMainnet,
};

self.epoch_start_configuration.flags().contains(&flag)
}

/// Returns `&Arc<EpochStartConfiguration>`
/// User can treat this `Arc` as `&EpochStartConfiguration`, or clone the
/// Arc to pass as owned object
Expand Down Expand Up @@ -1364,23 +1355,6 @@ impl AuthorityPerEpochStore {
.map_err(Into::into)
}

/// Returns future containing the state digest for the given epoch
/// once available.
/// TODO: remove once StateAccumulatorV1 is removed
pub async fn notify_read_checkpoint_state_digests(
&self,
checkpoints: Vec<CheckpointSequenceNumber>,
) -> IotaResult<Vec<Accumulator>> {
self.checkpoint_state_notify_read
.read(&checkpoints, |checkpoints| -> IotaResult<_> {
Ok(self
.tables()?
.state_hash_by_checkpoint
.multi_get(checkpoints)?)
})
.await
}

pub async fn notify_read_running_root(
&self,
checkpoint: CheckpointSequenceNumber,
Expand Down
23 changes: 3 additions & 20 deletions crates/iota-core/src/authority/epoch_start_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,20 @@ pub trait EpochStartConfigTrait {
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum EpochFlag {
WritebackCacheEnabled = 0,

StateAccumulatorV2EnabledTestnet = 1,
StateAccumulatorV2EnabledMainnet = 2,
}

impl EpochFlag {
pub fn default_flags_for_new_epoch(config: &NodeConfig) -> Vec<Self> {
Self::default_flags_impl(&config.execution_cache, config.state_accumulator_v2)
Self::default_flags_impl(&config.execution_cache)
}

/// For situations in which there is no config available (e.g. setting up a
/// downloaded snapshot).
pub fn default_for_no_config() -> Vec<Self> {
Self::default_flags_impl(&Default::default(), true)
Self::default_flags_impl(&Default::default())
}

fn default_flags_impl(
cache_config: &ExecutionCacheConfig,
enable_state_accumulator_v2: bool,
) -> Vec<Self> {
fn default_flags_impl(cache_config: &ExecutionCacheConfig) -> Vec<Self> {
let mut new_flags = vec![];

if matches!(
Expand All @@ -84,11 +78,6 @@ impl EpochFlag {
new_flags.push(EpochFlag::WritebackCacheEnabled);
}

if enable_state_accumulator_v2 {
new_flags.push(EpochFlag::StateAccumulatorV2EnabledTestnet);
new_flags.push(EpochFlag::StateAccumulatorV2EnabledMainnet);
}

new_flags
}
}
Expand All @@ -99,12 +88,6 @@ impl fmt::Display for EpochFlag {
// is used as metric key
match self {
EpochFlag::WritebackCacheEnabled => write!(f, "WritebackCacheEnabled"),
EpochFlag::StateAccumulatorV2EnabledTestnet => {
write!(f, "StateAccumulatorV2EnabledTestnet")
}
EpochFlag::StateAccumulatorV2EnabledMainnet => {
write!(f, "StateAccumulatorV2EnabledMainnet")
}
}
}
}
Expand Down
122 changes: 4 additions & 118 deletions crates/iota-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,13 @@ impl StateAccumulatorMetrics {

pub enum StateAccumulator {
V1(StateAccumulatorV1),
V2(StateAccumulatorV2),
}

pub struct StateAccumulatorV1 {
store: Arc<dyn AccumulatorStore>,
metrics: Arc<StateAccumulatorMetrics>,
}

pub struct StateAccumulatorV2 {
store: Arc<dyn AccumulatorStore>,
metrics: Arc<StateAccumulatorMetrics>,
}

pub trait AccumulatorStore: ObjectStore + Send + Sync {
fn get_root_state_accumulator_for_epoch(
&self,
Expand Down Expand Up @@ -166,11 +160,7 @@ impl StateAccumulator {
epoch_store: &Arc<AuthorityPerEpochStore>,
metrics: Arc<StateAccumulatorMetrics>,
) -> Self {
if epoch_store.state_accumulator_v2_enabled() {
StateAccumulator::V2(StateAccumulatorV2::new(store, metrics))
} else {
StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
}
StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
}

pub fn new_for_tests(
Expand All @@ -187,14 +177,12 @@ impl StateAccumulator {
pub fn metrics(&self) -> Arc<StateAccumulatorMetrics> {
match self {
StateAccumulator::V1(impl_v1) => impl_v1.metrics.clone(),
StateAccumulator::V2(impl_v2) => impl_v2.metrics.clone(),
}
}

pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
match self {
StateAccumulator::V1(impl_v1) => &impl_v1.metrics,
StateAccumulator::V2(impl_v2) => &impl_v2.metrics,
}
.inconsistent_state
.set(is_inconsistent_state as i64);
Expand Down Expand Up @@ -232,12 +220,8 @@ impl StateAccumulator {
checkpoint_acc: Option<Accumulator>,
) -> IotaResult {
match self {
StateAccumulator::V1(_) => {
// V1 does not have a running root accumulator
Ok(())
}
StateAccumulator::V2(impl_v2) => {
impl_v2
StateAccumulator::V1(impl_v1) => {
impl_v1
.accumulate_running_root(epoch_store, checkpoint_seq_num, checkpoint_acc)
.await
}
Expand All @@ -251,12 +235,7 @@ impl StateAccumulator {
) -> IotaResult<Accumulator> {
match self {
StateAccumulator::V1(impl_v1) => {
impl_v1
.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
.await
}
StateAccumulator::V2(impl_v2) => {
impl_v2.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
impl_v1.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
}
}
}
Expand All @@ -266,9 +245,6 @@ impl StateAccumulator {
StateAccumulator::V1(impl_v1) => Self::accumulate_live_object_set_impl(
impl_v1.store.iter_cached_live_object_set_for_testing(),
),
StateAccumulator::V2(impl_v2) => Self::accumulate_live_object_set_impl(
impl_v2.store.iter_cached_live_object_set_for_testing(),
),
}
}

Expand All @@ -279,9 +255,6 @@ impl StateAccumulator {
StateAccumulator::V1(impl_v1) => {
Self::accumulate_live_object_set_impl(impl_v1.store.iter_live_object_set())
}
StateAccumulator::V2(impl_v2) => {
Self::accumulate_live_object_set_impl(impl_v2.store.iter_live_object_set())
}
}
}

Expand All @@ -290,7 +263,6 @@ impl StateAccumulator {
pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
match self {
StateAccumulator::V1(impl_v1) => impl_v1.accumulate_effects(effects),
StateAccumulator::V2(impl_v2) => impl_v2.accumulate_effects(effects),
}
}

Expand Down Expand Up @@ -339,92 +311,6 @@ impl StateAccumulatorV1 {
Self { store, metrics }
}

/// Unions all checkpoint accumulators at the end of the epoch to generate
/// the root state hash and persists it to db. This function is
/// idempotent. Can be called on non-consecutive epochs, e.g. to
/// accumulate epoch 3 after having last accumulated epoch 1.
pub async fn accumulate_epoch(
&self,
epoch_store: Arc<AuthorityPerEpochStore>,
last_checkpoint_of_epoch: CheckpointSequenceNumber,
) -> IotaResult<Accumulator> {
let _scope = monitored_scope("AccumulateEpochV1");
let epoch = epoch_store.epoch();
if let Some((_checkpoint, acc)) = self.store.get_root_state_accumulator_for_epoch(epoch)? {
return Ok(acc);
}

// Get the next checkpoint to accumulate (first checkpoint of the epoch)
// by adding 1 to the highest checkpoint of the previous epoch
let (_highest_epoch, (next_to_accumulate, mut root_state_accumulator)) = self
.store
.get_root_state_accumulator_for_highest_epoch()?
.map(|(epoch, (checkpoint, acc))| {
(
epoch,
(
checkpoint
.checked_add(1)
.expect("Overflowed u64 for epoch ID"),
acc,
),
)
})
.unwrap_or((0, (0, Accumulator::default())));

debug!(
"Accumulating epoch {} from checkpoint {} to checkpoint {} (inclusive)",
epoch, next_to_accumulate, last_checkpoint_of_epoch
);

let (checkpoints, mut accumulators) = epoch_store
.get_accumulators_in_checkpoint_range(next_to_accumulate, last_checkpoint_of_epoch)?
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

let remaining_checkpoints: Vec<_> = (next_to_accumulate..=last_checkpoint_of_epoch)
.filter(|seq_num| !checkpoints.contains(seq_num))
.collect();

if !remaining_checkpoints.is_empty() {
debug!(
"Awaiting accumulation of checkpoints {:?} for epoch {} accumulation",
remaining_checkpoints, epoch
);
}

let mut remaining_accumulators = epoch_store
.notify_read_checkpoint_state_digests(remaining_checkpoints)
.await
.expect("Failed to notify read checkpoint state digests");

accumulators.append(&mut remaining_accumulators);

assert!(accumulators.len() == (last_checkpoint_of_epoch - next_to_accumulate + 1) as usize);

for acc in accumulators {
root_state_accumulator.union(&acc);
}

self.store.insert_state_accumulator_for_epoch(
epoch,
&last_checkpoint_of_epoch,
&root_state_accumulator,
)?;

Ok(root_state_accumulator)
}

pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
accumulate_effects(effects)
}
}

impl StateAccumulatorV2 {
pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
Self { store, metrics }
}

pub async fn accumulate_running_root(
&self,
epoch_store: &AuthorityPerEpochStore,
Expand Down
Loading
Loading