Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {

// Schedule the resolved storage slots for prefetching if it's enabled.
if s.db.prefetcher != nil && s.data.Root != types.EmptyRootHash {
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, [][]byte{key[:]}, true); err != nil {
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, nil, []common.Hash{key}, true); err != nil {
log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err)
}
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *stateObject) setState(key common.Hash, value common.Hash, origin common
// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise() {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
slotsToPrefetch := make([]common.Hash, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
if origin, exist := s.uncommittedStorage[key]; exist && origin == value {
// The slot is reverted to its original value, delete the entry
Expand All @@ -250,7 +250,7 @@ func (s *stateObject) finalise() {
// The slot is different from its original value and hasn't been
// tracked for commit yet.
s.uncommittedStorage[key] = s.GetCommittedState(key)
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
slotsToPrefetch = append(slotsToPrefetch, key) // Copy needed for closure
}
// Aggregate the dirty storage slots into the pending area. It might
// be possible that the value of tracked slot here is same with the
Expand All @@ -261,7 +261,7 @@ func (s *stateObject) finalise() {
s.pendingStorage[key] = value
}
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch, false); err != nil {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, nil, slotsToPrefetch, false); err != nil {
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
// Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved.
var (
deletions []common.Hash
used = make([][]byte, 0, len(s.uncommittedStorage))
used = make([]common.Hash, 0, len(s.uncommittedStorage))
)
for key, origin := range s.uncommittedStorage {
// Skip noop changes, persist actual changes
Expand All @@ -346,7 +346,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
deletions = append(deletions, key)
}
// Cache the items for preloading
used = append(used, common.CopyBytes(key[:])) // Copy needed for closure
used = append(used, key) // Copy needed for closure
}
for _, key := range deletions {
if err := tr.DeleteStorage(s.address, key[:]); err != nil {
Expand All @@ -356,7 +356,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.StorageDeleted.Add(1)
}
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.addrHash, s.data.Root, used)
s.db.prefetcher.used(s.addrHash, s.data.Root, nil, used)
}
s.uncommittedStorage = make(Storage) // empties the commit markers
return tr, nil
Expand Down
16 changes: 8 additions & 8 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness)
// the prefetcher is constructed. For more details, see:
// https://github.com/ethereum/go-ethereum/issues/29880
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil)
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, nil, false); err != nil {
log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err)
}
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
}
// Schedule the resolved account for prefetching if it's enabled.
if s.prefetcher != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, [][]byte{addr[:]}, true); err != nil {
if err = s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, []common.Address{addr}, nil, true); err != nil {
log.Error("Failed to prefetch account", "addr", addr, "err", err)
}
}
Expand Down Expand Up @@ -720,7 +720,7 @@ func (s *StateDB) GetRefund() uint64 {
// the journal as well as the refunds. Finalise, however, will not push any updates
// into the tries just yet. Only IntermediateRoot or Commit will do that.
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties))
addressesToPrefetch := make([]common.Address, 0, len(s.journal.dirties))
for addr := range s.journal.dirties {
obj, exist := s.stateObjects[addr]
if !exist {
Expand Down Expand Up @@ -753,10 +753,10 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
// At this point, also ship the address off to the precacher. The precacher
// will start loading tries, and when the change is eventually committed,
// the commit-phase will be a lot faster
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
addressesToPrefetch = append(addressesToPrefetch, addr) // Copy needed for closure
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, false); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, nil, false); err != nil {
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
}
}
Expand Down Expand Up @@ -877,7 +877,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// into a shortnode. This requires `B` to be resolved from disk.
// Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved.
var (
usedAddrs [][]byte
usedAddrs []common.Address
deletedAddrs []common.Address
)
for addr, op := range s.mutations {
Expand All @@ -892,7 +892,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.updateStateObject(s.stateObjects[addr])
s.AccountUpdated += 1
}
usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure
usedAddrs = append(usedAddrs, addr) // Copy needed for closure
}
for _, deletedAddr := range deletedAddrs {
s.deleteStateObject(deletedAddr)
Expand All @@ -901,7 +901,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.AccountUpdates += time.Since(start)

if s.prefetcher != nil {
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs, nil)
}
// Track the amount of time wasted on hashing the account trie
defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now())
Expand Down
150 changes: 96 additions & 54 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,31 +118,31 @@ func (p *triePrefetcher) report() {
fetcher.wait() // ensure the fetcher's idle before poking in its internals

if fetcher.root == p.root {
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenReadAddr)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWriteAddr)))

p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))

