Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
return err
}

cfg := stagedsync.StageSendersCfg(db, chainConfig, sync.Cfg(), false, tmpdir, pm, br, nil, nil)
cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, nil, nil)
if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
Expand Down
9 changes: 1 addition & 8 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/eth/ethconfig"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -46,10 +45,9 @@ type SendersCfg struct {
hd *headerdownload.HeaderDownload
blockReader services.FullBlockReader
loopBreakCheck func(int) bool
syncCfg ethconfig.Sync
}

func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, loopBreakCheck func(int) bool) SendersCfg {
func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, loopBreakCheck func(int) bool) SendersCfg {
const sendersBatchSize = 10000
const sendersBlockSize = 4096

Expand All @@ -67,7 +65,6 @@ func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, syncCfg ethconfig.Sync,
hd: hd,
blockReader: blockReader,
loopBreakCheck: loopBreakCheck,
syncCfg: syncCfg,
}
}

Expand Down Expand Up @@ -109,10 +106,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R

startFrom := s.BlockNumber + 1

if to > startFrom && cfg.syncCfg.LoopBlockLimit > 0 && to-startFrom > uint64(cfg.syncCfg.LoopBlockLimit) { // uint underflow protection. preserve global jump limit.
to = startFrom + uint64(cfg.syncCfg.LoopBlockLimit)
}

jobs := make(chan *senderRecoveryJob, cfg.batchSize)
out := make(chan *senderRecoveryJob, cfg.batchSize)
wg := new(sync.WaitGroup)
Expand Down
3 changes: 1 addition & 2 deletions eth/stagedsync/stage_senders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/turbo/stages/mock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -129,7 +128,7 @@ func TestSenders(t *testing.T) {

require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))

cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, ethconfig.Defaults.Sync, false, "", prune.Mode{}, br, nil, nil)
cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, br, nil, nil)
err = stagedsync.SpawnRecoverSendersStage(cfg, &stagedsync.StageState{ID: stages.Senders}, nil, tx, 3, m.Ctx, log.New())
require.NoError(err)

Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (s *Sync) Len() int {
return len(s.stages)
}

func (s *Sync) Cfg() ethconfig.Sync { return s.cfg }

func (s *Sync) UnwindPoint() uint64 {
return *s.unwindPoint
}
Expand Down
2 changes: 1 addition & 1 deletion turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil /* heimdallClient */, mock.BlockReader, nil, nil, nil, recents, signatures),
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, cfg.HistoryV3, blockWriter, nil),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd, nil),
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd, nil),
stagedsync.StageExecuteBlocksCfg(
mock.DB,
prune,
Expand Down
8 changes: 4 additions & 4 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func NewDefaultStages(ctx context.Context,
stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, blockReader, controlServer.Hd, controlServer.Penalize, loopBreakCheck, recents, signatures),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
Expand Down Expand Up @@ -581,7 +581,7 @@ func NewPipelineStages(ctx context.Context,
return stagedsync.PipelineStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.HistoryV3, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
Expand Down Expand Up @@ -616,7 +616,7 @@ func NewPipelineStages(ctx context.Context,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.HistoryV3, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications, loopBreakCheck),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, loopBreakCheck),
stagedsync.StageExecuteBlocksCfg(
db,
Expand Down Expand Up @@ -658,7 +658,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil, nil),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, nil),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
stagedsync.StageExecuteBlocksCfg(
db,
cfg.Prune,
Expand Down