Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions aggsender/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/agglayer/aggkit/l1infotreesync"
"github.com/agglayer/aggkit/l2gersync"
treetypes "github.com/agglayer/aggkit/tree/types"
aggkittypes "github.com/agglayer/aggkit/types"
signertypes "github.com/agglayer/go_signer/signer/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -80,7 +79,6 @@ type L2BridgeSyncer interface {
GetBridges(ctx context.Context, fromBlock, toBlock uint64) ([]bridgesync.Bridge, error)
GetClaims(ctx context.Context, fromBlock, toBlock uint64) ([]bridgesync.Claim, error)
OriginNetwork() uint32
BlockFinality() aggkittypes.BlockNumberFinality
GetLastProcessedBlock(ctx context.Context) (uint64, error)
GetExitRootByHash(ctx context.Context, root common.Hash) (*treetypes.Root, error)
GetClaimsByGlobalIndex(ctx context.Context, globalIndex *big.Int) ([]bridgesync.Claim, error)
Expand Down
17 changes: 6 additions & 11 deletions bridgesync/bridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type BridgeSync struct {

originNetwork uint32
reorgDetector ReorgDetector
blockFinality aggkittypes.BlockNumberFinality
ethClient aggkittypes.EthClienter
bridgeContractV2 *polygonzkevmbridgev2.Polygonzkevmbridgev2
}
Expand All @@ -73,15 +72,15 @@ type noOpReorgDetectorWrapper struct {
func NewL1(
ctx context.Context,
cfg Config,
blockFinalityType aggkittypes.BlockNumberFinality,
blockFinality aggkittypes.BlockNumberFinality,
ethClient aggkittypes.EthClienter,
originNetwork uint32,
syncFullClaims bool,
) (*BridgeSync, error) {
return newBridgeSync(
ctx,
cfg,
blockFinalityType,
blockFinality,
&noOpReorgDetectorWrapper{*reorgdetector.NewNoOpReorgDetector()},
ethClient,
L1BridgeSyncer,
Expand Down Expand Up @@ -132,7 +131,7 @@ func NewL2(
func newBridgeSync(
ctx context.Context,
cfg Config,
blockFinalityType aggkittypes.BlockNumberFinality,
blockFinality aggkittypes.BlockNumberFinality,
rd ReorgDetector,
ethClient aggkittypes.EthClienter,
syncerID BridgeSyncerType,
Expand Down Expand Up @@ -192,7 +191,7 @@ func newBridgeSync(
syncerID.String(),
ethClient,
cfg.SyncBlockChunkSize,
cfg.BlockFinality,
blockFinality,
cfg.WaitForNewBlocksPeriod.Duration,
appender,
[]common.Address{cfg.BridgeAddr},
Expand Down Expand Up @@ -228,6 +227,7 @@ func newBridgeSync(
"%s created:\n"+
" dbPath: %s\n"+
" initialBlock: %d\n"+
" blockFinality: %s\n"+
" bridgeAddr: %s\n"+
" syncFullClaims: %t\n"+
" maxRetryAttemptsAfterError: %d\n"+
Expand All @@ -238,6 +238,7 @@ func newBridgeSync(
syncerID,
cfg.DBPath,
cfg.InitialBlockNum,
blockFinality.String(),
cfg.BridgeAddr.String(),
syncFullClaims,
cfg.MaxRetryAttemptsAfterError,
Expand All @@ -253,7 +254,6 @@ func newBridgeSync(
downloader: downloader,
originNetwork: networkID,
reorgDetector: rd,
blockFinality: blockFinalityType,
ethClient: ethClient,
bridgeContractV2: bridgeContractV2,
}, nil
Expand Down Expand Up @@ -408,11 +408,6 @@ func (s *BridgeSync) OriginNetwork() uint32 {
return s.originNetwork
}

// BlockFinality returns the block finality type
func (s *BridgeSync) BlockFinality() aggkittypes.BlockNumberFinality {
return s.blockFinality
}

type LastReorg struct {
DetectedAt int64 `json:"detected_at"`
FromBlock uint64 `json:"from_block"`
Expand Down
2 changes: 0 additions & 2 deletions bridgesync/bridgesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func TestNewLx(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, l1BridgeSync)
require.Equal(t, originNetwork, l1BridgeSync.OriginNetwork())
require.Equal(t, blockFinalityType, l1BridgeSync.BlockFinality())

bridgeSyncL2Cfg := Config{
DBPath: dbPath,
Expand All @@ -107,7 +106,6 @@ func TestNewLx(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, l1BridgeSync)
require.Equal(t, originNetwork, l2BridgdeSync.OriginNetwork())
require.Equal(t, blockFinalityType, l2BridgdeSync.BlockFinality())

// Fails the sanity check of the contract address
mockEthClient = mocksethclient.NewEthClienter(t)
Expand Down
10 changes: 5 additions & 5 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,20 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
if rd.IsDisabled() {
return nil
}
// Get the latest finalized block

// Get the latest finalized block
blockNumber, err := rd.finalizedBlockType.BlockNumber(ctx, rd.client)
if err != nil {
return fmt.Errorf("failed to get the latest finalized block: %w", err)
}
lastFinalisedBlock, err := rd.client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNumber))
lastFinalizedBlock, err := rd.client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil {
return fmt.Errorf("failed to get the header %d. Err: %w", blockNumber, err)
}
var (
headersCacheLock sync.Mutex
headersCache = map[uint64]*types.Header{
lastFinalisedBlock.Number.Uint64(): lastFinalisedBlock,
lastFinalizedBlock.Number.Uint64(): lastFinalizedBlock,
}
errGroup errgroup.Group
)
Expand All @@ -180,7 +180,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
continue
}

rd.log.Debugf("Checking reorgs in tracked blocks up to block %d", lastFinalisedBlock.Number.Uint64())
rd.log.Debugf("Checking reorgs in tracked blocks up to block %d", lastFinalizedBlock.Number.Uint64())

errGroup.Go(func() error {
headers := hdrs.getSorted()
Expand All @@ -202,7 +202,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
if hdr.Hash == currentHeader.Hash() {
// Delete block from the tracked blocks list if it is less than or equal to the last finalized block
// and hashes matches. If higher than finalized block, we assume a reorg still might happen.
if hdr.Num <= lastFinalisedBlock.Number.Uint64() {
if hdr.Num <= lastFinalizedBlock.Number.Uint64() {
hdrs.removeRange(hdr.Num, hdr.Num)

if err := rd.removeTrackedBlockRange(id, hdr.Num, hdr.Num); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion scripts/local_config_pp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function export_values_of_aggkit_config() {
function export_ports_from_kurtosis() {
common_export_ports_from_kurtosis
export_portnum_from_kurtosis_or_fail op_el_rpc_port op-el-1-op-geth-op-node-001 rpc
export op_el_rpc_url="http://localhost:${op_el_rpc_port}"
export op_el_rpc_url="http://localhost:${op_el_rpc_port}"
export l2_rpc_url=$op_el_rpc_url
export agglayer_grpc_url="http://localhost:${agglayer_grpc_port}"
}
Expand Down
25 changes: 12 additions & 13 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,24 @@ func NewEVMDownloader(
rh *RetryHandler,
finalizedBlockType aggkittypes.BlockNumberFinality,
) (*EVMDownloader, error) {
logger := log.WithFields("syncer", syncerID)

fbtEthermanType := finalizedBlockType
if finality.IsEmpty() {
return nil, fmt.Errorf("block finality must be set")
}

if finalizedBlockType.GreaterThan(&finality) {
fbtEthermanType = finality
// if someone configured the syncer to query blocks by Safe or Finalized block
// finalized block type should be at least the same as the block finality
logger.Warnf("finalized block type %s is greater than block finality %s, setting finalized block type to %s",
finalizedBlockType.String(), finality.String(), fbtEthermanType.String())
logger := log.WithFields("syncer", syncerID)
if finalizedBlockType.LessFinalThan(finality) {
finalizedBlockType = finality
logger.Warnf("finalized block type %s is less final than block finality %s, setting finalized block type to %s",
finalizedBlockType.String(), finality.String(), finalizedBlockType.String())
}

logger.Infof("downloader initialized with block finality: %s, finalized block type: %s. SyncChunkSize: %d",
finality.String(), fbtEthermanType.String(), syncBlockChunkSize)
finality.String(), finalizedBlockType.String(), syncBlockChunkSize)

return &EVMDownloader{
syncBlockChunkSize: syncBlockChunkSize,
log: logger,
finalizedBlockType: &fbtEthermanType,
finalizedBlockType: &finalizedBlockType,
addressesToQuery: addressesToQuery,
EVMDownloaderInterface: NewEVMDownloaderImplementation(
syncerID,
Expand Down Expand Up @@ -202,7 +201,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
func (d *EVMDownloader) reportBlocks(downloadedCh chan EVMBlock, blocks EVMBlocks, lastFinalizedBlock uint64) {
for _, block := range blocks {
d.log.Debugf("sending block %d to the driver (with events)", block.Num)
block.IsFinalizedBlock = d.finalizedBlockType.IsFinalized() && block.Num <= lastFinalizedBlock
block.IsFinalizedBlock = block.Num <= lastFinalizedBlock
downloadedCh <- *block
}
}
Expand All @@ -217,7 +216,7 @@ func (d *EVMDownloader) reportEmptyBlock(ctx context.Context, downloadedCh chan
}

downloadedCh <- EVMBlock{
IsFinalizedBlock: d.finalizedBlockType.IsFinalized() && header.Num <= lastFinalizedBlock,
IsFinalizedBlock: header.Num <= lastFinalizedBlock,
EVMBlockHeader: header,
}
}
Expand Down
4 changes: 2 additions & 2 deletions sync/evmdownloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,8 @@ func TestGetLogs(t *testing.T) {
},
}
ctx := context.TODO()
// First call times out (after 40 seconds, which is longer than the 30-second timeout)
mockEthClient.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("network error %w", context.DeadlineExceeded)).After(time.Second * 40).Once()
// First call times out
mockEthClient.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("network error %w", context.DeadlineExceeded)).After(10 * time.Millisecond).Once()
// Second call succeeds after retry
mockEthClient.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return(nil, nil).Once()
logs := sut.GetLogs(ctx, 0, 1)
Expand Down
20 changes: 12 additions & 8 deletions types/block_finality.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,22 @@ func (BlockNumberFinality) JSONSchema() *jsonschema.Schema {
}
}

// IsEmpty returns true if v is empty
// IsEmpty returns true if b is empty
func (b BlockNumberFinality) IsEmpty() bool {
return b.Block == Empty
}

// IsLessThan returns true if v is less than other
// IsFinalized returns true if b is finalized
func (b BlockNumberFinality) IsFinalized() bool {
return b.Block == Finalized
}

// IsSafe returns true if b is safe
func (b BlockNumberFinality) IsSafe() bool {
return b.Block == Safe
}

// IsLatest returns true if b is latest with non-negative offset
func (b BlockNumberFinality) IsLatest() bool {
return b.Block == Latest && b.Offset >= 0
}
Expand All @@ -126,11 +129,12 @@ func (b *BlockNumberFinality) BlockNumber(ctx context.Context, requester ethereu
return b.Block.ApplyOffset(blockHeader.Number.Uint64(), b.Offset), nil
}

// IsGreaterThan returns true if v is greater than other
// earliest ≀ finalized ≀ safe ≀ latest ≀ pending
func (b *BlockNumberFinality) GreaterThan(other *BlockNumberFinality) bool {
if b == nil || other == nil {
return false
// LessFinalThan returns true if b is less strict commitment level than other.
// In case commitment level keywords are the same, it compares the offsets.
// finalized ≀ safe ≀ latest ≀ pending
func (b *BlockNumberFinality) LessFinalThan(other BlockNumberFinality) bool {
if b == nil {
return true
}
if blockOrder[b.Block] > blockOrder[other.Block] {
return true
Expand All @@ -144,7 +148,7 @@ func (b *BlockNumberFinality) GreaterThan(other *BlockNumberFinality) bool {
type BlockNumber int64

var (
blockOrder = map[BlockNumber]int{Finalized: 1, Safe: 2, Latest: 3, Pending: 4, Empty: 0} //nolint:mnd
blockOrder = map[BlockNumber]int{Finalized: 1, Safe: 2, Latest: 3, Pending: 4, Empty: 5} //nolint:mnd
)

const (
Expand Down
Loading
Loading