Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 12 additions & 138 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"time"

jRPC "github.com/0xPolygon/cdk-rpc/rpc"
Expand Down Expand Up @@ -46,7 +44,7 @@ type AggSender struct {
certStatusChecker types.CertificateStatusChecker
certQuerier types.CertificateQuerier
rollupDataQuerier types.RollupDataQuerier
committeeQuerier types.MultisigQuerier
validatorPoller types.ValidatorPoller

l1Client aggkittypes.BaseEthereumClienter
l1InfoTreeSyncer types.L1InfoTreeSyncer
Expand Down Expand Up @@ -138,7 +136,13 @@ func New(
l2OriginNetwork: l2OriginNetwork,
certQuerier: certQuerier,
rollupDataQuerier: rollupDataQuerier,
committeeQuerier: committeeQuerier,
validatorPoller: NewValidatorPoller(
logger,
storage,
flowManager.Signer(),
committeeQuerier,
cfg.ValidatorClient,
),
certStatusChecker: statuschecker.NewCertStatusChecker(
logger, storage, aggLayerClient, certQuerier, l2OriginNetwork),
l1Client: l1Client,
Expand Down Expand Up @@ -345,13 +349,10 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi
a.log.Warnf("local validation of certificate failed: %v. Cert: %s", err, certificate.Brief())
}
} else {
validators, signaturesThreshold, err := a.getValidators(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get validators: %w", err)
}

multisig, err = a.pollValidatorCommittee(ctx, validators, signaturesThreshold,
certificate, certificateParams.ToBlock)
multisig, err = a.validatorPoller.PollValidators(ctx, &types.ValidationRequest{
Certificate: certificate,
LastL2BlockInCert: certificateParams.ToBlock,
})
if err != nil {
return nil, fmt.Errorf("error polling validator committee: %w", err)
}
Expand Down Expand Up @@ -427,133 +428,6 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi
return certificate, nil
}

// pollValidatorCommittee invokes validator committee members to validate and sign the certificate
func (a *AggSender) pollValidatorCommittee(
ctx context.Context,
validators []types.CertificateValidateAndSigner,
signaturesThreshold uint32,
certificate *agglayertypes.Certificate,
lastL2BlockInCert uint64,
) (*agglayertypes.Multisig, error) {
if len(validators) == 0 {
a.log.Warnf("skipping certificate validation, because there are no validators configured")
return nil, nil
}

a.log.Infof("delegating certificate validation: %s", certificate.Brief())

ctx, cancel := context.WithCancel(ctx)
defer cancel()

type signResult struct {
signature []byte
err error
validator types.CertificateValidateAndSigner
}

resultsCh := make(chan signResult, len(validators))
var wg sync.WaitGroup

start := time.Now()

for _, v := range validators {
wg.Add(1)
go func(v types.CertificateValidateAndSigner) {
defer wg.Done()

select {
case <-ctx.Done():
return
default:
}

sig, err := v.ValidateAndSignCertificate(ctx, certificate, lastL2BlockInCert)
if err != nil {
a.log.Errorf("validator %s failed to validate the certificate: %v", v.String(), err)
resultsCh <- signResult{err: err, validator: v}
return
}

resultsCh <- signResult{signature: sig, validator: v}
}(v)
}

go func() {
wg.Wait()
close(resultsCh)
metrics.ValidateTime(time.Since(start).Seconds())
}()

multisig := &agglayertypes.Multisig{
Signatures: make([]agglayertypes.ECDSAMultisigEntry, 0, len(validators)),
}
errs := make([]error, 0)

for res := range resultsCh {
if res.err != nil {
errs = append(errs, res.err)
metrics.ValidatorError(res.validator.Address())

continue
}

if len(res.signature) != aggkitcommon.SignatureSize {
a.log.Errorf("validator %v returned an invalid signature with length %d",
res.validator.String(), len(res.signature))
metrics.ValidatorInvalidSignature(res.validator.Address())

errs = append(errs, fmt.Errorf("validator %s returned an invalid signature with length %d",
res.validator.String(), len(res.signature)))

continue
}

multisig.Signatures = append(multisig.Signatures, agglayertypes.ECDSAMultisigEntry{
Index: res.validator.Index(),
Signature: res.signature,
})

if uint32(len(multisig.Signatures)) >= signaturesThreshold {
cancel() // signal other goroutines to stop early
}
}

if uint32(len(multisig.Signatures)) < signaturesThreshold {
metrics.MultiSigThresholdNotReached()

return nil, fmt.Errorf("threshold not reached: %d/%d. Errors: %w",
len(multisig.Signatures), signaturesThreshold, errors.Join(errs...))
}

a.log.Infof("certificate validation passed with %d/%d signatures: %s",
len(multisig.Signatures), signaturesThreshold, certificate.Brief())

return multisig, nil
}

