diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 6d0d0f368e1..a18d7925575 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1128,7 +1128,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) miningSync := stagedsync.New( stagedsync.MiningStages(ctx, - stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpdir), + stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpdir), stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(db, tmpdir), stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader(chainConfig)), diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 8fbd6986855..b9f80df8ebe 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -308,7 +308,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. miner.MiningConfig.ExtraData = nextBlock.Extra() miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error { err = stagedsync.SpawnMiningCreateBlockStage(s, tx, - stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpDir), + stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpDir), quit) if err != nil { return err diff --git a/cmd/rpcdaemon/rpcdaemontest/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go index aecc0ea2c1b..d04ad650cdd 100644 --- a/cmd/rpcdaemon/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -221,7 +221,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) listener := bufconn.Listen(1024 * 1024) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2c3d7d5f82b..5b0895ea04d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -225,6 +225,10 @@ var ( Name: "mine", Usage: "Enable mining", } + ProposingEnabledFlag = cli.BoolFlag{ + Name: "proposer", + Usage: "Enable PoS proposer", + } MinerNotifyFlag = cli.StringFlag{ Name: "miner.notify", Usage: "Comma separated HTTP URL list to notify of new work packages", @@ -1150,9 +1154,9 @@ func setParlia(ctx *cli.Context, cfg *params.ParliaConfig, datadir string) { } func setMiner(ctx *cli.Context, cfg *params.MiningConfig) { - if ctx.GlobalIsSet(MiningEnabledFlag.Name) { - cfg.Enabled = true - } + cfg.Enabled = ctx.GlobalIsSet(MiningEnabledFlag.Name) + cfg.EnabledPOS = ctx.GlobalIsSet(ProposingEnabledFlag.Name) + if cfg.Enabled && len(cfg.Etherbase.Bytes()) == 0 { panic(fmt.Sprintf("Erigon supports only remote miners. Flag --%s or --%s is required", MinerNotifyFlag.Name, MinerSigningKeyFileFlag.Name)) } diff --git a/eth/backend.go b/eth/backend.go index 4a16f34d64c..83c421bae90 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -64,6 +64,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethutils" "github.com/ledgerwatch/erigon/eth/protocols/eth" "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/node" @@ -390,11 +391,12 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere backend.pendingBlocks = miner.PendingResultCh backend.minedBlocks = miner.MiningResultCh backend.reverseDownloadCh = make(chan privateapi.PayloadMessage) - backend.statusCh = make(chan privateapi.ExecutionStatus) + backend.statusCh = make(chan privateapi.ExecutionStatus, 1) + // proof-of-work mining mining := stagedsync.New( stagedsync.MiningStages(backend.sentryCtx, - stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, tmpdir), + stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, nil, tmpdir), stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(backend.chainDB, tmpdir), stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir, blockReader), @@ -405,10 +407,38 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere if casted, ok := backend.engine.(*ethash.Ethash); ok { ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) } + // proof-of-stake mining + assembleBlockPOS := func(random common.Hash, suggestedFeeRecipient common.Address, timestamp uint64) (*types.Block, error) { + miningStatePos := stagedsync.NewMiningState(&config.Miner) + proposingSync := stagedsync.New( + stagedsync.MiningStages(backend.sentryCtx, + stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, &stagedsync.BlockProposerParametersPOS{ + Random: random, + SuggestedFeeRecipient: suggestedFeeRecipient, + Timestamp: timestamp, + }, tmpdir), + stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), + stagedsync.StageHashStateCfg(backend.chainDB, tmpdir), + stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir, blockReader), + stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit), + ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder) + // We start the mining step + if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync); err != nil { + return nil, err + } + block := <-miningStatePos.MiningResultPOSCh + return block, nil + } atomic.StoreUint32(&backend.waitingForBeaconChain, 0) + // Initialize ethbackend ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, - blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain) + blockReader, chainConfig, backend.reverseDownloadCh, backend.statusCh, &backend.waitingForBeaconChain, + backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) + // If we enabled the proposer flag we initiates the block proposing thread + if config.Miner.EnabledPOS { + ethBackendRPC.StartProposer() + } if stack.Config().PrivateApiAddr != "" { var creds credentials.TransportCredentials if stack.Config().TLSConnection { @@ -617,6 +647,12 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy var hasWork bool errc := make(chan error, 1) + tx, err := s.chainDB.BeginRo(ctx) + if err != nil { + log.Warn("mining", "err", err) + return + } + for { mineEvery.Reset(3 * time.Second) select { @@ -636,6 +672,21 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy case <-quitCh: return } + // Check if we transitioned and if we did halt POW mining + headNumber, err := stages.GetStageProgress(tx, stages.Headers) + if err != nil { + log.Warn("mining", "err", err) + return + } + + isTrans, err := rawdb.Transitioned(tx, headNumber, s.chainConfig.TerminalTotalDifficulty) + if err != nil { + log.Warn("mining", "err", err) + return + } + if isTrans { + return + } if !works && hasWork { works = true diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index f97f75afd85..c9065381986 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -137,8 +137,16 @@ func HeadersPOS( ) error { // Waiting for the beacon chain log.Info("Waiting for payloads...") + var payloadMessage privateapi.PayloadMessage atomic.StoreUint32(cfg.waitingPosHeaders, 1) - payloadMessage := <-cfg.reverseDownloadCh + // Decide what kind of action we need to take place + select { + case payloadMessage = <-cfg.reverseDownloadCh: + case <-cfg.hd.SkipCycleHack: + atomic.StoreUint32(cfg.waitingPosHeaders, 0) + return nil + } + atomic.StoreUint32(cfg.waitingPosHeaders, 0) header := payloadMessage.Header diff --git a/eth/stagedsync/stage_mining_create_block.go b/eth/stagedsync/stage_mining_create_block.go index 1c1145464d4..204d62f2364 100644 --- a/eth/stagedsync/stage_mining_create_block.go +++ b/eth/stagedsync/stage_mining_create_block.go @@ -35,40 +35,50 @@ type MiningBlock struct { } type MiningState struct { - MiningConfig *params.MiningConfig - PendingResultCh chan *types.Block - MiningResultCh chan *types.Block - MiningBlock *MiningBlock + MiningConfig *params.MiningConfig + PendingResultCh chan *types.Block + MiningResultCh chan *types.Block + MiningResultPOSCh chan *types.Block + MiningBlock *MiningBlock } func NewMiningState(cfg *params.MiningConfig) MiningState { return MiningState{ - MiningConfig: cfg, - PendingResultCh: make(chan *types.Block, 1), - MiningResultCh: make(chan *types.Block, 1), - MiningBlock: &MiningBlock{}, + MiningConfig: cfg, + PendingResultCh: make(chan *types.Block, 1), + MiningResultCh: make(chan *types.Block, 1), + MiningResultPOSCh: make(chan *types.Block, 1), + MiningBlock: &MiningBlock{}, } } +type BlockProposerParametersPOS struct { + Random common.Hash + SuggestedFeeRecipient common.Address // For now, we apply a suggested recipient only if etherbase is unset + Timestamp uint64 +} + type MiningCreateBlockCfg struct { - db kv.RwDB - miner MiningState - chainConfig params.ChainConfig - engine consensus.Engine - txPool2 *txpool.TxPool - txPool2DB kv.RoDB - tmpdir string + db kv.RwDB + miner MiningState + chainConfig params.ChainConfig + engine consensus.Engine + txPool2 *txpool.TxPool + txPool2DB kv.RoDB + tmpdir string + blockProposerParameters *BlockProposerParametersPOS } -func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, tmpdir string) MiningCreateBlockCfg { +func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig params.ChainConfig, engine consensus.Engine, txPool2 *txpool.TxPool, txPool2DB kv.RoDB, blockProposerParameters *BlockProposerParametersPOS, tmpdir string) MiningCreateBlockCfg { return MiningCreateBlockCfg{ - db: db, - miner: miner, - chainConfig: chainConfig, - engine: engine, - txPool2: txPool2, - txPool2DB: txPool2DB, - tmpdir: tmpdir, + db: db, + miner: miner, + chainConfig: chainConfig, + engine: engine, + txPool2: txPool2, + txPool2DB: txPool2DB, + tmpdir: tmpdir, + blockProposerParameters: blockProposerParameters, } } @@ -85,10 +95,6 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc staleThreshold = 7 ) - if cfg.miner.MiningConfig.Etherbase == (common.Address{}) { - return fmt.Errorf("refusing to mine without etherbase") - } - logPrefix := s.LogPrefix() executionAt, err := s.ExecutionAt(tx) if err != nil { @@ -99,6 +105,19 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc return fmt.Errorf(fmt.Sprintf("[%s] Empty block", logPrefix), "blocknum", executionAt) } + isTrans, err := rawdb.Transitioned(tx, executionAt, cfg.chainConfig.TerminalTotalDifficulty) + if err != nil { + return err + } + + if cfg.miner.MiningConfig.Etherbase == (common.Address{}) { + if !isTrans { + return fmt.Errorf("refusing to mine without etherbase") + } + // If we do not have an etherbase, let's use the suggested one + coinbase = cfg.blockProposerParameters.SuggestedFeeRecipient + } + blockNum := executionAt + 1 var txs []types.Transaction if err = cfg.txPool2DB.View(context.Background(), func(tx kv.Tx) error { @@ -161,10 +180,15 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc } // re-written miner/worker.go:commitNewWork - timestamp := time.Now().Unix() - if parent.Time >= uint64(timestamp) { - timestamp = int64(parent.Time + 1) + var timestamp int64 + // If we are on proof-of-stake timestamp should be already set for us + if !isTrans { + timestamp = time.Now().Unix() + if parent.Time >= uint64(timestamp) { + timestamp = int64(parent.Time + 1) + } } + num := parent.Number header := &types.Header{ ParentHash: parent.Hash(), @@ -202,6 +226,16 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc return err } + if isTrans { + // We apply pre-made fields + header.MixDigest = cfg.blockProposerParameters.Random + header.Time = cfg.blockProposerParameters.Timestamp + + current.Header = header + current.Uncles = nil + return nil + } + // If we are care about TheDAO hard-fork check whether to override the extra-data or not if daoBlock := cfg.chainConfig.DAOForkBlock; daoBlock != nil { // Check whether the block is among the fork extra-override range diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index 14afe5e1c9e..eab6752c864 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -5,6 +5,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/log/v3" @@ -54,6 +55,16 @@ func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit //} //prev = sealHash + // If we are on POS, we will send the result on the POS channel + isTrans, err := rawdb.Transitioned(tx, block.Header().Number.Uint64(), cfg.chainConfig.TerminalTotalDifficulty) + if err != nil { + return err + } + + if isTrans { + cfg.miningState.MiningResultPOSCh <- block + return nil + } // Tests may set pre-calculated nonce if block.NonceU64() != 0 { cfg.miningState.MiningResultCh <- block diff --git a/ethdb/privateapi/all.go b/ethdb/privateapi/all.go index fe7ce6479e2..fb3c1765a18 100644 --- a/ethdb/privateapi/all.go +++ b/ethdb/privateapi/all.go @@ -44,6 +44,7 @@ func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txP if healthCheck { defer healthServer.Shutdown() } + defer ethBackendSrv.StopProposer() if err := grpcServer.Serve(lis); err != nil { log.Error("private RPC server fail", "err", err) } diff --git a/ethdb/privateapi/engine_test.go b/ethdb/privateapi/engine_test.go index d1fb120e73b..6591db69fa2 100644 --- a/ethdb/privateapi/engine_test.go +++ b/ethdb/privateapi/engine_test.go @@ -93,7 +93,7 @@ func TestMockDownloadRequest(t *testing.T) { statusCh := make(chan ExecutionStatus) waitingForHeaders := uint32(1) - backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders) + backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false) var err error var reply *remote.EngineExecutePayloadReply @@ -153,7 +153,7 @@ func TestMockValidExecution(t *testing.T) { statusCh := make(chan ExecutionStatus) waitingForHeaders := uint32(1) - backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders) + backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false) var err error var reply *remote.EngineExecutePayloadReply @@ -189,7 +189,7 @@ func TestMockInvalidExecution(t *testing.T) { statusCh := make(chan ExecutionStatus) waitingForHeaders := uint32(1) - backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders) + backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false) var err error var reply *remote.EngineExecutePayloadReply @@ -225,7 +225,7 @@ func TestNoTTD(t *testing.T) { statusCh := make(chan ExecutionStatus) waitingForHeaders := uint32(1) - backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{}, reverseDownloadCh, statusCh, &waitingForHeaders) + backend := NewEthBackendServer(ctx, nil, db, nil, nil, ¶ms.ChainConfig{}, reverseDownloadCh, statusCh, &waitingForHeaders, nil, nil, false) var err error diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index aa89d3918a7..134c8854abe 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -8,6 +8,7 @@ import ( "sync" "sync/atomic" + "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" @@ -31,6 +32,8 @@ const ( Invalid PayloadStatus = "INVALID" ) +type assemblePayloadPOSFunc func(random common.Hash, suggestedFeeRecipient common.Address, timestamp uint64) (*types.Block, error) + // EthBackendAPIVersion // 2.0.0 - move all mining-related methods to 'txpool/mining' server // 2.1.0 - add NetPeerCount function @@ -54,11 +57,13 @@ type EthBackendServer struct { reverseDownloadCh chan<- PayloadMessage // Notify whether the current block being processed is Valid or not statusCh <-chan ExecutionStatus - // Last block number sent over via reverseDownloadCh - numberSent uint64 // Determines whether stageloop is processing a block or not - waitingForBeaconChain *uint32 // atomic boolean flag - mu sync.Mutex + waitingForBeaconChain *uint32 // atomic boolean flag + skipCycleHack chan struct{} // with this channel we tell the stagedsync that we want to assemble a block + assemblePayloadPOS assemblePayloadPOSFunc + proposing bool + syncCond *sync.Cond // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time + shutdown bool } type EthBackend interface { @@ -85,10 +90,12 @@ type PayloadMessage struct { func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader interfaces.BlockReader, config *params.ChainConfig, reverseDownloadCh chan<- PayloadMessage, statusCh <-chan ExecutionStatus, waitingForBeaconChain *uint32, + skipCycleHack chan struct{}, assemblePayloadPOS assemblePayloadPOSFunc, proposing bool, ) *EthBackendServer { return &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config, reverseDownloadCh: reverseDownloadCh, statusCh: statusCh, waitingForBeaconChain: waitingForBeaconChain, - pendingPayloads: make(map[uint64]types2.ExecutionPayload), + pendingPayloads: make(map[uint64]types2.ExecutionPayload), skipCycleHack: skipCycleHack, + assemblePayloadPOS: assemblePayloadPOS, proposing: proposing, syncCond: sync.NewCond(&sync.Mutex{}), } } @@ -200,14 +207,14 @@ func (s *EthBackendServer) Block(ctx context.Context, req *remote.BlockRequest) // EngineExecutePayloadV1, executes payload func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *types2.ExecutionPayload) (*remote.EngineExecutePayloadReply, error) { + s.syncCond.L.Lock() + defer s.syncCond.L.Unlock() if s.config.TerminalTotalDifficulty == nil { return nil, fmt.Errorf("not a proof-of-stake chain") } blockHash := gointerfaces.ConvertH256ToHash(req.BlockHash) - // Discard all previous prepared payloads if another block was proposed - s.pendingPayloads = make(map[uint64]types2.ExecutionPayload) // If another payload is already commissioned then we just reply with syncing if atomic.LoadUint32(s.waitingForBeaconChain) == 0 { // We are still syncing a commissioned payload @@ -247,7 +254,7 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes())) } // Send the block over - s.numberSent = req.BlockNumber + s.reverseDownloadCh <- PayloadMessage{ Header: &header, Body: &types.RawBody{ @@ -257,11 +264,13 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type } executedStatus := <-s.statusCh - + // Discard all previous prepared payloads if another block was proposed if executedStatus.Error != nil { return nil, executedStatus.Error } - + // Discard all payload assembled + s.pendingPayloads = make(map[uint64]types2.ExecutionPayload) + // Send reply over reply := remote.EngineExecutePayloadReply{Status: string(executedStatus.Status)} if executedStatus.LatestValidHash != (common.Hash{}) { reply.LatestValidHash = gointerfaces.ConvertHashToH256(executedStatus.LatestValidHash) @@ -271,27 +280,41 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type // EngineGetPayloadV1, retrieves previously assembled payload (Validators only) func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) { - s.mu.Lock() - defer s.mu.Unlock() + s.syncCond.L.Lock() + defer s.syncCond.L.Unlock() + + if !s.proposing { + return nil, fmt.Errorf("execution layer not running as a proposer. enable --proposer flag on startup") + } if s.config.TerminalTotalDifficulty == nil { return nil, fmt.Errorf("not a proof-of-stake chain") } - payload, ok := s.pendingPayloads[req.PayloadId] - if ok { - return &payload, nil + for { + payload, ok := s.pendingPayloads[req.PayloadId] + if !ok { + return nil, fmt.Errorf("unknown payload") + } + + if payload.BlockNumber != 0 { + return &payload, nil + } + + // Wait for payloads assembling thread to finish + s.syncCond.Wait() } - return nil, fmt.Errorf("unknown payload") } // EngineForkChoiceUpdatedV1, either states new block head or request the assembling of a new bloc func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) { - s.mu.Lock() - defer s.mu.Unlock() + s.syncCond.L.Lock() + defer s.syncCond.L.Unlock() + if s.config.TerminalTotalDifficulty == nil { return nil, fmt.Errorf("not a proof-of-stake chain") } + // Check if parent equate to the head parent := gointerfaces.ConvertH256ToHash(req.Forkchoice.HeadBlockHash) tx, err := s.db.BeginRo(ctx) @@ -299,35 +322,32 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r return nil, err } - headHeader, err := rawdb.ReadHeaderByHash(tx, parent) - if err != nil { - return nil, err + if parent != rawdb.ReadHeadHeaderHash(tx) { + // TODO(enriavil1): make unwind happen + return &remote.EngineForkChoiceUpdatedReply{ + Status: string(Syncing), + }, nil } - if atomic.LoadUint32(s.waitingForBeaconChain) == 0 || headHeader == nil { + // Same if we are not waiting for the beacon chain + if atomic.LoadUint32(s.waitingForBeaconChain) == 0 { return &remote.EngineForkChoiceUpdatedReply{ Status: string(Syncing), }, nil } - // Hash is incorrect because mining archittecture has yet to be implemented + if !s.proposing { + return nil, fmt.Errorf("execution layer not running as a proposer. enable --proposer flag on startup") + } + s.pendingPayloads[s.payloadId] = types2.ExecutionPayload{ - ParentHash: req.Forkchoice.HeadBlockHash, - Coinbase: req.Prepare.FeeRecipient, - Timestamp: req.Prepare.Timestamp, - Random: req.Prepare.Random, - StateRoot: gointerfaces.ConvertHashToH256(headHeader.Root), - ReceiptRoot: gointerfaces.ConvertHashToH256(types.EmptyRootHash), - LogsBloom: &types2.H2048{}, - GasLimit: headHeader.GasLimit, - GasUsed: 0, - BlockNumber: headHeader.Number.Uint64() + 1, - ExtraData: []byte{}, - BaseFeePerGas: &types2.H256{}, - BlockHash: gointerfaces.ConvertHashToH256(headHeader.Hash()), - Transactions: [][]byte{}, + Random: req.Prepare.Random, + Timestamp: req.Prepare.Timestamp, + Coinbase: req.Prepare.FeeRecipient, } - // successfully assembled the payload and assinged the correct id + // Unpause assemble process + s.syncCond.Broadcast() + // successfully assembled the payload and assigned the correct id defer func() { s.payloadId++ }() return &remote.EngineForkChoiceUpdatedReply{ Status: "SUCCESS", @@ -335,6 +355,89 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r }, nil } +func (s *EthBackendServer) StartProposer() { + + go func() { + s.syncCond.L.Lock() + defer s.syncCond.L.Unlock() + + for { + // Wait until we have to process new payloads + s.syncCond.Wait() + + if s.shutdown { + return + } + + // Go over each payload and re-update them + for id := range s.pendingPayloads { + // If we already assembled this block, let's just skip it + if s.pendingPayloads[id].BlockNumber != 0 { + continue + } + // we do not want to make a copy of the payload in the loop because it contains a lock + random := gointerfaces.ConvertH256ToHash(s.pendingPayloads[id].Random) + coinbase := gointerfaces.ConvertH160toAddress(s.pendingPayloads[id].Coinbase) + timestamp := s.pendingPayloads[id].Timestamp + // Tell the stage headers to leave space for the write transaction for mining stages + s.skipCycleHack <- struct{}{} + + block, err := s.assemblePayloadPOS(random, coinbase, timestamp) + if err != nil { + log.Warn("Error during block assembling", "err", err.Error()) + return + } + var baseFeeReply *types2.H256 + if block.Header().BaseFee != nil { + var baseFee uint256.Int + baseFee.SetFromBig(block.Header().BaseFee) + baseFeeReply = gointerfaces.ConvertUint256IntToH256(&baseFee) + } + var encodedTransactions [][]byte + buf := bytes.NewBuffer(nil) + + for _, tx := range block.Transactions() { + buf.Reset() + + err := rlp.Encode(buf, tx) + if err != nil { + log.Warn("Broken tx rlp", "err", err.Error()) + return + } + encodedTransactions = append(encodedTransactions, common.CopyBytes(buf.Bytes())) + } + // Set parameters accordingly to what the beacon chain told us and from what the mining stage told us + s.pendingPayloads[id] = types2.ExecutionPayload{ + ParentHash: gointerfaces.ConvertHashToH256(block.Header().ParentHash), + Coinbase: gointerfaces.ConvertAddressToH160(block.Header().Coinbase), + Timestamp: s.pendingPayloads[id].Timestamp, + Random: s.pendingPayloads[id].Random, + StateRoot: gointerfaces.ConvertHashToH256(block.Root()), + ReceiptRoot: gointerfaces.ConvertHashToH256(block.ReceiptHash()), + LogsBloom: gointerfaces.ConvertBytesToH2048(block.Bloom().Bytes()), + GasLimit: block.GasLimit(), + GasUsed: block.GasUsed(), + BlockNumber: block.NumberU64(), + ExtraData: block.Extra(), + BaseFeePerGas: baseFeeReply, + BlockHash: gointerfaces.ConvertHashToH256(block.Header().Hash()), + Transactions: encodedTransactions, + } + } + // Broadcast the signal that an entire loop over pending payloads has been executed + s.syncCond.Broadcast() + } + }() +} + +func (s *EthBackendServer) StopProposer() { + s.syncCond.L.Lock() + defer s.syncCond.L.Unlock() + + s.shutdown = true + s.syncCond.Broadcast() +} + func (s *EthBackendServer) NodeInfo(_ context.Context, r *remote.NodesInfoRequest) (*remote.NodesInfoReply, error) { nodesInfo, err := s.eth.NodesInfo(int(r.Limit)) if err != nil { diff --git a/params/mining.go b/params/mining.go index d3878199f89..1e9c9a95661 100644 --- a/params/mining.go +++ b/params/mining.go @@ -11,14 +11,15 @@ import ( // MiningConfig is the configuration parameters of mining. type MiningConfig struct { - Enabled bool - Noverify bool // Disable remote mining solution verification(only useful in ethash). - Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards - SigKey *ecdsa.PrivateKey // ECDSA private key for signing blocks - Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages(only useful in ethash). - ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner - GasFloor uint64 // Target gas floor for mined blocks. - GasCeil uint64 // Target gas ceiling for mined blocks. - GasPrice *big.Int // Minimum gas price for mining a transaction - Recommit time.Duration // The time interval for miner to re-create mining work. + Enabled bool + EnabledPOS bool + Noverify bool // Disable remote mining solution verification(only useful in ethash). + Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards + SigKey *ecdsa.PrivateKey // ECDSA private key for signing blocks + Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages(only useful in ethash). + ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner + GasFloor uint64 // Target gas floor for mined blocks. + GasCeil uint64 // Target gas ceiling for mined blocks. + GasPrice *big.Int // Minimum gas price for mining a transaction + Recommit time.Duration // The time interval for miner to re-create mining work. } diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 3ca258f0b6d..58ec3ca309b 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -77,6 +77,7 @@ var DefaultFlags = []cli.Flag{ utils.CliqueSnapshotInmemorySignaturesFlag, utils.CliqueDataDirFlag, utils.MiningEnabledFlag, + utils.ProposingEnabledFlag, utils.MinerNotifyFlag, utils.MinerGasTargetFlag, utils.MinerGasLimitFlag, diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 0b42f43d262..1566aa76257 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -339,7 +339,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.MinedBlocks = miner.MiningResultCh mock.MiningSync = stagedsync.New( stagedsync.MiningStages(mock.Ctx, - stagedsync.StageMiningCreateBlockCfg(mock.DB, miner, *mock.ChainConfig, mock.Engine, mock.TxPool, nil, mock.tmpdir), + stagedsync.StageMiningCreateBlockCfg(mock.DB, miner, *mock.ChainConfig, mock.Engine, mock.TxPool, nil, nil, mock.tmpdir), stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, mock.tmpdir), stagedsync.StageHashStateCfg(mock.DB, mock.tmpdir), stagedsync.StageTrieCfg(mock.DB, false, true, mock.tmpdir, blockReader),