Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6755a40
add SpansIndex tables
antonis19 Aug 11, 2025
f309f8b
use correct mdbx db
antonis19 Aug 12, 2025
a5d18f6
use spanIndex in MdbxStore
antonis19 Aug 12, 2025
3d34e1a
use new SpanIdAt in snapshots
antonis19 Aug 12, 2025
29fa088
implement spanRangeIndex
antonis19 Aug 13, 2025
13b9f1f
fixing bor mining benchmark
antonis19 Aug 13, 2025
3a9ba35
implement Last() for indexer
antonis19 Aug 13, 2025
9c19d78
fix test by populating bor span snapshots with valid data
antonis19 Aug 13, 2025
e771371
fix snapshot_store tests
antonis19 Aug 13, 2025
531d484
remove another SpanIdAt
antonis19 Aug 14, 2025
3ddbea3
remove SpanIdAt() from Entity()
antonis19 Aug 14, 2025
910d9d6
add tests for Entity()
antonis19 Aug 14, 2025
fc02238
add tests for span rotations
antonis19 Aug 14, 2025
38f47c3
use LastFrozenId() instead
antonis19 Aug 14, 2025
76b4d7d
remove hardcoded table
antonis19 Aug 14, 2025
e0ff53e
pass context to NotifySync()
antonis19 Aug 14, 2025
c29648e
change signature of LastFrozenEntityId()
antonis19 Aug 15, 2025
a10a3d3
Merge branch 'main' of github.com:erigontech/erigon into spans-index
antonis19 Aug 15, 2025
7d880ba
fix more merge conflicts
antonis19 Aug 15, 2025
193d794
add unit tests for spanRangeIndex
antonis19 Aug 15, 2025
60fc408
deprecate legacy SpanIdAt()
antonis19 Aug 15, 2025
f63b41d
Merge branch 'main' of github.com:erigontech/erigon into spans-index
antonis19 Aug 15, 2025
e44c189
Merge branch 'main' of github.com:erigontech/erigon into spans-index
antonis19 Aug 18, 2025
634c8ae
fix lint
antonis19 Aug 18, 2025
291e8b6
do not expose db
antonis19 Aug 18, 2025
e3b5da9
remove -v flag
antonis19 Aug 18, 2025
d91b497
remove DB() method
antonis19 Aug 18, 2025
b747191
use correct signature for NoopEntityStore.LastFrozenEntityId()
antonis19 Aug 18, 2025
466e4d5
remove context from RegisterObserver
antonis19 Aug 18, 2025
069698a
add context cancellation in test
antonis19 Aug 18, 2025
80298f6
check if .seg file is empty
antonis19 Aug 18, 2025
20b502e
remove context from NotifySync
antonis19 Aug 18, 2025
280b858
fix grammar
antonis19 Aug 18, 2025
424b929
wrap error
antonis19 Aug 18, 2025
28f2f82
optimize error handling
antonis19 Aug 18, 2025
59abff6
remove context from handlers
antonis19 Aug 18, 2025
362542e
inline heimdallStore creation
antonis19 Aug 18, 2025
717e3a3
extract buildSpanIndexFromSnapshots function
antonis19 Aug 18, 2025
cd91534
simplify LastEntityId()
antonis19 Aug 18, 2025
cf5fe9d
reinstate LastFrozenEntityId() into EntityStore
antonis19 Aug 18, 2025
ace4fe0
add t.Cleanup(heimdallStore.Close)
antonis19 Aug 18, 2025
3964b21
use t.Cleanup() instead of defer
antonis19 Aug 18, 2025
6353797
more t.Cleanup
antonis19 Aug 18, 2025
9bf82c5
close .idx file when helper returns
antonis19 Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ test-short: test-erigon-lib-short
## test-all: run all tests with a 1h timeout
test-all: test-erigon-lib-all
@{ \
$(GOTEST) --timeout 60m -coverprofile=coverage-test-all.out > run.log 2>&1; \
$(GOTEST) --timeout 60m -v -coverprofile=coverage-test-all.out > run.log 2>&1; \
STATUS=$$?; \
grep -v -e ' CONT ' -e 'RUN' -e 'PAUSE' -e 'PASS' run.log; \
exit $$STATUS; \
Expand Down
8 changes: 5 additions & 3 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/bridge"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/polygon/polygoncommon"
"github.com/erigontech/erigon/rpc"
"github.com/erigontech/erigon/rpc/rpccfg"
"github.com/erigontech/erigon/rpc/rpchelper"
Expand Down Expand Up @@ -367,6 +368,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

var cc *chain.Config

var rawDB kv.RwDB

var bridgeStore bridge.Store
var heimdallStore heimdall.Store

Expand All @@ -393,7 +396,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

logger.Warn("Opening chain db", "path", cfg.Dirs.Chaindata)
limiter := semaphore.NewWeighted(roTxLimit)
rawDB, err := kv2.New(kv.ChainDB, logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede(true).Open(ctx)
rawDB, err = kv2.New(kv.ChainDB, logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede(true).Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}
Expand Down Expand Up @@ -567,6 +570,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger

heimdallConfig := heimdall.ReaderConfig{
Store: heimdallStore,
Db: polygoncommon.AsDatabase(rawDB),
BorConfig: cc.Bor.(*borcfg.BorConfig),
Logger: logger,
}
Expand All @@ -575,8 +579,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}

// NOTE: bor_* RPCs are not fully supported when using polygon.sync (https://github.com/erigontech/erigon/issues/11171)
// Skip the compatibility check, until we have a schema in erigon-lib
engine = bor.NewRo(cc, blockReader, logger)
} else if cc != nil && cc.Aura != nil {
consensusDB, err := kv2.New(kv.ConsensusDB, logger).Path(filepath.Join(cfg.DataDir, "aura")).Accede(true).Open(ctx)
Expand Down
58 changes: 32 additions & 26 deletions db/kv/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,21 @@ const (
PendingEpoch = "DevPendingEpoch" // block_num_u64+block_hash->transition_proof

// BOR
BorTxLookup = "BlockBorTransactionLookup" // transaction_hash -> block_num_u64
BorEvents = "BorEvents" // event_id -> event_payload
BorEventNums = "BorEventNums" // block_num -> event_id (last event_id in that block)
BorEventProcessedBlocks = "BorEventProcessedBlocks" // block_num -> block_time, tracks processed blocks in the bridge, used for unwinds and restarts, gets pruned
BorEventTimes = "BorEventTimes" // timestamp -> event_id
BorSpans = "BorSpans" // span_id -> span (in JSON encoding)
BorMilestones = "BorMilestones" // milestone_id -> milestone (in JSON encoding)
BorMilestoneEnds = "BorMilestoneEnds" // start block_num -> milestone_id (first block of milestone)
BorCheckpoints = "BorCheckpoints" // checkpoint_id -> checkpoint (in JSON encoding)
BorCheckpointEnds = "BorCheckpointEnds" // start block_num -> checkpoint_id (first block of checkpoint)
BorProducerSelections = "BorProducerSelections" // span_id -> span selection with accumulated proposer priorities (in JSON encoding)
BorWitnesses = "BorWitnesses" // block_num_u64 + block_hash -> witness
BorWitnessSizes = "BorWitnessSizes" // block_num_u64 + block_hash -> witness size (uint64)
BorTxLookup = "BlockBorTransactionLookup" // transaction_hash -> block_num_u64
BorEvents = "BorEvents" // event_id -> event_payload
BorEventNums = "BorEventNums" // block_num -> event_id (last event_id in that block)
BorEventProcessedBlocks = "BorEventProcessedBlocks" // block_num -> block_time, tracks processed blocks in the bridge, used for unwinds and restarts, gets pruned
BorEventTimes = "BorEventTimes" // timestamp -> event_id
BorSpans = "BorSpans" // span_id -> span (in JSON encoding)
BorSpansIndex = "BorSpansIndex" // span.StartBlockNumber -> span.Id
BorMilestones = "BorMilestones" // milestone_id -> milestone (in JSON encoding)
BorMilestoneEnds = "BorMilestoneEnds" // start block_num -> milestone_id (first block of milestone)
BorCheckpoints = "BorCheckpoints" // checkpoint_id -> checkpoint (in JSON encoding)
BorCheckpointEnds = "BorCheckpointEnds" // start block_num -> checkpoint_id (first block of checkpoint)
BorProducerSelections = "BorProducerSelections" // span_id -> span selection with accumulated proposer priorities (in JSON encoding)
BorProducerSelectionsIndex = "BorProducerSelectionsIndex" // span.StartBlockNumber -> span.Id
BorWitnesses = "BorWitnesses" // block_num_u64 + block_hash -> witness
BorWitnessSizes = "BorWitnessSizes" // block_num_u64 + block_hash -> witness size (uint64)

// Downloader
BittorrentCompletion = "BittorrentCompletion"
Expand Down Expand Up @@ -349,11 +351,13 @@ var ChaindataTables = []string{
BorEventProcessedBlocks,
BorEventTimes,
BorSpans,
BorSpansIndex,
BorMilestones,
BorMilestoneEnds,
BorCheckpoints,
BorCheckpointEnds,
BorProducerSelections,
BorProducerSelectionsIndex,
BorWitnesses,
BorWitnessSizes,
TblAccountVals,
Expand Down Expand Up @@ -586,19 +590,21 @@ var AuRaTablesCfg = TableCfg{
}

var BorTablesCfg = TableCfg{
BorTxLookup: {Flags: DupSort},
BorEvents: {Flags: DupSort},
BorEventNums: {Flags: DupSort},
BorEventProcessedBlocks: {Flags: DupSort},
BorEventTimes: {Flags: DupSort},
BorSpans: {Flags: DupSort},
BorCheckpoints: {Flags: DupSort},
BorCheckpointEnds: {Flags: DupSort},
BorMilestones: {Flags: DupSort},
BorMilestoneEnds: {Flags: DupSort},
BorProducerSelections: {Flags: DupSort},
BorWitnesses: {Flags: DupSort},
BorWitnessSizes: {Flags: DupSort},
BorTxLookup: {Flags: DupSort},
BorEvents: {Flags: DupSort},
BorEventNums: {Flags: DupSort},
BorEventProcessedBlocks: {Flags: DupSort},
BorEventTimes: {Flags: DupSort},
BorSpans: {Flags: DupSort},
BorSpansIndex: {Flags: DupSort},
BorProducerSelectionsIndex: {Flags: DupSort},
BorCheckpoints: {Flags: DupSort},
BorCheckpointEnds: {Flags: DupSort},
BorMilestones: {Flags: DupSort},
BorMilestoneEnds: {Flags: DupSort},
BorProducerSelections: {Flags: DupSort},
BorWitnesses: {Flags: DupSort},
BorWitnessSizes: {Flags: DupSort},
}

var TxpoolTablesCfg = TableCfg{}
Expand Down
11 changes: 6 additions & 5 deletions erigon-lib/event/observers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package event

import (
"context"
"sync"
)

type Observer[TEvent any] func(event TEvent)
type Observer[TEvent any] func(ctx context.Context, event TEvent)
type UnregisterFunc func()

type Observers[TEvent any] struct {
Expand Down Expand Up @@ -68,17 +69,17 @@ func (o *Observers[TEvent]) Close() {
}

// Notify all observers in parallel without waiting for them to process the event.
func (o *Observers[TEvent]) Notify(event TEvent) {
func (o *Observers[TEvent]) Notify(ctx context.Context, event TEvent) {
o.observersMu.Lock()
defer o.observersMu.Unlock()

for _, observer := range o.observers {
go observer(event)
go observer(ctx, event)
}
}

// NotifySync all observers in parallel and wait until all of them process the event.
func (o *Observers[TEvent]) NotifySync(event TEvent) {
func (o *Observers[TEvent]) NotifySync(ctx context.Context, event TEvent) {
o.observersMu.Lock()
defer o.observersMu.Unlock()

Expand All @@ -87,7 +88,7 @@ func (o *Observers[TEvent]) NotifySync(event TEvent) {
wg.Add(1)
go func(observer Observer[TEvent]) {
defer wg.Done()
observer(event)
observer(ctx, event)
}(observer)
}

Expand Down
23 changes: 13 additions & 10 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, err := setUpBlockReader(ctx, rawChainDB, config.Dirs, config, chainConfig, stack.Config(), logger, segmentsBuildLimiter)
blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, heimdallMdbxStore, err := setUpBlockReader(ctx, rawChainDB, config.Dirs, config, chainConfig, stack.Config(), logger, segmentsBuildLimiter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -638,6 +638,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

heimdallService = heimdall.NewService(heimdall.ServiceConfig{
Store: heimdallStore,
Db: heimdallMdbxStore.DB(),
BorConfig: borConfig,
Client: heimdallClient,
Logger: logger,
Expand Down Expand Up @@ -967,7 +968,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.sentriesClient.Bd.AddToPrefetch(b.Header(), b.RawBody())
}

backend.minedBlockObservers.Notify(b)
backend.minedBlockObservers.Notify(ctx, b)

//p2p
//backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty())
Expand Down Expand Up @@ -1418,7 +1419,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient

func (s *Ethereum) IsMining() bool { return s.config.Miner.Enabled }

func (s *Ethereum) RegisterMinedBlockObserver(callback func(msg *types.Block)) event.UnregisterFunc {
func (s *Ethereum) RegisterMinedBlockObserver(callback func(ctx context.Context, msg *types.Block)) event.UnregisterFunc {
return s.minedBlockObservers.Register(callback)
}

Expand Down Expand Up @@ -1519,7 +1520,7 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, nodeCfg *nodecfg.Con
return err
}

func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, chainConfig *chain.Config, nodeConfig *nodecfg.Config, logger log.Logger, blockSnapBuildSema *semaphore.Weighted) (*freezeblocks.BlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *heimdall.RoSnapshots, bridge.Store, heimdall.Store, kv.TemporalRwDB, error) {
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, chainConfig *chain.Config, nodeConfig *nodecfg.Config, logger log.Logger, blockSnapBuildSema *semaphore.Weighted) (*freezeblocks.BlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *heimdall.RoSnapshots, bridge.Store, heimdall.Store, kv.TemporalRwDB, *heimdall.MdbxStore, error) {
var minFrozenBlock uint64

if frozenLimit := snConfig.Sync.FrozenBlockLimit; frozenLimit != 0 {
Expand All @@ -1533,30 +1534,32 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf
var allBorSnapshots *heimdall.RoSnapshots
var bridgeStore bridge.Store
var heimdallStore heimdall.Store
var heimdallMdbxStore *heimdall.MdbxStore

if chainConfig.Bor != nil {
heimdallMdbxStore = heimdall.NewMdbxStore(logger, dirs.DataDir, false, int64(nodeConfig.Http.DBReadConcurrency))
allBorSnapshots = heimdall.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger)
bridgeStore = bridge.NewSnapshotStore(bridge.NewMdbxStore(dirs.DataDir, logger, false, int64(nodeConfig.Http.DBReadConcurrency)), allBorSnapshots, chainConfig.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewMdbxStore(logger, dirs.DataDir, false, int64(nodeConfig.Http.DBReadConcurrency)), allBorSnapshots)
heimdallStore = heimdall.NewSnapshotStore(heimdallMdbxStore, allBorSnapshots)
}
blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)

_, knownSnapCfg := snapcfg.KnownCfg(chainConfig.ChainName)
createNewSaltFileIfNeeded := snConfig.Snapshot.NoDownloader || snConfig.Snapshot.DisableDownloadE3 || !knownSnapCfg
salt, err := state.GetStateIndicesSalt(dirs, createNewSaltFileIfNeeded, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}
agg, err := state.NewAggregator2(ctx, dirs, config3.DefaultStepSize, salt, db, logger)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}
agg.SetSnapshotBuildSema(blockSnapBuildSema)
agg.SetProduceMod(snConfig.Snapshot.ProduceE3)

allSegmentsDownloadComplete, err := core.AllSegmentsDownloadCompleteFromDB(db)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}
if allSegmentsDownloadComplete {
allSnapshots.OptimisticalyOpenFolder()
Expand All @@ -1570,12 +1573,12 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf

temporalDb, err := temporal.New(db, agg)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, nil, nil, err
}

blockWriter := blockio.NewBlockWriter()

return blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, nil
return blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, heimdallMdbxStore, nil
}

func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) {
Expand Down
6 changes: 5 additions & 1 deletion polygon/heimdall/client_idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewIdleClient(cfg buildercfg.MiningConfig) Client {

func (c *IdleClient) FetchLatestSpan(ctx context.Context) (*Span, error) {
return &Span{
StartBlock: 0,
EndBlock: 255,
ValidatorSet: ValidatorSet{
Validators: []*Validator{
{
Expand All @@ -55,7 +57,9 @@ func (c *IdleClient) FetchLatestSpan(ctx context.Context) (*Span, error) {

func (c *IdleClient) FetchSpan(ctx context.Context, spanID uint64) (*Span, error) {
return &Span{
Id: SpanId(spanID),
Id: SpanId(spanID),
StartBlock: 0,
EndBlock: 255,
ValidatorSet: ValidatorSet{
Validators: []*Validator{
{
Expand Down
25 changes: 16 additions & 9 deletions polygon/heimdall/entity_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
)

var databaseTablesCfg = kv.TableCfg{
kv.BorCheckpoints: {},
kv.BorCheckpointEnds: {},
kv.BorMilestones: {},
kv.BorMilestoneEnds: {},
kv.BorSpans: {},
kv.BorProducerSelections: {},
kv.BorCheckpoints: {},
kv.BorCheckpointEnds: {},
kv.BorMilestones: {},
kv.BorMilestoneEnds: {},
kv.BorSpans: {},
kv.BorSpansIndex: {},
kv.BorProducerSelections: {},
kv.BorProducerSelectionsIndex: {},
}

//go:generate mockgen -typed=true -source=./entity_store.go -destination=./entity_store_mock.go -package=heimdall
Expand All @@ -46,7 +48,6 @@ type EntityStore[TEntity Entity] interface {
Close()

LastEntityId(ctx context.Context) (uint64, bool, error)
LastFrozenEntityId() uint64
LastEntity(ctx context.Context) (TEntity, bool, error)
Entity(ctx context.Context, id uint64) (TEntity, bool, error)
PutEntity(ctx context.Context, id uint64, entity TEntity) error
Expand All @@ -56,6 +57,8 @@ type EntityStore[TEntity Entity] interface {
DeleteToBlockNum(ctx context.Context, unwindPoint uint64, limit int) (int, error)
DeleteFromBlockNum(ctx context.Context, unwindPoint uint64) (int, error)

RangeIndex() RangeIndex

SnapType() snaptype.Type
}

Expand All @@ -72,7 +75,7 @@ func (NoopEntityStore[TEntity]) Close() {}
func (NoopEntityStore[TEntity]) LastEntityId(ctx context.Context) (uint64, bool, error) {
return 0, false, errors.New("noop")
}
func (NoopEntityStore[TEntity]) LastFrozenEntityId() uint64 { return 0 }
func (NoopEntityStore[TEntity]) LastFrozenEntityId() (uint64, error) { return 0, nil }
func (NoopEntityStore[TEntity]) LastEntity(ctx context.Context) (TEntity, bool, error) {
var res TEntity
return res, false, errors.New("noop")
Expand Down Expand Up @@ -142,6 +145,10 @@ func (s *mdbxEntityStore[TEntity]) WithTx(tx kv.Tx) EntityStore[TEntity] {
return txEntityStore[TEntity]{s, tx}
}

func (s *mdbxEntityStore[TEntity]) RangeIndex() RangeIndex {
return s.blockNumToIdIndex
}

func (s *mdbxEntityStore[TEntity]) Close() {
}

Expand Down Expand Up @@ -222,7 +229,7 @@ func (s *mdbxEntityStore[TEntity]) PutEntity(ctx context.Context, id uint64, ent
defer tx.Rollback()

if err = (txEntityStore[TEntity]{s, tx}).PutEntity(ctx, id, entity); err != nil {
return nil
return err
}

return tx.Commit()
Expand Down
Loading
Loading