Skip to content

Commit bcb2594

Browse files
authored
swarm/network: rewrite of peer suggestion engine, fix skipped tests (#18404)
* swarm/network: fix skipped tests related to suggestPeer * swarm/network: rename depth to radius * swarm/network: uncomment assertHealth and improve comments * swarm/network: remove commented code * swarm/network: kademlia suggestPeer algo correction * swarm/network: kademlia suggest peer * simplify suggest Peer code * improve peer suggestion algo * add comments * kademlia testing improvements * assertHealth -> checkHealth (test helper) * testSuggestPeer -> checkSuggestPeer (test helper) * remove testSuggestPeerBug and TestKademliaCase * swarm/network: kademlia suggestPeer cleanup, improved comments * swarm/network: minor comment, discovery test default arg
1 parent 34f11e7 commit bcb2594

File tree

3 files changed

+287
-667
lines changed

3 files changed

+287
-667
lines changed

swarm/network/kademlia.go

Lines changed: 133 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -168,85 +168,118 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
168168
return nil
169169
}
170170

171-
// SuggestPeer returns a known peer for the lowest proximity bin for the
172-
// lowest bincount below depth
173-
// naturally if there is an empty row it returns a peer for that
174-
func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) {
171+
// SuggestPeer returns an unconnected peer address as a peer suggestion for connection
172+
func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
175173
k.lock.Lock()
176174
defer k.lock.Unlock()
177-
minsize := k.MinBinSize
178-
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
179-
// if there is a callable neighbour within the current proxBin, connect
180-
// this makes sure nearest neighbour set is fully connected
181-
var ppo int
182-
k.addrs.EachNeighbour(k.base, Pof, func(val pot.Val, po int) bool {
183-
if po < depth {
184-
return false
175+
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
176+
// collect undersaturated bins in ascending order of number of connected peers
177+
// and from shallow to deep (ascending order of PO)
178+
// insert them in a map of bin arrays, keyed with the number of connected peers
179+
saturation := make(map[int][]int)
180+
var lastPO int // the last non-empty PO bin in the iteration
181+
saturationDepth = -1 // the deepest PO such that all shallower bins have >= k.MinBinSize peers
182+
var pastDepth bool // whether po of iteration >= depth
183+
k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
184+
// process skipped empty bins
185+
for ; lastPO < po; lastPO++ {
186+
// find the lowest unsaturated bin
187+
if saturationDepth == -1 {
188+
saturationDepth = lastPO
189+
}
190+
// if there is an empty bin, depth is surely passed
191+
pastDepth = true
192+
saturation[0] = append(saturation[0], lastPO)
185193
}
186-
e := val.(*entry)
187-
c := k.callable(e)
188-
if c {
189-
a = e.BzzAddr
194+
lastPO = po + 1
195+
// past radius, depth is surely passed
196+
if po >= radius {
197+
pastDepth = true
190198
}
191-
ppo = po
192-
return !c
193-
})
194-
if a != nil {
195-
log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo))
196-
return a, 0, false
197-
}
198-
199-
var bpo []int
200-
prev := -1
201-
k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
202-
prev++
203-
for ; prev < po; prev++ {
204-
bpo = append(bpo, prev)
205-
minsize = 0
199+
// beyond depth the bin is treated as unsaturated even if size >= k.MinBinSize
200+
// in order to achieve full connectivity to all neighbours
201+
if pastDepth && size >= k.MinBinSize {
202+
size = k.MinBinSize - 1
206203
}
207-
if size < minsize {
208-
bpo = append(bpo, po)
209-
minsize = size
204+
// process non-empty unsaturated bins
205+
if size < k.MinBinSize {
206+
// find the lowest unsaturated bin
207+
if saturationDepth == -1 {
208+
saturationDepth = po
209+
}
210+
saturation[size] = append(saturation[size], po)
210211
}
211-
return size > 0 && po < depth
212+
return true
213+
})
214+
// to trigger peer requests for peers closer than closest connection, include
215+
// all bins from nearest connection upto nearest address as unsaturated
216+
var nearestAddrAt int
217+
k.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool {
218+
nearestAddrAt = po
219+
return false
212220
})
213-
// all buckets are full, ie., minsize == k.MinBinSize
214-
if len(bpo) == 0 {
221+
// including bins as size 0 has the effect that requesting connection
222+
// is prioritised over non-empty shallower bins
223+
for ; lastPO <= nearestAddrAt; lastPO++ {
224+
saturation[0] = append(saturation[0], lastPO)
225+
}
226+
// all PO bins are saturated, ie., minsize >= k.MinBinSize, no peer suggested
227+
if len(saturation) == 0 {
215228
return nil, 0, false
216229
}
217-
// as long as we got candidate peers to connect to
218-
// dont ask for new peers (want = false)
219-
// try to select a candidate peer
220-
// find the first callable peer
221-
nxt := bpo[0]
222-
k.addrs.EachBin(k.base, Pof, nxt, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
223-
// for each bin (up until depth) we find callable candidate peers
224-
if po >= depth {
225-
return false
230+
// find the first callable peer in the address book
231+
// starting from the bins with smallest size proceeding from shallow to deep
232+
// for each bin (up until neighbourhood radius) we find callable candidate peers
233+
for size := 0; size < k.MinBinSize && suggestedPeer == nil; size++ {
234+
bins, ok := saturation[size]
235+
if !ok {
236+
// no bin with this size
237+
continue
226238
}
227-
return f(func(val pot.Val) bool {
228-
e := val.(*entry)
229-
c := k.callable(e)
230-
if c {
231-
a = e.BzzAddr
239+
cur := 0
240+
curPO := bins[0]
241+
k.addrs.EachBin(k.base, Pof, curPO, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
242+
curPO = bins[cur]
243+
// find the next bin that has size size
244+
if curPO == po {
245+
cur++
246+
} else {
247+
// skip bins that have no addresses
248+
for ; cur < len(bins) && curPO < po; cur++ {
249+
curPO = bins[cur]
250+
}
251+
if po < curPO {
252+
cur--
253+
return true
254+
}
255+
// stop if there are no addresses
256+
if curPO < po {
257+
return false
258+
}
232259
}
233-
return !c
260+
// curPO found
261+
// find a callable peer out of the addresses in the unsaturated bin
262+
// stop if found
263+
f(func(val pot.Val) bool {
264+
e := val.(*entry)
265+
if k.callable(e) {
266+
suggestedPeer = e.BzzAddr
267+
return false
268+
}
269+
return true
270+
})
271+
return cur < len(bins) && suggestedPeer == nil
234272
})
235-
})
236-
// found a candidate
237-
if a != nil {
238-
return a, 0, false
239273
}
240-
// no candidate peer found, request for the short bin
241-
var changed bool
242-
if uint8(nxt) < k.depth {
243-
k.depth = uint8(nxt)
244-
changed = true
274+
275+
if uint8(saturationDepth) < k.depth {
276+
k.depth = uint8(saturationDepth)
277+
return suggestedPeer, saturationDepth, true
245278
}
246-
return a, nxt, changed
279+
return suggestedPeer, 0, false
247280
}
248281

249-
// On inserts the peer as a kademlia peer into the live peers
282+
// On inserts the peer as a kademlia peer into the live peers
250283
func (k *Kademlia) On(p *Peer) (uint8, bool) {
251284
k.lock.Lock()
252285
defer k.lock.Unlock()
@@ -398,29 +431,25 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
398431
})
399432
}
400433

434+
// NeighbourhoodDepth returns the depth for the pot, see depthForPot
401435
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
402436
k.lock.RLock()
403437
defer k.lock.RUnlock()
404438
return depthForPot(k.conns, k.NeighbourhoodSize, k.base)
405439
}
406440

407-
// depthForPot returns the proximity order that defines the distance of
408-
// the nearest neighbour set with cardinality >= NeighbourhoodSize
409-
// if there is altogether less than NeighbourhoodSize peers it returns 0
441+
// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia
442+
// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize
443+
// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether
444+
// contain at least neighbourhoodSize connected peers
445+
// if there is altogether less than neighbourhoodSize peers connected, it returns 0
410446
// caller must hold the lock
411-
func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
447+
func neighbourhoodRadiusForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
412448
if p.Size() <= neighbourhoodSize {
413449
return 0
414450
}
415-
416451
// total number of peers in iteration
417452
var size int
418-
419-
// determining the depth is a two-step process
420-
// first we find the proximity bin of the shallowest of the NeighbourhoodSize peers
421-
// the numeric value of depth cannot be higher than this
422-
var maxDepth int
423-
424453
f := func(v pot.Val, i int) bool {
425454
// po == 256 means that addr is the pivot address(self)
426455
if i == 256 {
@@ -431,13 +460,30 @@ func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int
431460
// this means we have all nn-peers.
432461
// depth is by default set to the bin of the farthest nn-peer
433462
if size == neighbourhoodSize {
434-
maxDepth = i
463+
depth = i
435464
return false
436465
}
437466

438467
return true
439468
}
440469
p.EachNeighbour(pivotAddr, Pof, f)
470+
return depth
471+
}
472+
473+
// depthForPot returns the depth for the pot
474+
// depth is the radius of the minimal extension of nearest neighbourhood that
475+
// includes all empty PO bins. I.e., depth is the deepest PO such that
476+
// - it is not deeper than neighbourhood radius
477+
// - all bins shallower than depth are not empty
478+
// caller must hold the lock
479+
func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
480+
if p.Size() <= neighbourhoodSize {
481+
return 0
482+
}
483+
// determining the depth is a two-step process
484+
// first we find the proximity bin of the shallowest of the neighbourhoodSize peers
485+
// the numeric value of depth cannot be higher than this
486+
maxDepth := neighbourhoodRadiusForPot(p, neighbourhoodSize, pivotAddr)
441487

442488
// the second step is to test for empty bins in order from shallowest to deepest
443489
// if an empty bin is found, this will be the actual depth
@@ -627,23 +673,20 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
627673
return ppmap
628674
}
629675

