diff --git a/aggsender/aggsender.go b/aggsender/aggsender.go index b5156832d..38e22452f 100644 --- a/aggsender/aggsender.go +++ b/aggsender/aggsender.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" jRPC "github.com/0xPolygon/cdk-rpc/rpc" @@ -19,6 +20,7 @@ import ( aggsenderrpc "github.com/agglayer/aggkit/aggsender/rpc" "github.com/agglayer/aggkit/aggsender/statuschecker" "github.com/agglayer/aggkit/aggsender/types" + "github.com/agglayer/aggkit/aggsender/validator" aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/db/compatibility" "github.com/agglayer/aggkit/log" @@ -43,6 +45,11 @@ type AggSender struct { certStatusChecker types.CertificateStatusChecker certQuerier types.CertificateQuerier rollupDataQuerier types.RollupDataQuerier + committeeQuerier types.MultisigQuerier + + l1Client aggkittypes.BaseEthereumClienter + l1InfoTreeSyncer types.L1InfoTreeSyncer + localValidator types.CertificateValidateAndSigner cfg config.Config @@ -51,8 +58,6 @@ type AggSender struct { flow types.AggsenderFlow l2OriginNetwork uint32 - - validator types.CertificateValidateAndSigner } // New returns a new AggSender instance @@ -67,6 +72,7 @@ func New( l1Client aggkittypes.BaseEthereumClienter, l2Client aggkittypes.BaseEthereumClienter, rollupDataQuerier types.RollupDataQuerier, + committeeQuerier types.MultisigQuerier, ) (*AggSender, error) { storageConfig := db.AggSenderSQLStorageConfig{ DBPath: cfg.StoragePath, @@ -130,33 +136,14 @@ func New( l2OriginNetwork: l2OriginNetwork, certQuerier: certQuerier, rollupDataQuerier: rollupDataQuerier, + committeeQuerier: committeeQuerier, certStatusChecker: statuschecker.NewCertStatusChecker( logger, storage, aggLayerClient, certQuerier, l2OriginNetwork), + l1Client: l1Client, + l1InfoTreeSyncer: l1InfoTreeSyncer, }, nil } -func (a *AggSender) AttachValidator(validator types.CertificateValidateAndSigner) { - if validator == nil { - a.log.Infof("AggSender validator attached: nil") - return - } - - a.validator = validator - a.log.Infof("AggSender validator attached: %s", a.validator.String()) -} - -func (a *AggSender) GetStorage() db.AggSenderStorage { - return a.storage -} - -func (a *AggSender) GetFlow() types.AggsenderFlow { - return a.flow -} - -func (a *AggSender) GetCertQuerier() types.CertificateQuerier { - return a.certQuerier -} - func (a *AggSender) GetLERQuerier() types.LERQuerier { return query.NewLERDataQuerier(a.cfg.RollupCreationBlockL1, a.rollupDataQuerier) } @@ -177,7 +164,7 @@ func (a *AggSender) GetRPCServices() []jRPC.Service { return []jRPC.Service{} } - logger := log.WithFields("aggsender-rpc", aggkitcommon.BRIDGE) + logger := log.WithFields("module", "aggsender-rpc") return []jRPC.Service{ { Name: "aggsender", @@ -197,16 +184,23 @@ func (a *AggSender) Start(ctx context.Context) { if err := a.flow.CheckInitialStatus(ctx); err != nil { a.log.Panicf("error checking flow Initial Status: %v", err) } - if a.validator != nil { - status, err := a.validator.HealthCheck(ctx) - if err != nil { - a.log.Warnf("error checking validator health: %v", err) - } - if !status.IsHealthy() { - a.log.Warnf("validator status is not healthy: %s", status.String()) - } - a.log.Infof("validator health check: %s", status.String()) + + // TODO: The local validator implementation is there solely for testing purposes + // and it should be removed after integration testing is done. + if !a.cfg.RequireValidatorCall { + a.localValidator = validator.NewLocalValidator( + a.log, + a.storage, + validator.NewAggsenderValidator( + a.log, + a.flow, + query.NewL1InfoTreeDataQuerier(a.l1Client, a.l1InfoTreeSyncer), + a.certQuerier, + a.GetLERQuerier(), + ), + ) } + a.sendCertificates(ctx, 0) } @@ -342,7 +336,24 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi return nil, fmt.Errorf("error building certificate: %w", err) } - validatorSignature, err := a.callValidator(ctx, certificate, certificateParams.ToBlock) + var ( + validators []types.CertificateValidateAndSigner + signaturesThreshold uint32 + ) + + if a.localValidator != nil { + validators = append(validators, a.localValidator) + signaturesThreshold = 1 + } else { + validators, signaturesThreshold, err = a.getValidators(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get validators: %w", err) + } + } + + multisig, err := a.pollValidators(ctx, + validators, signaturesThreshold, + certificate, certificateParams.ToBlock) if err != nil { // TODO - agglayer has not yet implemented the endpoints needed to validate a certificate // so lets just log the error and continue. This will be changed when the agglayer is ready @@ -365,7 +376,12 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi time.Sleep(*rateLimitSleepTime) } - certificateHash, err := a.aggLayerClient.SendCertificate(ctx, certificate, validatorSignature) + // TODO: Update once agglayer endpoint changes to accept the multisig + var signature []byte + if len(multisig) > 0 { + signature = multisig[0] + } + certificateHash, err := a.aggLayerClient.SendCertificate(ctx, certificate, signature) if err != nil { a.saveNonAcceptedCert(ctx, certificate, certificateParams.CreatedAt, err) @@ -416,24 +432,122 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi return certificate, nil } -// callValidator calls the validator to validate the certificate -func (a *AggSender) callValidator( +// pollValidators committee signers to validate the certificate and gather signatures +func (a *AggSender) pollValidators( ctx context.Context, + validators []types.CertificateValidateAndSigner, + signaturesThreshold uint32, certificate *agglayertypes.Certificate, - lastL2BlockInCert uint64) ([]byte, error) { - if a.validator == nil { + lastL2BlockInCert uint64, +) ([][]byte, error) { + if len(validators) == 0 { + a.log.Warnf("skipping certificate validation, because there are no validators configured") return nil, nil } + a.log.Infof("delegating certificate validation: %s", certificate.Brief()) - validatorSignature, err := a.validator.ValidateAndSignCertificate(ctx, certificate, lastL2BlockInCert) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + type signResult struct { + signature []byte + err error + } + + resultsCh := make(chan signResult, len(validators)) + var wg sync.WaitGroup + + for _, v := range validators { + wg.Add(1) + go func(v types.CertificateValidateAndSigner) { + defer wg.Done() + + select { + case <-ctx.Done(): + return + default: + } + + status, err := v.HealthCheck(ctx) + if err != nil { + a.log.Warnf("error checking validator health (URL=%s): %v", v.URL(), err) + resultsCh <- signResult{err: err} + return + } + + if !status.IsHealthy() { + a.log.Warnf("validator (URL=%s) is not healthy: %s, skipping it...", v.URL(), status.String()) + return // skip unhealthy validator + } + + a.log.Infof("validator health check (URL=%s): %s", v.URL(), status.String()) + + sig, err := v.ValidateAndSignCertificate(ctx, certificate, lastL2BlockInCert) + if err != nil { + a.log.Errorf("validator %v failed to validate the certificate: %v", v, err) + resultsCh <- signResult{err: err} + return + } + + resultsCh <- signResult{signature: sig} + }(v) + } + + go func() { + wg.Wait() + close(resultsCh) + }() + + signatures := make([][]byte, 0, len(validators)) + var errs []error + + for res := range resultsCh { + if res.err != nil { + errs = append(errs, res.err) + continue + } + + signatures = append(signatures, res.signature) + if uint32(len(signatures)) >= signaturesThreshold { + cancel() // signal other goroutines to stop early + } + } + + if uint32(len(signatures)) < signaturesThreshold { + if len(errs) > 0 { + return signatures, errors.Join(errs...) + } + return signatures, fmt.Errorf("threshold not reached: %d/%d", len(signatures), signaturesThreshold) + } + + a.log.Infof("certificate validation passed with %d/%d signatures: %s", + len(signatures), signaturesThreshold, certificate.Brief()) + + return signatures, nil +} + +// getValidators retrieves the actual multisig committee and creates a set of the validators +// that are going to validate the provided certificate +func (a *AggSender) getValidators(ctx context.Context) ([]types.CertificateValidateAndSigner, uint32, error) { + committee, err := a.committeeQuerier.GetMultisigCommittee(ctx, aggkittypes.LatestBlockNum) if err != nil { - a.log.Errorf("certificate validation failed: %w. Cert: %s", err, certificate.Brief()) - return nil, fmt.Errorf("certificate validation failed: %w", err) + return nil, 0, fmt.Errorf("failed to retrieve the latest multisig committee: %w", err) } - a.log.Infof("certificate validation passed: %s", certificate.Brief()) + validators := make([]types.CertificateValidateAndSigner, 0, committee.Size()) + for _, signer := range committee.Signers() { + clientCfg := a.cfg.ValidatorClient.WithURL(signer.URL) + validator, err := validator.NewRemoteValidator(&clientCfg, a.storage) + if err != nil { + return nil, 0, fmt.Errorf("failed to create a remote validator for committee signer (Address=%s, URL=%s): %w", + signer.Address, signer.URL, err) + } + + validators = append(validators, validator) + } - return validatorSignature, nil + return validators, committee.Threshold(), nil } // saveCertificateToStorage saves the certificate to the storage diff --git a/aggsender/aggsender_test.go b/aggsender/aggsender_test.go index f349100f3..3313c40a5 100644 --- a/aggsender/aggsender_test.go +++ b/aggsender/aggsender_test.go @@ -21,6 +21,7 @@ import ( "github.com/agglayer/aggkit/aggsender/flows" "github.com/agglayer/aggkit/aggsender/mocks" aggsendertypes "github.com/agglayer/aggkit/aggsender/types" + "github.com/agglayer/aggkit/aggsender/validator" "github.com/agglayer/aggkit/bridgesync" aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/config/types" @@ -104,6 +105,7 @@ func TestAggSenderStart(t *testing.T) { nil, // l1 client nil, // l2 client rollupQuerierMock, + nil, // committee querier ) require.NoError(t, err) require.NotNil(t, aggSender) @@ -121,7 +123,7 @@ func TestExploratoryGenerateCert(t *testing.T) { key, err := crypto.GenerateKey() require.NoError(t, err) - signature, err := crypto.Sign(common.HexToHash("0x1").Bytes(), key) + signature, err := crypto.Sign(common.Hex2Bytes("0x1"), key) require.NoError(t, err) certificate := &agglayertypes.Certificate{ @@ -210,6 +212,12 @@ func TestSendCertificate_NoClaims(t *testing.T) { mockLERQuerier := mocks.NewLERQuerier(t) logger := log.WithFields("aggsender-test", "no claims test") signer := signer.NewLocalSignFromPrivateKey("ut", log.WithFields("aggsender", 1), privateKey, 0) + mockValidator := mocks.NewCertificateValidateAndSigner(t) + mockValidator.EXPECT().HealthCheck(mock.Anything).Return(&aggsendertypes.HealthCheckResponse{Status: aggsendertypes.HealthCheckStatusOK}, nil) + mockValidator.EXPECT().URL().Return("http://localhost") + mockValidator.EXPECT(). + ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything). + Return(common.Hex2Bytes("0x5ca1e"), nil).Once() aggSender := &AggSender{ log: logger, storage: mockStorage, @@ -217,6 +225,7 @@ func TestSendCertificate_NoClaims(t *testing.T) { aggLayerClient: mockAggLayerClient, epochNotifier: mockEpochNotifier, cfg: config.Config{}, + localValidator: mockValidator, flow: flows.NewPPFlow(logger, flows.NewBaseFlow(logger, mockL2BridgeQuerier, mockStorage, mockL1Querier, mockLERQuerier, flows.NewBaseFlowConfigDefault()), @@ -320,6 +329,16 @@ func TestSendCertificate(t *testing.T) { mockAgglayerClient.EXPECT().SendCertificate(mock.Anything, mock.Anything, mock.Anything).Return(common.Hash{}, errors.New("some error")).Once() mockStorage.EXPECT().SaveNonAcceptedCertificate(mock.Anything, mock.Anything).Return(nil).Once() }, + mockValidatorFn: func() *mocks.CertificateValidateAndSigner { + mockValidator := mocks.NewCertificateValidateAndSigner(t) + mockValidator.EXPECT().HealthCheck(mock.Anything).Return(&aggsendertypes. + HealthCheckResponse{Status: aggsendertypes.HealthCheckStatusOK}, nil) + mockValidator.EXPECT().URL().Return("http://localhost") + mockValidator.EXPECT(). + ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything). + Return(common.Hex2Bytes("0x5ca1e"), nil).Once() + return mockValidator + }, expectedError: "error sending certificate", }, { @@ -339,6 +358,16 @@ func TestSendCertificate(t *testing.T) { mockAgglayerClient.EXPECT().SendCertificate(mock.Anything, mock.Anything, mock.Anything).Return(common.HexToHash("0x22"), nil).Once() mockStorage.EXPECT().SaveLastSentCertificate(mock.Anything, mock.Anything).Return(errors.New("some error")).Once() }, + mockValidatorFn: func() *mocks.CertificateValidateAndSigner { + mockValidator := mocks.NewCertificateValidateAndSigner(t) + mockValidator.EXPECT().HealthCheck(mock.Anything).Return(&aggsendertypes. + HealthCheckResponse{Status: aggsendertypes.HealthCheckStatusOK}, nil) + mockValidator.EXPECT().URL().Return("http://localhost") + mockValidator.EXPECT(). + ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything). + Return(common.Hex2Bytes("0x5ca1e"), nil).Once() + return mockValidator + }, expectedError: "error saving last sent certificate", }, { @@ -361,7 +390,12 @@ func TestSendCertificate(t *testing.T) { }, mockValidatorFn: func() *mocks.CertificateValidateAndSigner { mockValidator := mocks.NewCertificateValidateAndSigner(t) - mockValidator.EXPECT().ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error")).Once() + mockValidator.EXPECT().HealthCheck(mock.Anything).Return(&aggsendertypes.HealthCheckResponse{Status: aggsendertypes.HealthCheckStatusOK}, nil) + mockValidator.EXPECT().URL().Return("http://localhost") + mockValidator.EXPECT().String().Return("local validator") + mockValidator.EXPECT(). + ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything). + Return(nil, errors.New("some error")).Once() return mockValidator }, // expectedError: "certificate validation failed: some error", // TODO - this will be fixed when the agglayer is ready @@ -380,12 +414,17 @@ func TestSendCertificate(t *testing.T) { NewLocalExitRoot: common.HexToHash("0x11"), BridgeExits: []*agglayertypes.BridgeExit{{}}, }, nil).Once() - mockAgglayerClient.EXPECT().SendCertificate(mock.Anything, mock.Anything, []byte{1, 2, 3}).Return(common.HexToHash("0x22"), nil).Once() + mockAgglayerClient.EXPECT().SendCertificate(mock.Anything, mock.Anything, common.Hex2Bytes("0x123456")). + Return(common.HexToHash("0x22"), nil).Once() mockStorage.EXPECT().SaveLastSentCertificate(mock.Anything, mock.Anything).Return(nil).Once() }, mockValidatorFn: func() *mocks.CertificateValidateAndSigner { mockValidator := mocks.NewCertificateValidateAndSigner(t) - mockValidator.EXPECT().ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything).Return([]byte{1, 2, 3}, nil).Once() + mockValidator.EXPECT().HealthCheck(mock.Anything).Return(&aggsendertypes. + HealthCheckResponse{Status: aggsendertypes.HealthCheckStatusOK}, nil) + mockValidator.EXPECT().URL().Return("http://localhost") + mockValidator.EXPECT().ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything). + Return(common.Hex2Bytes("0x123456"), nil).Once() return mockValidator }, }, @@ -406,6 +445,16 @@ func TestSendCertificate(t *testing.T) { mockAgglayerClient.EXPECT().SendCertificate(mock.Anything, mock.Anything, mock.Anything).Return(common.HexToHash("0x22"), nil).Once() mockStorage.EXPECT().SaveLastSentCertificate(mock.Anything, mock.Anything).Return(nil).Once() }, + mockValidatorFn: func() *mocks.CertificateValidateAndSigner { + mockValidator := mocks.NewCertificateValidateAndSigner(t) + mockValidator.EXPECT().HealthCheck(mock.Anything).Return(&aggsendertypes. + HealthCheckResponse{Status: aggsendertypes.HealthCheckStatusOK}, nil) + mockValidator.EXPECT().URL().Return("http://localhost") + mockValidator.EXPECT(). + ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything). + Return(common.Hex2Bytes("0x12345"), nil).Once() + return mockValidator + }, }, } @@ -434,7 +483,7 @@ func TestSendCertificate(t *testing.T) { } if tt.mockValidatorFn != nil { - aggsender.validator = tt.mockValidatorFn() + aggsender.localValidator = tt.mockValidatorFn() } mockEpochNotifier.EXPECT().GetEpochStatus().Return(aggsendertypes.EpochStatus{}) @@ -452,6 +501,82 @@ func TestSendCertificate(t *testing.T) { } } +func TestGetValidators(t *testing.T) { + allSigners := []*aggsendertypes.SignerInfo{ + aggsendertypes.NewSignerInfo("http://localhost:8001", common.HexToAddress("0x1")), + aggsendertypes.NewSignerInfo("http://localhost:8002", common.HexToAddress("0x2")), + aggsendertypes.NewSignerInfo("http://localhost:8003", common.HexToAddress("0x3")), + aggsendertypes.NewSignerInfo("http://localhost:8004", common.HexToAddress("0x4")), + aggsendertypes.NewSignerInfo("http://localhost:8005", common.HexToAddress("0x5")), + aggsendertypes.NewSignerInfo("http://localhost:8006", common.HexToAddress("0x6")), + } + + testCases := []struct { + name string + signers []*aggsendertypes.SignerInfo + expectedValidatorsFn func(*testing.T, []*aggsendertypes.SignerInfo) []aggsendertypes.CertificateValidateAndSigner + expectedThreshold uint32 + expectedError string + }{ + { + name: "successful return of committee validators", + signers: allSigners[:len(allSigners)/2], + expectedThreshold: uint32(len(allSigners) / 2), + expectedValidatorsFn: func(t *testing.T, + signers []*aggsendertypes.SignerInfo) []aggsendertypes.CertificateValidateAndSigner { + t.Helper() + + validators := make([]aggsendertypes.CertificateValidateAndSigner, 0, len(signers)) + for _, signer := range signers { + validator, err := validator.NewRemoteValidator(&grpc.ClientConfig{URL: signer.URL}, nil) + require.NoError(t, err) + validators = append(validators, validator) + } + return validators + }, + }, + { + name: "failed to query the committee", + expectedError: "invalid parameters", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + committeeQuerierMock := mocks.NewMultisigQuerier(t) + + if tc.expectedError == "" { + committee, err := aggsendertypes.NewMultisigCommittee(tc.signers, uint32(len(tc.signers))) + require.NoError(t, err) + committeeQuerierMock.EXPECT(). + GetMultisigCommittee(mock.Anything, mock.Anything). + Return(committee, nil) + } else { + committeeQuerierMock.EXPECT(). + GetMultisigCommittee(mock.Anything, mock.Anything). + Return(nil, errors.New(tc.expectedError)) + } + + aggsender := &AggSender{ + cfg: config.Config{ValidatorClient: &grpc.ClientConfig{}}, + committeeQuerier: committeeQuerierMock, + } + + validators, threshold, err := aggsender.getValidators(t.Context()) + if tc.expectedError != "" { + require.ErrorContains(t, err, tc.expectedError) + } else { + expectedValidators := tc.expectedValidatorsFn(t, tc.signers) + require.Len(t, validators, len(tc.signers)) + for i, v := range expectedValidators { + require.Equal(t, v.URL(), validators[i].URL()) + } + require.Equal(t, tc.expectedThreshold, threshold) + } + }) + } +} + func TestNewAggSender(t *testing.T) { mockBridgeSyncer := mocks.NewL2BridgeSyncer(t) mockBridgeSyncer.EXPECT().OriginNetwork().Return(uint32(1)).Times(2) @@ -460,7 +585,13 @@ func TestNewAggSender(t *testing.T) { Method: signertypes.MethodNone, }, Mode: "PessimisticProof", - }, nil, nil, mockBridgeSyncer, nil, nil, nil, nil) + }, nil, nil, mockBridgeSyncer, + nil, // epoch notifier + nil, // l1 client + nil, // l2 client + nil, // rollup data querier + nil, // committee querier + ) require.NoError(t, err) require.NotNil(t, sut) require.Contains(t, sut.rateLimiter.String(), "Unlimited") @@ -492,6 +623,7 @@ func TestAggSenderStartFailsCompatibilityChecker(t *testing.T) { testData.sut.Start(testData.ctx) }, "Expected panic when starting AggSender") } + func TestSendCertificatesRetry(t *testing.T) { mockCertStatusChecker := mocks.NewCertificateStatusChecker(t) mockEpochNotifier := mocks.NewEpochNotifier(t) @@ -542,6 +674,7 @@ func TestSendCertificatesRetry(t *testing.T) { }() aggSender.sendCertificates(ctx, 2) } + func TestSendCertificates(t *testing.T) { t.Parallel() diff --git a/aggsender/config/config.go b/aggsender/config/config.go index fceedfca1..941d1d920 100644 --- a/aggsender/config/config.go +++ b/aggsender/config/config.go @@ -13,8 +13,6 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" ) -var errValidatorClientURLNotSet = fmt.Errorf("ValidatorClient URL must be set when RequireValidatorCall is true") - // Config is the configuration for the AggSender type Config struct { // StoragePath is the path of the sqlite db on which the AggSender will store the data @@ -125,10 +123,6 @@ func (c Config) Validate() error { "RequireValidatorCall can only be true in PessimisticProof or AggchainProof mode, got %s", c.Mode) } - - if c.ValidatorClient == nil || c.ValidatorClient.URL == "" { - return errValidatorClientURLNotSet - } } if err := c.AgglayerClient.Validate(); err != nil { diff --git a/aggsender/config/config_test.go b/aggsender/config/config_test.go index 2c995314a..ec4db24a6 100644 --- a/aggsender/config/config_test.go +++ b/aggsender/config/config_test.go @@ -46,22 +46,6 @@ func TestValidate(t *testing.T) { }, }, }, - { - name: "RequireValidatorCall is true with ValidatorClient URL not set", - config: Config{ - Mode: aggsendertypes.PessimisticProofMode.String(), - RequireValidatorCall: true, - ValidatorClient: &grpc.ClientConfig{ - URL: "", - }, - AgglayerClient: agglayer.ClientConfig{GRPC: &grpc.ClientConfig{ - URL: "http://localhost:9090", - MinConnectTimeout: types.NewDuration(5 * time.Second), - }, - }, - }, - expectedErr: "ValidatorClient URL must be set when RequireValidatorCall is true", - }, { name: "Invalid AgglayerClient configuration", config: Config{ diff --git a/aggsender/mocks/mock_certificate_validate_and_signer.go b/aggsender/mocks/mock_certificate_validate_and_signer.go index 74a749d70..d9e4a09da 100644 --- a/aggsender/mocks/mock_certificate_validate_and_signer.go +++ b/aggsender/mocks/mock_certificate_validate_and_signer.go @@ -128,6 +128,51 @@ func (_c *CertificateValidateAndSigner_String_Call) RunAndReturn(run func() stri return _c } +// URL provides a mock function with no fields +func (_m *CertificateValidateAndSigner) URL() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for URL") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// CertificateValidateAndSigner_URL_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'URL' +type CertificateValidateAndSigner_URL_Call struct { + *mock.Call +} + +// URL is a helper method to define mock.On call +func (_e *CertificateValidateAndSigner_Expecter) URL() *CertificateValidateAndSigner_URL_Call { + return &CertificateValidateAndSigner_URL_Call{Call: _e.mock.On("URL")} +} + +func (_c *CertificateValidateAndSigner_URL_Call) Run(run func()) *CertificateValidateAndSigner_URL_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *CertificateValidateAndSigner_URL_Call) Return(_a0 string) *CertificateValidateAndSigner_URL_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *CertificateValidateAndSigner_URL_Call) RunAndReturn(run func() string) *CertificateValidateAndSigner_URL_Call { + _c.Call.Return(run) + return _c +} + // ValidateAndSignCertificate provides a mock function with given fields: ctx, certificate, lastL2BlockInCert func (_m *CertificateValidateAndSigner) ValidateAndSignCertificate(ctx context.Context, certificate *agglayertypes.Certificate, lastL2BlockInCert uint64) ([]byte, error) { ret := _m.Called(ctx, certificate, lastL2BlockInCert) diff --git a/aggsender/types/interfaces.go b/aggsender/types/interfaces.go index fb6a207ba..0b54d2c83 100644 --- a/aggsender/types/interfaces.go +++ b/aggsender/types/interfaces.go @@ -228,6 +228,7 @@ type CertificateValidateAndSigner interface { certificate *agglayertypes.Certificate, lastL2BlockInCert uint64, ) ([]byte, error) + URL() string String() string } diff --git a/aggsender/types/multisig_committee.go b/aggsender/types/multisig_committee.go index 0f98cffd2..31c5b3dbe 100644 --- a/aggsender/types/multisig_committee.go +++ b/aggsender/types/multisig_committee.go @@ -76,29 +76,6 @@ func (m *MultisigCommittee) AddSigner(info *SignerInfo) error { return nil } -// IsThresholdReached checks if the provided signer addresses constitute a valid quorum -// (namely signers length should be at least as big as the threshold value). -// - Returns an error if any signer is not part of the committee. -// - Duplicate addresses are ignored in counting. -func (m *MultisigCommittee) IsThresholdReached(signerAddrs []common.Address) (bool, error) { - seen := make(map[common.Address]struct{}, len(signerAddrs)) - count := uint32(0) - - for _, signerAddr := range signerAddrs { - if _, exists := m.signersSet[signerAddr]; !exists { - return false, fmt.Errorf("signer %s is not in the committee", signerAddr) - } - - // Count each signer only once - if _, alreadySeen := seen[signerAddr]; !alreadySeen { - seen[signerAddr] = struct{}{} - count++ - } - } - - return count >= m.threshold, nil -} - // Threshold returns the signature threshold required for quorum. func (m *MultisigCommittee) Threshold() uint32 { return m.threshold @@ -106,8 +83,17 @@ func (m *MultisigCommittee) Threshold() uint32 { // Signers returns a shallow copy of the committee's signers slice // to prevent external modification of the internal slice. -func (m *MultisigCommittee) Signers() []*SignerInfo { - cpy := make([]*SignerInfo, len(m.signers)) - copy(cpy, m.signers) +func (m *MultisigCommittee) Signers() []SignerInfo { + cpy := make([]SignerInfo, len(m.signers)) + for i, s := range m.signers { + if s != nil { + cpy[i] = *s + } + } return cpy } + +// Size returns the committee size +func (m *MultisigCommittee) Size() int { + return len(m.signers) +} diff --git a/aggsender/types/multisig_committee_test.go b/aggsender/types/multisig_committee_test.go index 4cc1dbf55..39eca9f63 100644 --- a/aggsender/types/multisig_committee_test.go +++ b/aggsender/types/multisig_committee_test.go @@ -64,6 +64,7 @@ func TestMultisigCommittee_NewMultisigCommittee(t *testing.T) { } else { require.NoError(t, err) require.NotNil(t, mc) + require.Equal(t, len(tc.members), mc.Size()) } }) } @@ -106,61 +107,25 @@ func TestMultisigCommittee_AddSigner(t *testing.T) { } } -func TestMultisigCommittee_IsThresholdReached(t *testing.T) { - s1 := NewSignerInfo("http://localhost:8001", common.HexToAddress("0x1")) - s2 := NewSignerInfo("http://localhost:8002", common.HexToAddress("0x2")) - s3 := NewSignerInfo("http://localhost:8003", common.HexToAddress("0x3")) +func TestMultisigCommittee_Signers(t *testing.T) { + signers := []SignerInfo{ + {Address: common.HexToAddress("0x1"), URL: "http://localhost:8001"}, + {Address: common.HexToAddress("0x2"), URL: "http://localhost:8002"}, + {Address: common.HexToAddress("0x3"), URL: "http://localhost:8003"}, + } - tests := []struct { - name string - initial []*SignerInfo - threshold uint32 - signers []common.Address - wantQuorum bool - errContains string - }{ - { - name: "has quorum", - initial: []*SignerInfo{s1, s2, s3}, - threshold: 2, - signers: []common.Address{s1.Address, s2.Address}, - wantQuorum: true, - }, - { - name: "insufficient quorum", - initial: []*SignerInfo{s1, s2, s3}, - threshold: 3, - signers: []common.Address{s1.Address, s2.Address}, - wantQuorum: false, - }, - { - name: "unknown signer", - initial: []*SignerInfo{s1, s2}, - threshold: 1, - signers: []common.Address{common.HexToAddress("0x99")}, - errContains: "not in the committee", - }, - { - name: "duplicate signers ignored", - initial: []*SignerInfo{s1, s2}, - threshold: 2, - signers: []common.Address{s1.Address, s1.Address, s2.Address}, - wantQuorum: true, - }, + ptrs := make([]*SignerInfo, len(signers)) + for i := range signers { + ptrs[i] = &signers[i] } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - mc, err := NewMultisigCommittee(tc.initial, tc.threshold) - require.NoError(t, err) + mc, err := NewMultisigCommittee(ptrs, uint32(len(signers)-1)) + require.NoError(t, err) - ok, err := mc.IsThresholdReached(tc.signers) - if tc.errContains != "" { - require.ErrorContains(t, err, tc.errContains) - } else { - require.NoError(t, err) - require.Equal(t, tc.wantQuorum, ok) - } - }) - } + cpySigners := mc.Signers() + require.Equal(t, signers, cpySigners) + + // Update single signer's address + cpySigners[0].Address = common.HexToAddress("0x4") + require.NotEqual(t, signers, cpySigners) } diff --git a/aggsender/validator/local_validator.go b/aggsender/validator/local_validator.go index 8604388f7..be038c24a 100644 --- a/aggsender/validator/local_validator.go +++ b/aggsender/validator/local_validator.go @@ -41,6 +41,11 @@ func (a *LocalValidator) String() string { return "LocalValidator" } +// URL returns an URL for the LocalValidator +func (a *LocalValidator) URL() string { + return "N/A" +} + func (a *LocalValidator) HealthCheck(ctx context.Context) (*types.HealthCheckResponse, error) { return &types.HealthCheckResponse{ Status: "OK", diff --git a/aggsender/validator/remote_validator.go b/aggsender/validator/remote_validator.go index fafbc0d3c..57683408f 100644 --- a/aggsender/validator/remote_validator.go +++ b/aggsender/validator/remote_validator.go @@ -16,6 +16,7 @@ var _ types.CertificateValidateAndSigner = (*RemoteValidator)(nil) // RemoteValidator encapsulates the gRPC client and configuration // required to interact with the AggsenderValidator service. type RemoteValidator struct { + url string client types.ValidatorClient storage db.AggSenderStorage } @@ -29,6 +30,7 @@ func NewRemoteValidator(cfg *grpc.ClientConfig, storage db.AggSenderStorage) (*R } return &RemoteValidator{ + url: cfg.URL, client: client, storage: storage, }, nil @@ -36,7 +38,12 @@ func NewRemoteValidator(cfg *grpc.ClientConfig, storage db.AggSenderStorage) (*R // String returns a string representation of the RemoteValidator. func (v *RemoteValidator) String() string { - return "RemoteValidator" + return fmt.Sprintf("RemoteValidator (URL=%s)", v.url) +} + +// URL returns an URL for the remote validator +func (v *RemoteValidator) URL() string { + return v.url } // HealthCheck performs a health check on the AggsenderValidator service. diff --git a/bridgeservice/bridge.go b/bridgeservice/bridge.go index 20343e9c3..b53e500df 100644 --- a/bridgeservice/bridge.go +++ b/bridgeservice/bridge.go @@ -49,7 +49,6 @@ const ( depositCountParam = "deposit_count" fromAddressParam = "from_address" leafIndexParam = "leaf_index" - globalIndexParam = "global_index" includeAllFields = "include_all_fields" binarySearchDivider = 2 diff --git a/bridgesync/e2e_test.go b/bridgesync/e2e_test.go index 7270c8e7c..f0c4e6cfd 100644 --- a/bridgesync/e2e_test.go +++ b/bridgesync/e2e_test.go @@ -115,9 +115,7 @@ func TestBridgeEventE2E(t *testing.T) { func getFinalizedBlockNumber(t *testing.T, ctx context.Context, client simulated.Client) uint64 { t.Helper() - lastBlockFinalityType, err := aggkittypes.FinalizedBlock.ToBlockNum() - require.NoError(t, err) - lastBlockHeader, err := client.HeaderByNumber(ctx, lastBlockFinalityType) + lastBlockHeader, err := client.HeaderByNumber(ctx, aggkittypes.FinalizedBlockNum) require.NoError(t, err) return lastBlockHeader.Number.Uint64() } diff --git a/cmd/run.go b/cmd/run.go index 05d3a35ad..9fbf241ae 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -84,7 +84,7 @@ func start(cliCtx *cli.Context) error { rollupDataQuerier, err := createRollupDataQuerier(cliCtx.Context, cfg.L1NetworkConfig, components) if err != nil { - return fmt.Errorf("failed to create etherman client: %w", err) + return fmt.Errorf("failed to create rollup data querier: %w", err) } l1InfoTreeSync := runL1InfoTreeSyncerIfNeeded(cliCtx.Context, components, *cfg, l1Client) @@ -94,6 +94,7 @@ func start(cliCtx *cli.Context) error { l2GERSync := runL2GERSyncIfNeeded( cliCtx.Context, components, cfg.L2GERSync, reorgDetectorL2, l2Client, l1InfoTreeSync, ) + var rpcServices []jRPC.Service for _, component := range components { switch component { @@ -113,6 +114,13 @@ func start(cliCtx *cli.Context) error { go b.Start(cliCtx.Context) case aggkitcommon.AGGSENDER: + // TODO: Check if the RollupAddr is the right SC address, or we should use some other. + committeeQuerier, err := query.NewECDSAMultisigCommitteeQuery(cfg.L1NetworkConfig.RollupAddr, l1Client) + if err != nil { + return fmt.Errorf("failed to create ECDSA multisig committee querier (SC address: %s): %w", + cfg.L1NetworkConfig.RollupAddr, err) + } + aggsender, err := createAggSender( cliCtx.Context, cfg.AggSender, @@ -121,6 +129,7 @@ func start(cliCtx *cli.Context) error { l2BridgeSync, l2Client, rollupDataQuerier, + committeeQuerier, ) if err != nil { log.Fatal(err) @@ -321,10 +330,11 @@ func createAggSender( ctx context.Context, cfg aggsendercfg.Config, l1EthClient aggkittypes.BaseEthereumClienter, - l1InfoTreeSync *l1infotreesync.L1InfoTreeSync, - l2Syncer *bridgesync.BridgeSync, + l1InfoTreeSync aggsendertypes.L1InfoTreeSyncer, + l2Syncer aggsendertypes.L2BridgeSyncer, l2Client aggkittypes.BaseEthereumClienter, - rollupDataQuerier *etherman.RollupDataQuerier) (*aggsender.AggSender, error) { + rollupDataQuerier aggsendertypes.RollupDataQuerier, + committeeQuerier aggsendertypes.MultisigQuerier) (*aggsender.AggSender, error) { logger := log.WithFields("module", aggkitcommon.AGGSENDER) if err := cfg.Validate(); err != nil { @@ -362,35 +372,11 @@ func createAggSender( go epochNotifier.Start(ctx) aggsender, err := aggsender.New(ctx, logger, cfg, agglayerClient, - l1InfoTreeSync, l2Syncer, epochNotifier, l1EthClient, l2Client, rollupDataQuerier) + l1InfoTreeSync, l2Syncer, epochNotifier, l1EthClient, l2Client, rollupDataQuerier, committeeQuerier) if err != nil { return nil, fmt.Errorf("failed to create AggSender: %w", err) } - // validatorObj is only supported in PessimisticProof mode - var validatorObj aggsendertypes.CertificateValidateAndSigner - if cfg.RequireValidatorCall { - validatorObj, err = validator.NewRemoteValidator(cfg.ValidatorClient, aggsender.GetStorage()) - if err != nil { - return nil, fmt.Errorf("failed to create RemoteValidatorClient: %w", err) - } - } else { - // this is only temporary, until we test it in local, then we will only use the remote validator - validatorObj = validator.NewLocalValidator( - logger, - aggsender.GetStorage(), - validator.NewAggsenderValidator( - logger, - aggsender.GetFlow(), - query.NewL1InfoTreeDataQuerier(l1EthClient, l1InfoTreeSync), - aggsender.GetCertQuerier(), - aggsender.GetLERQuerier(), - ), - ) - } - - aggsender.AttachValidator(validatorObj) - return aggsender, nil } diff --git a/grpc/client.go b/grpc/client.go index 30b39ad18..7035ecb08 100644 --- a/grpc/client.go +++ b/grpc/client.go @@ -73,6 +73,14 @@ type ClientConfig struct { Retry *RetryConfig `mapstructure:"Retry"` } +// WithURL returns a copy of the current ClientConfig with the URL field set to the given value. +// This method does not modify the original ClientConfig. +func (c ClientConfig) WithURL(url string) ClientConfig { + newCfg := c + newCfg.URL = url + return newCfg +} + // DefaultConfig returns a default configuration for the gRPC client func DefaultConfig() *ClientConfig { return &ClientConfig{ diff --git a/types/block_finality.go b/types/block_finality.go index 1e0961757..1631374cb 100644 --- a/types/block_finality.go +++ b/types/block_finality.go @@ -24,6 +24,14 @@ var ( EarliestBlock = BlockNumberFinality{"EarliestBlock"} ) +var ( + SafeBlockNum = big.NewInt(int64(Safe)) + FinalizedBlockNum = big.NewInt(int64(Finalized)) + LatestBlockNum = big.NewInt(int64(Latest)) + PendingBlockNum = big.NewInt(int64(Pending)) + EarliestBlockNum = big.NewInt(int64(Earliest)) +) + func (b *BlockNumberFinality) ToBlockNum() (*big.Int, error) { switch strings.ToUpper(b.String()) { case strings.ToUpper(FinalizedBlock.String()):