From bef31a6e1bf2a297bc7573db44b977eebc94aef5 Mon Sep 17 00:00:00 2001 From: antonis19 Date: Tue, 2 Sep 2025 14:47:50 +0200 Subject: [PATCH] integration: use polygon services in initConsensusEngine (#16946) Fixes: https://github.com/erigontech/erigon/issues/16026 --------- Co-authored-by: antonis19 --- cmd/integration/commands/stages.go | 67 +++++++++++++++++++++--------- eth/backend.go | 8 ++-- 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index a39421653cf..9b059779d82 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -59,6 +59,7 @@ import ( "github.com/erigontech/erigon/db/state/statecfg" "github.com/erigontech/erigon/db/state/stats" "github.com/erigontech/erigon/db/wrap" + "github.com/erigontech/erigon/eth" "github.com/erigontech/erigon/eth/ethconfig" "github.com/erigontech/erigon/eth/ethconfig/features" "github.com/erigontech/erigon/eth/ethconsensusconfig" @@ -79,8 +80,10 @@ import ( "github.com/erigontech/erigon/p2p/sentry" "github.com/erigontech/erigon/p2p/sentry/sentry_multi_client" "github.com/erigontech/erigon/polygon/bor" + "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon/polygon/bridge" "github.com/erigontech/erigon/polygon/heimdall" + "github.com/erigontech/erigon/polygon/heimdall/poshttp" "github.com/erigontech/erigon/turbo/app" "github.com/erigontech/erigon/turbo/debug" "github.com/erigontech/erigon/turbo/logging" @@ -1355,14 +1358,15 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *buildercfg.M cfg.Miner = *miningConfig } cfg.Dirs = dirs - allSn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger) + dbReadConcurrency := runtime.GOMAXPROCS(-1) * 16 + blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance)) + blockReader, blockWriter, allSn, borSn, bridgeStore, heimdallStore, _, err := eth.SetUpBlockReader(ctx, db, dirs, &cfg, chainConfig, dbReadConcurrency, logger, blockSnapBuildSema) if err != nil { - panic(err) // we do already panic above on genesis error + panic(err) } cfg.Snapshot = allSn.Cfg() - - blockReader, blockWriter := blocksIO(db, logger) - engine := initConsensusEngine(ctx, chainConfig, cfg.Dirs.DataDir, db, blockReader, logger) + borSn.DownloadComplete() // mark as ready + engine := initConsensusEngine(ctx, chainConfig, cfg.Dirs.DataDir, db, blockReader, bridgeStore, heimdallStore, logger) statusDataProvider := sentry.NewStatusDataProvider( db, @@ -1393,22 +1397,13 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *buildercfg.M panic(err) } - blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance)) - agg.SetSnapshotBuildSema(blockSnapBuildSema) - notifications := shards.NewNotifications(nil) - var ( - signatures *lru.ARCCache[common.Hash, common.Address] - bridgeStore bridge.Store - heimdallStore heimdall.Store - ) + var signatures *lru.ARCCache[common.Hash, common.Address] + if bor, ok := engine.(*bor.Bor); ok { signatures = bor.Signatures - bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(db), borSn, chainConfig.Bor) - heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), borSn) } - borSn.DownloadComplete() // mark as ready blockRetire := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, heimdallStore, bridgeStore, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger) stageList := stages2.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil, @@ -1473,9 +1468,12 @@ func stage(st *stagedsync.Sync, tx kv.Tx, db kv.RoDB, stage stages.SyncStage) *s return res } -func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db kv.RwDB, blockReader services.FullBlockReader, logger log.Logger) (engine consensus.Engine) { +func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db kv.RwDB, blockReader services.FullBlockReader, bridgeStore bridge.Store, heimdallStore heimdall.Store, logger log.Logger) consensus.Engine { config := ethconfig.Defaults - + var polygonBridge *bridge.Service + var heimdallService *heimdall.Service + var heimdallClient heimdall.Client + var bridgeClient bridge.Client var consensusConfig interface{} if cc.Clique != nil { consensusConfig = chainspec.CliqueSnapshot @@ -1484,11 +1482,42 @@ func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db } else if cc.Bor != nil { consensusConfig = cc.Bor config.HeimdallURL = HeimdallURL + if !config.WithoutHeimdall { + heimdallClient = heimdall.NewHttpClient(config.HeimdallURL, logger, poshttp.WithApiVersioner(ctx)) + bridgeClient = bridge.NewHttpClient(config.HeimdallURL, logger, poshttp.WithApiVersioner(ctx)) + } else { + heimdallClient = heimdall.NewIdleClient(config.Miner) + bridgeClient = bridge.NewIdleClient() + } + borConfig := consensusConfig.(*borcfg.BorConfig) + + polygonBridge = bridge.NewService(bridge.ServiceConfig{ + Store: bridgeStore, + Logger: logger, + BorConfig: borConfig, + EventFetcher: bridgeClient, + }) + + if err := heimdallStore.Prepare(ctx); err != nil { + panic(err) + } + + if err := bridgeStore.Prepare(ctx); err != nil { + panic(err) + } + + heimdallService = heimdall.NewService(heimdall.ServiceConfig{ + Store: heimdallStore, + BorConfig: borConfig, + Client: heimdallClient, + Logger: logger, + }) + } else { consensusConfig = &config.Ethash } return ethconsensusconfig.CreateConsensusEngine(ctx, &nodecfg.Config{Dirs: datadir.New(dir)}, cc, consensusConfig, config.Miner.Notify, config.Miner.Noverify, - config.WithoutHeimdall, blockReader, db.ReadOnly(), logger, nil, nil) + config.WithoutHeimdall, blockReader, db.ReadOnly(), logger, polygonBridge, heimdallService) } func readGenesis(chain string) *types.Genesis { diff --git a/eth/backend.go b/eth/backend.go index 9b040be7bbb..d8313541760 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -412,7 +412,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger // Check if we have an already initialized chain and fall back to // that if so. Otherwise we need to generate a new genesis spec. - blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, err := setUpBlockReader(ctx, rawChainDB, config.Dirs, config, chainConfig, stack.Config(), logger, segmentsBuildLimiter) + blockReader, blockWriter, allSnapshots, allBorSnapshots, bridgeStore, heimdallStore, temporalDb, err := SetUpBlockReader(ctx, rawChainDB, config.Dirs, config, chainConfig, stack.Config().Http.DBReadConcurrency, logger, segmentsBuildLimiter) if err != nil { return nil, err } @@ -1542,7 +1542,7 @@ func (s *Ethereum) setUpSnapDownloader( return err } -func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, chainConfig *chain.Config, nodeConfig *nodecfg.Config, logger log.Logger, blockSnapBuildSema *semaphore.Weighted) (*freezeblocks.BlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *heimdall.RoSnapshots, bridge.Store, heimdall.Store, kv.TemporalRwDB, error) { +func SetUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConfig *ethconfig.Config, chainConfig *chain.Config, dbReadConcurrency int, logger log.Logger, blockSnapBuildSema *semaphore.Weighted) (*freezeblocks.BlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *heimdall.RoSnapshots, bridge.Store, heimdall.Store, kv.TemporalRwDB, error) { allSnapshots := freezeblocks.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, logger) var allBorSnapshots *heimdall.RoSnapshots @@ -1551,8 +1551,8 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf if chainConfig.Bor != nil { allBorSnapshots = heimdall.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, logger) - bridgeStore = bridge.NewSnapshotStore(bridge.NewMdbxStore(dirs.DataDir, logger, false, int64(nodeConfig.Http.DBReadConcurrency)), allBorSnapshots, chainConfig.Bor) - heimdallStore = heimdall.NewSnapshotStore(heimdall.NewMdbxStore(logger, dirs.DataDir, false, int64(nodeConfig.Http.DBReadConcurrency)), allBorSnapshots) + bridgeStore = bridge.NewSnapshotStore(bridge.NewMdbxStore(dirs.DataDir, logger, false, int64(dbReadConcurrency)), allBorSnapshots, chainConfig.Bor) + heimdallStore = heimdall.NewSnapshotStore(heimdall.NewMdbxStore(logger, dirs.DataDir, false, int64(dbReadConcurrency)), allBorSnapshots) } blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)