Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions db/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ type Aggregator struct {
collateAndBuildWorkers int // minimize amount of background workers by default
mergeWorkers int // usually 1

commitmentValuesTransform bool // enables squeezing commitment values in CommitmentDomain

// To keep DB small - need move data to small files ASAP.
// It means goroutine which creating small files - can't be locked by merge or indexing.
buildingFiles atomic.Bool
Expand Down Expand Up @@ -113,8 +111,6 @@ func newAggregatorOld(ctx context.Context, dirs datadir.Dirs, stepSize uint64, d
collateAndBuildWorkers: 1,
mergeWorkers: 1,

commitmentValuesTransform: statecfg.AggregatorSqueezeCommitmentValues,

produce: true,
}, nil
}
Expand Down Expand Up @@ -1309,7 +1305,8 @@ func (a *Aggregator) recalcVisibleFilesMinimaxTxNum() {

func (at *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *Ranges {
r := &Ranges{invertedIndex: make([]*MergeRange, len(at.a.iis))}
if at.a.commitmentValuesTransform {
commitmentUseReferencedBranches := at.a.d[kv.CommitmentDomain].ReplaceKeysInValues
if commitmentUseReferencedBranches {
lmrAcc := at.d[kv.AccountsDomain].files.LatestMergedRange()
lmrSto := at.d[kv.StorageDomain].files.LatestMergedRange()
lmrCom := at.d[kv.CommitmentDomain].files.LatestMergedRange()
Expand All @@ -1328,7 +1325,7 @@ func (at *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *Ranges {
r.domain[id] = d.findMergeRange(maxEndTxNum, maxSpan)
}

if at.a.commitmentValuesTransform && r.domain[kv.CommitmentDomain].values.needMerge {
if commitmentUseReferencedBranches && r.domain[kv.CommitmentDomain].values.needMerge {
cr := r.domain[kv.CommitmentDomain]

restorePrevRange := false
Expand Down Expand Up @@ -1392,6 +1389,7 @@ func (at *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticF
}()

at.a.logger.Info("[snapshots] merge state " + r.String())
commitmentUseReferencedBranches := at.a.d[kv.CommitmentDomain].ReplaceKeysInValues

accStorageMerged := new(sync.WaitGroup)

Expand All @@ -1405,15 +1403,16 @@ func (at *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticF

id := id
kid := kv.Domain(id)
if at.a.commitmentValuesTransform && (kid == kv.AccountsDomain || kid == kv.StorageDomain) {
if commitmentUseReferencedBranches && (kid == kv.AccountsDomain || kid == kv.StorageDomain) {
accStorageMerged.Add(1)
}

g.Go(func() (err error) {
var vt valueTransformer
if at.a.commitmentValuesTransform && kid == kv.CommitmentDomain {
if commitmentUseReferencedBranches && kid == kv.CommitmentDomain {
accStorageMerged.Wait()

// prepare transformer callback to correctly dereference previously merged accounts/storage plain keys
vt, err = at.d[kv.CommitmentDomain].commitmentValTransformDomain(r.domain[kid].values, at.d[kv.AccountsDomain], at.d[kv.StorageDomain],
mf.d[kv.AccountsDomain], mf.d[kv.StorageDomain])

Expand All @@ -1423,7 +1422,7 @@ func (at *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticF
}

mf.d[id], mf.dIdx[id], mf.dHist[id], err = at.d[id].mergeFiles(ctx, files.d[id], files.dIdx[id], files.dHist[id], r.domain[id], vt, at.a.ps)
if at.a.commitmentValuesTransform {
if commitmentUseReferencedBranches {
if kid == kv.AccountsDomain || kid == kv.StorageDomain {
accStorageMerged.Done()
}
Expand Down
3 changes: 1 addition & 2 deletions db/state/aggregator_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func Fuzz_AggregatorV3_Merge(f *testing.F) {
func Fuzz_AggregatorV3_MergeValTransform(f *testing.F) {
_db, agg := testFuzzDbAndAggregatorv3(f, 10)
db := wrapDbWithCtx(_db, agg)
agg.d[kv.CommitmentDomain].ReplaceKeysInValues = true

rwTx, err := db.BeginTemporalRw(context.Background())
require.NoError(f, err)
Expand All @@ -171,8 +172,6 @@ func Fuzz_AggregatorV3_MergeValTransform(f *testing.F) {

const txs = uint64(1000)

agg.commitmentValuesTransform = true

state := make(map[string][]byte)

// keys are encodings of numbers 1..31
Expand Down
7 changes: 2 additions & 5 deletions db/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,6 @@ func TestAggregatorV3_MergeValTransform(t *testing.T) {
if testing.Short() {
t.Skip()
}
if !statecfg.AggregatorSqueezeCommitmentValues {
t.Skip()
}

t.Parallel()
_db, agg := testDbAndAggregatorv3(t, 5)
Expand All @@ -355,15 +352,15 @@ func TestAggregatorV3_MergeValTransform(t *testing.T) {
require.NoError(t, err)
defer rwTx.Rollback()

agg.d[kv.CommitmentDomain].ReplaceKeysInValues = true

domains, err := NewSharedDomains(rwTx, log.New())
require.NoError(t, err)
defer domains.Close()

txs := uint64(100)
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

agg.commitmentValuesTransform = true

state := make(map[string][]byte)

// keys are encodings of numbers 1..31
Expand Down
26 changes: 12 additions & 14 deletions db/state/domain_committed.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,23 @@ func (sd *SharedDomains) ComputeCommitment(ctx context.Context, saveStateAfter b
return
}

// ValuesPlainKeyReferencingThresholdReached checks if the range from..to is large enough to use plain key referencing
// Used for commitment branches - to store references to account and storage keys as shortened keys (file offsets)
func ValuesPlainKeyReferencingThresholdReached(stepSize, from, to uint64) bool {
const minStepsForReferencing = 2

return ((to-from)/stepSize)%minStepsForReferencing == 0
}

// replaceShortenedKeysInBranch expands shortened key references (file offsets) in branch data back to full keys
// by looking them up in the account and storage domain files.
func (at *AggregatorRoTx) replaceShortenedKeysInBranch(prefix []byte, branch commitment.BranchData, fStartTxNum uint64, fEndTxNum uint64) (commitment.BranchData, error) {
logger := log.Root()
aggTx := at

if !aggTx.a.commitmentValuesTransform || bytes.Equal(prefix, keyCommitmentState) {
return branch, nil
}

if !aggTx.d[kv.CommitmentDomain].d.ReplaceKeysInValues && aggTx.a.commitmentValuesTransform {
panic("domain.replaceKeysInValues is disabled, but agg.commitmentValuesTransform is enabled")
}

if !aggTx.a.commitmentValuesTransform ||
len(branch) == 0 ||
aggTx.TxNumsInFiles(kv.StateDomains...) == 0 ||
bytes.Equal(prefix, keyCommitmentState) ||
((fEndTxNum-fStartTxNum)/at.StepSize())%2 != 0 { // this checks if file has even number of steps, singular files does not transform values.
commitmentUseReferencedBranches := at.a.d[kv.CommitmentDomain].ReplaceKeysInValues
if !commitmentUseReferencedBranches || len(branch) == 0 || bytes.Equal(prefix, keyCommitmentState) ||
aggTx.TxNumsInFiles(kv.StateDomains...) == 0 || !ValuesPlainKeyReferencingThresholdReached(at.StepSize(), fStartTxNum, fEndTxNum) {

return branch, nil // do not transform, return as is
}
Expand Down Expand Up @@ -386,7 +384,7 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto
dt.d.logger.Debug("prepare commitmentValTransformDomain", "merge", rng.String("range", dt.d.stepSize), "Mstorage", hadToLookupStorage, "Maccount", hadToLookupAccount)

vt := func(valBuf []byte, keyFromTxNum, keyEndTxNum uint64) (transValBuf []byte, err error) {
if !dt.d.ReplaceKeysInValues || len(valBuf) == 0 || ((keyEndTxNum-keyFromTxNum)/dt.d.stepSize)%2 != 0 {
if !dt.d.ReplaceKeysInValues || len(valBuf) == 0 || !ValuesPlainKeyReferencingThresholdReached(dt.d.stepSize, keyFromTxNum, keyEndTxNum) {
return valBuf, nil
}
if _, ok := storageFileMap[keyFromTxNum]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion db/state/domain_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestSharedDomain_CommitmentKeyReplacement(t *testing.T) {
require.NoError(t, err)

t.Logf("expected hash: %x", expectedHash)
t.Logf("valueTransform enabled: %t", agg.commitmentValuesTransform)
t.Logf("key referencing enabled: %t", agg.d[kv.CommitmentDomain].ReplaceKeysInValues)
err = agg.BuildFiles(stepSize * 16)
require.NoError(t, err)

Expand Down
14 changes: 5 additions & 9 deletions db/state/squeeze.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/erigontech/erigon/db/state/statecfg"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -107,7 +108,8 @@ func (a *Aggregator) sqeezeDomainFile(ctx context.Context, domain kv.Domain, fro
// SqueezeCommitmentFiles should be called only when NO EXECUTION is running.
// Removes commitment files and suppose following aggregator shutdown and restart (to integrate new files and rebuild indexes)
func SqueezeCommitmentFiles(ctx context.Context, at *AggregatorRoTx, logger log.Logger) error {
if !at.a.commitmentValuesTransform {
commitmentUseReferencedBranches := at.a.d[kv.CommitmentDomain].ReplaceKeysInValues
if !commitmentUseReferencedBranches {
return nil
}

Expand Down Expand Up @@ -321,7 +323,7 @@ func CheckCommitmentForPrint(ctx context.Context, rwDb kv.TemporalRwDB) (string,
return "", err
}
s := fmt.Sprintf("[commitment] Latest: blockNum: %d txNum: %d latestRootHash: %x\n", domains.BlockNum(), domains.TxNum(), rootHash)
s += fmt.Sprintf("[commitment] stepSize %d, commitmentValuesTransform enabled %t\n", a.StepSize(), a.commitmentValuesTransform)
s += fmt.Sprintf("[commitment] stepSize %d, commitmentValuesTransform enabled %t\n", a.StepSize(), a.d[kv.CommitmentDomain].ReplaceKeysInValues)
return s, nil
}

Expand Down Expand Up @@ -369,9 +371,6 @@ func RebuildCommitmentFiles(ctx context.Context, rwDb kv.TemporalRwDB, txNumsRea
logger.Info("[commitment_rebuild] collected shards to build", "count", len(sf.d[kv.AccountsDomain]))
start := time.Now()

originalCommitmentValuesTransform := a.commitmentValuesTransform
a.commitmentValuesTransform = false

var totalKeysCommitted uint64

for i, r := range ranges {
Expand Down Expand Up @@ -531,19 +530,16 @@ func RebuildCommitmentFiles(ctx context.Context, rwDb kv.TemporalRwDB, txNumsRea
break
}
}
a.commitmentValuesTransform = originalCommitmentValuesTransform // disable only while merging, to squeeze later. If enabled in Scheme, must be enabled while computing commitment to correctly dereference keys

}

var m runtime.MemStats
dbg.ReadMemStats(&m)
logger.Info("[rebuild_commitment] done", "duration", time.Since(start), "totalKeysProcessed", common.PrettyCounter(totalKeysCommitted), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))

a.commitmentValuesTransform = originalCommitmentValuesTransform

acRo.Close()

if !squeeze {
if !squeeze && !statecfg.Schema.CommitmentDomain.ReplaceKeysInValues {
return latestRoot, nil
}
logger.Info("[squeeze] starting")
Expand Down
4 changes: 1 addition & 3 deletions db/state/squeeze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func testDbAggregatorWithNoFiles(tb testing.TB, txCount int, cfg *testAggConfig)
_db, agg := testDbAndAggregatorv3(tb, cfg.stepSize)
db := wrapDbWithCtx(_db, agg)

agg.commitmentValuesTransform = !cfg.disableCommitmentBranchTransform
agg.d[kv.CommitmentDomain].ReplaceKeysInValues = agg.commitmentValuesTransform
agg.d[kv.CommitmentDomain].ReplaceKeysInValues = !cfg.disableCommitmentBranchTransform

ctx := context.Background()
agg.logger = log.Root().New()
Expand Down Expand Up @@ -117,7 +116,6 @@ func TestAggregator_SqueezeCommitment(t *testing.T) {
domains.Close()

// now do the squeeze
agg.commitmentValuesTransform = true
agg.d[kv.CommitmentDomain].ReplaceKeysInValues = true
err = SqueezeCommitmentFiles(context.Background(), AggTx(rwTx), log.New())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion db/state/statecfg/state_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ var Schema = SchemaGen{
CompressCfg: DomainCompressCfg, Compression: seg.CompressKeys,

Accessors: AccessorHashMap,
ReplaceKeysInValues: AggregatorSqueezeCommitmentValues,
ReplaceKeysInValues: AggregatorSqueezeCommitmentValues, // when true, keys are replaced in values during merge once file range reaches threshold

Hist: HistCfg{
ValuesTable: kv.TblCommitmentHistoryVals,
Expand Down
Loading