Skip to content
Merged
204 changes: 159 additions & 45 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

jRPC "github.com/0xPolygon/cdk-rpc/rpc"
Expand All @@ -19,6 +20,7 @@ import (
aggsenderrpc "github.com/agglayer/aggkit/aggsender/rpc"
"github.com/agglayer/aggkit/aggsender/statuschecker"
"github.com/agglayer/aggkit/aggsender/types"
"github.com/agglayer/aggkit/aggsender/validator"
aggkitcommon "github.com/agglayer/aggkit/common"
"github.com/agglayer/aggkit/db/compatibility"
"github.com/agglayer/aggkit/log"
Expand All @@ -43,6 +45,11 @@ type AggSender struct {
certStatusChecker types.CertificateStatusChecker
certQuerier types.CertificateQuerier
rollupDataQuerier types.RollupDataQuerier
committeeQuerier types.MultisigQuerier

l1Client aggkittypes.BaseEthereumClienter
l1InfoTreeSyncer types.L1InfoTreeSyncer
localValidator types.CertificateValidateAndSigner

cfg config.Config

Expand All @@ -51,8 +58,6 @@ type AggSender struct {
flow types.AggsenderFlow

l2OriginNetwork uint32

validator types.CertificateValidateAndSigner
}

// New returns a new AggSender instance
Expand All @@ -67,6 +72,7 @@ func New(
l1Client aggkittypes.BaseEthereumClienter,
l2Client aggkittypes.BaseEthereumClienter,
rollupDataQuerier types.RollupDataQuerier,
committeeQuerier types.MultisigQuerier,
) (*AggSender, error) {
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: cfg.StoragePath,
Expand Down Expand Up @@ -130,33 +136,14 @@ func New(
l2OriginNetwork: l2OriginNetwork,
certQuerier: certQuerier,
rollupDataQuerier: rollupDataQuerier,
committeeQuerier: committeeQuerier,
certStatusChecker: statuschecker.NewCertStatusChecker(
logger, storage, aggLayerClient, certQuerier, l2OriginNetwork),
l1Client: l1Client,
l1InfoTreeSyncer: l1InfoTreeSyncer,
}, nil
}

func (a *AggSender) AttachValidator(validator types.CertificateValidateAndSigner) {
if validator == nil {
a.log.Infof("AggSender validator attached: nil")
return
}

a.validator = validator
a.log.Infof("AggSender validator attached: %s", a.validator.String())
}

func (a *AggSender) GetStorage() db.AggSenderStorage {
return a.storage
}

func (a *AggSender) GetFlow() types.AggsenderFlow {
return a.flow
}

func (a *AggSender) GetCertQuerier() types.CertificateQuerier {
return a.certQuerier
}

func (a *AggSender) GetLERQuerier() types.LERQuerier {
return query.NewLERDataQuerier(a.cfg.RollupCreationBlockL1, a.rollupDataQuerier)
}
Expand All @@ -177,7 +164,7 @@ func (a *AggSender) GetRPCServices() []jRPC.Service {
return []jRPC.Service{}
}

logger := log.WithFields("aggsender-rpc", aggkitcommon.BRIDGE)
logger := log.WithFields("module", "aggsender-rpc")
return []jRPC.Service{
{
Name: "aggsender",
Expand All @@ -197,16 +184,23 @@ func (a *AggSender) Start(ctx context.Context) {
if err := a.flow.CheckInitialStatus(ctx); err != nil {
a.log.Panicf("error checking flow Initial Status: %v", err)
}
if a.validator != nil {
status, err := a.validator.HealthCheck(ctx)
if err != nil {
a.log.Warnf("error checking validator health: %v", err)
}
if !status.IsHealthy() {
a.log.Warnf("validator status is not healthy: %s", status.String())
}
a.log.Infof("validator health check: %s", status.String())

// TODO: The local validator implementation is there solely for testing purposes
// and it should be removed after integration testing is done.
if !a.cfg.RequireValidatorCall {
a.localValidator = validator.NewLocalValidator(
a.log,
a.storage,
validator.NewAggsenderValidator(
a.log,
a.flow,
query.NewL1InfoTreeDataQuerier(a.l1Client, a.l1InfoTreeSyncer),
a.certQuerier,
a.GetLERQuerier(),
),
)
}

a.sendCertificates(ctx, 0)
}

Expand Down Expand Up @@ -342,7 +336,24 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi
return nil, fmt.Errorf("error building certificate: %w", err)
}

validatorSignature, err := a.callValidator(ctx, certificate, certificateParams.ToBlock)
var (
validators []types.CertificateValidateAndSigner
signaturesThreshold uint32
)

if a.localValidator != nil {
validators = append(validators, a.localValidator)
signaturesThreshold = 1
} else {
validators, signaturesThreshold, err = a.getValidators(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get validators: %w", err)
}
}

multisig, err := a.pollValidators(ctx,
validators, signaturesThreshold,
certificate, certificateParams.ToBlock)
if err != nil {
// TODO - agglayer has not yet implemented the endpoints needed to validate a certificate
// so lets just log the error and continue. This will be changed when the agglayer is ready
Expand All @@ -365,7 +376,12 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi
time.Sleep(*rateLimitSleepTime)
}

certificateHash, err := a.aggLayerClient.SendCertificate(ctx, certificate, validatorSignature)
// TODO: Update once agglayer endpoint changes to accept the multisig
var signature []byte
if len(multisig) > 0 {
signature = multisig[0]
}
certificateHash, err := a.aggLayerClient.SendCertificate(ctx, certificate, signature)
if err != nil {
a.saveNonAcceptedCert(ctx, certificate, certificateParams.CreatedAt, err)

Expand Down Expand Up @@ -416,24 +432,122 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayertypes.Certifi
return certificate, nil
}

// callValidator calls the validator to validate the certificate
func (a *AggSender) callValidator(
// pollValidators committee signers to validate the certificate and gather signatures
func (a *AggSender) pollValidators(
ctx context.Context,
validators []types.CertificateValidateAndSigner,
signaturesThreshold uint32,
certificate *agglayertypes.Certificate,
lastL2BlockInCert uint64) ([]byte, error) {
if a.validator == nil {
lastL2BlockInCert uint64,
) ([][]byte, error) {
if len(validators) == 0 {
a.log.Warnf("skipping certificate validation, because there are no validators configured")
return nil, nil
}

a.log.Infof("delegating certificate validation: %s", certificate.Brief())
validatorSignature, err := a.validator.ValidateAndSignCertificate(ctx, certificate, lastL2BlockInCert)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

type signResult struct {
signature []byte
err error
}

resultsCh := make(chan signResult, len(validators))
var wg sync.WaitGroup

for _, v := range validators {
wg.Add(1)
go func(v types.CertificateValidateAndSigner) {
defer wg.Done()

select {
case <-ctx.Done():
return
default:
}

status, err := v.HealthCheck(ctx)
if err != nil {
a.log.Warnf("error checking validator health (URL=%s): %v", v.URL(), err)
resultsCh <- signResult{err: err}
return
}

if !status.IsHealthy() {
a.log.Warnf("validator (URL=%s) is not healthy: %s, skipping it...", v.URL(), status.String())
return // skip unhealthy validator
}

a.log.Infof("validator health check (URL=%s): %s", v.URL(), status.String())

sig, err := v.ValidateAndSignCertificate(ctx, certificate, lastL2BlockInCert)
if err != nil {
a.log.Errorf("validator %v failed to validate the certificate: %v", v, err)
resultsCh <- signResult{err: err}
return
}

resultsCh <- signResult{signature: sig}
}(v)
}

go func() {
wg.Wait()
close(resultsCh)
}()

signatures := make([][]byte, 0, len(validators))
var errs []error

for res := range resultsCh {
if res.err != nil {
errs = append(errs, res.err)
continue
}

signatures = append(signatures, res.signature)
if uint32(len(signatures)) >= signaturesThreshold {
cancel() // signal other goroutines to stop early
}
}

if uint32(len(signatures)) < signaturesThreshold {
if len(errs) > 0 {
return signatures, errors.Join(errs...)
}
return signatures, fmt.Errorf("threshold not reached: %d/%d", len(signatures), signaturesThreshold)
}

a.log.Infof("certificate validation passed with %d/%d signatures: %s",
len(signatures), signaturesThreshold, certificate.Brief())

return signatures, nil
}

// getValidators retrieves the actual multisig committee and creates a set of the validators
// that are going to validate the provided certificate
func (a *AggSender) getValidators(ctx context.Context) ([]types.CertificateValidateAndSigner, uint32, error) {
committee, err := a.committeeQuerier.GetMultisigCommittee(ctx, aggkittypes.LatestBlockNum)
if err != nil {
a.log.Errorf("certificate validation failed: %w. Cert: %s", err, certificate.Brief())
return nil, fmt.Errorf("certificate validation failed: %w", err)
return nil, 0, fmt.Errorf("failed to retrieve the latest multisig committee: %w", err)
}

a.log.Infof("certificate validation passed: %s", certificate.Brief())
validators := make([]types.CertificateValidateAndSigner, 0, committee.Size())
for _, signer := range committee.Signers() {
clientCfg := a.cfg.ValidatorClient.WithURL(signer.URL)
validator, err := validator.NewRemoteValidator(&clientCfg, a.storage)
if err != nil {
return nil, 0, fmt.Errorf("failed to create a remote validator for committee signer (Address=%s, URL=%s): %w",
signer.Address, signer.URL, err)
}

validators = append(validators, validator)
}

return validatorSignature, nil
return validators, committee.Threshold(), nil
}

// saveCertificateToStorage saves the certificate to the storage
Expand Down
Loading
Loading