Skip to content

Commit ec132df

Browse files
authored
[Ledger] Replace LRU cache with a FIFO queue (circular buffer) (#2893)
1 parent f774617 commit ec132df

File tree

8 files changed

+408
-102
lines changed

8 files changed

+408
-102
lines changed

ledger/complete/checkpoint_benchmark_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ func BenchmarkLoadCheckpointAndWALs(b *testing.B) {
145145
return err
146146
},
147147
func(rootHash ledger.RootHash) error {
148-
forest.RemoveTrie(rootHash)
149148
return nil
150149
},
151150
)

ledger/complete/ledger.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const defaultTrieUpdateChanSize = 500
3232
// Ledger is fork-aware which means any update can be applied at any previous state which forms a tree of tries (forest).
3333
// The forest is in memory but all changes (e.g. register updates) are captured inside write-ahead-logs for crash recovery reasons.
3434
// In order to limit the memory usage and maintain the performance storage only keeps a limited number of
35-
// tries and purge the old ones (LRU-based); in other words, Ledger is not designed to be used
35+
// tries and purge the old ones (FIFO-based); in other words, Ledger is not designed to be used
3636
// for archival usage but make it possible for other software components to reconstruct very old tries using write-ahead logs.
3737
type Ledger struct {
3838
forest *mtrie.Forest
@@ -53,12 +53,7 @@ func NewLedger(
5353

5454
logger := log.With().Str("ledger", "complete").Logger()
5555

56-
forest, err := mtrie.NewForest(capacity, metrics, func(evictedTrie *trie.MTrie) {
57-
err := wal.RecordDelete(evictedTrie.RootHash())
58-
if err != nil {
59-
logger.Error().Err(err).Msg("failed to save delete record in wal")
60-
}
61-
})
56+
forest, err := mtrie.NewForest(capacity, metrics, nil)
6257
if err != nil {
6358
return nil, fmt.Errorf("cannot create forest: %w", err)
6459
}
@@ -358,7 +353,7 @@ func (l *Ledger) ExportCheckpointAt(
358353
Str("hash", rh.String()).
359354
Msgf("Most recently touched root hash.")
360355
return ledger.State(hash.DummyHash),
361-
fmt.Errorf("cannot get try at the given state commitment: %w", err)
356+
fmt.Errorf("cannot get trie at the given state commitment: %w", err)
362357
}
363358

364359
// clean up tries to release memory
@@ -505,20 +500,7 @@ func (l *Ledger) keepOnlyOneTrie(state ledger.State) error {
505500
// don't write things to WALs
506501
l.wal.PauseRecord()
507502
defer l.wal.UnpauseRecord()
508-
509-
allTries, err := l.forest.GetTries()
510-
if err != nil {
511-
return err
512-
}
513-
514-
targetRootHash := ledger.RootHash(state)
515-
for _, trie := range allTries {
516-
trieRootHash := trie.RootHash()
517-
if trieRootHash != targetRootHash {
518-
l.forest.RemoveTrie(trieRootHash)
519-
}
520-
}
521-
return nil
503+
return l.forest.PurgeCacheExcept(ledger.RootHash(state))
522504
}
523505

524506
func runReport(r ledger.Reporter, p []ledger.Payload, commit ledger.State, l zerolog.Logger) error {

ledger/complete/ledger_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,6 @@ func Test_WAL(t *testing.T) {
532532
assert.NoError(t, err)
533533
state, _, err = led.Set(update)
534534
require.NoError(t, err)
535-
fmt.Printf("Updated with %x\n", state)
536535

537536
data := make(map[string]ledger.Value, len(keys))
538537
for j, key := range keys {
@@ -581,10 +580,6 @@ func Test_WAL(t *testing.T) {
581580
}
582581
}
583582

584-
// test deletion
585-
s := led2.ForestSize()
586-
assert.Equal(t, s, size)
587-
588583
<-led2.Done()
589584
<-compactor2.Done()
590585
})

ledger/complete/mtrie/forest.go

Lines changed: 27 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package mtrie
22

33
import (
4-
"errors"
54
"fmt"
65

7-
lru "github.com/hashicorp/golang-lru"
8-
96
"github.com/onflow/flow-go/ledger"
107
"github.com/onflow/flow-go/ledger/common/hash"
118
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
@@ -27,7 +24,7 @@ type Forest struct {
2724
// tries stores all MTries in the forest. It is NOT a CACHE in the conventional sense:
2825
// there is no mechanism to load a trie from disk in case of a cache miss. Missing a
2926
// needed trie in the forest might cause a fatal application logic error.
30-
tries *lru.Cache
27+
tries *TrieCache
3128
forestCapacity int
3229
onTreeEvicted func(tree *trie.MTrie)
3330
metrics module.LedgerMetrics
@@ -36,38 +33,19 @@ type Forest struct {
3633
// NewForest returns a new instance of memory forest.
3734
//
3835
// CAUTION on forestCapacity: the specified capacity MUST be SUFFICIENT to store all needed MTries in the forest.
39-
// If more tries are added than the capacity, the Least Recently Used trie is removed (evicted) from the Forest.
40-
// THIS IS A ROUGH HEURISTIC as it might evict tries that are still needed.
36+
// If more tries are added than the capacity, the Least Recently Added trie is removed (evicted) from the Forest (FIFO queue).
4137
// Make sure you chose a sufficiently large forestCapacity, such that, when reaching the capacity, the
42-
// Least Recently Used trie will never be needed again.
38+
// Least Recently Added trie will never be needed again.
4339
func NewForest(forestCapacity int, metrics module.LedgerMetrics, onTreeEvicted func(tree *trie.MTrie)) (*Forest, error) {
44-
// init LRU cache as a SHORTCUT for a usage-related storage eviction policy
45-
var cache *lru.Cache
46-
var err error
47-
if onTreeEvicted != nil {
48-
cache, err = lru.NewWithEvict(forestCapacity, func(key interface{}, value interface{}) {
49-
trie, ok := value.(*trie.MTrie)
50-
if !ok {
51-
panic(fmt.Sprintf("cache contains item of type %T", value))
52-
}
53-
onTreeEvicted(trie)
54-
})
55-
} else {
56-
cache, err = lru.New(forestCapacity)
57-
}
58-
if err != nil {
59-
return nil, fmt.Errorf("cannot create forest cache: %w", err)
60-
}
61-
62-
forest := &Forest{tries: cache,
40+
forest := &Forest{tries: NewTrieCache(uint(forestCapacity), onTreeEvicted),
6341
forestCapacity: forestCapacity,
6442
onTreeEvicted: onTreeEvicted,
6543
metrics: metrics,
6644
}
6745

6846
// add trie with no allocated registers
6947
emptyTrie := trie.NewEmptyMTrie()
70-
err = forest.AddTrie(emptyTrie)
48+
err := forest.AddTrie(emptyTrie)
7149
if err != nil {
7250
return nil, fmt.Errorf("adding empty trie to forest failed: %w", err)
7351
}
@@ -333,33 +311,15 @@ func (f *Forest) HasTrie(rootHash ledger.RootHash) bool {
333311
// warning, use this function for read-only operation
334312
func (f *Forest) GetTrie(rootHash ledger.RootHash) (*trie.MTrie, error) {
335313
// if in memory
336-
if ent, found := f.tries.Get(rootHash); found {
337-
trie, ok := ent.(*trie.MTrie)
338-
if !ok {
339-
return nil, fmt.Errorf("forest contains an element of a wrong type")
340-
}
314+
if trie, found := f.tries.Get(rootHash); found {
341315
return trie, nil
342316
}
343317
return nil, fmt.Errorf("trie with the given rootHash %s not found", rootHash)
344318
}
345319

346320
// GetTries returns list of currently cached tree root hashes
347321
func (f *Forest) GetTries() ([]*trie.MTrie, error) {
348-
// ToDo needs concurrency safety
349-
keys := f.tries.Keys()
350-
tries := make([]*trie.MTrie, len(keys))
351-
for i, key := range keys {
352-
t, ok := f.tries.Get(key)
353-
if !ok {
354-
return nil, errors.New("concurrent Forest modification")
355-
}
356-
trie, ok := t.(*trie.MTrie)
357-
if !ok {
358-
return nil, errors.New("forest contains an element of a wrong type")
359-
}
360-
tries[i] = trie
361-
}
362-
return tries, nil
322+
return f.tries.Tries(), nil
363323
}
364324

365325
// AddTries adds a trie to the forest
@@ -381,44 +341,42 @@ func (f *Forest) AddTrie(newTrie *trie.MTrie) error {
381341

382342
// TODO: check Thread safety
383343
rootHash := newTrie.RootHash()
384-
if storedTrie, found := f.tries.Get(rootHash); found {
385-
trie, ok := storedTrie.(*trie.MTrie)
386-
if !ok {
387-
return fmt.Errorf("forest contains an element of a wrong type")
388-
}
389-
if trie.Equals(newTrie) {
390-
return nil
391-
}
392-
return fmt.Errorf("forest already contains a tree with same root hash but other properties")
344+
if _, found := f.tries.Get(rootHash); found {
345+
// do no op
346+
return nil
393347
}
394-
f.tries.Add(rootHash, newTrie)
395-
f.metrics.ForestNumberOfTrees(uint64(f.tries.Len()))
348+
f.tries.Push(newTrie)
349+
f.metrics.ForestNumberOfTrees(uint64(f.tries.Count()))
396350

397351
return nil
398352
}
399353

400-
// RemoveTrie removes a trie to the forest
401-
func (f *Forest) RemoveTrie(rootHash ledger.RootHash) {
402-
// TODO remove from the file as well
403-
f.tries.Remove(rootHash)
404-
f.metrics.ForestNumberOfTrees(uint64(f.tries.Len()))
405-
}
406-
407354
// GetEmptyRootHash returns the rootHash of empty Trie
408355
func (f *Forest) GetEmptyRootHash() ledger.RootHash {
409356
return trie.EmptyTrieRootHash()
410357
}
411358

412359
// MostRecentTouchedRootHash returns the rootHash of the most recently touched trie
413360
func (f *Forest) MostRecentTouchedRootHash() (ledger.RootHash, error) {
414-
keys := f.tries.Keys()
415-
if len(keys) > 0 {
416-
return keys[len(keys)-1].(ledger.RootHash), nil
361+
trie := f.tries.LastAddedTrie()
362+
if trie != nil {
363+
return trie.RootHash(), nil
417364
}
418365
return ledger.RootHash(hash.DummyHash), fmt.Errorf("no trie is stored in the forest")
419366
}
420367

368+
// PurgeCacheExcept removes all tries in the memory except the one with the given root hash
369+
func (f *Forest) PurgeCacheExcept(rootHash ledger.RootHash) error {
370+
trie, found := f.tries.Get(rootHash)
371+
if !found {
372+
return fmt.Errorf("trie with the given root hash not found")
373+
}
374+
f.tries.Purge()
375+
f.tries.Push(trie)
376+
return nil
377+
}
378+
421379
// Size returns the number of active tries in this store
422380
func (f *Forest) Size() int {
423-
return f.tries.Len()
381+
return f.tries.Count()
424382
}

ledger/complete/mtrie/forest_test.go

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ func TestTrieOperations(t *testing.T) {
4141
require.NoError(t, err)
4242
require.Equal(t, retnt.RootHash(), updatedTrie.RootHash())
4343
require.Equal(t, 2, forest.Size())
44-
45-
// Remove trie
46-
forest.RemoveTrie(updatedTrie.RootHash())
47-
require.Equal(t, 1, forest.Size())
4844
}
4945

5046
// TestTrieUpdate updates the empty trie with some values and verifies that the
@@ -1072,3 +1068,51 @@ func TestNow(t *testing.T) {
10721068
require.Equal(t, updatedRoot, updatedRoot2)
10731069
require.Equal(t, 2, size)
10741070
}
1071+
1072+
func TestPurgeCacheExcept(t *testing.T) {
1073+
forest, err := NewForest(5, &metrics.NoopCollector{}, nil)
1074+
require.NoError(t, err)
1075+
1076+
nt := trie.NewEmptyMTrie()
1077+
p1 := pathByUint8s([]uint8{uint8(53), uint8(74)})
1078+
v1 := payloadBySlices([]byte{'A'}, []byte{'A'})
1079+
1080+
updatedTrie1, _, err := trie.NewTrieWithUpdatedRegisters(nt, []ledger.Path{p1}, []ledger.Payload{*v1}, true)
1081+
require.NoError(t, err)
1082+
1083+
err = forest.AddTrie(updatedTrie1)
1084+
require.NoError(t, err)
1085+
1086+
p2 := pathByUint8s([]uint8{uint8(12), uint8(34)})
1087+
v2 := payloadBySlices([]byte{'B'}, []byte{'B'})
1088+
1089+
updatedTrie2, _, err := trie.NewTrieWithUpdatedRegisters(nt, []ledger.Path{p2}, []ledger.Payload{*v2}, true)
1090+
require.NoError(t, err)
1091+
1092+
err = forest.AddTrie(updatedTrie2)
1093+
require.NoError(t, err)
1094+
1095+
require.Equal(t, 3, forest.tries.Count())
1096+
forest.PurgeCacheExcept(updatedTrie2.RootHash())
1097+
1098+
require.Equal(t, 1, forest.tries.Count())
1099+
ret, err := forest.GetTrie(updatedTrie2.RootHash())
1100+
require.NoError(t, err)
1101+
require.Equal(t, ret, updatedTrie2)
1102+
1103+
_, err = forest.GetTrie(updatedTrie1.RootHash())
1104+
require.Error(t, err)
1105+
1106+
// test purge when only a single target trie exist there
1107+
forest.PurgeCacheExcept(updatedTrie1.RootHash())
1108+
ret, err = forest.GetTrie(updatedTrie2.RootHash())
1109+
require.NoError(t, err)
1110+
require.Equal(t, ret, updatedTrie2)
1111+
1112+
_, err = forest.GetTrie(updatedTrie1.RootHash())
1113+
require.Error(t, err)
1114+
1115+
// purge with non existing trie
1116+
forest.PurgeCacheExcept(updatedTrie2.RootHash())
1117+
require.Error(t, err)
1118+
}

0 commit comments

Comments
 (0)