630-
// saturation iterates through all peers and
631-
// returns the smallest po value in which the node has less than n peers
632-
// if the iterator reaches depth, then value for depth is returned
633-
// TODO move to separate testing tools file
634-
// TODO this function will stop at the first bin with less than MinBinSize peers, even if there are empty bins between that bin and the depth. This may not be correct behavior
676+
// saturation returns the smallest po value in which the node has less than MinBinSize peers
677+
// if the iterator reaches neighbourhood radius, then the last bin + 1 is returned
635678
func (k *Kademlia) saturation() int {
636679
prev := -1
637-
k.addrs.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
680+
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
681+
k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
638682
prev++
683+
if po >= radius {
684+
return false
685+
}
639686
return prev == po && size >= k.MinBinSize
640687
})
641-
// TODO evaluate whether this check cannot just as well be done within the eachbin
642-
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
643-
644-
// if in the iterator above we iterated deeper than the neighbourhood depth - return depth
645-
if depth < prev {
646-
return depth
688+
if prev < 0 {
689+
return 0
647690
}
648691
return prev
649692
}
@@ -745,6 +788,9 @@ type Health struct {
745788
func (k *Kademlia) Healthy(pp *PeerPot) *Health {
746789
k.lock.RLock()
747790
defer k.lock.RUnlock()
791+
if len(pp.NNSet) < k.NeighbourhoodSize {
792+
log.Warn("peerpot NNSet < NeighbourhoodSize")
793+
}
748794
gotnn, countgotnn, culpritsgotnn := k.connectedNeighbours(pp.NNSet)
749795
knownn, countknownn, culpritsknownn := k.knowNeighbours(pp.NNSet)
750796
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)

0 commit comments

Comments
 (0)