// getValidators retrieves the actual multisig committee and creates a set of the validators
// that are going to validate the provided certificate
func (a *AggSender) getValidators(ctx context.Context) ([]types.CertificateValidateAndSigner, uint32, error) {
committee, err := a.committeeQuerier.GetMultisigCommittee(ctx, big.NewInt(int64(aggkittypes.Latest)))
if err != nil {
return nil, 0, fmt.Errorf("failed to retrieve the latest multisig committee: %w", err)
}

validators := make([]types.CertificateValidateAndSigner, 0, committee.Size())
for i, signer := range committee.Signers() {
clientCfg := a.cfg.ValidatorClient.WithURL(signer.URL)
validator, err := validator.NewRemoteValidator(&clientCfg, a.storage, signer.Address, uint32(i))
if err != nil {
return nil, 0, fmt.Errorf("failed to create a remote validator for committee signer (Address=%s, URL=%s): %w",
signer.Address, signer.URL, err)
}

validators = append(validators, validator)
}

return validators, committee.Threshold(), nil
}

// saveCertificateToStorage saves the certificate to the storage
// it retries if it fails. if param retries == 0 it retries indefinitely
func (a *AggSender) saveCertificateToStorage(ctx context.Context, cert types.Certificate, maxRetries int) error {
Expand Down
188 changes: 0 additions & 188 deletions aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/agglayer/aggkit/aggsender/flows"
"github.com/agglayer/aggkit/aggsender/mocks"
aggsendertypes "github.com/agglayer/aggkit/aggsender/types"
"github.com/agglayer/aggkit/aggsender/validator"
"github.com/agglayer/aggkit/bridgesync"
aggkitcommon "github.com/agglayer/aggkit/common"
"github.com/agglayer/aggkit/config/types"
Expand Down Expand Up @@ -495,82 +494,6 @@ func TestSendCertificate(t *testing.T) {
}
}

func TestGetValidators(t *testing.T) {
allSigners := []*aggsendertypes.SignerInfo{
aggsendertypes.NewSignerInfo("http://localhost:8001", common.HexToAddress("0x1")),
aggsendertypes.NewSignerInfo("http://localhost:8002", common.HexToAddress("0x2")),
aggsendertypes.NewSignerInfo("http://localhost:8003", common.HexToAddress("0x3")),
aggsendertypes.NewSignerInfo("http://localhost:8004", common.HexToAddress("0x4")),
aggsendertypes.NewSignerInfo("http://localhost:8005", common.HexToAddress("0x5")),
aggsendertypes.NewSignerInfo("http://localhost:8006", common.HexToAddress("0x6")),
}

testCases := []struct {
name string
signers []*aggsendertypes.SignerInfo
expectedValidatorsFn func(*testing.T, []*aggsendertypes.SignerInfo) []aggsendertypes.CertificateValidateAndSigner
expectedThreshold uint32
expectedError string
}{
{
name: "successful return of committee validators",
signers: allSigners[:len(allSigners)/2],
expectedThreshold: uint32(len(allSigners) / 2),
expectedValidatorsFn: func(t *testing.T,
signers []*aggsendertypes.SignerInfo) []aggsendertypes.CertificateValidateAndSigner {
t.Helper()

validators := make([]aggsendertypes.CertificateValidateAndSigner, 0, len(signers))
for i, signer := range signers {
validator, err := validator.NewRemoteValidator(&grpc.ClientConfig{URL: signer.URL}, nil, signer.Address, uint32(i))
require.NoError(t, err)
validators = append(validators, validator)
}
return validators
},
},
{
name: "failed to query the committee",
expectedError: "invalid parameters",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
committeeQuerierMock := mocks.NewMultisigQuerier(t)

if tc.expectedError == "" {
committee, err := aggsendertypes.NewMultisigCommittee(tc.signers, uint32(len(tc.signers)))
require.NoError(t, err)
committeeQuerierMock.EXPECT().
GetMultisigCommittee(mock.Anything, mock.Anything).
Return(committee, nil)
} else {
committeeQuerierMock.EXPECT().
GetMultisigCommittee(mock.Anything, mock.Anything).
Return(nil, errors.New(tc.expectedError))
}

aggsender := &AggSender{
cfg: config.Config{ValidatorClient: &grpc.ClientConfig{}},
committeeQuerier: committeeQuerierMock,
}

validators, threshold, err := aggsender.getValidators(t.Context())
if tc.expectedError != "" {
require.ErrorContains(t, err, tc.expectedError)
} else {
expectedValidators := tc.expectedValidatorsFn(t, tc.signers)
require.Len(t, validators, len(tc.signers))
for i, v := range expectedValidators {
require.Equal(t, v.URL(), validators[i].URL())
}
require.Equal(t, tc.expectedThreshold, threshold)
}
})
}
}

