Skip to content

Commit 1c473ed

Browse files
committed
core/rawdb: batched freezer writes
core/rawdb: improve append benchmark core/rawdb: add freezer batch core/rawdb: minor benchmark nitpicks core/rawdb: add freezer batch core/rawdb: method docs core/rawdb: resolve freezer batch filenum at write-time core/rawdb: use bytes.Buffer in freezer batch core/rawdb: work in progress -- rlp encoding inside freezer batch core/rawdb: implement write batchs in freezer core/rawdb: improve memory allocations for snappy compression core/rawdb: polish core/rawdb: simplify + fix copy/paste error core/rawdb: error handling for freezertable batches core/rawdb: lint nitpicks core/rawdb: more tests for freezer batching core/rawdb: freezer batch interface core/rawdb: WIP freezer batch interface core/rawdb: check insert count of table batches core/rawdb: use ancient batch in background freeze core/rawdb: fix some issues in background freezer ethdb: document AncientBatch core/rawdb: remove single-item append core/rawdb: use dumpIndexString in batch tests core/rawdb: fix batch test core/rawdb: implement commit during append core/rawdb: add some missing close calls in freezer tests core/rawdb: simplify snappy buffer core/rawdb: change ancient writer interface core/rawdb: fix offset test core/rawdb: track headBytes only in freezerTable core/rawdb: add writeLock core/rawdb: reuse write batch core/rawdb: re-add metrics reporting core/rawdb: recreate the concurrent truncate test of freezer This changes the test slightly to use the 'freezer' object instead of 'freezerTable'. This is necessary because the concurrency handling has moved to the freezer and concurrent append and truncate is no longer allowed on the table object. core/rawdb: fix item count after ModifyAncients and improve the test core/rawdb: add concurrency test for retrieve core/rawdb: allow overriding max table size in newFreezer This makes the concurrency tests fail with -race. core/rawdb: fix race in advanceFile core/rawdb: remove atomic access on freezer.headId It doesn't need to be atomic, all accesses are protected by the lock. core/rawdb: use int64 for writeSize core/rawdb: implement and test all-or-nothing behavior of ModifyAncients core/rawdb: move buffer code to batch file core/rawdb: reimplement batch test on freezer core/rawdb: fix race in AncientSize core, core/rawdb: WIP batch ancient write in sync core/rawdb: delete old batch tests core: fix error handling when inserting side chain receipts This fixes (well, rewrites) the test for the error case of InsertReceiptChain where side chain data is imported and setting the fast block fails because it doesn't match the header chain. This previously relied on the terminateInsert hook. With the new ancient write interface, there is no good place for this hook anymore, so the test now injects an actual error instead of simulating one. core: improve comments core/blockchain, core/rawdb: less iterator usage when deleting leveldb-data core/rawdb: return write error from WriteAncientBlocks It's easier to test this function when it doesn't just exit the process on failure. core/rawdb: add benchmark for WriteAncientBlocks core/rawdb: use os.RemoveAll in test core/rawdb: avoid allocating in benchmark loop core/rawdb: avoid copying difficulty twice The difficulty is already copied by block.Header(), avoid copying it again. core/rawdb: improve WriteAncientBlocks benchmark core: handle block->header validation
1 parent fe2f153 commit 1c473ed

File tree

13 files changed

+1343
-578
lines changed

13 files changed

+1343
-578
lines changed

core/blockchain.go

Lines changed: 82 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,7 @@ type BlockChain struct {
207207
processor Processor // Block transaction processor interface
208208
vmConfig vm.Config
209209

210-
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
211-
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
210+
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
212211
}
213212

214213
// NewBlockChain returns a fully initialised block chain using information
@@ -1085,38 +1084,6 @@ const (
10851084
SideStatTy
10861085
)
10871086

