From 86c1b57a2411be37ae1d8a68c0dd44267b469cbd Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 25 Apr 2024 20:12:12 +0100 Subject: [PATCH 1/7] added limit for saving data to node memory --- erigon-lib/diagnostics/client.go | 3 +- erigon-lib/diagnostics/entities.go | 4 +- erigon-lib/diagnostics/network.go | 223 ++++++++++++++++++------- erigon-lib/diagnostics/network_test.go | 163 ++++++++++++++++++ 4 files changed, 333 insertions(+), 60 deletions(-) create mode 100644 erigon-lib/diagnostics/network_test.go diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index a16c3389ac9..ef1769f7578 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -16,7 +16,7 @@ type DiagnosticClient struct { mu sync.Mutex headerMutex sync.Mutex hardwareInfo HardwareInfo - peersSyncMap sync.Map + peersStats PeerStats headers Headers bodies BodiesInfo bodiesMutex sync.Mutex @@ -37,6 +37,7 @@ func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string) *Diagnos resourcesUsage: ResourcesUsage{ MemoryUsage: []MemoryStats{}, }, + peersStats: *NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag } } diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index adb304e20be..e7e75c91b12 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -16,7 +16,9 @@ package diagnostics -import "time" +import ( + "time" +) type PeerStatisticsGetter interface { GetPeersStatistics() map[string]*PeerStatistics diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 4621d5bfa34..23ccd35f9e0 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -2,60 +2,99 @@ package diagnostics import ( "context" + "sort" + "sync" + "time" "github.com/ledgerwatch/log/v3" ) -func (d *DiagnosticClient) setupNetworkDiagnostics(rootCtx context.Context) { - d.runCollectPeersStatistics(rootCtx) +type PeerStats struct { + peersInfo sync.Map + recordsCount int + lastUpdateMap map[string]time.Time + limit int } -func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { - go func() { - ctx, ch, closeChannel := Context[PeerStatisticMsgUpdate](rootCtx, 1) - defer closeChannel() +func NewPeerStats(peerLimit int) *PeerStats { + return &PeerStats{ + peersInfo: sync.Map{}, + recordsCount: 0, + lastUpdateMap: make(map[string]time.Time), + limit: peerLimit, + } +} - StartProviders(ctx, TypeOf(PeerStatisticMsgUpdate{}), log.Root()) - for { - select { - case <-rootCtx.Done(): - return - case info := <-ch: - if value, ok := d.peersSyncMap.Load(info.PeerID); ok { - if stats, ok := value.(PeerStatistics); ok { - if info.Inbound { - stats.BytesIn += uint64(info.Bytes) - stats.CapBytesIn[info.MsgCap] += uint64(info.Bytes) - stats.TypeBytesIn[info.MsgType] += uint64(info.Bytes) - } else { - stats.BytesOut += uint64(info.Bytes) - stats.CapBytesOut[info.MsgCap] += uint64(info.Bytes) - stats.TypeBytesOut[info.MsgType] += uint64(info.Bytes) - } - - d.peersSyncMap.Store(info.PeerID, stats) - } else { - log.Debug("Failed to cast value to PeerStatistics struct", value) - } - } else { - d.peersSyncMap.Store(info.PeerID, PeerStatistics{ - PeerType: info.PeerType, - CapBytesIn: make(map[string]uint64), - CapBytesOut: make(map[string]uint64), - TypeBytesIn: make(map[string]uint64), - TypeBytesOut: make(map[string]uint64), - }) - } - } +func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + if value, ok := p.peersInfo.Load(peerID); ok { + p.UpdatePeer(peerID, peerInfo, value) + } else { + p.AddPeer(peerID, peerInfo) + if p.GetPeersCount() > p.limit { + p.RemovePeersWhichExceedLimit(p.limit) } - }() + } } -func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { - stats := make(map[string]*PeerStatistics) +func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + pv := PeerStatisticsFromMsgUpdate(peerInfo, nil) + p.peersInfo.Store(peerID, pv) + p.recordsCount++ + p.lastUpdateMap[peerID] = time.Now() +} + +func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { + pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue) - d.peersSyncMap.Range(func(key, value interface{}) bool { + p.peersInfo.Store(peerID, pv) + p.lastUpdateMap[peerID] = time.Now() +} + +func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) PeerStatistics { + ps := PeerStatistics{ + PeerType: msg.PeerType, + BytesIn: 0, + BytesOut: 0, + CapBytesIn: make(map[string]uint64), + CapBytesOut: make(map[string]uint64), + TypeBytesIn: make(map[string]uint64), + TypeBytesOut: make(map[string]uint64), + } + + if stats, ok := prevValue.(PeerStatistics); ok { + if msg.Inbound { + ps.BytesIn = stats.BytesIn + uint64(msg.Bytes) + ps.CapBytesIn[msg.MsgCap] = stats.CapBytesIn[msg.MsgCap] + uint64(msg.Bytes) + ps.TypeBytesIn[msg.MsgType] = stats.TypeBytesIn[msg.MsgType] + uint64(msg.Bytes) + } else { + ps.BytesOut = stats.BytesOut + uint64(msg.Bytes) + ps.CapBytesOut[msg.MsgCap] = stats.CapBytesOut[msg.MsgCap] + uint64(msg.Bytes) + ps.TypeBytesOut[msg.MsgType] = stats.TypeBytesOut[msg.MsgType] + uint64(msg.Bytes) + } + } else { + if msg.Inbound { + ps.BytesIn += uint64(msg.Bytes) + ps.CapBytesIn[msg.MsgCap] += uint64(msg.Bytes) + ps.TypeBytesIn[msg.MsgType] += uint64(msg.Bytes) + } else { + ps.BytesOut += uint64(msg.Bytes) + ps.CapBytesOut[msg.MsgCap] += uint64(msg.Bytes) + ps.TypeBytesOut[msg.MsgType] += uint64(msg.Bytes) + } + + } + + return ps +} +func (p *PeerStats) GetPeersCount() int { + return p.recordsCount +} + +func (p *PeerStats) GetPeers() map[string]*PeerStatistics { + stats := make(map[string]*PeerStatistics) + + p.peersInfo.Range(func(key, value interface{}) bool { if loadedKey, ok := key.(string); ok { if loadedValue, ok := value.(PeerStatistics); ok { stats[loadedKey] = &loadedValue @@ -69,26 +108,94 @@ func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { return true }) - d.PeerDataResetStatistics() - return stats } -func (d *DiagnosticClient) PeerDataResetStatistics() { - d.peersSyncMap.Range(func(key, value interface{}) bool { - if stats, ok := value.(PeerStatistics); ok { - stats.BytesIn = 0 - stats.BytesOut = 0 - stats.CapBytesIn = make(map[string]uint64) - stats.CapBytesOut = make(map[string]uint64) - stats.TypeBytesIn = make(map[string]uint64) - stats.TypeBytesOut = make(map[string]uint64) - - d.peersSyncMap.Store(key, stats) - } else { - log.Debug("Failed to cast value to PeerStatistics struct", value) +func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { + if value, ok := p.peersInfo.Load(peerID); ok { + if peerStats, ok := value.(PeerStatistics); ok { + return peerStats } + } - return true + return PeerStatistics{} +} + +func (p *PeerStats) GetLastUpdate(peerID string) time.Time { + if lastUpdate, ok := p.lastUpdateMap[peerID]; ok { + return lastUpdate + } + + return time.Time{} +} + +func (p *PeerStats) Reset() { + p.peersInfo = sync.Map{} + p.recordsCount = 0 + p.lastUpdateMap = make(map[string]time.Time) +} + +func (p *PeerStats) RemovePeer(peerID string) { + p.peersInfo.Delete(peerID) + p.recordsCount-- + delete(p.lastUpdateMap, peerID) +} + +type PeerUpdTime struct { + PeerID string + Time time.Time +} + +func (p *PeerStats) GetOldestPeersWithAmountOfPeers(size int) []PeerUpdTime { + var timeArray []PeerUpdTime + for k, v := range p.lastUpdateMap { + timeArray = append(timeArray, PeerUpdTime{k, v}) + } + + sort.Slice(timeArray, func(i, j int) bool { + return timeArray[i].Time.Before(timeArray[j].Time) }) + + if len(timeArray) < size { + return timeArray + } else { + return timeArray[:size] + } +} + +func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { + peersToRemove := p.GetPeersCount() - limit + if peersToRemove > 0 { + peers := p.GetOldestPeersWithAmountOfPeers(peersToRemove) + for _, peer := range peers { + p.RemovePeer(peer.PeerID) + } + } +} + +func (d *DiagnosticClient) setupNetworkDiagnostics(rootCtx context.Context) { + d.runCollectPeersStatistics(rootCtx) +} + +func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { + go func() { + ctx, ch, closeChannel := Context[PeerStatisticMsgUpdate](rootCtx, 1) + defer closeChannel() + + StartProviders(ctx, TypeOf(PeerStatisticMsgUpdate{}), log.Root()) + for { + select { + case <-rootCtx.Done(): + return + case info := <-ch: + d.peersStats.AddOrUpdatePeer(info.PeerID, info) + } + } + }() +} + +func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { + peers := d.peersStats.GetPeers() + d.peersStats.Reset() + return peers } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go new file mode 100644 index 00000000000..956370daff1 --- /dev/null +++ b/erigon-lib/diagnostics/network_test.go @@ -0,0 +1,163 @@ +package diagnostics_test + +import ( + "strconv" + "testing" + + "github.com/ledgerwatch/erigon-lib/diagnostics" + "github.com/stretchr/testify/require" +) + +var testPeerStats = diagnostics.PeerStatistics{ + PeerType: "Sentinel", + BytesIn: 10, + CapBytesIn: map[string]uint64{"msgCap1": 10}, + TypeBytesIn: map[string]uint64{"msgType1": 10}, + BytesOut: 0, + CapBytesOut: map[string]uint64{}, + TypeBytesOut: map[string]uint64{}, +} + +var testUpdMsg = diagnostics.PeerStatisticMsgUpdate{ + PeerType: "Sentinel", + PeerID: "test1", + Inbound: true, + MsgType: "msgType1", + MsgCap: "msgCap1", + Bytes: 10, +} + +func TestPeerStatisticsFromMsgUpdate(t *testing.T) { + ps := diagnostics.PeerStatisticsFromMsgUpdate(testUpdMsg, nil) + require.Equal(t, testPeerStats, ps) + + ps1 := diagnostics.PeerStatisticsFromMsgUpdate(testUpdMsg, ps) + + require.Equal(t, diagnostics.PeerStatistics{ + PeerType: "Sentinel", + BytesIn: 20, + CapBytesIn: map[string]uint64{"msgCap1": 20}, + TypeBytesIn: map[string]uint64{"msgType1": 20}, + BytesOut: 0, + CapBytesOut: map[string]uint64{}, + TypeBytesOut: map[string]uint64{}, + }, ps1) +} + +func TestAddPeer(t *testing.T) { + var peerStats = diagnostics.NewPeerStats(100) + + peerStats.AddPeer("test1", testUpdMsg) + require.Equal(t, 1, peerStats.GetPeersCount()) + + require.Equal(t, testPeerStats, peerStats.GetPeerStatistics("test1")) + + peerStats.Reset() + require.Equal(t, 0, peerStats.GetPeersCount()) +} + +func TestUpdatePeer(t *testing.T) { + peerStats := diagnostics.NewPeerStats(1000) + + peerStats.AddPeer("test1", testUpdMsg) + peerStats.UpdatePeer("test1", testUpdMsg, testPeerStats) + require.Equal(t, 1, peerStats.GetPeersCount()) + + require.Equal(t, diagnostics.PeerStatistics{ + PeerType: "Sentinel", + BytesIn: 20, + CapBytesIn: map[string]uint64{"msgCap1": 20}, + TypeBytesIn: map[string]uint64{"msgType1": 20}, + BytesOut: 0, + CapBytesOut: map[string]uint64{}, + TypeBytesOut: map[string]uint64{}, + }, peerStats.GetPeerStatistics("test1")) +} + +func TestAddOrUpdatePeer(t *testing.T) { + peerStats := diagnostics.NewPeerStats(100) + + peerStats.AddOrUpdatePeer("test1", testUpdMsg) + require.Equal(t, 1, peerStats.GetPeersCount()) + + require.Equal(t, testPeerStats, peerStats.GetPeerStatistics("test1")) + + peerStats.AddOrUpdatePeer("test1", testUpdMsg) + require.Equal(t, 1, peerStats.GetPeersCount()) + + require.Equal(t, diagnostics.PeerStatistics{ + PeerType: "Sentinel", + BytesIn: 20, + CapBytesIn: map[string]uint64{"msgCap1": 20}, + TypeBytesIn: map[string]uint64{"msgType1": 20}, + BytesOut: 0, + CapBytesOut: map[string]uint64{}, + TypeBytesOut: map[string]uint64{}, + }, peerStats.GetPeerStatistics("test1")) + + peerStats.AddOrUpdatePeer("test2", testUpdMsg) + require.Equal(t, 2, peerStats.GetPeersCount()) +} + +func TestLastUpdated(t *testing.T) { + peerStats := diagnostics.NewPeerStats(1000) + + peerStats.AddOrUpdatePeer("test1", testUpdMsg) + require.NotEmpty(t, peerStats.GetLastUpdate("test1")) + + for i := 1; i < 100; i++ { + pid := "test" + strconv.Itoa(i) + peerStats.AddOrUpdatePeer(pid, testUpdMsg) + } + + require.True(t, peerStats.GetLastUpdate("test2").After(peerStats.GetLastUpdate("test1"))) + + oldestPeers := peerStats.GetOldestPeersWithAmountOfPeers(10) + + // we have 100 peers, but we should get only 10 oldest + require.Equal(t, len(oldestPeers), 10) + // the oldest peer should be test1 + require.Equal(t, "test1", oldestPeers[0].PeerID) + + // update test1 to + peerStats.AddOrUpdatePeer("test1", testUpdMsg) + oldestPeers = peerStats.GetOldestPeersWithAmountOfPeers(10) + + // the oldest peer should not be test1 + require.NotEqual(t, "test1", oldestPeers[0].PeerID) +} + +func TestRemovePeersWhichExceedLimit(t *testing.T) { + limit := 100 + peerStats := diagnostics.NewPeerStats(limit) + + for i := 1; i < 105; i++ { + pid := "test" + strconv.Itoa(i) + peerStats.AddOrUpdatePeer(pid, testUpdMsg) + } + + peerStats.RemovePeersWhichExceedLimit(limit) + + require.Equal(t, limit, peerStats.GetPeersCount()) + + limit = 1000 + peerStats.RemovePeersWhichExceedLimit(limit) + + require.Equal(t, 100, peerStats.GetPeersCount()) +} + +func TestAddingPeersAboveTheLimit(t *testing.T) { + limit := 100 + peerStats := diagnostics.NewPeerStats(limit) + + for i := 1; i < 105; i++ { + pid := "test" + strconv.Itoa(i) + peerStats.AddOrUpdatePeer(pid, testUpdMsg) + } + + require.Equal(t, limit, peerStats.GetPeersCount()) + + peerStats.AddOrUpdatePeer("test105", testUpdMsg) + + require.Equal(t, limit, peerStats.GetPeersCount()) +} From 37a21c956616aa148cde2625c0ae0319daaa7e52 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 25 Apr 2024 20:16:49 +0100 Subject: [PATCH 2/7] renamed file --- erigon-lib/diagnostics/network.go | 4 ++-- erigon-lib/diagnostics/network_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 23ccd35f9e0..5152c4bb5e6 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -146,7 +146,7 @@ type PeerUpdTime struct { Time time.Time } -func (p *PeerStats) GetOldestPeersWithAmountOfPeers(size int) []PeerUpdTime { +func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { var timeArray []PeerUpdTime for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) @@ -166,7 +166,7 @@ func (p *PeerStats) GetOldestPeersWithAmountOfPeers(size int) []PeerUpdTime { func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { peersToRemove := p.GetPeersCount() - limit if peersToRemove > 0 { - peers := p.GetOldestPeersWithAmountOfPeers(peersToRemove) + peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) for _, peer := range peers { p.RemovePeer(peer.PeerID) } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index 956370daff1..d5fdf3a0f13 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -112,7 +112,7 @@ func TestLastUpdated(t *testing.T) { require.True(t, peerStats.GetLastUpdate("test2").After(peerStats.GetLastUpdate("test1"))) - oldestPeers := peerStats.GetOldestPeersWithAmountOfPeers(10) + oldestPeers := peerStats.GetOldestUpdatedPeersWithSize(10) // we have 100 peers, but we should get only 10 oldest require.Equal(t, len(oldestPeers), 10) @@ -121,7 +121,7 @@ func TestLastUpdated(t *testing.T) { // update test1 to peerStats.AddOrUpdatePeer("test1", testUpdMsg) - oldestPeers = peerStats.GetOldestPeersWithAmountOfPeers(10) + oldestPeers = peerStats.GetOldestUpdatedPeersWithSize(10) // the oldest peer should not be test1 require.NotEqual(t, "test1", oldestPeers[0].PeerID) From 5ed17bff4fb3a8bcc5e840a87589b4525e7248ff Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 25 Apr 2024 20:57:49 +0100 Subject: [PATCH 3/7] fixed lint error --- erigon-lib/diagnostics/network.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 5152c4bb5e6..0fe4ff95007 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -147,7 +147,7 @@ type PeerUpdTime struct { } func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { - var timeArray []PeerUpdTime + timeArray := make([]PeerUpdTime, 0, p.GetPeersCount()) for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) } From 258fbb599a354f0fdd2545cce96996ec7ba0daf2 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 25 Apr 2024 23:27:14 +0100 Subject: [PATCH 4/7] check --- erigon-lib/diagnostics/network_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index d5fdf3a0f13..51e5ce82f5e 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -110,7 +110,7 @@ func TestLastUpdated(t *testing.T) { peerStats.AddOrUpdatePeer(pid, testUpdMsg) } - require.True(t, peerStats.GetLastUpdate("test2").After(peerStats.GetLastUpdate("test1"))) + require.True(t, peerStats.GetLastUpdate("test50").After(peerStats.GetLastUpdate("test1"))) oldestPeers := peerStats.GetOldestUpdatedPeersWithSize(10) From 695ec4f1351a7b616e53ee0aa0e23143163d4e4d Mon Sep 17 00:00:00 2001 From: dvovk Date: Fri, 26 Apr 2024 11:17:40 +0100 Subject: [PATCH 5/7] removed reset functionality as it has no sense --- erigon-lib/diagnostics/network.go | 10 +--------- erigon-lib/diagnostics/network_test.go | 3 --- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 0fe4ff95007..e928d2eabad 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -129,12 +129,6 @@ func (p *PeerStats) GetLastUpdate(peerID string) time.Time { return time.Time{} } -func (p *PeerStats) Reset() { - p.peersInfo = sync.Map{} - p.recordsCount = 0 - p.lastUpdateMap = make(map[string]time.Time) -} - func (p *PeerStats) RemovePeer(peerID string) { p.peersInfo.Delete(peerID) p.recordsCount-- @@ -195,7 +189,5 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { } func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { - peers := d.peersStats.GetPeers() - d.peersStats.Reset() - return peers + return d.peersStats.GetPeers() } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index 51e5ce82f5e..c3943dca494 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -51,9 +51,6 @@ func TestAddPeer(t *testing.T) { require.Equal(t, 1, peerStats.GetPeersCount()) require.Equal(t, testPeerStats, peerStats.GetPeerStatistics("test1")) - - peerStats.Reset() - require.Equal(t, 0, peerStats.GetPeersCount()) } func TestUpdatePeer(t *testing.T) { From 06d53cf51703ad8b2b69b1e14304609bb409df36 Mon Sep 17 00:00:00 2001 From: dvovk Date: Fri, 26 Apr 2024 11:20:16 +0100 Subject: [PATCH 6/7] trying to fix windows test --- erigon-lib/diagnostics/network_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index c3943dca494..735800de574 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -3,6 +3,7 @@ package diagnostics_test import ( "strconv" "testing" + "time" "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/stretchr/testify/require" @@ -102,12 +103,14 @@ func TestLastUpdated(t *testing.T) { peerStats.AddOrUpdatePeer("test1", testUpdMsg) require.NotEmpty(t, peerStats.GetLastUpdate("test1")) - for i := 1; i < 100; i++ { + for i := 1; i < 20; i++ { pid := "test" + strconv.Itoa(i) peerStats.AddOrUpdatePeer(pid, testUpdMsg) + //wait for 1 milisecond to make sure that the last update time is different + time.Sleep(10 * time.Millisecond) } - require.True(t, peerStats.GetLastUpdate("test50").After(peerStats.GetLastUpdate("test1"))) + require.True(t, peerStats.GetLastUpdate("test2").After(peerStats.GetLastUpdate("test1"))) oldestPeers := peerStats.GetOldestUpdatedPeersWithSize(10) From 2a41bb44399d3def4fcb64bf73e2a53c2ed9c392 Mon Sep 17 00:00:00 2001 From: dvovk Date: Sat, 27 Apr 2024 12:40:25 +0100 Subject: [PATCH 7/7] added a few tests, did required changes --- erigon-lib/diagnostics/client.go | 4 +- erigon-lib/diagnostics/network.go | 4 +- erigon-lib/diagnostics/network_test.go | 89 ++++++++++++++++++++------ 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index ef1769f7578..284e46c1498 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -16,7 +16,7 @@ type DiagnosticClient struct { mu sync.Mutex headerMutex sync.Mutex hardwareInfo HardwareInfo - peersStats PeerStats + peersStats *PeerStats headers Headers bodies BodiesInfo bodiesMutex sync.Mutex @@ -37,7 +37,7 @@ func NewDiagnosticClient(metricsMux *http.ServeMux, dataDirPath string) *Diagnos resourcesUsage: ResourcesUsage{ MemoryUsage: []MemoryStats{}, }, - peersStats: *NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag + peersStats: NewPeerStats(1000), // 1000 is the limit of peers; TODO: make it configurable through a flag } } diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index e928d2eabad..2306aa997bf 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -10,7 +10,7 @@ import ( ) type PeerStats struct { - peersInfo sync.Map + peersInfo *sync.Map recordsCount int lastUpdateMap map[string]time.Time limit int @@ -18,7 +18,7 @@ type PeerStats struct { func NewPeerStats(peerLimit int) *PeerStats { return &PeerStats{ - peersInfo: sync.Map{}, + peersInfo: &sync.Map{}, recordsCount: 0, lastUpdateMap: make(map[string]time.Time), limit: peerLimit, diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index 735800de574..122c2e117e4 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -var testPeerStats = diagnostics.PeerStatistics{ +var mockInboundPeerStats = diagnostics.PeerStatistics{ PeerType: "Sentinel", BytesIn: 10, CapBytesIn: map[string]uint64{"msgCap1": 10}, @@ -19,7 +19,17 @@ var testPeerStats = diagnostics.PeerStatistics{ TypeBytesOut: map[string]uint64{}, } -var testUpdMsg = diagnostics.PeerStatisticMsgUpdate{ +var mockOutboundPeerStats = diagnostics.PeerStatistics{ + PeerType: "Sentinel", + BytesIn: 0, + CapBytesIn: map[string]uint64{}, + TypeBytesIn: map[string]uint64{}, + BytesOut: 10, + CapBytesOut: map[string]uint64{"msgCap1": 10}, + TypeBytesOut: map[string]uint64{"msgType1": 10}, +} + +var mockInboundUpdMsg = diagnostics.PeerStatisticMsgUpdate{ PeerType: "Sentinel", PeerID: "test1", Inbound: true, @@ -28,11 +38,21 @@ var testUpdMsg = diagnostics.PeerStatisticMsgUpdate{ Bytes: 10, } +var mockOutboundUpdMsg = diagnostics.PeerStatisticMsgUpdate{ + PeerType: "Sentinel", + PeerID: "test1", + Inbound: false, + MsgType: "msgType1", + MsgCap: "msgCap1", + Bytes: 10, +} + func TestPeerStatisticsFromMsgUpdate(t *testing.T) { - ps := diagnostics.PeerStatisticsFromMsgUpdate(testUpdMsg, nil) - require.Equal(t, testPeerStats, ps) + //test handing inbound message + inboundPeerStats := diagnostics.PeerStatisticsFromMsgUpdate(mockInboundUpdMsg, nil) + require.Equal(t, mockInboundPeerStats, inboundPeerStats) - ps1 := diagnostics.PeerStatisticsFromMsgUpdate(testUpdMsg, ps) + inboundPeerStats = diagnostics.PeerStatisticsFromMsgUpdate(mockInboundUpdMsg, inboundPeerStats) require.Equal(t, diagnostics.PeerStatistics{ PeerType: "Sentinel", @@ -42,23 +62,40 @@ func TestPeerStatisticsFromMsgUpdate(t *testing.T) { BytesOut: 0, CapBytesOut: map[string]uint64{}, TypeBytesOut: map[string]uint64{}, - }, ps1) + }, inboundPeerStats) + + //test handing outbound message + outboundPeerStats := diagnostics.PeerStatisticsFromMsgUpdate(mockOutboundUpdMsg, nil) + require.Equal(t, mockOutboundPeerStats, outboundPeerStats) + + outboundPeerStats = diagnostics.PeerStatisticsFromMsgUpdate(mockOutboundUpdMsg, outboundPeerStats) + + require.Equal(t, diagnostics.PeerStatistics{ + PeerType: "Sentinel", + BytesIn: 0, + CapBytesIn: map[string]uint64{}, + TypeBytesIn: map[string]uint64{}, + BytesOut: 20, + CapBytesOut: map[string]uint64{"msgCap1": 20}, + TypeBytesOut: map[string]uint64{"msgType1": 20}, + }, outboundPeerStats) + } func TestAddPeer(t *testing.T) { var peerStats = diagnostics.NewPeerStats(100) - peerStats.AddPeer("test1", testUpdMsg) + peerStats.AddPeer("test1", mockInboundUpdMsg) require.Equal(t, 1, peerStats.GetPeersCount()) - require.Equal(t, testPeerStats, peerStats.GetPeerStatistics("test1")) + require.Equal(t, mockInboundPeerStats, peerStats.GetPeerStatistics("test1")) } func TestUpdatePeer(t *testing.T) { peerStats := diagnostics.NewPeerStats(1000) - peerStats.AddPeer("test1", testUpdMsg) - peerStats.UpdatePeer("test1", testUpdMsg, testPeerStats) + peerStats.AddPeer("test1", mockInboundUpdMsg) + peerStats.UpdatePeer("test1", mockInboundUpdMsg, mockInboundPeerStats) require.Equal(t, 1, peerStats.GetPeersCount()) require.Equal(t, diagnostics.PeerStatistics{ @@ -75,12 +112,12 @@ func TestUpdatePeer(t *testing.T) { func TestAddOrUpdatePeer(t *testing.T) { peerStats := diagnostics.NewPeerStats(100) - peerStats.AddOrUpdatePeer("test1", testUpdMsg) + peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) require.Equal(t, 1, peerStats.GetPeersCount()) - require.Equal(t, testPeerStats, peerStats.GetPeerStatistics("test1")) + require.Equal(t, mockInboundPeerStats, peerStats.GetPeerStatistics("test1")) - peerStats.AddOrUpdatePeer("test1", testUpdMsg) + peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) require.Equal(t, 1, peerStats.GetPeersCount()) require.Equal(t, diagnostics.PeerStatistics{ @@ -93,19 +130,31 @@ func TestAddOrUpdatePeer(t *testing.T) { TypeBytesOut: map[string]uint64{}, }, peerStats.GetPeerStatistics("test1")) - peerStats.AddOrUpdatePeer("test2", testUpdMsg) + peerStats.AddOrUpdatePeer("test2", mockInboundUpdMsg) require.Equal(t, 2, peerStats.GetPeersCount()) } +func TestGetPeers(t *testing.T) { + peerStats := diagnostics.NewPeerStats(10) + + peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) + peerStats.AddOrUpdatePeer("test2", mockInboundUpdMsg) + peerStats.AddOrUpdatePeer("test3", mockInboundUpdMsg) + + peers := peerStats.GetPeers() + require.Equal(t, 3, len(peers)) + require.Equal(t, &mockInboundPeerStats, peers["test1"]) +} + func TestLastUpdated(t *testing.T) { peerStats := diagnostics.NewPeerStats(1000) - peerStats.AddOrUpdatePeer("test1", testUpdMsg) + peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) require.NotEmpty(t, peerStats.GetLastUpdate("test1")) for i := 1; i < 20; i++ { pid := "test" + strconv.Itoa(i) - peerStats.AddOrUpdatePeer(pid, testUpdMsg) + peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) //wait for 1 milisecond to make sure that the last update time is different time.Sleep(10 * time.Millisecond) } @@ -120,7 +169,7 @@ func TestLastUpdated(t *testing.T) { require.Equal(t, "test1", oldestPeers[0].PeerID) // update test1 to - peerStats.AddOrUpdatePeer("test1", testUpdMsg) + peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) oldestPeers = peerStats.GetOldestUpdatedPeersWithSize(10) // the oldest peer should not be test1 @@ -133,7 +182,7 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) { for i := 1; i < 105; i++ { pid := "test" + strconv.Itoa(i) - peerStats.AddOrUpdatePeer(pid, testUpdMsg) + peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) } peerStats.RemovePeersWhichExceedLimit(limit) @@ -152,12 +201,12 @@ func TestAddingPeersAboveTheLimit(t *testing.T) { for i := 1; i < 105; i++ { pid := "test" + strconv.Itoa(i) - peerStats.AddOrUpdatePeer(pid, testUpdMsg) + peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) } require.Equal(t, limit, peerStats.GetPeersCount()) - peerStats.AddOrUpdatePeer("test105", testUpdMsg) + peerStats.AddOrUpdatePeer("test105", mockInboundUpdMsg) require.Equal(t, limit, peerStats.GetPeersCount()) }