diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 7a9cd172958..c264d6e0547 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -575,8 +575,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err } - // NOTE: bor_* RPCs are not fully supported when using polygon.sync (https://github.com/erigontech/erigon/issues/11171) - // Skip the compatibility check, until we have a schema in erigon-lib engine = bor.NewRo(cc, blockReader, logger) } else if cc != nil && cc.Aura != nil { consensusDB, err := kv2.New(kv.ConsensusDB, logger).Path(filepath.Join(cfg.DataDir, "aura")).Accede(true).Open(ctx) diff --git a/db/kv/tables.go b/db/kv/tables.go index 0da31d09185..a3957ef5ef9 100644 --- a/db/kv/tables.go +++ b/db/kv/tables.go @@ -132,19 +132,21 @@ const ( PendingEpoch = "DevPendingEpoch" // block_num_u64+block_hash->transition_proof // BOR - BorTxLookup = "BlockBorTransactionLookup" // transaction_hash -> block_num_u64 - BorEvents = "BorEvents" // event_id -> event_payload - BorEventNums = "BorEventNums" // block_num -> event_id (last event_id in that block) - BorEventProcessedBlocks = "BorEventProcessedBlocks" // block_num -> block_time, tracks processed blocks in the bridge, used for unwinds and restarts, gets pruned - BorEventTimes = "BorEventTimes" // timestamp -> event_id - BorSpans = "BorSpans" // span_id -> span (in JSON encoding) - BorMilestones = "BorMilestones" // milestone_id -> milestone (in JSON encoding) - BorMilestoneEnds = "BorMilestoneEnds" // start block_num -> milestone_id (first block of milestone) - BorCheckpoints = "BorCheckpoints" // checkpoint_id -> checkpoint (in JSON encoding) - BorCheckpointEnds = "BorCheckpointEnds" // start block_num -> checkpoint_id (first block of checkpoint) - BorProducerSelections = "BorProducerSelections" // span_id -> span selection with accumulated proposer priorities (in JSON encoding) - BorWitnesses = "BorWitnesses" // block_num_u64 + block_hash -> witness - BorWitnessSizes = "BorWitnessSizes" // block_num_u64 + block_hash -> witness size (uint64) + BorTxLookup = "BlockBorTransactionLookup" // transaction_hash -> block_num_u64 + BorEvents = "BorEvents" // event_id -> event_payload + BorEventNums = "BorEventNums" // block_num -> event_id (last event_id in that block) + BorEventProcessedBlocks = "BorEventProcessedBlocks" // block_num -> block_time, tracks processed blocks in the bridge, used for unwinds and restarts, gets pruned + BorEventTimes = "BorEventTimes" // timestamp -> event_id + BorSpans = "BorSpans" // span_id -> span (in JSON encoding) + BorSpansIndex = "BorSpansIndex" // span.StartBlockNumber -> span.Id + BorMilestones = "BorMilestones" // milestone_id -> milestone (in JSON encoding) + BorMilestoneEnds = "BorMilestoneEnds" // start block_num -> milestone_id (first block of milestone) + BorCheckpoints = "BorCheckpoints" // checkpoint_id -> checkpoint (in JSON encoding) + BorCheckpointEnds = "BorCheckpointEnds" // start block_num -> checkpoint_id (first block of checkpoint) + BorProducerSelections = "BorProducerSelections" // span_id -> span selection with accumulated proposer priorities (in JSON encoding) + BorProducerSelectionsIndex = "BorProducerSelectionsIndex" // span.StartBlockNumber -> span.Id + BorWitnesses = "BorWitnesses" // block_num_u64 + block_hash -> witness + BorWitnessSizes = "BorWitnessSizes" // block_num_u64 + block_hash -> witness size (uint64) // Downloader BittorrentCompletion = "BittorrentCompletion" @@ -349,11 +351,13 @@ var ChaindataTables = []string{ BorEventProcessedBlocks, BorEventTimes, BorSpans, + BorSpansIndex, BorMilestones, BorMilestoneEnds, BorCheckpoints, BorCheckpointEnds, BorProducerSelections, + BorProducerSelectionsIndex, BorWitnesses, BorWitnessSizes, TblAccountVals, @@ -586,19 +590,21 @@ var AuRaTablesCfg = TableCfg{ } var BorTablesCfg = TableCfg{ - BorTxLookup: {Flags: DupSort}, - BorEvents: {Flags: DupSort}, - BorEventNums: {Flags: DupSort}, - BorEventProcessedBlocks: {Flags: DupSort}, - BorEventTimes: {Flags: DupSort}, - BorSpans: {Flags: DupSort}, - BorCheckpoints: {Flags: DupSort}, - BorCheckpointEnds: {Flags: DupSort}, - BorMilestones: {Flags: DupSort}, - BorMilestoneEnds: {Flags: DupSort}, - BorProducerSelections: {Flags: DupSort}, - BorWitnesses: {Flags: DupSort}, - BorWitnessSizes: {Flags: DupSort}, + BorTxLookup: {Flags: DupSort}, + BorEvents: {Flags: DupSort}, + BorEventNums: {Flags: DupSort}, + BorEventProcessedBlocks: {Flags: DupSort}, + BorEventTimes: {Flags: DupSort}, + BorSpans: {Flags: DupSort}, + BorSpansIndex: {Flags: DupSort}, + BorProducerSelectionsIndex: {Flags: DupSort}, + BorCheckpoints: {Flags: DupSort}, + BorCheckpointEnds: {Flags: DupSort}, + BorMilestones: {Flags: DupSort}, + BorMilestoneEnds: {Flags: DupSort}, + BorProducerSelections: {Flags: DupSort}, + BorWitnesses: {Flags: DupSort}, + BorWitnessSizes: {Flags: DupSort}, } var TxpoolTablesCfg = TableCfg{} diff --git a/polygon/heimdall/client_idle.go b/polygon/heimdall/client_idle.go index 822fc2b8929..55896afae24 100644 --- a/polygon/heimdall/client_idle.go +++ b/polygon/heimdall/client_idle.go @@ -34,6 +34,8 @@ func NewIdleClient(cfg buildercfg.MiningConfig) Client { func (c *IdleClient) FetchLatestSpan(ctx context.Context) (*Span, error) { return &Span{ + StartBlock: 0, + EndBlock: 255, ValidatorSet: ValidatorSet{ Validators: []*Validator{ { @@ -55,7 +57,9 @@ func (c *IdleClient) FetchLatestSpan(ctx context.Context) (*Span, error) { func (c *IdleClient) FetchSpan(ctx context.Context, spanID uint64) (*Span, error) { return &Span{ - Id: SpanId(spanID), + Id: SpanId(spanID), + StartBlock: 0, + EndBlock: 255, ValidatorSet: ValidatorSet{ Validators: []*Validator{ { diff --git a/polygon/heimdall/entity_store.go b/polygon/heimdall/entity_store.go index 3be0c3ffd42..d6ad7428cea 100644 --- a/polygon/heimdall/entity_store.go +++ b/polygon/heimdall/entity_store.go @@ -32,12 +32,14 @@ import ( ) var databaseTablesCfg = kv.TableCfg{ - kv.BorCheckpoints: {}, - kv.BorCheckpointEnds: {}, - kv.BorMilestones: {}, - kv.BorMilestoneEnds: {}, - kv.BorSpans: {}, - kv.BorProducerSelections: {}, + kv.BorCheckpoints: {}, + kv.BorCheckpointEnds: {}, + kv.BorMilestones: {}, + kv.BorMilestoneEnds: {}, + kv.BorSpans: {}, + kv.BorSpansIndex: {}, + kv.BorProducerSelections: {}, + kv.BorProducerSelectionsIndex: {}, } //go:generate mockgen -typed=true -source=./entity_store.go -destination=./entity_store_mock.go -package=heimdall @@ -46,7 +48,7 @@ type EntityStore[TEntity Entity] interface { Close() LastEntityId(ctx context.Context) (uint64, bool, error) - LastFrozenEntityId() uint64 + LastFrozenEntityId() (uint64, bool, error) LastEntity(ctx context.Context) (TEntity, bool, error) Entity(ctx context.Context, id uint64) (TEntity, bool, error) PutEntity(ctx context.Context, id uint64, entity TEntity) error @@ -56,6 +58,8 @@ type EntityStore[TEntity Entity] interface { DeleteToBlockNum(ctx context.Context, unwindPoint uint64, limit int) (int, error) DeleteFromBlockNum(ctx context.Context, unwindPoint uint64) (int, error) + RangeIndex() RangeIndex + SnapType() snaptype.Type } @@ -72,7 +76,7 @@ func (NoopEntityStore[TEntity]) Close() {} func (NoopEntityStore[TEntity]) LastEntityId(ctx context.Context) (uint64, bool, error) { return 0, false, errors.New("noop") } -func (NoopEntityStore[TEntity]) LastFrozenEntityId() uint64 { return 0 } +func (NoopEntityStore[TEntity]) LastFrozenEntityId() (uint64, bool, error) { return 0, false, nil } func (NoopEntityStore[TEntity]) LastEntity(ctx context.Context) (TEntity, bool, error) { var res TEntity return res, false, errors.New("noop") @@ -142,6 +146,10 @@ func (s *mdbxEntityStore[TEntity]) WithTx(tx kv.Tx) EntityStore[TEntity] { return txEntityStore[TEntity]{s, tx} } +func (s *mdbxEntityStore[TEntity]) RangeIndex() RangeIndex { + return s.blockNumToIdIndex +} + func (s *mdbxEntityStore[TEntity]) Close() { } @@ -159,8 +167,8 @@ func (s *mdbxEntityStore[TEntity]) LastEntityId(ctx context.Context) (uint64, bo return txEntityStore[TEntity]{s, tx}.LastEntityId(ctx) } -func (s *mdbxEntityStore[TEntity]) LastFrozenEntityId() uint64 { - return 0 +func (s *mdbxEntityStore[TEntity]) LastFrozenEntityId() (uint64, bool, error) { + return 0, false, nil } func (s *mdbxEntityStore[TEntity]) LastEntity(ctx context.Context) (TEntity, bool, error) { @@ -222,7 +230,7 @@ func (s *mdbxEntityStore[TEntity]) PutEntity(ctx context.Context, id uint64, ent defer tx.Rollback() if err = (txEntityStore[TEntity]{s, tx}).PutEntity(ctx, id, entity); err != nil { - return nil + return err } return tx.Commit() diff --git a/polygon/heimdall/entity_store_mock.go b/polygon/heimdall/entity_store_mock.go index 8bcbbc1916a..62e3861fe7d 100644 --- a/polygon/heimdall/entity_store_mock.go +++ b/polygon/heimdall/entity_store_mock.go @@ -316,11 +316,13 @@ func (c *MockEntityStoreLastEntityIdCall[TEntity]) DoAndReturn(f func(context.Co } // LastFrozenEntityId mocks base method. -func (m *MockEntityStore[TEntity]) LastFrozenEntityId() uint64 { +func (m *MockEntityStore[TEntity]) LastFrozenEntityId() (uint64, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LastFrozenEntityId") ret0, _ := ret[0].(uint64) - return ret0 + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // LastFrozenEntityId indicates an expected call of LastFrozenEntityId. @@ -336,19 +338,19 @@ type MockEntityStoreLastFrozenEntityIdCall[TEntity Entity] struct { } // Return rewrite *gomock.Call.Return -func (c *MockEntityStoreLastFrozenEntityIdCall[TEntity]) Return(arg0 uint64) *MockEntityStoreLastFrozenEntityIdCall[TEntity] { - c.Call = c.Call.Return(arg0) +func (c *MockEntityStoreLastFrozenEntityIdCall[TEntity]) Return(arg0 uint64, arg1 bool, arg2 error) *MockEntityStoreLastFrozenEntityIdCall[TEntity] { + c.Call = c.Call.Return(arg0, arg1, arg2) return c } // Do rewrite *gomock.Call.Do -func (c *MockEntityStoreLastFrozenEntityIdCall[TEntity]) Do(f func() uint64) *MockEntityStoreLastFrozenEntityIdCall[TEntity] { +func (c *MockEntityStoreLastFrozenEntityIdCall[TEntity]) Do(f func() (uint64, bool, error)) *MockEntityStoreLastFrozenEntityIdCall[TEntity] { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockEntityStoreLastFrozenEntityIdCall[TEntity]) DoAndReturn(f func() uint64) *MockEntityStoreLastFrozenEntityIdCall[TEntity] { +func (c *MockEntityStoreLastFrozenEntityIdCall[TEntity]) DoAndReturn(f func() (uint64, bool, error)) *MockEntityStoreLastFrozenEntityIdCall[TEntity] { c.Call = c.Call.DoAndReturn(f) return c } @@ -468,6 +470,44 @@ func (c *MockEntityStoreRangeFromBlockNumCall[TEntity]) DoAndReturn(f func(conte return c } +// RangeIndex mocks base method. +func (m *MockEntityStore[TEntity]) RangeIndex() RangeIndex { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RangeIndex") + ret0, _ := ret[0].(RangeIndex) + return ret0 +} + +// RangeIndex indicates an expected call of RangeIndex. +func (mr *MockEntityStoreMockRecorder[TEntity]) RangeIndex() *MockEntityStoreRangeIndexCall[TEntity] { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeIndex", reflect.TypeOf((*MockEntityStore[TEntity])(nil).RangeIndex)) + return &MockEntityStoreRangeIndexCall[TEntity]{Call: call} +} + +// MockEntityStoreRangeIndexCall wrap *gomock.Call +type MockEntityStoreRangeIndexCall[TEntity Entity] struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockEntityStoreRangeIndexCall[TEntity]) Return(arg0 RangeIndex) *MockEntityStoreRangeIndexCall[TEntity] { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockEntityStoreRangeIndexCall[TEntity]) Do(f func() RangeIndex) *MockEntityStoreRangeIndexCall[TEntity] { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockEntityStoreRangeIndexCall[TEntity]) DoAndReturn(f func() RangeIndex) *MockEntityStoreRangeIndexCall[TEntity] { + c.Call = c.Call.DoAndReturn(f) + return c +} + // SnapType mocks base method. func (m *MockEntityStore[TEntity]) SnapType() snaptype.Type { m.ctrl.T.Helper() diff --git a/polygon/heimdall/range_index.go b/polygon/heimdall/range_index.go index 3701b93b0cf..6027b0f6e53 100644 --- a/polygon/heimdall/range_index.go +++ b/polygon/heimdall/range_index.go @@ -27,6 +27,7 @@ import ( type RangeIndex interface { Lookup(ctx context.Context, blockNum uint64) (uint64, bool, error) + Last(ctx context.Context) (uint64, bool, error) } type TransactionalRangeIndexer interface { @@ -128,6 +129,18 @@ func (i *dbRangeIndex) Lookup(ctx context.Context, blockNum uint64) (uint64, boo return id, ok, err } +func (i *dbRangeIndex) Last(ctx context.Context) (uint64, bool, error) { + var lastKey uint64 + var ok bool + + err := i.db.View(ctx, func(tx kv.Tx) error { + var err error + lastKey, ok, err = i.WithTx(tx).Last(ctx) + return err + }) + return lastKey, ok, err +} + func (i *txRangeIndex) Lookup(ctx context.Context, blockNum uint64) (uint64, bool, error) { cursor, err := i.tx.Cursor(i.table) if err != nil { @@ -149,6 +162,26 @@ func (i *txRangeIndex) Lookup(ctx context.Context, blockNum uint64) (uint64, boo return id, true, err } +// last key in the index +func (i *txRangeIndex) Last(ctx context.Context) (uint64, bool, error) { + cursor, err := i.tx.Cursor(i.table) + if err != nil { + return 0, false, err + } + defer cursor.Close() + key, value, err := cursor.Last() + if err != nil { + return 0, false, err + } + + if value == nil || key == nil { + return 0, false, nil + } + + lastKey := rangeIndexKeyParse(key) + return lastKey, true, nil +} + // Lookup ids for the given range [blockFrom, blockTo). Return boolean which checks if the result is reliable to use, because // heimdall data can be not published yet for [blockFrom, blockTo), in that case boolean OK will be false func (i *dbRangeIndex) GetIDsBetween(ctx context.Context, blockFrom, blockTo uint64) ([]uint64, bool, error) { diff --git a/polygon/heimdall/service.go b/polygon/heimdall/service.go index 4e7feea616f..299de6750fd 100644 --- a/polygon/heimdall/service.go +++ b/polygon/heimdall/service.go @@ -324,7 +324,7 @@ func (s *Service) Run(ctx context.Context) error { } s.RegisterSpanObserver(func(span *Span) { - s.spanBlockProducersTracker.ObserveSpanAsync(span) + s.spanBlockProducersTracker.ObserveSpanAsync(ctx, span) }) milestoneObserver := s.RegisterMilestoneObserver(func(milestone *Milestone) { diff --git a/polygon/heimdall/service_store.go b/polygon/heimdall/service_store.go index 5200b59d553..bd16454907d 100644 --- a/polygon/heimdall/service_store.go +++ b/polygon/heimdall/service_store.go @@ -41,10 +41,8 @@ func NewMdbxStore(logger log.Logger, dataDir string, accede bool, roTxLimit int6 } func newMdbxStore(db *polygoncommon.Database) *MdbxStore { - spanIndex := RangeIndexFunc( - func(ctx context.Context, blockNum uint64) (uint64, bool, error) { - return uint64(SpanIdAt(blockNum)), true, nil - }) + spanIndex := NewSpanRangeIndex(db, kv.BorSpansIndex) + producerSelectionIndex := NewSpanRangeIndex(db, kv.BorProducerSelectionsIndex) return &MdbxStore{ db: db, @@ -57,7 +55,7 @@ func newMdbxStore(db *polygoncommon.Database) *MdbxStore { spans: newMdbxEntityStore( db, kv.BorSpans, Spans, generics.New[Span], spanIndex), spanBlockProducerSelections: newMdbxEntityStore( - db, kv.BorProducerSelections, nil, generics.New[SpanBlockProducerSelection], spanIndex), + db, kv.BorProducerSelections, nil, generics.New[SpanBlockProducerSelection], producerSelectionIndex), } } diff --git a/polygon/heimdall/service_test.go b/polygon/heimdall/service_test.go index c40a388c05d..3fdd868b63e 100644 --- a/polygon/heimdall/service_test.go +++ b/polygon/heimdall/service_test.go @@ -193,7 +193,10 @@ func (suite *ServiceTestSuite) SetupSuite() { }) suite.eg.Go(func() error { - return suite.service.Run(suite.ctx) + defer suite.cancel() + err := suite.service.Run(suite.ctx) + require.ErrorIs(suite.T(), err, context.Canceled) + return err }) lastMilestone, ok, err := suite.service.SynchronizeMilestones(suite.ctx) diff --git a/polygon/heimdall/snapshot_store.go b/polygon/heimdall/snapshot_store.go index 99eda0b3e27..17ee4184e77 100644 --- a/polygon/heimdall/snapshot_store.go +++ b/polygon/heimdall/snapshot_store.go @@ -2,7 +2,6 @@ package heimdall import ( "context" - "encoding/binary" "encoding/json" "errors" "fmt" @@ -62,8 +61,6 @@ func (s *SnapshotStore) Prepare(ctx context.Context) error { return eg.Wait() } -var ErrSpanNotFound = errors.New("span not found") - type SpanSnapshotStore struct { EntityStore[*Span] snapshots *RoSnapshots @@ -78,7 +75,95 @@ func (s *SpanSnapshotStore) Prepare(ctx context.Context) error { return err } - return <-s.snapshots.Ready(ctx) + err := <-s.snapshots.Ready(ctx) + if err != nil { + return err + } + + err = s.buildSpanIndexFromSnapshots(ctx) + if err != nil { + return err + } + return nil +} + +func (s *SpanSnapshotStore) buildSpanIndexFromSnapshots(ctx context.Context) error { + rangeIndex := s.RangeIndex() + rangeIndexer, ok := rangeIndex.(RangeIndexer) + if !ok { + return errors.New("could not cast RangeIndex to RangeIndexer") + } + lastBlockNumInIndex, ok, err := rangeIndexer.Last(ctx) + if err != nil { + return err + } + if !ok { // index table is empty + lastBlockNumInIndex = 0 + } + + lastSpanIdInIndex, ok, err := rangeIndex.Lookup(ctx, lastBlockNumInIndex) + if err != nil { + return err + } + + if !ok { // index table is empty + lastSpanIdInIndex = 0 + } + + updateSpanIndexFunc := func(span Span) (stop bool, err error) { + // this is already written to index + if span.Id <= SpanId(lastSpanIdInIndex) { + return true, nil // we can stop because all subsequent span ids will already be in the SpanIndex + } + err = rangeIndexer.Put(ctx, span.BlockNumRange(), uint64(span.Id)) + if err != nil { + return true, nil // happy case, we can continue updating + } else { + return false, err // we need to stop if we encounter an error, so that the function doesn't get called again + } + } + // fill the index walking backwards from + return s.snapshotsReverseForEach(updateSpanIndexFunc) +} + +// Walk each span in the snapshots from last to first and apply function f as long as no error or stop condition is encountered +func (s *SpanSnapshotStore) snapshotsReverseForEach(f func(span Span) (stop bool, err error)) error { + if s.snapshots == nil { + return nil + } + + tx := s.snapshots.ViewType(s.SnapType()) + defer tx.Close() + segments := tx.Segments + // walk the segment files backwards + for i := len(segments) - 1; i >= 0; i-- { + sn := segments[i] + idx := sn.Src().Index() + if idx == nil || idx.KeyCount() == 0 { + continue + } + keyCount := idx.KeyCount() + // walk the segment file backwards + for j := int(keyCount - 1); j >= 0; j-- { + offset := idx.OrdinalLookup(uint64(j)) + gg := sn.Src().MakeGetter() + gg.Reset(offset) + result, _ := gg.Next(nil) + var span Span + err := json.Unmarshal(result, &span) + if err != nil { + return err + } + stop, err := f(span) + if err != nil { + return err + } + if stop { + return nil + } + } + } + return nil } func (s *SpanSnapshotStore) WithTx(tx kv.Tx) EntityStore[*Span] { @@ -93,9 +178,9 @@ func (s *SpanSnapshotStore) RangeExtractor() snaptype.RangeExtractor { }) } -func (s *SpanSnapshotStore) LastFrozenEntityId() uint64 { +func (s *SpanSnapshotStore) LastFrozenEntityId() (uint64, bool, error) { if s.snapshots == nil { - return 0 + return 0, false, nil } tx := s.snapshots.ViewType(s.SnapType()) @@ -103,7 +188,7 @@ func (s *SpanSnapshotStore) LastFrozenEntityId() uint64 { segments := tx.Segments if len(segments) == 0 { - return 0 + return 0, false, nil } // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment @@ -117,30 +202,34 @@ func (s *SpanSnapshotStore) LastFrozenEntityId() uint64 { } } if lastSegment == nil { - return 0 + return 0, false, nil } - lastSpanID := SpanIdAt(lastSegment.To()) - if lastSpanID > 0 { - lastSpanID-- + idx := lastSegment.Src().Index() + offset := idx.OrdinalLookup(idx.KeyCount() - 1) // check for the last element in this last seg file + gg := lastSegment.Src().MakeGetter() + gg.Reset(offset) + result, _ := gg.Next(nil) + + var span Span + if err := json.Unmarshal(result, &span); err != nil { + return 0, false, err } - return uint64(lastSpanID) + + return uint64(span.Id), true, nil } func (s *SpanSnapshotStore) Entity(ctx context.Context, id uint64) (*Span, bool, error) { - var endBlock uint64 - if id > 0 { - endBlock = SpanEndBlockNum(SpanId(id)) + + lastSpanIdInSnapshots, found, err := s.LastFrozenEntityId() + if err != nil { + return nil, false, fmt.Errorf("could not load last span id in snapshots: %w", err) } - maxBlockNumInFiles := s.snapshots.VisibleBlocksAvailable(s.SnapType().Enum()) - if maxBlockNumInFiles == 0 || endBlock > maxBlockNumInFiles { + if !found || id > lastSpanIdInSnapshots { // the span with this id is in MDBX and not in snapshots return s.EntityStore.Entity(ctx, id) } - var buf [8]byte - binary.BigEndian.PutUint64(buf[:], id) - tx := s.snapshots.ViewType(s.SnapType()) defer tx.Close() segments := tx.Segments @@ -149,22 +238,24 @@ func (s *SpanSnapshotStore) Entity(ctx context.Context, id uint64) (*Span, bool, sn := segments[i] idx := sn.Src().Index() - if idx == nil { - continue - } - spanFrom := uint64(SpanIdAt(sn.From())) - if id < spanFrom { + if idx == nil || idx.KeyCount() == 0 { continue } - spanTo := uint64(SpanIdAt(sn.To())) - if id >= spanTo { - continue + + gg := sn.Src().MakeGetter() + firstOffset := idx.OrdinalLookup(0) + gg.Reset(firstOffset) + firstSpanRaw, _ := gg.Next(nil) + var firstSpanInSeg Span + if err := json.Unmarshal(firstSpanRaw, &firstSpanInSeg); err != nil { + return nil, false, err } - if idx.KeyCount() == 0 { + // skip : we need to look in an earlier .seg file + if id < uint64(firstSpanInSeg.Id) { continue } + offset := idx.OrdinalLookup(id - idx.BaseDataID()) - gg := sn.Src().MakeGetter() gg.Reset(offset) result, _ := gg.Next(nil) @@ -181,13 +272,16 @@ func (s *SpanSnapshotStore) Entity(ctx context.Context, id uint64) (*Span, bool, func (s *SpanSnapshotStore) LastEntityId(ctx context.Context) (uint64, bool, error) { lastId, ok, err := s.EntityStore.LastEntityId(ctx) + if err != nil { + return lastId, false, err + } - snapshotLastId := s.LastFrozenEntityId() - if snapshotLastId > lastId { - return snapshotLastId, true, nil + if ok { // found in mdbx , return immediately + return lastId, ok, nil } - return lastId, ok, err + // check in snapshots + return s.LastFrozenEntityId() } func (s *SpanSnapshotStore) LastEntity(ctx context.Context) (*Span, bool, error) { @@ -231,9 +325,9 @@ func (s *MilestoneSnapshotStore) RangeExtractor() snaptype.RangeExtractor { }) } -func (s *MilestoneSnapshotStore) LastFrozenEntityId() uint64 { +func (s *MilestoneSnapshotStore) LastFrozenEntityId() (uint64, bool, error) { if s.snapshots == nil { - return 0 + return 0, false, nil } tx := s.snapshots.ViewType(s.SnapType()) @@ -241,7 +335,7 @@ func (s *MilestoneSnapshotStore) LastFrozenEntityId() uint64 { segments := tx.Segments if len(segments) == 0 { - return 0 + return 0, false, nil } // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment @@ -255,35 +349,32 @@ func (s *MilestoneSnapshotStore) LastFrozenEntityId() uint64 { } } if lastSegment == nil { - return 0 + return 0, false, nil } index := lastSegment.Src().Index() - return index.BaseDataID() + index.KeyCount() - 1 + return index.BaseDataID() + index.KeyCount() - 1, true, nil } func (s *MilestoneSnapshotStore) LastEntityId(ctx context.Context) (uint64, bool, error) { - lastId, ok, err := s.EntityStore.LastEntityId(ctx) - - snapshotLastId := s.LastFrozenEntityId() - if snapshotLastId > lastId { - return snapshotLastId, true, nil + lastId, foundInMdbx, err := s.EntityStore.LastEntityId(ctx) + if err != nil { + return lastId, foundInMdbx, err } - return lastId, ok, err + if foundInMdbx { // found in mdbx return immediately + return lastId, true, nil + } + return s.LastFrozenEntityId() } func (s *MilestoneSnapshotStore) Entity(ctx context.Context, id uint64) (*Milestone, bool, error) { entity, ok, err := s.EntityStore.Entity(ctx, id) - if ok { return entity, ok, err } - var buf [8]byte - binary.BigEndian.PutUint64(buf[:], id) - tx := s.snapshots.ViewType(s.SnapType()) defer tx.Close() segments := tx.Segments @@ -363,15 +454,14 @@ func (s *CheckpointSnapshotStore) WithTx(tx kv.Tx) EntityStore[*Checkpoint] { } func (s *CheckpointSnapshotStore) LastEntityId(ctx context.Context) (uint64, bool, error) { - lastId, ok, err := s.EntityStore.LastEntityId(ctx) - - snapshotLastCheckpointId := s.LastFrozenEntityId() - - if snapshotLastCheckpointId > lastId { - return snapshotLastCheckpointId, true, nil + lastId, foundInMdbx, err := s.EntityStore.LastEntityId(ctx) + if err != nil { + return lastId, foundInMdbx, err } - - return lastId, ok, err + if foundInMdbx { // found in MDBX return immediately + return lastId, foundInMdbx, err + } + return s.LastFrozenEntityId() } func (s *CheckpointSnapshotStore) Entity(ctx context.Context, id uint64) (*Checkpoint, bool, error) { @@ -409,9 +499,9 @@ func (s *CheckpointSnapshotStore) Entity(ctx context.Context, id uint64) (*Check return nil, false, fmt.Errorf("checkpoint %d: %w", id, ErrCheckpointNotFound) } -func (s *CheckpointSnapshotStore) LastFrozenEntityId() uint64 { +func (s *CheckpointSnapshotStore) LastFrozenEntityId() (uint64, bool, error) { if s.snapshots == nil { - return 0 + return 0, false, nil } tx := s.snapshots.ViewType(s.SnapType()) @@ -419,7 +509,7 @@ func (s *CheckpointSnapshotStore) LastFrozenEntityId() uint64 { segments := tx.Segments if len(segments) == 0 { - return 0 + return 0, false, nil } // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment @@ -434,12 +524,12 @@ func (s *CheckpointSnapshotStore) LastFrozenEntityId() uint64 { } if lastSegment == nil { - return 0 + return 0, false, nil } index := lastSegment.Src().Index() - return index.BaseDataID() + index.KeyCount() - 1 + return index.BaseDataID() + index.KeyCount() - 1, true, nil } func (s *CheckpointSnapshotStore) LastEntity(ctx context.Context) (*Checkpoint, bool, error) { diff --git a/polygon/heimdall/snapshot_store_test.go b/polygon/heimdall/snapshot_store_test.go index 5d49978155d..4e2592b7117 100644 --- a/polygon/heimdall/snapshot_store_test.go +++ b/polygon/heimdall/snapshot_store_test.go @@ -3,6 +3,7 @@ package heimdall import ( "context" "encoding/binary" + "encoding/json" "fmt" "path/filepath" "testing" @@ -29,17 +30,23 @@ func TestHeimdallStoreLastFrozenSpanIdWhenSegmentFilesArePresent(t *testing.T) { logger := testlog.Logger(t, log.LvlInfo) dir := t.TempDir() - createTestBorEventSegmentFile(t, 0, 500_000, 132, dir, logger) - createTestSegmentFile(t, 0, 500_000, Enums.Spans, dir, version.V1_0, logger) - borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet}, dir, 0, logger) - defer borRoSnapshots.Close() + createTestBorEventSegmentFile(t, 0, 5_000, 132, dir, logger) + createTestSegmentFile(t, 0, 5_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet, NoDownloader: true}, dir, 0, logger) + t.Cleanup(borRoSnapshots.Close) err := borRoSnapshots.OpenFolder() require.NoError(t, err) tempDir := t.TempDir() dataDir := fmt.Sprintf("%s/datadir", tempDir) heimdallStore := NewSnapshotStore(NewMdbxStore(logger, dataDir, false, 1), borRoSnapshots) - require.Equal(t, uint64(78), heimdallStore.spans.LastFrozenEntityId()) + t.Cleanup(heimdallStore.Close) + err = heimdallStore.Prepare(t.Context()) + require.NoError(t, err) + lastFrozenSpanId, found, err := heimdallStore.spans.LastFrozenEntityId() + require.NoError(t, err) + require.True(t, found) + require.Equal(t, uint64(4), lastFrozenSpanId) } func TestHeimdallStoreLastFrozenSpanIdWhenSegmentFilesAreNotPresent(t *testing.T) { @@ -47,8 +54,8 @@ func TestHeimdallStoreLastFrozenSpanIdWhenSegmentFilesAreNotPresent(t *testing.T logger := testlog.Logger(t, log.LvlInfo) dir := t.TempDir() - borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet}, dir, 0, logger) - defer borRoSnapshots.Close() + borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet, NoDownloader: true}, dir, 0, logger) + t.Cleanup(borRoSnapshots.Close) err := borRoSnapshots.OpenFolder() require.NoError(t, err) @@ -56,92 +63,189 @@ func TestHeimdallStoreLastFrozenSpanIdWhenSegmentFilesAreNotPresent(t *testing.T dataDir := fmt.Sprintf("%s/datadir", tempDir) heimdallStore := NewSnapshotStore(NewMdbxStore(logger, dataDir, false, 1), borRoSnapshots) - require.Equal(t, uint64(0), heimdallStore.spans.LastFrozenEntityId()) + t.Cleanup(heimdallStore.Close) + lastFrozenSpanId, found, err := heimdallStore.spans.LastFrozenEntityId() + require.NoError(t, err) + require.False(t, found) + require.Equal(t, uint64(0), lastFrozenSpanId) } func TestHeimdallStoreLastFrozenSpanIdReturnsLastSegWithIdx(t *testing.T) { t.Parallel() logger := testlog.Logger(t, log.LvlInfo) dir := t.TempDir() - createTestBorEventSegmentFile(t, 0, 500_000, 132, dir, logger) - createTestBorEventSegmentFile(t, 500_000, 1_000_000, 264, dir, logger) - createTestBorEventSegmentFile(t, 1_000_000, 1_500_000, 528, dir, logger) - createTestSegmentFile(t, 0, 500_000, Enums.Spans, dir, version.V1_0, logger) - createTestSegmentFile(t, 500_000, 1_000_000, Enums.Spans, dir, version.V1_0, logger) - createTestSegmentFile(t, 1_000_000, 1_500_000, Enums.Spans, dir, version.V1_0, logger) + createTestBorEventSegmentFile(t, 0, 4_000, 132, dir, logger) + createTestBorEventSegmentFile(t, 4_000, 6_000, 264, dir, logger) + createTestBorEventSegmentFile(t, 6_000, 10_000, 528, dir, logger) + createTestSegmentFile(t, 0, 4_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + createTestSegmentFile(t, 4_000, 6_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + createTestSegmentFile(t, 6_000, 10_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) // delete idx file for last bor span segment to simulate segment with missing idx file - idxFileToDelete := filepath.Join(dir, snaptype.IdxFileName(version.V1_0, 1_000_000, 1_500_000, Spans.Name())) + idxFileToDelete := filepath.Join(dir, snaptype.IdxFileName(version.V1_0, 0, 4_000, Spans.Name())) err := dir2.RemoveFile(idxFileToDelete) require.NoError(t, err) - borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet}, dir, 0, logger) - defer borRoSnapshots.Close() + borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet, NoDownloader: true}, dir, 0, logger) + t.Cleanup(borRoSnapshots.Close) err = borRoSnapshots.OpenFolder() require.NoError(t, err) tempDir := t.TempDir() dataDir := fmt.Sprintf("%s/datadir", tempDir) heimdallStore := NewSnapshotStore(NewMdbxStore(logger, dataDir, false, 1), borRoSnapshots) - require.Equal(t, uint64(156), heimdallStore.spans.LastFrozenEntityId()) + t.Cleanup(heimdallStore.Close) + err = heimdallStore.Prepare(t.Context()) + require.NoError(t, err) + lastFrozenSpanid, found, err := heimdallStore.spans.LastFrozenEntityId() + require.NoError(t, err) + require.True(t, found) + require.Equal(t, uint64(9), lastFrozenSpanid) } -func TestBlockReaderLastFrozenSpanIdReturnsZeroWhenAllSegmentsDoNotHaveIdx(t *testing.T) { +func TestHeimdallStoreEntity(t *testing.T) { t.Parallel() logger := testlog.Logger(t, log.LvlInfo) dir := t.TempDir() - createTestBorEventSegmentFile(t, 0, 500_000, 132, dir, logger) - createTestBorEventSegmentFile(t, 500_000, 1_000_000, 264, dir, logger) - createTestBorEventSegmentFile(t, 1_000_000, 1_500_000, 528, dir, logger) - createTestSegmentFile(t, 0, 500_000, Enums.Spans, dir, version.V1_0, logger) - createTestSegmentFile(t, 500_000, 1_000_000, Enums.Spans, dir, version.V1_0, logger) - createTestSegmentFile(t, 1_000_000, 1_500_000, Enums.Spans, dir, version.V1_0, logger) - // delete idx file for all bor span segments to simulate segments with missing idx files - idxFileToDelete := filepath.Join(dir, snaptype.IdxFileName(version.V1_0, 1, 500_000, Spans.Name())) - err := dir2.RemoveFile(idxFileToDelete) - require.NoError(t, err) - idxFileToDelete = filepath.Join(dir, snaptype.IdxFileName(version.V1_0, 500_000, 1_000_000, Spans.Name())) - err = dir2.RemoveFile(idxFileToDelete) + createTestSegmentFile(t, 0, 2_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + createTestSegmentFile(t, 2_000, 4_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + createTestSegmentFile(t, 4_000, 6_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + createTestSegmentFile(t, 6_000, 8_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + createTestSegmentFile(t, 8_000, 10_000, Enums.Spans, spanDataForTesting, dir, version.V1_0, logger) + borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet, NoDownloader: true}, dir, 0, logger) + t.Cleanup(borRoSnapshots.Close) + err := borRoSnapshots.OpenFolder() require.NoError(t, err) - idxFileToDelete = filepath.Join(dir, snaptype.IdxFileName(version.V1_0, 1_000_000, 1_500_000, Spans.Name())) - err = dir2.RemoveFile(idxFileToDelete) + + tempDir := t.TempDir() + dataDir := fmt.Sprintf("%s/datadir", tempDir) + heimdallStore := NewSnapshotStore(NewMdbxStore(logger, dataDir, false, 1), borRoSnapshots) + t.Cleanup(heimdallStore.Close) + err = heimdallStore.Prepare(t.Context()) require.NoError(t, err) - borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet}, dir, 0, logger) - defer borRoSnapshots.Close() - err = borRoSnapshots.OpenFolder() + for i := 0; i < len(spanDataForTesting); i++ { + expectedSpan := spanDataForTesting[i] + actualSpan, ok, err := heimdallStore.spans.Entity(t.Context(), expectedSpan.RawId()) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, actualSpan.Id, expectedSpan.Id) + require.Equal(t, actualSpan.StartBlock, expectedSpan.StartBlock) + require.Equal(t, actualSpan.EndBlock, expectedSpan.EndBlock) + } +} + +func TestHeimdallStoreLastFrozenIdWithSpanRotations(t *testing.T) { + t.Parallel() + + logger := testlog.Logger(t, log.LvlInfo) + dir := t.TempDir() + createTestSegmentFile(t, 0, 2_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 2_000, 4_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 4_000, 6_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 6_000, 8_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 8_000, 10_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet, NoDownloader: true}, dir, 0, logger) + t.Cleanup(borRoSnapshots.Close) + err := borRoSnapshots.OpenFolder() require.NoError(t, err) tempDir := t.TempDir() dataDir := fmt.Sprintf("%s/datadir", tempDir) + heimdallStore := NewSnapshotStore(NewMdbxStore(logger, dataDir, false, 1), borRoSnapshots) + t.Cleanup(heimdallStore.Close) + err = heimdallStore.Prepare(t.Context()) + require.NoError(t, err) + lastFrozenId, found, err := heimdallStore.spans.LastFrozenEntityId() + require.NoError(t, err) + require.True(t, found) + require.Equal(t, lastFrozenId, uint64(9)) +} +func TestHeimdallStoreEntityWithSpanRotations(t *testing.T) { + t.Parallel() + + logger := testlog.Logger(t, log.LvlInfo) + dir := t.TempDir() + createTestSegmentFile(t, 0, 2_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 2_000, 4_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 4_000, 6_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 6_000, 8_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + createTestSegmentFile(t, 8_000, 10_000, Enums.Spans, spanDataWithRotations, dir, version.V1_0, logger) + borRoSnapshots := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnet, NoDownloader: true}, dir, 0, logger) + t.Cleanup(borRoSnapshots.Close) + err := borRoSnapshots.OpenFolder() + require.NoError(t, err) + + tempDir := t.TempDir() + dataDir := fmt.Sprintf("%s/datadir", tempDir) heimdallStore := NewSnapshotStore(NewMdbxStore(logger, dataDir, false, 1), borRoSnapshots) - require.Equal(t, uint64(0), heimdallStore.spans.LastFrozenEntityId()) + t.Cleanup(heimdallStore.Close) + err = heimdallStore.Prepare(t.Context()) + require.NoError(t, err) + for i := 0; i < len(spanDataWithRotations); i++ { + expectedSpan := spanDataWithRotations[i] + actualSpan, ok, err := heimdallStore.spans.Entity(t.Context(), expectedSpan.RawId()) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, actualSpan.Id, expectedSpan.Id) + require.Equal(t, actualSpan.StartBlock, expectedSpan.StartBlock) + require.Equal(t, actualSpan.EndBlock, expectedSpan.EndBlock) + } } -func createTestSegmentFile(t *testing.T, from, to uint64, name snaptype.Enum, dir string, ver version.Version, logger log.Logger) { +func createTestSegmentFile(t *testing.T, from, to uint64, name snaptype.Enum, spans []Span, dir string, ver version.Version, logger log.Logger) { compressCfg := seg.DefaultCfg compressCfg.MinPatternScore = 100 - c, err := seg.NewCompressor(context.Background(), "test", filepath.Join(dir, snaptype.SegmentFileName(ver, from, to, name)), dir, compressCfg, log.LvlDebug, logger) + segFileName := filepath.Join(dir, snaptype.SegmentFileName(ver, from, to, name)) + c, err := seg.NewCompressor(context.Background(), "test", segFileName, dir, compressCfg, log.LvlDebug, logger) require.NoError(t, err) defer c.Close() c.DisableFsync() - err = c.AddWord([]byte{1}) - require.NoError(t, err) + // use from and to to determine which spans go inside this .seg file from the spansForTesting + // it is not a requirement, but a handy convention for testing purposes + for i := from / 1000; i < to/1000; i++ { + span := spans[i] + buf, err := json.Marshal(span) + require.NoError(t, err) + err = c.AddWord(buf) + require.NoError(t, err) + } err = c.Compress() require.NoError(t, err) + d, err := seg.NewDecompressor(segFileName) + require.NoError(t, err) + defer d.Close() + indexFileName := filepath.Join(dir, snaptype.IdxFileName(version.V1_0, from, to, name.String())) idx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ - KeyCount: 1, - BucketSize: 10, + KeyCount: c.Count(), + Enums: c.Count() > 0, + BucketSize: recsplit.DefaultBucketSize, TmpDir: dir, - IndexFile: filepath.Join(dir, snaptype.IdxFileName(version.V1_0, from, to, name.String())), - LeafSize: 8, + BaseDataID: from / 1000, + IndexFile: indexFileName, + LeafSize: recsplit.DefaultLeafSize, }, logger) require.NoError(t, err) defer idx.Close() idx.DisableFsync() - err = idx.AddKey([]byte{1}, 0) - require.NoError(t, err) + getter := d.MakeGetter() + // + var i, offset, nextPos uint64 + var key [8]byte + for getter.HasNext() { + nextPos, _ = getter.Skip() + binary.BigEndian.PutUint64(key[:], i) + i++ + err = idx.AddKey(key[:], offset) + require.NoError(t, err) + offset = nextPos + } err = idx.Build(context.Background()) require.NoError(t, err) + index, err := recsplit.OpenIndex(indexFileName) + require.NoError(t, err) + defer index.Close() + baseId := index.BaseDataID() + require.Equal(t, baseId, from/1000) if name == snaptype2.Transactions.Enum() { idx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: 1, @@ -198,3 +302,110 @@ func createTestBorEventSegmentFile(t *testing.T, from, to, eventId uint64, dir s err = idx.Build(context.Background()) require.NoError(t, err) } + +var spanDataForTesting = []Span{ + Span{ + Id: 0, + StartBlock: 0, + EndBlock: 999, + }, + Span{ + Id: 1, + StartBlock: 1000, + EndBlock: 1999, + }, + Span{ + Id: 2, + StartBlock: 2000, + EndBlock: 2999, + }, + Span{ + Id: 3, + StartBlock: 3000, + EndBlock: 3999, + }, + Span{ + Id: 4, + StartBlock: 4000, + EndBlock: 4999, + }, + Span{ + Id: 5, + StartBlock: 5000, + EndBlock: 5999, + }, + Span{ + Id: 6, + StartBlock: 6000, + EndBlock: 6999, + }, + Span{ + Id: 7, + StartBlock: 7000, + EndBlock: 7999, + }, + Span{ + Id: 8, + StartBlock: 8000, + EndBlock: 8999, + }, + Span{ + Id: 9, + StartBlock: 9000, + EndBlock: 9999, + }, +} + +// span data that is irregular, containing possible span rotations +var spanDataWithRotations = []Span{ + Span{ + Id: 0, + StartBlock: 0, + EndBlock: 999, + }, + Span{ + Id: 1, + StartBlock: 5, + EndBlock: 1999, + }, + Span{ + Id: 2, + StartBlock: 1988, + EndBlock: 2999, + }, + Span{ + Id: 3, + StartBlock: 3000, + EndBlock: 3999, + }, + Span{ + Id: 4, + StartBlock: 3500, + EndBlock: 4999, + }, + Span{ + Id: 5, + StartBlock: 5000, + EndBlock: 5999, + }, + Span{ + Id: 6, + StartBlock: 5500, + EndBlock: 6999, + }, + Span{ + Id: 7, + StartBlock: 7000, + EndBlock: 7999, + }, + Span{ + Id: 8, + StartBlock: 7001, + EndBlock: 8999, + }, + Span{ + Id: 9, + StartBlock: 7002, + EndBlock: 9999, + }, +} diff --git a/polygon/heimdall/span_block_producers_tracker.go b/polygon/heimdall/span_block_producers_tracker.go index b223f39d917..f83becdcf7a 100644 --- a/polygon/heimdall/span_block_producers_tracker.go +++ b/polygon/heimdall/span_block_producers_tracker.go @@ -100,9 +100,14 @@ func (t *spanBlockProducersTracker) Synchronize(ctx context.Context) error { } } -func (t *spanBlockProducersTracker) ObserveSpanAsync(span *Span) { - t.queued.Add(1) - t.newSpans <- span +func (t *spanBlockProducersTracker) ObserveSpanAsync(ctx context.Context, span *Span) { + select { + case <-ctx.Done(): + return + case t.newSpans <- span: + t.queued.Add(1) + return + } } func (t *spanBlockProducersTracker) ObserveSpan(ctx context.Context, newSpan *Span) error { @@ -205,12 +210,18 @@ func (t *spanBlockProducersTracker) producers(ctx context.Context, blockNum uint } // have we previously calculated the producers for the previous sprint num of the same span (chain tip optimisation) - spanId := SpanIdAt(blockNum) + spanId, ok, err := t.store.EntityIdFromBlockNum(ctx, blockNum) + if err != nil { + return nil, 0, err + } + if !ok { + return nil, 0, fmt.Errorf("could not get spanId from blockNum=%d", blockNum) + } var prevSprintNum uint64 if currentSprintNum > 0 { prevSprintNum = currentSprintNum - 1 } - if selection, ok := t.recentSelections.Get(prevSprintNum); ok && spanId == selection.SpanId { + if selection, ok := t.recentSelections.Get(prevSprintNum); ok && SpanId(spanId) == selection.SpanId { producersCopy := selection.Producers.Copy() producersCopy.IncrementProposerPriority(1) selectionCopy := selection @@ -220,7 +231,7 @@ func (t *spanBlockProducersTracker) producers(ctx context.Context, blockNum uint } // no recent selection that we can easily use, re-calculate from DB - producerSelection, ok, err := t.store.Entity(ctx, uint64(spanId)) + producerSelection, ok, err := t.store.Entity(ctx, spanId) if err != nil { return nil, 0, err } diff --git a/polygon/heimdall/span_id.go b/polygon/heimdall/span_id_legacy.go similarity index 80% rename from polygon/heimdall/span_id.go rename to polygon/heimdall/span_id_legacy.go index 717c79edfdf..232ab31b580 100644 --- a/polygon/heimdall/span_id.go +++ b/polygon/heimdall/span_id_legacy.go @@ -17,6 +17,8 @@ package heimdall import ( + "errors" + "github.com/erigontech/erigon/polygon/bor/borcfg" ) @@ -27,7 +29,11 @@ const ( zerothSpanEnd = 255 // End block of 0th span ) -// SpanIdAt returns the corresponding span id for the given block number. +var ( + ErrSpanNotFound = errors.New("span not found") +) + +// Deprecated: SpanIdAt returns the corresponding span id for the given block number. func SpanIdAt(blockNum uint64) SpanId { if blockNum > zerothSpanEnd { return SpanId(1 + (blockNum-zerothSpanEnd-1)/spanLength) @@ -35,7 +41,7 @@ func SpanIdAt(blockNum uint64) SpanId { return 0 } -// SpanEndBlockNum returns the number of the last block in the given span. +// Deprecated: SpanEndBlockNum returns the number of the last block in the given span. func SpanEndBlockNum(spanId SpanId) uint64 { if spanId > 0 { return uint64(spanId)*spanLength + zerothSpanEnd @@ -43,7 +49,7 @@ func SpanEndBlockNum(spanId SpanId) uint64 { return zerothSpanEnd } -// IsBlockInLastSprintOfSpan returns true if a block num is within the last sprint of a span and false otherwise. +// Deprecated: IsBlockInLastSprintOfSpan returns true if a block num is within the last sprint of a span and false otherwise. func IsBlockInLastSprintOfSpan(blockNum uint64, config *borcfg.BorConfig) bool { spanNum := SpanIdAt(blockNum) endBlockNum := SpanEndBlockNum(spanNum) diff --git a/polygon/heimdall/span_id_test.go b/polygon/heimdall/span_id_legacy_test.go similarity index 100% rename from polygon/heimdall/span_id_test.go rename to polygon/heimdall/span_id_legacy_test.go diff --git a/polygon/heimdall/span_range_index.go b/polygon/heimdall/span_range_index.go new file mode 100644 index 00000000000..2841f197f6b --- /dev/null +++ b/polygon/heimdall/span_range_index.go @@ -0,0 +1,220 @@ +package heimdall + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + + "github.com/erigontech/erigon/db/kv" + "github.com/erigontech/erigon/polygon/polygoncommon" +) + +type spanRangeIndex struct { + db *polygoncommon.Database + table string +} + +func NewSpanRangeIndex(db *polygoncommon.Database, table string) *spanRangeIndex { + return &spanRangeIndex{db, table} +} + +func (i *spanRangeIndex) WithTx(tx kv.Tx) RangeIndexer { + return &txSpanRangeIndex{i, tx} +} + +// Put a mapping from a range to an id. +func (i *spanRangeIndex) Put(ctx context.Context, r ClosedRange, id uint64) error { + tx, err := i.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + + if err := i.WithTx(tx).Put(ctx, r, id); err != nil { + return err + } + + return tx.Commit() +} + +// Lookup an id of a span given by blockNum within that range. +func (i *spanRangeIndex) Lookup(ctx context.Context, blockNum uint64) (uint64, bool, error) { + var id uint64 + var ok bool + + err := i.db.View(ctx, func(tx kv.Tx) error { + var err error + id, ok, err = i.WithTx(tx).Lookup(ctx, blockNum) + return err + }) + return id, ok, err +} + +func (i *spanRangeIndex) Last(ctx context.Context) (uint64, bool, error) { + var lastKey uint64 + var ok bool + + err := i.db.View(ctx, func(tx kv.Tx) error { + var err error + lastKey, ok, err = i.WithTx(tx).Last(ctx) + return err + }) + return lastKey, ok, err +} + +// Lookup ids for the given range [blockFrom, blockTo). Return boolean which checks if the result is reliable to use, because +// heimdall data can be not published yet for [blockFrom, blockTo), in that case boolean OK will be false +func (i *spanRangeIndex) GetIDsBetween(ctx context.Context, blockFrom, blockTo uint64) ([]uint64, bool, error) { + var ids []uint64 + var ok bool + + err := i.db.View(ctx, func(tx kv.Tx) error { + var err error + ids, ok, err = i.WithTx(tx).GetIDsBetween(ctx, blockFrom, blockTo) + return err + }) + return ids, ok, err +} + +type txSpanRangeIndex struct { + *spanRangeIndex + tx kv.Tx +} + +func NewTxSpanRangeIndex(db kv.RoDB, table string, tx kv.Tx) *txSpanRangeIndex { + return &txSpanRangeIndex{&spanRangeIndex{db: polygoncommon.AsDatabase(db.(kv.RwDB)), table: table}, tx} +} + +func (i *txSpanRangeIndex) Put(ctx context.Context, r ClosedRange, id uint64) error { + key := rangeIndexKey(r.Start) // use span.StartBlock as key + tx, ok := i.tx.(kv.RwTx) + + if !ok { + return errors.New("tx not writable") + } + valuePair := writeSpanIdEndBlockPair(id, r.End) // write (spanId, EndBlock) pair to buf + return tx.Put(i.table, key[:], valuePair[:]) +} + +func (i *txSpanRangeIndex) Lookup(ctx context.Context, blockNum uint64) (uint64, bool, error) { + cursor, err := i.tx.Cursor(i.table) + if err != nil { + return 0, false, err + } + defer cursor.Close() + + key := rangeIndexKey(blockNum) + startBlockRaw, valuePair, err := cursor.Seek(key[:]) + if err != nil { + return 0, false, err + } + // seek not found, we check the last entry + if valuePair == nil { + // get latest then + lastStartBlockRaw, lastValuePair, err := cursor.Last() + if err != nil { + return 0, false, err + } + if lastValuePair == nil { + return 0, false, nil + } + lastStartBlock := rangeIndexKeyParse(lastStartBlockRaw) + lastSpanId, lastEndBlock := rangeIndexValuePairParse(lastValuePair) + // sanity check + isInRange := blockNumInRange(blockNum, lastStartBlock, lastEndBlock) + if !isInRange { + return 0, false, fmt.Errorf("SpanIndexLookup(%d) returns Span{Id:%d, StartBlock:%d, EndBlock:%d } not containing blockNum=%d", blockNum, lastSpanId, lastStartBlock, lastEndBlock, blockNum) + } + // happy case + return lastSpanId, true, nil + + } + + currStartBlock := rangeIndexKeyParse(startBlockRaw) + // If currStartBlock == blockNum, then this span contains blockNum, and no need to do the .Prev() below + if currStartBlock == blockNum { + currSpanId, currEndBlock := rangeIndexValuePairParse(valuePair) + // sanityCheck + isInRange := blockNumInRange(blockNum, currStartBlock, currEndBlock) + if !isInRange { + return 0, false, fmt.Errorf("SpanIndexLookup(%d) returns Span{Id:%d, StartBlock:%d, EndBlock:%d } not containing blockNum=%d", blockNum, currSpanId, currStartBlock, currEndBlock, blockNum) + } + // happy case + return currSpanId, true, nil + } + + // Prev should contain the appropriate span containing blockNum + prevStartBlockRaw, prevValuePair, err := cursor.Prev() + if err != nil { + return 0, false, err + } + prevStartBlock := rangeIndexKeyParse(prevStartBlockRaw) + spanId, endBlock := rangeIndexValuePairParse(prevValuePair) + // sanity check + isInRange := blockNumInRange(blockNum, prevStartBlock, endBlock) + if !isInRange { + return 0, false, fmt.Errorf("SpanIndexLookup(%d) returns Span{Id:%d, StartBlock:%d, EndBlock:%d } not containing blockNum=%d", blockNum, spanId, prevStartBlock, endBlock, blockNum) + } + // happy case + return spanId, true, nil +} + +// last key in the index +func (i *txSpanRangeIndex) Last(ctx context.Context) (uint64, bool, error) { + cursor, err := i.tx.Cursor(i.table) + if err != nil { + return 0, false, err + } + defer cursor.Close() + key, value, err := cursor.Last() + if err != nil { + return 0, false, err + } + + if value == nil || key == nil { // table is empty + return 0, false, nil + } + + lastKey := rangeIndexKeyParse(key) + return lastKey, true, nil +} + +func (i *txSpanRangeIndex) GetIDsBetween(ctx context.Context, blockFrom, blockTo uint64) ([]uint64, bool, error) { + startId, ok, err := i.Lookup(ctx, blockFrom) + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + + endId, ok, err := i.Lookup(ctx, blockTo) + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + + return []uint64{startId, endId}, true, nil +} + +func blockNumInRange(blockNum, startBlock, endBlock uint64) bool { + return startBlock <= blockNum && blockNum <= endBlock +} + +// Write (spanId, endBlock) to buffer +func writeSpanIdEndBlockPair(spanId uint64, spanEndBlock uint64) [16]byte { + result := [16]byte{} + binary.BigEndian.PutUint64(result[:], spanId) + binary.BigEndian.PutUint64(result[8:], spanEndBlock) + return result +} + +// Parse to pair (uint64,uint64) +func rangeIndexValuePairParse(valuePair []byte) (uint64, uint64) { + first := binary.BigEndian.Uint64(valuePair[:8]) + second := binary.BigEndian.Uint64(valuePair[8:]) + return first, second +} diff --git a/polygon/heimdall/span_range_index_test.go b/polygon/heimdall/span_range_index_test.go new file mode 100644 index 00000000000..18612547b6e --- /dev/null +++ b/polygon/heimdall/span_range_index_test.go @@ -0,0 +1,265 @@ +package heimdall + +import ( + "context" + "testing" + + "github.com/c2h5oh/datasize" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/db/kv" + "github.com/erigontech/erigon/db/kv/mdbx" + "github.com/erigontech/erigon/polygon/polygoncommon" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type spanRangeIndexTest struct { + index *spanRangeIndex + ctx context.Context + logger log.Logger +} + +func newSpanRangeIndexTest(t *testing.T) spanRangeIndexTest { + tmpDir := t.TempDir() + ctx, cancel := context.WithCancel(t.Context()) + logger := log.New() + + db, err := mdbx.New(kv.HeimdallDB, logger). + InMem(tmpDir). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return kv.TableCfg{kv.BorSpansIndex: {}} }). + MapSize(1 * datasize.GB). + Open(ctx) + + require.NoError(t, err) + + index := NewSpanRangeIndex(polygoncommon.AsDatabase(db), kv.BorSpansIndex) + + t.Cleanup(func() { db.Close(); cancel() }) + + return spanRangeIndexTest{ + index: index, + ctx: ctx, + logger: logger, + } +} + +func TestSpanRangeIndexEmpty(t *testing.T) { + t.Parallel() + test := newSpanRangeIndexTest(t) + _, found, err := test.index.Lookup(test.ctx, 1000) + require.NoError(t, err) + assert.False(t, found) +} + +func TestSpanRangeIndexNonOverlappingSpans(t *testing.T) { + t.Parallel() + test := newSpanRangeIndexTest(t) + ctx := test.ctx + + spans := []Span{ + Span{ + Id: 0, + StartBlock: 0, + EndBlock: 999, + }, + Span{ + Id: 1, + StartBlock: 1000, + EndBlock: 1999, + }, + Span{ + Id: 2, + StartBlock: 2000, + EndBlock: 2999, + }, + Span{ + Id: 3, + StartBlock: 3000, + EndBlock: 3999, + }, + Span{ + Id: 4, + StartBlock: 4000, + EndBlock: 4999, + }, + Span{ + Id: 5, + StartBlock: 5000, + EndBlock: 5999, + }, + Span{ + Id: 6, + StartBlock: 6000, + EndBlock: 6999, + }, + Span{ + Id: 7, + StartBlock: 7000, + EndBlock: 7999, + }, + Span{ + Id: 8, + StartBlock: 8000, + EndBlock: 8999, + }, + Span{ + Id: 9, + StartBlock: 9000, + EndBlock: 9999, + }, + } + + for _, span := range spans { + spanId := span.RawId() + r := ClosedRange{Start: span.StartBlock, End: span.EndBlock} + require.NoError(t, test.index.Put(ctx, r, spanId)) + } + + for _, span := range spans { + blockNumsToTest := []uint64{span.StartBlock, (span.StartBlock + span.EndBlock) / 2, span.EndBlock} + for _, blockNum := range blockNumsToTest { + actualId, found, err := test.index.Lookup(ctx, blockNum) + require.NoError(t, err) + require.True(t, found) + assert.Equal(t, actualId, span.RawId()) + } + } +} + +func TestSpanRangeIndexOverlappingSpans(t *testing.T) { + t.Parallel() + test := newSpanRangeIndexTest(t) + ctx := test.ctx + + // span data that is irregular, containing possible span rotations + var spans = []Span{ + Span{ + Id: 0, + StartBlock: 0, + EndBlock: 999, + }, + Span{ + Id: 1, + StartBlock: 5, + EndBlock: 1999, + }, + Span{ + Id: 2, + StartBlock: 1988, + EndBlock: 2999, + }, + Span{ + Id: 3, + StartBlock: 3000, + EndBlock: 3999, + }, + Span{ + Id: 4, + StartBlock: 3500, + EndBlock: 4999, + }, + Span{ + Id: 5, + StartBlock: 5000, + EndBlock: 5999, + }, + Span{ + Id: 6, + StartBlock: 5500, + EndBlock: 6999, + }, + Span{ + Id: 7, + StartBlock: 7000, + EndBlock: 7999, + }, + Span{ + Id: 8, + StartBlock: 7001, + EndBlock: 8999, + }, + Span{ + Id: 9, + StartBlock: 7002, + EndBlock: 9999, + }, + } + + for _, span := range spans { + spanId := span.RawId() + r := ClosedRange{Start: span.StartBlock, End: span.EndBlock} + require.NoError(t, test.index.Put(ctx, r, spanId)) + } + + // expected blockNum -> spanId lookups + expectedLookupVals := map[uint64]uint64{ + 0: 0, + 1: 0, + 4: 0, + 5: 1, + 999: 1, + 100: 1, + 1988: 2, + 1999: 2, + 3200: 3, + 3500: 4, + 3600: 4, + 3988: 4, + 5200: 5, + 5900: 6, + 6501: 6, + 7000: 7, + 7001: 8, + 7002: 9, + 8000: 9, + 8998: 9, + 9000: 9, + 9998: 9, + 9999: 9, + } + + for blockNum, expectedId := range expectedLookupVals { + actualId, found, err := test.index.Lookup(ctx, blockNum) + require.NoError(t, err) + require.True(t, found) + assert.Equal(t, actualId, expectedId) + } + + // additional test cases for out of range lookups + _, _, err := test.index.Lookup(ctx, 12000) + require.Error(t, err) + +} + +func TestSpanRangeIndexSingletonLookup(t *testing.T) { + t.Parallel() + test := newSpanRangeIndexTest(t) + ctx := test.ctx + span := &Span{Id: 0, StartBlock: 0, EndBlock: 6400} + spanId := span.RawId() + r := ClosedRange{Start: span.StartBlock, End: span.EndBlock} + require.NoError(t, test.index.Put(ctx, r, spanId)) + + // Lookup at 0 should be successful + id, found, err := test.index.Lookup(ctx, 0) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, id, uint64(0)) + + // Lookup at 1200 should be successful + id, found, err = test.index.Lookup(ctx, 1200) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, id, uint64(0)) + + // Lookup at 6400 should be successful + id, found, err = test.index.Lookup(ctx, 6400) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, id, uint64(0)) + + // Lookup at 6401 should throw an error + _, _, err = test.index.Lookup(ctx, 6401) + require.Error(t, err) + +} diff --git a/polygon/heimdall/types.go b/polygon/heimdall/types.go index 36123a707c8..e07b65bdd2d 100644 --- a/polygon/heimdall/types.go +++ b/polygon/heimdall/types.go @@ -259,8 +259,30 @@ var ( version.V1_1_standart, snaptype.RangeExtractorFunc( func(ctx context.Context, blockFrom, blockTo uint64, firstKeyGetter snaptype.FirstKeyGetter, db kv.RoDB, _ *chain.Config, collect func([]byte) error, workers int, lvl log.Lvl, logger log.Logger, hashResolver snaptype.BlockHashResolver) (uint64, error) { - spanFrom := uint64(SpanIdAt(blockFrom)) - spanTo := uint64(SpanIdAt(blockTo)) + var spanFrom, spanTo uint64 + err := db.View(ctx, func(tx kv.Tx) (err error) { + rangeIndex := NewTxSpanRangeIndex(db, kv.BorSpansIndex, tx) + + spanIds, ok, err := rangeIndex.GetIDsBetween(ctx, blockFrom, blockTo) + if err != nil { + return err + } + + if !ok { + return ErrHeimdallDataIsNotReady + } + + if len(spanIds) > 0 { + spanFrom = spanIds[0] + spanTo = spanIds[len(spanIds)-1] + } + + return nil + }) + + if err != nil { + return 0, err + } logger.Debug("Extracting spans to snapshots", "blockFrom", blockFrom, "blockTo", blockTo, "spanFrom", spanFrom, "spanTo", spanTo) @@ -275,8 +297,18 @@ var ( return err } defer d.Close() - - baseSpanId := uint64(SpanIdAt(sn.From)) + var baseSpanId = uint64(0) + getter := d.MakeGetter() + getter.Reset(0) + if getter.HasNext() { + firstSpanRaw, _ := getter.Next(nil) // first span in this .seg file + var firstSpan Span + err = json.Unmarshal(firstSpanRaw, &firstSpan) + if err != nil { + return err + } + baseSpanId = uint64(firstSpan.Id) + } return buildValueIndex(ctx, sn, salt, d, baseSpanId, tmpDir, p, lvl, logger) }),