1088-
// truncateAncient rewinds the blockchain to the specified header and deletes all
1089-
// data in the ancient store that exceeds the specified header.
1090-
func (bc *BlockChain) truncateAncient(head uint64) error {
1091-
frozen, err := bc.db.Ancients()
1092-
if err != nil {
1093-
return err
1094-
}
1095-
// Short circuit if there is no data to truncate in ancient store.
1096-
if frozen <= head+1 {
1097-
return nil
1098-
}
1099-
// Truncate all the data in the freezer beyond the specified head
1100-
if err := bc.db.TruncateAncients(head + 1); err != nil {
1101-
return err
1102-
}
1103-
// Clear out any stale content from the caches
1104-
bc.hc.headerCache.Purge()
1105-
bc.hc.tdCache.Purge()
1106-
bc.hc.numberCache.Purge()
1107-
1108-
// Clear out any stale content from the caches
1109-
bc.bodyCache.Purge()
1110-
bc.bodyRLPCache.Purge()
1111-
bc.receiptsCache.Purge()
1112-
bc.blockCache.Purge()
1113-
bc.txLookupCache.Purge()
1114-
bc.futureBlocks.Purge()
1115-
1116-
log.Info("Rewind ancient data", "number", head)
1117-
return nil
1118-
}
1119-
11201087
// numberHash is just a container for a number and a hash, to represent a block
11211088
type numberHash struct {
11221089
number uint64
@@ -1155,12 +1122,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
11551122
var (
11561123
stats = struct{ processed, ignored int32 }{}
11571124
start = time.Now()
1158-
size = 0
1125+
size = int64(0)
11591126
)
1127+
11601128
// updateHead updates the head fast sync block if the inserted blocks are better
11611129
// and returns an indicator whether the inserted blocks are canonical.
11621130
updateHead := func(head *types.Block) bool {
11631131
bc.chainmu.Lock()
1132+
defer bc.chainmu.Unlock()
11641133

11651134
// Rewind may have occurred, skip in that case.
11661135
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
@@ -1169,127 +1138,122 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
11691138
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
11701139
bc.currentFastBlock.Store(head)
11711140
headFastBlockGauge.Update(int64(head.NumberU64()))
1172-
bc.chainmu.Unlock()
11731141
return true
11741142
}
11751143
}
1176-
bc.chainmu.Unlock()
11771144
return false
11781145
}
1146+
11791147
// writeAncient writes blockchain and corresponding receipt chain into ancient store.
11801148
//
11811149
// this function only accepts canonical chain data. All side chain will be reverted
11821150
// eventually.
11831151
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
1184-
var (
1185-
previous = bc.CurrentFastBlock()
1186-
batch = bc.db.NewBatch()
1187-
)
1188-
// If any error occurs before updating the head or we are inserting a side chain,
1189-
// all the data written this time wll be rolled back.
1190-
defer func() {
1191-
if previous != nil {
1192-
if err := bc.truncateAncient(previous.NumberU64()); err != nil {
1193-
log.Crit("Truncate ancient store failed", "err", err)
1194-
}
1195-
}
1196-
}()
1197-
var deleted []*numberHash
1198-
for i, block := range blockChain {
1199-
// Short circuit insertion if shutting down or processing failed
1200-
if bc.insertStopped() {
1201-
return 0, errInsertionInterrupted
1202-
}
1203-
// Short circuit insertion if it is required(used in testing only)
1204-
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
1205-
return i, errors.New("insertion is terminated for testing purpose")
1206-
}
1207-
// Short circuit if the owner header is unknown
1208-
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
1209-
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
1210-
}
1211-
if block.NumberU64() == 1 {
1212-
// Make sure to write the genesis into the freezer
1213-
if frozen, _ := bc.db.Ancients(); frozen == 0 {
1214-
h := rawdb.ReadCanonicalHash(bc.db, 0)
1215-
b := rawdb.ReadBlock(bc.db, h, 0)
1216-
size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0))
1217-
log.Info("Wrote genesis to ancients")
1152+
first := blockChain[0]
1153+
last := blockChain[len(blockChain)-1]
1154+
1155+
// Ensure genesis is in ancients.
1156+
if first.NumberU64() == 1 {
1157+
if frozen, _ := bc.db.Ancients(); frozen == 0 {
1158+
b := bc.genesisBlock
1159+
td := bc.genesisBlock.Difficulty()
1160+
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
1161+
size += writeSize
1162+
if err != nil {
1163+
log.Error("Error writing genesis to ancients", "err", err)
1164+
return 0, err
12181165
}
1166+
log.Info("Wrote genesis to ancients")
12191167
}
1220-
// Flush data into ancient database.
1221-
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))
1222-
1223-
// Write tx indices if any condition is satisfied:
1224-
// * If user requires to reserve all tx indices(txlookuplimit=0)
1225-
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
1226-
// * If block number is large enough to be regarded as a recent block
1227-
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
1228-
//
1229-
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
1230-
// an external ancient database, during the setup, blockchain will start
1231-
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
1232-
// range. In this case, all tx indices of newly imported blocks should be
1233-
// generated.
1168+
}
1169+
// Before writing the blocks to the ancients, we need to ensure that
1170+
// they correspond to the what the headerchain 'expects'.
1171+
// We only check the last block/header, since it's a contiguous chain.
1172+
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
1173+
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
1174+
}
1175+
1176+
// Write all chain data to ancients.
1177+
td := bc.GetTd(first.Hash(), first.NumberU64())
1178+
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
1179+
size += writeSize
1180+
if err != nil {
1181+
log.Error("Error importing chain data to ancients", "err", err)
1182+
return 0, err
1183+
}
1184+
1185+
// Write tx indices if any condition is satisfied:
1186+
// * If user requires to reserve all tx indices(txlookuplimit=0)
1187+
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
1188+
// * If block number is large enough to be regarded as a recent block
1189+
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
1190+
//
1191+
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
1192+
// an external ancient database, during the setup, blockchain will start
1193+
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
1194+
// range. In this case, all tx indices of newly imported blocks should be
1195+
// generated.
1196+
var batch = bc.db.NewBatch()
1197+
for _, block := range blockChain {
12341198
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
12351199
rawdb.WriteTxLookupEntriesByBlock(batch, block)
12361200
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
12371201
rawdb.WriteTxLookupEntriesByBlock(batch, block)
12381202
}
12391203
stats.processed++
12401204
}
1205+
12411206
// Flush all tx-lookup index data.
1242-
size += batch.ValueSize()
1207+
size += int64(batch.ValueSize())
12431208
if err := batch.Write(); err != nil {
1209+
// The tx index data could not be written.
1210+
// Roll back the ancient store update.
1211+
fastBlock := bc.CurrentFastBlock().NumberU64()
1212+
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
1213+
log.Error("Can't truncate ancient store after failed insert", "err", err)
1214+
}
12441215
return 0, err
12451216
}
1246-
batch.Reset()
12471217

