Skip to content

Commit 2bd713b

Browse files
StageSenders: --sync.loop.block.limit support (#9982)
We reverted support of this flag in `updateForkChoice` because implementation was too complex and fragile: #9900 But it's good-enough if StageSenders will preserve this flag - then next stages (exec) will also follow (because they look at prev stage progress). It's good-enough - because users just want to save some partial progress after restoring node from backup (long downtime). And enforce "all stages progress together" invariant
1 parent 9af7278 commit 2bd713b

File tree

6 files changed

+18
-8
lines changed

6 files changed

+18
-8
lines changed

cmd/integration/commands/stages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ func stageSenders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
930930
return err
931931
}
932932

933-
cfg := stagedsync.StageSendersCfg(db, chainConfig, false, tmpdir, pm, br, nil, nil)
933+
cfg := stagedsync.StageSendersCfg(db, chainConfig, sync.Cfg(), false, tmpdir, pm, br, nil, nil)
934934
if unwind > 0 {
935935
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
936936
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {

eth/stagedsync/stage_senders.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
13+
"github.com/ledgerwatch/erigon/eth/ethconfig"
1314

1415
"github.com/ledgerwatch/erigon-lib/chain"
1516
libcommon "github.com/ledgerwatch/erigon-lib/common"
@@ -45,9 +46,10 @@ type SendersCfg struct {
4546
hd *headerdownload.HeaderDownload
4647
blockReader services.FullBlockReader
4748
loopBreakCheck func(int) bool
49+
syncCfg ethconfig.Sync
4850
}
4951

50-
func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, loopBreakCheck func(int) bool) SendersCfg {
52+
func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, loopBreakCheck func(int) bool) SendersCfg {
5153
const sendersBatchSize = 10000
5254
const sendersBlockSize = 4096
5355

@@ -65,6 +67,7 @@ func StageSendersCfg(db kv.RwDB, chainCfg *chain.Config, badBlockHalt bool, tmpd
6567
hd: hd,
6668
blockReader: blockReader,
6769
loopBreakCheck: loopBreakCheck,
70+
syncCfg: syncCfg,
6871
}
6972
}
7073

@@ -106,6 +109,10 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
106109

107110
startFrom := s.BlockNumber + 1
108111

112+
if to > startFrom && cfg.syncCfg.LoopBlockLimit > 0 && to-startFrom > uint64(cfg.syncCfg.LoopBlockLimit) { // uint underflow protection. preserve global jump limit.
113+
to = startFrom + uint64(cfg.syncCfg.LoopBlockLimit)
114+
}
115+
109116
jobs := make(chan *senderRecoveryJob, cfg.batchSize)
110117
out := make(chan *senderRecoveryJob, cfg.batchSize)
111118
wg := new(sync.WaitGroup)

eth/stagedsync/stage_senders_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
libcommon "github.com/ledgerwatch/erigon-lib/common"
77
"github.com/ledgerwatch/erigon-lib/kv"
8+
"github.com/ledgerwatch/erigon/eth/ethconfig"
89
"github.com/ledgerwatch/erigon/eth/stagedsync"
910
"github.com/ledgerwatch/erigon/turbo/stages/mock"
1011
"github.com/stretchr/testify/assert"
@@ -128,7 +129,7 @@ func TestSenders(t *testing.T) {
128129

129130
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
130131

131-
cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, false, "", prune.Mode{}, br, nil, nil)
132+
cfg := stagedsync.StageSendersCfg(db, params.TestChainConfig, ethconfig.Defaults.Sync, false, "", prune.Mode{}, br, nil, nil)
132133
err = stagedsync.SpawnRecoverSendersStage(cfg, &stagedsync.StageState{ID: stages.Senders}, nil, tx, 3, m.Ctx, log.New())
133134
require.NoError(err)
134135

eth/stagedsync/sync.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func (s *Sync) Len() int {
4545
return len(s.stages)
4646
}
4747

48+
func (s *Sync) Cfg() ethconfig.Sync { return s.cfg }
49+
4850
func (s *Sync) UnwindPoint() uint64 {
4951
return *s.unwindPoint
5052
}

turbo/stages/mock/mock_sentry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
442442
stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil /* heimdallClient */, mock.BlockReader, nil, nil, nil, recents, signatures),
443443
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter),
444444
stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, cfg.HistoryV3, blockWriter, nil),
445-
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd, nil),
445+
stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd, nil),
446446
stagedsync.StageExecuteBlocksCfg(
447447
mock.DB,
448448
prune,

turbo/stages/stageloop.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ func NewDefaultStages(ctx context.Context,
502502
stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, blockReader, controlServer.Hd, controlServer.Penalize, loopBreakCheck, recents, signatures),
503503
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
504504
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, loopBreakCheck),
505-
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
505+
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
506506
stagedsync.StageExecuteBlocksCfg(
507507
db,
508508
cfg.Prune,
@@ -581,7 +581,7 @@ func NewPipelineStages(ctx context.Context,
581581
return stagedsync.PipelineStages(ctx,
582582
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.HistoryV3, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm),
583583
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
584-
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
584+
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
585585
stagedsync.StageExecuteBlocksCfg(
586586
db,
587587
cfg.Prune,
@@ -616,7 +616,7 @@ func NewPipelineStages(ctx context.Context,
616616
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.HistoryV3, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm),
617617
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications, loopBreakCheck),
618618
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
619-
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
619+
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, loopBreakCheck),
620620
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, loopBreakCheck),
621621
stagedsync.StageExecuteBlocksCfg(
622622
db,
@@ -658,7 +658,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
658658
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil, nil),
659659
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, cfg.HistoryV3, blockWriter, nil),
660660
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
661-
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
661+
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd, nil),
662662
stagedsync.StageExecuteBlocksCfg(
663663
db,
664664
cfg.Prune,

0 commit comments

Comments
 (0)