func TestNewAggSender(t *testing.T) {
mockBridgeSyncer := mocks.NewL2BridgeSyncer(t)
mockRollupQuerier := mocks.NewRollupDataQuerier(t)
Expand Down Expand Up @@ -779,117 +702,6 @@ func TestSendCertificates(t *testing.T) {
}
}

func TestPollValidators(t *testing.T) {
t.Parallel()

certificate := &agglayertypes.Certificate{
NetworkID: 1,
Height: 1,
}

tests := []struct {
name string
setupMocks func() ([]aggsendertypes.CertificateValidateAndSigner, uint32)
expectedMinSigs int
expectErrSubstring string
}{
{
name: "no validators configured",
expectedMinSigs: 0,
},
{
name: "single healthy validator returns valid signature",
setupMocks: func() ([]aggsendertypes.CertificateValidateAndSigner, uint32) {
mockValidator := mocks.NewCertificateValidateAndSigner(t)
mockValidator.EXPECT().Index().Return(uint32(1))
mockValidator.EXPECT().
ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything).
Return(make([]byte, aggkitcommon.SignatureSize), nil).
Once()
return []aggsendertypes.CertificateValidateAndSigner{mockValidator}, 1
},
expectedMinSigs: 1,
},
{
name: "multiple validators reach threshold",
setupMocks: func() ([]aggsendertypes.CertificateValidateAndSigner, uint32) {
v1 := mocks.NewCertificateValidateAndSigner(t)
v2 := mocks.NewCertificateValidateAndSigner(t)
v3 := mocks.NewCertificateValidateAndSigner(t)

for i, v := range [](*mocks.CertificateValidateAndSigner){v1, v2, v3} {
v.EXPECT().Index().Return(uint32(i))
v.EXPECT().
ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything).
Return(make([]byte, aggkitcommon.SignatureSize), nil).
Times(1)
}

validators := []aggsendertypes.CertificateValidateAndSigner{v1, v2, v3}
return validators, 2
},
expectedMinSigs: 2,
},
{
name: "threshold not reached",
setupMocks: func() ([]aggsendertypes.CertificateValidateAndSigner, uint32) {
v1 := mocks.NewCertificateValidateAndSigner(t)
v2 := mocks.NewCertificateValidateAndSigner(t)
v3 := mocks.NewCertificateValidateAndSigner(t)

for i, v := range [](*mocks.CertificateValidateAndSigner){v1, v2, v3} {
v.EXPECT().String().Return(fmt.Sprintf("validator-%d", i))
v.EXPECT().Address().Return(common.HexToAddress(fmt.Sprintf("0x%d", i+1)))
v.EXPECT().
ValidateAndSignCertificate(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("validation failed")).
Times(1)
}

validators := []aggsendertypes.CertificateValidateAndSigner{v1, v2, v3}
return validators, 2
},
expectErrSubstring: "threshold not reached",
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

var (
validators []aggsendertypes.CertificateValidateAndSigner
threshold uint32
)

if tc.setupMocks != nil {
validators, threshold = tc.setupMocks()
}

agg := &AggSender{
log: log.WithFields("test", "pollValidators"),
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

multiSig, err := agg.pollValidatorCommittee(ctx, validators, threshold, certificate, 0)

if tc.expectErrSubstring != "" {
require.ErrorContains(t, err, tc.expectErrSubstring)
} else if len(validators) == 0 {
require.Nil(t, multiSig)
require.NoError(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, multiSig)
require.GreaterOrEqual(t, len(multiSig.Signatures), tc.expectedMinSigs)
}
})
}
}

type testDataFlags = int

const (
Expand Down
Loading
Loading