for _, key := range fetcher.used {
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
for _, key := range fetcher.usedAddr {
delete(fetcher.seenReadAddr, key)
delete(fetcher.seenWriteAddr, key)
}
p.accountWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
p.accountWasteMeter.Mark(int64(len(fetcher.seenReadAddr) + len(fetcher.seenWriteAddr)))
} else {
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenReadSlot)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWriteSlot)))

p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))

for _, key := range fetcher.used {
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
for _, key := range fetcher.usedSlot {
delete(fetcher.seenReadSlot, key)
delete(fetcher.seenWriteSlot, key)
}
p.storageWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
p.storageWasteMeter.Mark(int64(len(fetcher.seenReadSlot) + len(fetcher.seenWriteSlot)))
}
}
}
Expand All @@ -158,7 +158,7 @@ func (p *triePrefetcher) report() {
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error {
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, addrs []common.Address, slots []common.Hash, read bool) error {
// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
Expand All @@ -175,7 +175,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher
}
return fetcher.schedule(keys, read)
return fetcher.schedule(addrs, slots, read)
}

// trie returns the trie matching the root hash, blocking until the fetcher of
Expand All @@ -195,10 +195,12 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {

// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the fetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, usedAddr []common.Address, usedSlot []common.Hash) {
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.wait() // ensure the fetcher's idle before poking in its internals
fetcher.used = append(fetcher.used, used...)

fetcher.usedAddr = append(fetcher.usedAddr, usedAddr...)
fetcher.usedSlot = append(fetcher.usedSlot, usedSlot...)
}
}

Expand Down Expand Up @@ -235,44 +237,50 @@ type subfetcher struct {
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption

seenRead map[string]struct{} // Tracks the entries already loaded via read operations
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations
seenReadAddr map[common.Address]struct{} // Tracks the accounts already loaded via read operations
seenWriteAddr map[common.Address]struct{} // Tracks the accounts already loaded via write operations
seenReadSlot map[common.Hash]struct{} // Tracks the storage already loaded via read operations
seenWriteSlot map[common.Hash]struct{} // Tracks the storage already loaded via write operations

dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
dupsCross int // Number of duplicate preload tasks via read-write-crosses

used [][]byte // Tracks the entries used in the end
usedAddr []common.Address // Tracks the accounts used in the end
usedSlot []common.Hash // Tracks the storage used in the end
}

// subfetcherTask is a trie path to prefetch, tagged with whether it originates
// from a read or a write request.
type subfetcherTask struct {
read bool
key []byte
addr *common.Address
slot *common.Hash
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
sf := &subfetcher{
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenRead: make(map[string]struct{}),
seenWrite: make(map[string]struct{}),
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenReadAddr: make(map[common.Address]struct{}),
seenWriteAddr: make(map[common.Address]struct{}),
seenReadSlot: make(map[common.Hash]struct{}),
seenWriteSlot: make(map[common.Hash]struct{}),
}
go sf.loop()
return sf
}

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
func (sf *subfetcher) schedule(addrs []common.Address, slots []common.Hash, read bool) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
Expand All @@ -281,8 +289,11 @@ func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
}
// Append the tasks to the current queue
sf.lock.Lock()
for _, key := range keys {
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, key: key})
for _, addr := range addrs {
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, addr: &addr})
}
for _, slot := range slots {
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, slot: &slot})
}
sf.lock.Unlock()

Expand Down Expand Up @@ -378,35 +389,66 @@ func (sf *subfetcher) loop() {
sf.lock.Unlock()

for _, task := range tasks {
key := string(task.key)
if task.read {
if _, ok := sf.seenRead[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsCross++
continue
if task.addr != nil {
key := *task.addr
if task.read {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteAddr[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteAddr[key]; ok {
sf.dupsWrite++
continue
}
}
} else {
if _, ok := sf.seenRead[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsWrite++
continue
key := *task.slot
if task.read {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteSlot[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteSlot[key]; ok {
sf.dupsWrite++
continue
}
}
}
if len(task.key) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task.key))
if task.addr != nil {
sf.trie.GetAccount(*task.addr)
} else {
sf.trie.GetStorage(sf.addr, task.key)
sf.trie.GetStorage(sf.addr, (*task.slot)[:])
}
if task.read {
sf.seenRead[key] = struct{}{}
if task.addr != nil {
sf.seenReadAddr[*task.addr] = struct{}{}
} else {
sf.seenReadSlot[*task.slot] = struct{}{}
}
} else {
sf.seenWrite[key] = struct{}{}
if task.addr != nil {
sf.seenWriteAddr[*task.addr] = struct{}{}
} else {
sf.seenWriteSlot[*task.slot] = struct{}{}
}
}
}

Expand Down
Loading