Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/erigontech/erigon/db/state/statecfg"
"github.com/erigontech/erigon/db/state/stats"
"github.com/erigontech/erigon/db/wrap"
"github.com/erigontech/erigon/eth"
"github.com/erigontech/erigon/eth/ethconfig"
"github.com/erigontech/erigon/eth/ethconfig/features"
"github.com/erigontech/erigon/eth/ethconsensusconfig"
Expand All @@ -79,8 +80,10 @@ import (
"github.com/erigontech/erigon/p2p/sentry"
"github.com/erigontech/erigon/p2p/sentry/sentry_multi_client"
"github.com/erigontech/erigon/polygon/bor"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/bridge"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/polygon/heimdall/poshttp"
"github.com/erigontech/erigon/turbo/app"
"github.com/erigontech/erigon/turbo/debug"
"github.com/erigontech/erigon/turbo/logging"
Expand Down Expand Up @@ -1355,14 +1358,15 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *buildercfg.M
cfg.Miner = *miningConfig
}
cfg.Dirs = dirs
allSn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
dbReadConcurrency := runtime.GOMAXPROCS(-1) * 16
Copy link

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 16 should be defined as a named constant to explain its purpose and make it easier to maintain.

Copilot uses AI. Check for mistakes.
blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance))
blockReader, blockWriter, allSn, borSn, bridgeStore, heimdallStore, _, err := eth.SetUpBlockReader(ctx, db, dirs, &cfg, chainConfig, dbReadConcurrency, logger, blockSnapBuildSema)
if err != nil {
panic(err) // we do already panic above on genesis error
panic(err)
}
cfg.Snapshot = allSn.Cfg()

blockReader, blockWriter := blocksIO(db, logger)
engine := initConsensusEngine(ctx, chainConfig, cfg.Dirs.DataDir, db, blockReader, logger)
borSn.DownloadComplete() // mark as ready
engine := initConsensusEngine(ctx, chainConfig, cfg.Dirs.DataDir, db, blockReader, bridgeStore, heimdallStore, logger)

statusDataProvider := sentry.NewStatusDataProvider(
db,
Expand Down Expand Up @@ -1393,22 +1397,13 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *buildercfg.M
panic(err)
}

blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance))
agg.SetSnapshotBuildSema(blockSnapBuildSema)

notifications := shards.NewNotifications(nil)

var (
signatures *lru.ARCCache[common.Hash, common.Address]
bridgeStore bridge.Store
heimdallStore heimdall.Store
)
var signatures *lru.ARCCache[common.Hash, common.Address]

if bor, ok := engine.(*bor.Bor); ok {
signatures = bor.Signatures
bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(db), borSn, chainConfig.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), borSn)
}
borSn.DownloadComplete() // mark as ready
blockRetire := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, heimdallStore, bridgeStore, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger)

stageList := stages2.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil,
Expand Down Expand Up @@ -1473,9 +1468,12 @@ func stage(st *stagedsync.Sync, tx kv.Tx, db kv.RoDB, stage stages.SyncStage) *s
return res
}

func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db kv.RwDB, blockReader services.FullBlockReader, logger log.Logger) (engine consensus.Engine) {
func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db kv.RwDB, blockReader services.FullBlockReader, bridgeStore bridge.Store, heimdallStore heimdall.Store, logger log.Logger) consensus.Engine {
config := ethconfig.Defaults

var polygonBridge *bridge.Service
var heimdallService *heimdall.Service
var heimdallClient heimdall.Client
var bridgeClient bridge.Client
var consensusConfig interface{}
if cc.Clique != nil {
consensusConfig = chainspec.CliqueSnapshot
Expand All @@ -1484,11 +1482,42 @@ func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db
} else if cc.Bor != nil {
consensusConfig = cc.Bor
config.HeimdallURL = HeimdallURL
if !config.WithoutHeimdall {
heimdallClient = heimdall.NewHttpClient(config.HeimdallURL, logger, poshttp.WithApiVersioner(ctx))
bridgeClient = bridge.NewHttpClient(config.HeimdallURL, logger, poshttp.WithApiVersioner(ctx))
} else {
heimdallClient = heimdall.NewIdleClient(config.Miner)
bridgeClient = bridge.NewIdleClient()
}
Comment on lines +1485 to +1491
Copy link

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This client initialization logic is duplicated from the main backend. Consider extracting this into a shared function to avoid code duplication.

Copilot uses AI. Check for mistakes.
borConfig := consensusConfig.(*borcfg.BorConfig)

polygonBridge = bridge.NewService(bridge.ServiceConfig{
Store: bridgeStore,
Logger: logger,
BorConfig: borConfig,
EventFetcher: bridgeClient,
})

if err := heimdallStore.Prepare(ctx); err != nil {
panic(err)
}

if err := bridgeStore.Prepare(ctx); err != nil {
panic(err)
}

heimdallService = heimdall.NewService(heimdall.ServiceConfig{
Store: heimdallStore,
BorConfig: borConfig,
Client: heimdallClient,
Logger: logger,
})

} else {
consensusConfig = &config.Ethash
}
return ethconsensusconfig.CreateConsensusEngine(ctx, &nodecfg.Config{Dirs: datadir.New(dir)}, cc, consensusConfig, config.Miner.Notify, config.Miner.Noverify,
config.WithoutHeimdall, blockReader, db.ReadOnly(), logger, nil, nil)
config.WithoutHeimdall, blockReader, db.ReadOnly(), logger, polygonBridge, heimdallService)
}

func readGenesis(chain string) *types.Genesis {
Expand Down
8 changes: 4 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, err := setUpBlockReader(ctx, rawChainDB, config.Dirs, config, chainConfig, stack.Config(), logger, segmentsBuildLimiter)
blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, err := SetUpBlockReader(ctx, rawChainDB, config.Dirs, config, chainConfig, stack.Config().Http.DBReadConcurrency, logger, segmentsBuildLimiter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1542,7 +1542,7 @@ func (s *Ethereum) setUpSnapDownloader(
return err
}

func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, chainConfig *chain.Config, nodeConfig *nodecfg.Config, logger log.Logger, blockSnapBuildSema *semaphore.Weighted) (*freezeblocks.BlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *heimdall.RoSnapshots, bridge.Store, heimdall.Store, kv.TemporalRwDB, error) {
func SetUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, chainConfig *chain.Config, dbReadConcurrency int, logger log.Logger, blockSnapBuildSema *semaphore.Weighted) (*freezeblocks.BlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *heimdall.RoSnapshots, bridge.Store, heimdall.Store, kv.TemporalRwDB, error) {
allSnapshots := freezeblocks.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, logger)

var allBorSnapshots *heimdall.RoSnapshots
Expand All @@ -1551,8 +1551,8 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf

if chainConfig.Bor != nil {
allBorSnapshots = heimdall.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, logger)
bridgeStore = bridge.NewSnapshotStore(bridge.NewMdbxStore(dirs.DataDir, logger, false, int64(nodeConfig.Http.DBReadConcurrency)), allBorSnapshots, chainConfig.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewMdbxStore(logger, dirs.DataDir, false, int64(nodeConfig.Http.DBReadConcurrency)), allBorSnapshots)
bridgeStore = bridge.NewSnapshotStore(bridge.NewMdbxStore(dirs.DataDir, logger, false, int64(dbReadConcurrency)), allBorSnapshots, chainConfig.Bor)
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewMdbxStore(logger, dirs.DataDir, false, int64(dbReadConcurrency)), allBorSnapshots)
}
blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)

Expand Down
Loading