Skip to content

Commit e3430ac

Browse files
authored
eth: check snap satelliteness, delegate drop to eth (#22235)
* eth: check snap satelliteness, delegate drop to eth * eth: better handle eth/snap satellite relation, merge reg/unreg paths
1 parent 3c728fb commit e3430ac

File tree

13 files changed

+201
-236
lines changed

13 files changed

+201
-236
lines changed

eth/handler.go

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
218218
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
219219

220220
fetchTx := func(peer string, hashes []common.Hash) error {
221-
p := h.peers.ethPeer(peer)
221+
p := h.peers.peer(peer)
222222
if p == nil {
223223
return errors.New("unknown peer")
224224
}
@@ -229,8 +229,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
229229
return h, nil
230230
}
231231

232-
// runEthPeer
232+
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
233+
// various subsistems and starts handling messages.
233234
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
235+
// If the peer has a `snap` extension, wait for it to connect so we can have
236+
// a uniform initialization/teardown mechanism
237+
snap, err := h.peers.waitSnapExtension(peer)
238+
if err != nil {
239+
peer.Log().Error("Snapshot extension barrier failed", "err", err)
240+
return err
241+
}
242+
// TODO(karalabe): Not sure why this is needed
234243
if !h.chainSync.handlePeerEvent(peer) {
235244
return p2p.DiscQuitting
236245
}
@@ -251,37 +260,46 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
251260
return err
252261
}
253262
reject := false // reserved peer slots
254-
if atomic.LoadUint32(&h.snapSync) == 1 && !peer.SupportsCap("snap", 1) {
255-
// If we are running snap-sync, we want to reserve roughly half the peer
256-
// slots for peers supporting the snap protocol.
257-
// The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
258-
if all, snp := h.peers.Len(), h.peers.SnapLen(); all-snp > snp+5 {
259-
reject = true
263+
if atomic.LoadUint32(&h.snapSync) == 1 {
264+
if snap == nil {
265+
// If we are running snap-sync, we want to reserve roughly half the peer
266+
// slots for peers supporting the snap protocol.
267+
// The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
268+
if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 {
269+
reject = true
270+
}
260271
}
261272
}
262273
// Ignore maxPeers if this is a trusted peer
263274
if !peer.Peer.Info().Network.Trusted {
264-
if reject || h.peers.Len() >= h.maxPeers {
275+
if reject || h.peers.len() >= h.maxPeers {
265276
return p2p.DiscTooManyPeers
266277
}
267278
}
268279
peer.Log().Debug("Ethereum peer connected", "name", peer.Name())
269280

270281
// Register the peer locally
271-
if err := h.peers.registerEthPeer(peer); err != nil {
282+
if err := h.peers.registerPeer(peer, snap); err != nil {
272283
peer.Log().Error("Ethereum peer registration failed", "err", err)
273284
return err
274285
}
275286
defer h.removePeer(peer.ID())
276287

277-
p := h.peers.ethPeer(peer.ID())
288+
p := h.peers.peer(peer.ID())
278289
if p == nil {
279290
return errors.New("peer dropped during handling")
280291
}
281292
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
282293
if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil {
294+
peer.Log().Error("Failed to register peer in eth syncer", "err", err)
283295
return err
284296
}
297+
if snap != nil {
298+
if err := h.downloader.SnapSyncer.Register(snap); err != nil {
299+
peer.Log().Error("Failed to register peer in snap syncer", "err", err)
300+
return err
301+
}
302+
}
285303
h.chainSync.handlePeerEvent(peer)
286304

287305
// Propagate existing transactions. new transactions appearing
@@ -317,25 +335,23 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
317335
return handler(peer)
318336
}
319337

320-
// runSnapPeer
321-
func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error {
338+
// runSnapExtension registers a `snap` peer into the joint eth/snap peerset and
339+
// starts handling inbound messages. As `snap` is only a satellite protocol to
340+
// `eth`, all subsystem registrations and lifecycle management will be done by
341+
// the main `eth` handler to prevent strange races.
342+
func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
322343
h.peerWG.Add(1)
323344
defer h.peerWG.Done()
324345

325-
// Register the peer locally
326-
if err := h.peers.registerSnapPeer(peer); err != nil {
327-
peer.Log().Error("Snapshot peer registration failed", "err", err)
328-
return err
329-
}
330-
defer h.removePeer(peer.ID())
331-
332-
if err := h.downloader.SnapSyncer.Register(peer); err != nil {
346+
if err := h.peers.registerSnapExtension(peer); err != nil {
347+
peer.Log().Error("Snapshot extension registration failed", "err", err)
333348
return err
334349
}
335-
// Handle incoming messages until the connection is torn down
336350
return handler(peer)
337351
}
338352

353+
// removePeer unregisters a peer from the downloader and fetchers, removes it from
354+
// the set of tracked peers and closes the network connection to it.
339355
func (h *handler) removePeer(id string) {
340356
// Create a custom logger to avoid printing the entire id
341357
var logger log.Logger
@@ -345,33 +361,27 @@ func (h *handler) removePeer(id string) {
345361
} else {
346362
logger = log.New("peer", id[:8])
347363
}
348-
// Remove the eth peer if it exists
349-
eth := h.peers.ethPeer(id)
350-
if eth != nil {
351-
logger.Debug("Removing Ethereum peer")
352-
h.downloader.UnregisterPeer(id)
353-
h.txFetcher.Drop(id)
354-
355-
if err := h.peers.unregisterEthPeer(id); err != nil {
356-
logger.Error("Ethereum peer removal failed", "err", err)
357-
}
364+
// Abort if the peer does not exist
365+
peer := h.peers.peer(id)
366+
if peer == nil {
367+
logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered)
368+
return
358369
}
359-
// Remove the snap peer if it exists
360-
snap := h.peers.snapPeer(id)
361-
if snap != nil {
362-
logger.Debug("Removing Snapshot peer")
370+
// Remove the `eth` peer if it exists
371+
logger.Debug("Removing Ethereum peer", "snap", peer.snapExt != nil)
372+
373+
// Remove the `snap` extension if it exists
374+
if peer.snapExt != nil {
363375
h.downloader.SnapSyncer.Unregister(id)
364-
if err := h.peers.unregisterSnapPeer(id); err != nil {
365-
logger.Error("Snapshot peer removel failed", "err", err)
366-
}
367-
}
368-
// Hard disconnect at the networking layer
369-
if eth != nil {
370-
eth.Peer.Disconnect(p2p.DiscUselessPeer)
371376
}
372-
if snap != nil {
373-
snap.Peer.Disconnect(p2p.DiscUselessPeer)
377+
h.downloader.UnregisterPeer(id)
378+
h.txFetcher.Drop(id)
379+
380+
if err := h.peers.unregisterPeer(id); err != nil {
381+
logger.Error("Ethereum peer removal failed", "err", err)
374382
}
383+
// Hard disconnect at the networking layer
384+
peer.Peer.Disconnect(p2p.DiscUselessPeer)
375385
}
376386

377387
func (h *handler) Start(maxPeers int) {
@@ -417,7 +427,7 @@ func (h *handler) Stop() {
417427
// will only announce its availability (depending what's requested).
418428
func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
419429
hash := block.Hash()
420-
peers := h.peers.ethPeersWithoutBlock(hash)
430+
peers := h.peers.peersWithoutBlock(hash)
421431

422432
// If propagation is requested, send to a subset of the peer
423433
if propagate {
@@ -456,7 +466,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool)
456466
// Broadcast transactions to a batch of peers not knowing about it
457467
if propagate {
458468
for _, tx := range txs {
459-
peers := h.peers.ethPeersWithoutTransaction(tx.Hash())
469+
peers := h.peers.peersWithoutTransaction(tx.Hash())
460470

461471
// Send the block to a subset of our peers
462472
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
@@ -472,7 +482,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool)
472482
}
473483
// Otherwise only broadcast the announcement to peers
474484
for _, tx := range txs {
475-
peers := h.peers.ethPeersWithoutTransaction(tx.Hash())
485+
peers := h.peers.peersWithoutTransaction(tx.Hash())
476486
for _, peer := range peers {
477487
annos[peer] = append(annos[peer], tx.Hash())
478488
}

eth/handler_eth.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {
4747

4848
// PeerInfo retrieves all known `eth` information about a peer.
4949
func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
50-
if p := h.peers.ethPeer(id.String()); p != nil {
50+
if p := h.peers.peer(id.String()); p != nil {
5151
return p.info()
5252
}
5353
return nil
@@ -107,7 +107,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
107107
// handleHeaders is invoked from a peer's message handler when it transmits a batch
108108
// of headers for the local node to process.
109109
func (h *ethHandler) handleHeaders(peer *eth.Peer, headers []*types.Header) error {
110-
p := h.peers.ethPeer(peer.ID())
110+
p := h.peers.peer(peer.ID())
111111
if p == nil {
112112
return errors.New("unregistered during callback")
113113
}

eth/handler_eth_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,11 +574,11 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
574574

575575
// Verify that the remote peer is maintained or dropped
576576
if drop {
577-
if peers := handler.handler.peers.Len(); peers != 0 {
577+
if peers := handler.handler.peers.len(); peers != 0 {
578578
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
579579
}
580580
} else {
581-
if peers := handler.handler.peers.Len(); peers != 1 {
581+
if peers := handler.handler.peers.len(); peers != 1 {
582582
t.Fatalf("peer count mismatch: have %d, want %d", peers, 1)
583583
}
584584
}

eth/handler_snap.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ func (h *snapHandler) Chain() *core.BlockChain { return h.chain }
3030

3131
// RunPeer is invoked when a peer joins on the `snap` protocol.
3232
func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error {
33-
return (*handler)(h).runSnapPeer(peer, hand)
33+
return (*handler)(h).runSnapExtension(peer, hand)
3434
}
3535

3636
// PeerInfo retrieves all known `snap` information about a peer.
3737
func (h *snapHandler) PeerInfo(id enode.ID) interface{} {
38-
if p := h.peers.snapPeer(id.String()); p != nil {
39-
return p.info()
38+
if p := h.peers.peer(id.String()); p != nil {
39+
if p.snapExt != nil {
40+
return p.snapExt.info()
41+
}
4042
}
4143
return nil
4244
}

eth/peer.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ type ethPeerInfo struct {
3636
// ethPeer is a wrapper around eth.Peer to maintain a few extra metadata.
3737
type ethPeer struct {
3838
*eth.Peer
39+
snapExt *snapPeer // Satellite `snap` connection
3940

40-
syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
41-
lock sync.RWMutex // Mutex protecting the internal fields
41+
syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
42+
snapWait chan struct{} // Notification channel for snap connections
43+
lock sync.RWMutex // Mutex protecting the internal fields
4244
}
4345

4446
// info gathers and returns some `eth` protocol metadata known about a peer.
@@ -61,9 +63,6 @@ type snapPeerInfo struct {
6163
// snapPeer is a wrapper around snap.Peer to maintain a few extra metadata.
6264
type snapPeer struct {
6365
*snap.Peer
64-
65-
ethDrop *time.Timer // Connection dropper if `eth` doesn't connect in time
66-
lock sync.RWMutex // Mutex protecting the internal fields
6766
}
6867

6968
// info gathers and returns some `snap` protocol metadata known about a peer.

0 commit comments

Comments
 (0)