@@ -20,127 +20,52 @@ package p2p
2020
2121import (
2222 "net"
23- "sync"
24- "sync/atomic"
25- "time"
2623
27- "github.com/ethereum/go-ethereum/event"
28- "github.com/ethereum/go-ethereum/log"
2924 "github.com/ethereum/go-ethereum/metrics"
3025)
3126
3227const (
33- MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
34- MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
35- MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
36- MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
37-
38- MeteredPeerLimit = 1024 // This amount of peers are individually metered
28+ ingressMeterName = "p2p/ingress"
29+ egressMeterName = "p2p/egress"
3930)
4031
4132var (
42- ingressConnectMeter = metrics .NewRegisteredMeter (MetricsInboundConnects , nil ) // Meter counting the ingress connections
43- ingressTrafficMeter = metrics .NewRegisteredMeter (MetricsInboundTraffic , nil ) // Meter metering the cumulative ingress traffic
44- egressConnectMeter = metrics .NewRegisteredMeter (MetricsOutboundConnects , nil ) // Meter counting the egress connections
45- egressTrafficMeter = metrics .NewRegisteredMeter (MetricsOutboundTraffic , nil ) // Meter metering the cumulative egress traffic
46- activePeerGauge = metrics .NewRegisteredGauge ("p2p/peers" , nil ) // Gauge tracking the current peer count
47-
48- PeerIngressRegistry = metrics .NewPrefixedChildRegistry (metrics .EphemeralRegistry , MetricsInboundTraffic + "/" ) // Registry containing the peer ingress
49- PeerEgressRegistry = metrics .NewPrefixedChildRegistry (metrics .EphemeralRegistry , MetricsOutboundTraffic + "/" ) // Registry containing the peer egress
50-
51- meteredPeerFeed event.Feed // Event feed for peer metrics
52- meteredPeerCount int32 // Actually stored peer connection count
53- )
54-
55- // MeteredPeerEventType is the type of peer events emitted by a metered connection.
56- type MeteredPeerEventType int
57-
58- const (
59- // PeerHandshakeSucceeded is the type of event
60- // emitted when a peer successfully makes the handshake.
61- PeerHandshakeSucceeded MeteredPeerEventType = iota
62-
63- // PeerHandshakeFailed is the type of event emitted when a peer fails to
64- // make the handshake or disconnects before it.
65- PeerHandshakeFailed
66-
67- // PeerDisconnected is the type of event emitted when a peer disconnects.
68- PeerDisconnected
33+ ingressConnectMeter = metrics .NewRegisteredMeter ("p2p/serves" , nil )
34+ ingressTrafficMeter = metrics .NewRegisteredMeter (ingressMeterName , nil )
35+ egressConnectMeter = metrics .NewRegisteredMeter ("p2p/dials" , nil )
36+ egressTrafficMeter = metrics .NewRegisteredMeter (egressMeterName , nil )
37+ activePeerGauge = metrics .NewRegisteredGauge ("p2p/peers" , nil )
6938)
7039
71- // MeteredPeerEvent is an event emitted when peers connect or disconnect.
72- type MeteredPeerEvent struct {
73- Type MeteredPeerEventType // Type of peer event
74- Addr string // TCP address of the peer
75- Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
76- Peer * Peer // Connected remote node instance
77- Ingress uint64 // Ingress count at the moment of the event
78- Egress uint64 // Egress count at the moment of the event
79- }
80-
81- // SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
82- // if metrics collection is enabled.
83- func SubscribeMeteredPeerEvent (ch chan <- MeteredPeerEvent ) event.Subscription {
84- return meteredPeerFeed .Subscribe (ch )
85- }
86-
8740// meteredConn is a wrapper around a net.Conn that meters both the
8841// inbound and outbound network traffic.
8942type meteredConn struct {
90- net.Conn // Network connection to wrap with metering
91-
92- connected time.Time // Connection time of the peer
93- addr * net.TCPAddr // TCP address of the peer
94- peer * Peer // Peer instance
95-
96- // trafficMetered denotes if the peer is registered in the traffic registries.
97- // Its value is true if the metered peer count doesn't reach the limit in the
98- // moment of the peer's connection.
99- trafficMetered bool
100- ingressMeter metrics.Meter // Meter for the read bytes of the peer
101- egressMeter metrics.Meter // Meter for the written bytes of the peer
102-
103- lock sync.RWMutex // Lock protecting the metered connection's internals
43+ net.Conn
10444}
10545
10646// newMeteredConn creates a new metered connection, bumps the ingress or egress
10747// connection meter and also increases the metered peer count. If the metrics
108- // system is disabled or the IP address is unspecified, this function returns
109- // the original object.
48+ // system is disabled, function returns the original connection.
11049func newMeteredConn (conn net.Conn , ingress bool , addr * net.TCPAddr ) net.Conn {
11150 // Short circuit if metrics are disabled
11251 if ! metrics .Enabled {
11352 return conn
11453 }
115- if addr == nil || addr .IP .IsUnspecified () {
116- log .Warn ("Peer address is unspecified" )
117- return conn
118- }
11954 // Bump the connection counters and wrap the connection
12055 if ingress {
12156 ingressConnectMeter .Mark (1 )
12257 } else {
12358 egressConnectMeter .Mark (1 )
12459 }
12560 activePeerGauge .Inc (1 )
126-
127- return & meteredConn {
128- Conn : conn ,
129- addr : addr ,
130- connected : time .Now (),
131- }
61+ return & meteredConn {Conn : conn }
13262}
13363
13464// Read delegates a network read to the underlying connection, bumping the common
13565// and the peer ingress traffic meters along the way.
13666func (c * meteredConn ) Read (b []byte ) (n int , err error ) {
13767 n , err = c .Conn .Read (b )
13868 ingressTrafficMeter .Mark (int64 (n ))
139- c .lock .RLock ()
140- if c .trafficMetered {
141- c .ingressMeter .Mark (int64 (n ))
142- }
143- c .lock .RUnlock ()
14469 return n , err
14570}
14671
@@ -149,84 +74,15 @@ func (c *meteredConn) Read(b []byte) (n int, err error) {
14974func (c * meteredConn ) Write (b []byte ) (n int , err error ) {
15075 n , err = c .Conn .Write (b )
15176 egressTrafficMeter .Mark (int64 (n ))
152- c .lock .RLock ()
153- if c .trafficMetered {
154- c .egressMeter .Mark (int64 (n ))
155- }
156- c .lock .RUnlock ()
15777 return n , err
15878}
15979
160- // handshakeDone is called after the connection passes the handshake.
161- func (c * meteredConn ) handshakeDone (peer * Peer ) {
162- if atomic .AddInt32 (& meteredPeerCount , 1 ) >= MeteredPeerLimit {
163- // Don't register the peer in the traffic registries.
164- atomic .AddInt32 (& meteredPeerCount , - 1 )
165- c .lock .Lock ()
166- c .peer , c .trafficMetered = peer , false
167- c .lock .Unlock ()
168- log .Warn ("Metered peer count reached the limit" )
169- } else {
170- enode := peer .Node ().String ()
171- c .lock .Lock ()
172- c .peer , c .trafficMetered = peer , true
173- c .ingressMeter = metrics .NewRegisteredMeter (enode , PeerIngressRegistry )
174- c .egressMeter = metrics .NewRegisteredMeter (enode , PeerEgressRegistry )
175- c .lock .Unlock ()
176- }
177- meteredPeerFeed .Send (MeteredPeerEvent {
178- Type : PeerHandshakeSucceeded ,
179- Addr : c .addr .String (),
180- Peer : peer ,
181- Elapsed : time .Since (c .connected ),
182- })
183- }
184-
18580// Close delegates a close operation to the underlying connection, unregisters
18681// the peer from the traffic registries and emits close event.
18782func (c * meteredConn ) Close () error {
18883 err := c .Conn .Close ()
189- c .lock .RLock ()
190- if c .peer == nil {
191- // If the peer disconnects before/during the handshake.
192- c .lock .RUnlock ()
193- meteredPeerFeed .Send (MeteredPeerEvent {
194- Type : PeerHandshakeFailed ,
195- Addr : c .addr .String (),
196- Elapsed : time .Since (c .connected ),
197- })
198- activePeerGauge .Dec (1 )
199- return err
200- }
201- peer := c .peer
202- if ! c .trafficMetered {
203- // If the peer isn't registered in the traffic registries.
204- c .lock .RUnlock ()
205- meteredPeerFeed .Send (MeteredPeerEvent {
206- Type : PeerDisconnected ,
207- Addr : c .addr .String (),
208- Peer : peer ,
209- })
84+ if err == nil {
21085 activePeerGauge .Dec (1 )
211- return err
21286 }
213- ingress , egress , enode := uint64 (c .ingressMeter .Count ()), uint64 (c .egressMeter .Count ()), c .peer .Node ().String ()
214- c .lock .RUnlock ()
215-
216- // Decrement the metered peer count
217- atomic .AddInt32 (& meteredPeerCount , - 1 )
218-
219- // Unregister the peer from the traffic registries
220- PeerIngressRegistry .Unregister (enode )
221- PeerEgressRegistry .Unregister (enode )
222-
223- meteredPeerFeed .Send (MeteredPeerEvent {
224- Type : PeerDisconnected ,
225- Addr : c .addr .String (),
226- Peer : peer ,
227- Ingress : ingress ,
228- Egress : egress ,
229- })
230- activePeerGauge .Dec (1 )
23187 return err
23288}
0 commit comments