Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 58 additions & 48 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.ethPeer(peer)
p := h.peers.peer(peer)
if p == nil {
return errors.New("unknown peer")
}
Expand All @@ -229,8 +229,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
return h, nil
}

// runEthPeer
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
// various subsistems and starts handling messages.
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If the peer has a `snap` extension, wait for it to connect so we can have
// a uniform initialization/teardown mechanism
snap, err := h.peers.waitSnapExtension(peer)
if err != nil {
peer.Log().Error("Snapshot extension barrier failed", "err", err)
return err
}
// TODO(karalabe): Not sure why this is needed
if !h.chainSync.handlePeerEvent(peer) {
return p2p.DiscQuitting
}
Expand All @@ -251,37 +260,46 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return err
}
reject := false // reserved peer slots
if atomic.LoadUint32(&h.snapSync) == 1 && !peer.SupportsCap("snap", 1) {
// If we are running snap-sync, we want to reserve roughly half the peer
// slots for peers supporting the snap protocol.
// The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
if all, snp := h.peers.Len(), h.peers.SnapLen(); all-snp > snp+5 {
reject = true
if atomic.LoadUint32(&h.snapSync) == 1 {
if snap == nil {
// If we are running snap-sync, we want to reserve roughly half the peer
// slots for peers supporting the snap protocol.
// The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 {
reject = true
}
}
}
// Ignore maxPeers if this is a trusted peer
if !peer.Peer.Info().Network.Trusted {
if reject || h.peers.Len() >= h.maxPeers {
if reject || h.peers.len() >= h.maxPeers {
return p2p.DiscTooManyPeers
}
}
peer.Log().Debug("Ethereum peer connected", "name", peer.Name())

// Register the peer locally
if err := h.peers.registerEthPeer(peer); err != nil {
if err := h.peers.registerPeer(peer, snap); err != nil {
peer.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer h.removePeer(peer.ID())

p := h.peers.ethPeer(peer.ID())
p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("peer dropped during handling")
}
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil {
peer.Log().Error("Failed to register peer in eth syncer", "err", err)
return err
}
if snap != nil {
if err := h.downloader.SnapSyncer.Register(snap); err != nil {
peer.Log().Error("Failed to register peer in snap syncer", "err", err)
return err
}
}
h.chainSync.handlePeerEvent(peer)

// Propagate existing transactions. new transactions appearing
Expand Down Expand Up @@ -317,25 +335,23 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return handler(peer)
}

// runSnapPeer
func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error {
// runSnapExtension registers a `snap` peer into the joint eth/snap peerset and
// starts handling inbound messages. As `snap` is only a satellite protocol to
// `eth`, all subsystem registrations and lifecycle management will be done by
// the main `eth` handler to prevent strange races.
func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
h.peerWG.Add(1)
defer h.peerWG.Done()

// Register the peer locally
if err := h.peers.registerSnapPeer(peer); err != nil {
peer.Log().Error("Snapshot peer registration failed", "err", err)
return err
}
defer h.removePeer(peer.ID())

if err := h.downloader.SnapSyncer.Register(peer); err != nil {
if err := h.peers.registerSnapExtension(peer); err != nil {
peer.Log().Error("Snapshot extension registration failed", "err", err)
return err
}
// Handle incoming messages until the connection is torn down
return handler(peer)
}

// removePeer unregisters a peer from the downloader and fetchers, removes it from
// the set of tracked peers and closes the network connection to it.
func (h *handler) removePeer(id string) {
// Create a custom logger to avoid printing the entire id
var logger log.Logger
Expand All @@ -345,33 +361,27 @@ func (h *handler) removePeer(id string) {
} else {
logger = log.New("peer", id[:8])
}
// Remove the eth peer if it exists
eth := h.peers.ethPeer(id)
if eth != nil {
logger.Debug("Removing Ethereum peer")
h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id)

if err := h.peers.unregisterEthPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
// Abort if the peer does not exist
peer := h.peers.peer(id)
if peer == nil {
logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered)
return
}
// Remove the snap peer if it exists
snap := h.peers.snapPeer(id)
if snap != nil {
logger.Debug("Removing Snapshot peer")
// Remove the `eth` peer if it exists
logger.Debug("Removing Ethereum peer", "snap", peer.snapExt != nil)

// Remove the `snap` extension if it exists
if peer.snapExt != nil {
h.downloader.SnapSyncer.Unregister(id)
if err := h.peers.unregisterSnapPeer(id); err != nil {
logger.Error("Snapshot peer removel failed", "err", err)
}
}
// Hard disconnect at the networking layer
if eth != nil {
eth.Peer.Disconnect(p2p.DiscUselessPeer)
}
if snap != nil {
snap.Peer.Disconnect(p2p.DiscUselessPeer)
h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id)

if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
// Hard disconnect at the networking layer
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}

func (h *handler) Start(maxPeers int) {
Expand Down Expand Up @@ -417,7 +427,7 @@ func (h *handler) Stop() {
// will only announce its availability (depending what's requested).
func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := h.peers.ethPeersWithoutBlock(hash)
peers := h.peers.peersWithoutBlock(hash)

// If propagation is requested, send to a subset of the peer
if propagate {
Expand Down Expand Up @@ -456,7 +466,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool)
// Broadcast transactions to a batch of peers not knowing about it
if propagate {
for _, tx := range txs {
peers := h.peers.ethPeersWithoutTransaction(tx.Hash())
peers := h.peers.peersWithoutTransaction(tx.Hash())

// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
Expand All @@ -472,7 +482,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool)
}
// Otherwise only broadcast the announcement to peers
for _, tx := range txs {
peers := h.peers.ethPeersWithoutTransaction(tx.Hash())
peers := h.peers.peersWithoutTransaction(tx.Hash())
for _, peer := range peers {
annos[peer] = append(annos[peer], tx.Hash())
}
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {

// PeerInfo retrieves all known `eth` information about a peer.
func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.ethPeer(id.String()); p != nil {
if p := h.peers.peer(id.String()); p != nil {
return p.info()
}
return nil
Expand Down Expand Up @@ -107,7 +107,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// handleHeaders is invoked from a peer's message handler when it transmits a batch
// of headers for the local node to process.
func (h *ethHandler) handleHeaders(peer *eth.Peer, headers []*types.Header) error {
p := h.peers.ethPeer(peer.ID())
p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("unregistered during callback")
}
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,11 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo

// Verify that the remote peer is maintained or dropped
if drop {
if peers := handler.handler.peers.Len(); peers != 0 {
if peers := handler.handler.peers.len(); peers != 0 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
}
} else {
if peers := handler.handler.peers.Len(); peers != 1 {
if peers := handler.handler.peers.len(); peers != 1 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 1)
}
}
Expand Down
8 changes: 5 additions & 3 deletions eth/handler_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ func (h *snapHandler) Chain() *core.BlockChain { return h.chain }

// RunPeer is invoked when a peer joins on the `snap` protocol.
func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error {
return (*handler)(h).runSnapPeer(peer, hand)
return (*handler)(h).runSnapExtension(peer, hand)
}

// PeerInfo retrieves all known `snap` information about a peer.
func (h *snapHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.snapPeer(id.String()); p != nil {
return p.info()
if p := h.peers.peer(id.String()); p != nil {
if p.snapExt != nil {
return p.snapExt.info()
}
}
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ type ethPeerInfo struct {
// ethPeer is a wrapper around eth.Peer to maintain a few extra metadata.
type ethPeer struct {
*eth.Peer
snapExt *snapPeer // Satellite `snap` connection

syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
lock sync.RWMutex // Mutex protecting the internal fields
syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
snapWait chan struct{} // Notification channel for snap connections
lock sync.RWMutex // Mutex protecting the internal fields
}

// info gathers and returns some `eth` protocol metadata known about a peer.
Expand All @@ -61,9 +63,6 @@ type snapPeerInfo struct {
// snapPeer is a wrapper around snap.Peer to maintain a few extra metadata.
type snapPeer struct {
*snap.Peer

ethDrop *time.Timer // Connection dropper if `eth` doesn't connect in time
lock sync.RWMutex // Mutex protecting the internal fields
}

// info gathers and returns some `snap` protocol metadata known about a peer.
Expand Down
Loading