Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheGCFlag,
utils.CacheNoPrefetchFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheGCFlag,
utils.CacheNoPrefetchFlag,
},
},
{
Expand Down
14 changes: 10 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ var (
Usage: "Percentage of cache memory allowance to use for trie pruning (default = 25% full mode, 0% archive mode)",
Value: 25,
}
CacheNoPrefetchFlag = cli.BoolFlag{
Name: "cache.noprefetch",
Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)",
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Name: "mine",
Expand Down Expand Up @@ -1336,6 +1340,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
cfg.NoPruning = ctx.GlobalString(GCModeFlag.Name) == "archive"
cfg.NoPrefetch = ctx.GlobalBool(CacheNoPrefetchFlag.Name)

if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100
Expand Down Expand Up @@ -1595,10 +1600,11 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
cache := &core.CacheConfig{
Disabled: ctx.GlobalString(GCModeFlag.Name) == "archive",
TrieCleanLimit: eth.DefaultConfig.TrieCleanCache,
TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache,
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
TrieCleanLimit: eth.DefaultConfig.TrieCleanCache,
TrieCleanNoPrefetch: ctx.GlobalBool(CacheNoPrefetchFlag.Name),
TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache,
TrieDirtyDisabled: ctx.GlobalString(GCModeFlag.Name) == "archive",
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
cache.TrieCleanLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100
Expand Down
111 changes: 60 additions & 51 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ var (
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)

ErrNoGenesis = errors.New("Genesis not found in chain")
)

Expand All @@ -87,10 +90,11 @@ const (
// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
Disabled bool // Whether to disable trie write caching (archive node)
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -126,7 +130,6 @@ type BlockChain struct {
genesisBlock *types.Block

chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock

checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain
Expand All @@ -145,10 +148,11 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down

engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
Expand Down Expand Up @@ -189,8 +193,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
vmConfig: vmConfig,
badBlocks: badBlocks,
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
Expand Down Expand Up @@ -381,31 +386,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block {
return bc.currentFastBlock.Load().(*types.Block)
}

// SetProcessor sets the processor required for making state modifications.
func (bc *BlockChain) SetProcessor(processor Processor) {
bc.procmu.Lock()
defer bc.procmu.Unlock()
bc.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (bc *BlockChain) SetValidator(validator Validator) {
bc.procmu.Lock()
defer bc.procmu.Unlock()
bc.validator = validator
}

// Validator returns the current validator.
func (bc *BlockChain) Validator() Validator {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.validator
}

// Processor returns the current processor.
func (bc *BlockChain) Processor() Processor {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.processor
}

Expand Down Expand Up @@ -722,7 +709,7 @@ func (bc *BlockChain) Stop() {
// - HEAD: So we don't need to reprocess any blocks in the general case
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.Disabled {
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()

for _, offset := range []uint64{0, 1, triesInMemory - 1} {
Expand Down Expand Up @@ -982,7 +969,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb := bc.stateCache.TrieDB()

// If we're running an archive node, always flush
if bc.cacheConfig.Disabled {
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err
}
Expand Down Expand Up @@ -1147,7 +1134,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// If the chain is terminating, don't even bother starting u
// If the chain is terminating, don't even bother starting up
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
}
Expand Down Expand Up @@ -1175,7 +1162,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
defer close(abort)

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.Validator())
it := newInsertIterator(chain, results, bc.validator)

block, err := it.next()

Expand Down Expand Up @@ -1238,54 +1225,76 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
state, err := state.New(parent.Root, bc.stateCache)
statedb, err := state.New(parent.Root, bc.stateCache)
if err != nil {
return it.index, events, coalescedLogs, err
}
// Process block using the parent state as reference point.
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32

if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
go func(start time.Time) {
throwaway, _ := state.New(parent.Root, bc.stateCache)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(&followupInterrupt) == 1 {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now())
}
}
// Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them

triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation
trieproc := state.AccountReads + state.AccountUpdates
trieproc += state.StorageReads + state.StorageUpdates
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.StorageReads + statedb.StorageUpdates

blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)

// Validate the state using the default validator
substart = time.Now()
if err := bc.Validator().ValidateState(block, state, receipts, usedGas); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
proctime := time.Since(start)

// Update the metrics touched during block validation
accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them

blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash))
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))

// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, state)
status, err := bc.writeBlockWithState(block, receipts, statedb)
if err != nil {
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, events, coalescedLogs, err
}
atomic.StoreUint32(&followupInterrupt, 1)

// Update the metrics touched during block commit
accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them

blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
blockInsertTimer.UpdateSince(start)

switch status {
Expand Down
43 changes: 37 additions & 6 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor

// insertIterator is a helper to assist during chain import.
type insertIterator struct {
chain types.Blocks
results <-chan error
index int
validator Validator
chain types.Blocks // Chain of blocks being iterated over

results <-chan error // Verification result sink from the consensus engine
errors []error // Header verification errors for the blocks

index int // Current offset of the iterator
validator Validator // Validator to run if verification succeeds
}

// newInsertIterator creates a new iterator based on the given blocks, which are
Expand All @@ -92,6 +95,7 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
return &insertIterator{
chain: chain,
results: results,
errors: make([]error, 0, len(chain)),
index: -1,
validator: validator,
}
Expand All @@ -100,17 +104,44 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
// next returns the next block in the iterator, along with any potential validation
// error for that block. When the end is reached, it will return (nil, nil).
func (it *insertIterator) next() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
it.index = len(it.chain)
return nil, nil
}
// Advance the iterator and wait for verification result if not yet done
it.index++
if err := <-it.results; err != nil {
return it.chain[it.index], err
if len(it.errors) <= it.index {
it.errors = append(it.errors, <-it.results)
}
if it.errors[it.index] != nil {
return it.chain[it.index], it.errors[it.index]
}
// Block header valid, run body validation and return
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
}

// peek returns the next block in the iterator, along with any potential validation
// error for that block, but does **not** advance the iterator.
//
// Both header and body validation errors (nil too) is cached into the iterator
// to avoid duplicating work on the following next() call.
func (it *insertIterator) peek() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
return nil, nil
}
// Wait for verification result if not yet done
if len(it.errors) <= it.index+1 {
it.errors = append(it.errors, <-it.results)
}
if it.errors[it.index+1] != nil {
return it.chain[it.index+1], it.errors[it.index+1]
}
// Block header valid, ignore body validation since we don't have a parent anyway
return it.chain[it.index+1], nil
}

// previous returns the previous header that was being processed, or nil.
func (it *insertIterator) previous() *types.Header {
if it.index < 1 {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb, vm.Config{})
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
Loading