12481218
// Sync the ancient store explicitly to ensure all data has been flushed to disk.
12491219
if err := bc.db.Sync(); err != nil {
12501220
return 0, err
12511221
}
1252-
if !updateHead(blockChain[len(blockChain)-1]) {
1253-
return 0, errors.New("side blocks can't be accepted as the ancient chain data")
1254-
}
1255-
previous = nil // disable rollback explicitly
12561222

1257-
// Wipe out canonical block data.
1258-
for _, nh := range deleted {
1259-
rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
1260-
rawdb.DeleteCanonicalHash(batch, nh.number)
1261-
}
1262-
for _, block := range blockChain {
1263-
// Always keep genesis block in active database.
1264-
if block.NumberU64() != 0 {
1265-
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
1266-
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
1223+
// Update the current fast block because all block data is now present in DB.
1224+
previousFastBlock := bc.CurrentFastBlock().NumberU64()
1225+
if !updateHead(blockChain[len(blockChain)-1]) {
1226+
// We end up here if the header chain has reorg'ed, and the blocks/receipts
1227+
// don't match the canonical chain.
1228+
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
1229+
log.Error("Can't truncate ancient store after failed insert", "err", err)
12671230
}
1231+
return 0, errSideChainReceipts
12681232
}
1269-
if err := batch.Write(); err != nil {
1270-
return 0, err
1271-
}
1272-
batch.Reset()
12731233

1274-
// Wipe out side chain too.
1275-
for _, nh := range deleted {
1276-
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
1277-
rawdb.DeleteBlock(batch, hash, nh.number)
1234+
// Delete block data from the main database.
1235+
batch.Reset()
1236+
canonHashes := make(map[common.Hash]struct{})
1237+
for _, block := range blockChain {
1238+
canonHashes[block.Hash()] = struct{}{}
1239+
if block.NumberU64() == 0 {
1240+
continue
12781241
}
1242+
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
1243+
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
12791244
}
1280-
for _, block := range blockChain {
1281-
// Always keep genesis block in active database.
1282-
if block.NumberU64() != 0 {
1283-
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
1284-
rawdb.DeleteBlock(batch, hash, block.NumberU64())
1285-
}
1245+
// Delete side chain hash-to-number mappings.
1246+
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
1247+
if _, canon := canonHashes[nh.Hash]; !canon {
1248+
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
12861249
}
12871250
}
12881251
if err := batch.Write(); err != nil {
12891252
return 0, err
12901253
}
12911254
return 0, nil
12921255
}
1256+
12931257
// writeLive writes blockchain and corresponding receipt chain into active store.
12941258
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
12951259
skipPresenceCheck := false
@@ -1327,7 +1291,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13271291
if err := batch.Write(); err != nil {
13281292
return 0, err
13291293
}
1330-
size += batch.ValueSize()
1294+
size += int64(batch.ValueSize())
13311295
batch.Reset()
13321296
}
13331297
stats.processed++
@@ -1336,14 +1300,15 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13361300
// we can ensure all components of body is completed(body, receipts,
13371301
// tx indexes)
13381302
if batch.ValueSize() > 0 {
1339-
size += batch.ValueSize()
1303+
size += int64(batch.ValueSize())
13401304
if err := batch.Write(); err != nil {
13411305
return 0, err
13421306
}
13431307
}
13441308
updateHead(blockChain[len(blockChain)-1])
13451309
return 0, nil
13461310
}
1311+
13471312
// Write downloaded chain data and corresponding receipt chain data
13481313
if len(ancientBlocks) > 0 {
13491314
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {

0 commit comments

Comments
 (0)