-
Notifications
You must be signed in to change notification settings - Fork 21.5k
core/state/snapshot: detect and clean up dangling storage snapshot in generation #24811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 26 commits
8604adb
51f8a0e
0ee1ebe
9b9dba1
617eda9
bbb6b94
816f6cb
af23c13
87d8bc3
f4a489d
5f37c25
0584fc6
546ce97
b88d7ac
e00ff21
54caa24
7fca158
e178af1
3acad8d
9a1ccd9
5df1225
d3fb321
83f60af
78ed542
254666a
1f5442d
55577d0
61dcb92
c80a059
25b0392
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| // Copyright 2022 The go-ethereum Authors | ||
| // This file is part of the go-ethereum library. | ||
| // | ||
| // The go-ethereum library is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Lesser General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
| // | ||
| // The go-ethereum library is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Lesser General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Lesser General Public License | ||
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|
|
||
| package snapshot | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/binary" | ||
| "errors" | ||
| "time" | ||
|
|
||
| "github.com/ethereum/go-ethereum/common" | ||
| "github.com/ethereum/go-ethereum/common/math" | ||
| "github.com/ethereum/go-ethereum/core/rawdb" | ||
| "github.com/ethereum/go-ethereum/ethdb" | ||
| "github.com/ethereum/go-ethereum/log" | ||
| ) | ||
|
|
||
| const ( | ||
| snapAccount = "account" // Identifier of account snapshot generation | ||
| snapStorage = "storage" // Identifier of storage snapshot generation | ||
| ) | ||
|
|
||
| // generatorStats is a collection of statistics gathered by the snapshot generator | ||
| // for logging purposes. | ||
| type generatorStats struct { | ||
| origin uint64 // Origin prefix where generation started | ||
| start time.Time // Timestamp when generation started | ||
| accounts uint64 // Number of accounts indexed(generated or recovered) | ||
| slots uint64 // Number of storage slots indexed(generated or recovered) | ||
| dangling uint64 // Number of dangling storage slots | ||
| storage common.StorageSize // Total account and storage slot size(generation or recovery) | ||
| } | ||
|
|
||
| // Log creates an contextual log with the given message and the context pulled | ||
| // from the internally maintained statistics. | ||
| func (gs *generatorStats) Log(msg string, root common.Hash, marker []byte) { | ||
| var ctx []interface{} | ||
| if root != (common.Hash{}) { | ||
| ctx = append(ctx, []interface{}{"root", root}...) | ||
| } | ||
| // Figure out whether we're after or within an account | ||
| switch len(marker) { | ||
| case common.HashLength: | ||
| ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...) | ||
| case 2 * common.HashLength: | ||
| ctx = append(ctx, []interface{}{ | ||
| "in", common.BytesToHash(marker[:common.HashLength]), | ||
| "at", common.BytesToHash(marker[common.HashLength:]), | ||
| }...) | ||
| } | ||
| // Add the usual measurements | ||
| ctx = append(ctx, []interface{}{ | ||
| "accounts", gs.accounts, | ||
| "slots", gs.slots, | ||
| "storage", gs.storage, | ||
| "dangling", gs.dangling, | ||
| "elapsed", common.PrettyDuration(time.Since(gs.start)), | ||
| }...) | ||
| // Calculate the estimated indexing time based on current stats | ||
| if len(marker) > 0 { | ||
| if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 { | ||
| left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8]) | ||
|
|
||
| speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero | ||
| ctx = append(ctx, []interface{}{ | ||
| "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond), | ||
| }...) | ||
| } | ||
| } | ||
| log.Info(msg, ctx...) | ||
| } | ||
|
|
||
| // generatorContext carries a few global values to be shared by all generation functions. | ||
| type generatorContext struct { | ||
| stats *generatorStats // Generation statistic collection | ||
| db ethdb.KeyValueStore // Key-value store containing the snapshot data | ||
| account *snapIter // Iterator of account snapshot data | ||
| storage *snapIter // Iterator of storage snapshot data | ||
| batch ethdb.Batch // Database batch for writing batch data atomically | ||
| logged time.Time // The timestamp when last generation progress was displayed | ||
| } | ||
|
|
||
| // newGeneratorContext initializes the context for generation. | ||
| func newGeneratorContext(stats *generatorStats, db ethdb.KeyValueStore, accMarker []byte, storageMarker []byte) *generatorContext { | ||
| ctx := &generatorContext{ | ||
| stats: stats, | ||
| db: db, | ||
| batch: db.NewBatch(), | ||
| logged: time.Now(), | ||
| } | ||
| ctx.openIterator(snapAccount, accMarker) | ||
| ctx.openIterator(snapStorage, storageMarker) | ||
| return ctx | ||
| } | ||
|
|
||
| // openIterator constructs global account and storage snapshot iterators | ||
| // at the interrupted position. These iterators should be reopened from time | ||
| // to time to avoid blocking leveldb compaction for a long time. | ||
| func (ctx *generatorContext) openIterator(kind string, start []byte) { | ||
| if kind == snapAccount { | ||
| iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start) | ||
| ctx.account = newSnapIter(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength)) | ||
| return | ||
| } | ||
| iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start) | ||
| ctx.storage = newSnapIter(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength)) | ||
| } | ||
|
|
||
| // reopenIterators releases the held two global database iterators and | ||
| // reopens them in the interruption position. It's aim for not blocking | ||
| // leveldb compaction. | ||
| func (ctx *generatorContext) reopenIterators() { | ||
| for i, iter := range []ethdb.Iterator{ctx.account, ctx.storage} { | ||
| key := iter.Key() | ||
| if len(key) == 0 { // nil or []byte{} | ||
| continue // the iterator may already be exhausted | ||
|
||
| } | ||
| kind := snapAccount | ||
| if i == 1 { | ||
| kind = snapStorage | ||
| } | ||
| iter.Release() | ||
| ctx.openIterator(kind, key[1:]) | ||
| } | ||
| } | ||
|
|
||
| // close releases all the held resources. | ||
| func (ctx *generatorContext) close() { | ||
| ctx.account.Release() | ||
| ctx.storage.Release() | ||
| } | ||
|
|
||
| // iterator returns the corresponding iterator specified by the kind. | ||
| func (ctx *generatorContext) iterator(kind string) *snapIter { | ||
| if kind == snapAccount { | ||
| return ctx.account | ||
| } | ||
| return ctx.storage | ||
| } | ||
|
|
||
| // removeStorageBefore, iterates and deletes all storage snapshots starting | ||
| // from the current iterator position until the specified account. When the | ||
| // iterator touches the storage located in the given account range, or the | ||
| // storage is larger than the given account range, it stops and moves back | ||
| // the iterator a step. | ||
| func (ctx *generatorContext) removeStorageBefore(account common.Hash) { | ||
| var ( | ||
| count uint64 | ||
| start = time.Now() | ||
| iter = ctx.storage | ||
| ) | ||
| for iter.Next() { | ||
| key := iter.Key() | ||
| if bytes.Compare(key[1:1+common.HashLength], account.Bytes()) >= 0 { | ||
| iter.Discard() | ||
| break | ||
| } | ||
| ctx.batch.Delete(key) | ||
| count += 1 | ||
| } | ||
| ctx.stats.dangling += count | ||
| snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds()) | ||
| } | ||
|
|
||
| // removeStorageAt iterates and deletes all storage snapshots which are located | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // in the specified account range. When the iterator touches the storage which | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // is larger than the given account range, it stops and moves back the iterator | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // a step. An error will be returned if the initial position of iterator is not | ||
| // in the given account range. | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| func (ctx *generatorContext) removeStorageAt(account common.Hash) error { | ||
| var ( | ||
| count int64 | ||
| start = time.Now() | ||
| iter = ctx.storage | ||
| ) | ||
| for iter.Next() { | ||
| key := iter.Key() | ||
| cmp := bytes.Compare(key[1:1+common.HashLength], account.Bytes()) | ||
| if cmp < 0 { | ||
| return errors.New("invalid iterator position") | ||
| } | ||
| if cmp > 0 { | ||
| iter.Discard() | ||
| break | ||
| } | ||
| ctx.batch.Delete(key) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can grow quite large if a big contract was deleted, imho we should check the batch size and flush if it gets large mid iteration. Also recreate the iterator?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. we can flush the batch here. And also in order to simplify the code, we can only flush the batch but without persisting the generation progress marker. We usually do this in |
||
| count += 1 | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| snapWipedStorageMeter.Mark(count) | ||
| snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds()) | ||
| return nil | ||
| } | ||
|
|
||
| // removeStorageLeft starting from the current iterator position, iterate and | ||
| // delete all storage snapshots left. | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| func (ctx *generatorContext) removeStorageLeft() { | ||
| var ( | ||
| count uint64 | ||
| start = time.Now() | ||
| iter = ctx.storage | ||
| ) | ||
| for iter.Next() { | ||
| ctx.batch.Delete(iter.Key()) | ||
rjl493456442 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| count += 1 | ||
rjl493456442 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| ctx.stats.dangling += count | ||
| snapDanglingStorageMeter.Mark(int64(count)) | ||
| snapStorageCleanCounter.Inc(time.Since(start).Nanoseconds()) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karalabe @holiman I think it's an important change so that I want to highlight here
Whenever we re-start snapshot generation, the snapshot iterators(account, storage) will be opened at the interruption position.
Theoretically the interruption marker represents that all snapshot data has been correctly generated before this. Even in the DiffToDisk function, the snapshot data before the interruption marker (including the interruption marker) will be changed, but it must ensure that all changes are correct and aligned to the new root.
But let's still list all the possible scenarios to prove this operation is correct.
The interruption marker is empty and snapshot iterations are opened at the beginning, it's obviously correct
The interruption marker is non-empty which points to an account
xyzand the slot atabc. So account iterator is opened atxyzand storage iterator is opened atxyz+abc.2.1 If the account is non-changed: correct
2.2 If the account is destructed in
diffToDiskoperation:diffToDiskis responsible for cleaning up all slots of accountxyz. Then in resumed generation, the position of storage iterator will be seeked to the first storage slot of next account. In this case nothing to do with the storage of accountxyzanymore. It's correct.2.3 If the storage of account
xyzis all cleaned up indiffToDiskoperationdiffToDiskis responsible for cleaning up slots beforeabc. Then in resumed generation storage iterator will be opened atxyz+abcand clean up the remainning slots sinceabc, correct.2.4 If the storage slots before
abcare partially updated indiffToDiskoperationdiffToDiskis responsible for updating slots beforeabccorrectly. Then in resumed generation storage iterator will be opened atxyz+abcand resume storage generation sinceabc, correct.