Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions swarm/pss/forwarding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package pss

import (
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/pot"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)

var testResMap map[pot.Address]int

// this function substitutes the real send function, since
// we only want to test the peer selection functionality
func dummySendMsg(_ *Pss, sp *network.Peer, _ *PssMsg) bool {
a := pot.NewAddressFromBytes(sp.Address())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just Hex the Address

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand: what should be done here? and what's wrong with existing implementation?

testResMap[a]++
return true
}

// setDummySendMsg replaces sendMessage function for testing purposes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you dont need these functions, just use this pattern:

in the code:

// default value for production
var sendFunc = send

// the send function used as arg to forward in production via assignment to `sendFunc`
func send(...) ...

in the test:

testSendFunc := func(.....
...
}
// this defer does closure with the current value of sendFunc so it will reset to its orig value when test returns
defer func(t) { sendFunc = t }(sendFunc)
sendFunc = testSendFunc

func setDummySendMsg() {
sendMessage = dummySendMsg
}

// resetSendMsgProduction resets sendMessage function to production version
func resetSendMsgProduction() {
sendMessage = sendMessageProd
}

// the purpose of this test is to see that pss.forward() function correctly
// selects the peers for message forwarding, depending on the message address
// and kademlia constellation.
func TestForwardBasic(t *testing.T) {
setDummySendMsg()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment above for a simpler pattern to use here

defer resetSendMsgProduction()

base := newBaseAddress() // 0xFFFFFF.......
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't just using zeros be easier?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we compare distances (xor), it's easier for me to identify 1s then 0s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, yeah I sympathize. But for people reading the code later, though, I think the more browsing back and forth in the code we force them to do, the harder it is to get the overview.

var peerAddresses []pot.Address
var dst pot.Address
const depth = 9
for i := 0; i <= depth; i++ {
// add two peers for each proximity order (same as in live system)
a := pot.RandomAddressAt(base, i)
peerAddresses = append(peerAddresses, a)
a = pot.RandomAddressAt(base, i)
peerAddresses = append(peerAddresses, a)
}

// skip one level, add one peer at one level below
a := pot.RandomAddressAt(base, depth+2)
peerAddresses = append(peerAddresses, a)

kad := network.NewKademlia(base[:], network.NewKadParams())
ps := createPss(t, kad)
addPeers(kad, peerAddresses)

const firstNearest = depth * 2 // first peer in the nearest neighbours' bin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be easier to understand if we just use literal numbers?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it would be very difficult to change the test. remembering all the magic numbers is impossible. now you only need to change one constant (depth).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Should we maybe add a comment saying "add some peers far away from the others". depth * 2 is a bit obscure, no? :)

nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2}

for i := 0; i < len(peerAddresses); i++ {
// send msg directly to the known peers (recipient address == peer address)
testForwardMsg(100+i, t, ps, peerAddresses[i][:], peerAddresses, []int{i})
}

for i := 0; i < firstNearest; i++ {
// send random messages with different proximity orders
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not just different, but every bin in order even.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added two peers for no other reason than to simulate the system as close to production as possible. concerning the firstNearest = depth * 2, this formula helps to change the depth var, without need to go through the code and fix all other variables as well.

po := i / 2
dst := pot.RandomAddressAt(base, po)
testForwardMsg(200+i, t, ps, dst[:], peerAddresses, []int{po * 2, po*2 + 1})
}

for i := firstNearest; i < len(peerAddresses); i++ {
// recipient address falls into the nearest neighbours' bin
dst := pot.RandomAddressAt(base, i)
testForwardMsg(300+i, t, ps, dst[:], peerAddresses, nearestNeighbours)
}

// send msg with proximity order higher than the last nearest neighbour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ambiguous. Is "last" deeper or shallower?

dst = pot.RandomAddressAt(base, 29)
testForwardMsg(400, t, ps, dst[:], peerAddresses, nearestNeighbours)

// test with partial addresses
const part = 12

for i := 0; i < firstNearest; i++ {
// send messages with partial address falling into different proximity orders
po := i / 2
if po%8 != 0 {
testForwardMsg(500+i, t, ps, peerAddresses[i][:po], peerAddresses, []int{po * 2, po*2 + 1})
}
testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{po * 2, po*2 + 1})
}

for i := firstNearest; i < len(peerAddresses); i++ {
// partial address falls into the nearest neighbours' bin
testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours)
}

// partial address with proximity order higher than the last nearest neighbour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too; higher

dst = pot.RandomAddressAt(base, part)
testForwardMsg(700, t, ps, dst[:part], peerAddresses, nearestNeighbours)

// special cases where partial address matches a large group of peers
all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
testForwardMsg(800, t, ps, []byte{}, peerAddresses, all)
testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:])
}

// this function tests the forwarding of a single message. the recipient address is passed as param,
// along with addreses of all peers, and indexes of those peers which are expected to receive the message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addresses, indexes -> indices

func testForwardMsg(testID int, t *testing.T, ps *Pss, recipientAddr []byte, peers []pot.Address, expected []int) {
testResMap = make(map[pot.Address]int)
msg := newTestMsg(recipientAddr)
ps.forward(msg)

// check test results
var fail bool
s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", testID, recipientAddr[:len(recipientAddr)%4], 8*len(recipientAddr))

// false negatives (expected message didn't reach peer)
for _, i := range expected {
a := peers[i]
received := testResMap[a]
if received != 1 {
s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received)
fail = true
}
testResMap[a] = 0
}

// false positives (unexpected message reached peer)
for k, v := range testResMap {
if v != 0 {
// find the index of the false positive peer
var j int
for j = 0; j < len(peers); j++ {
if peers[j] == k {
break
}
}
s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", j, k[:4], v)
fail = true
}
}

if fail {
t.Fatal(s)
}
}

func addPeers(kad *network.Kademlia, addresses []pot.Address) {
for _, a := range addresses {
p := newTestDiscoveryPeer(a, kad)
kad.On(p)
}
}

func createPss(t *testing.T, kad *network.Kademlia) *Pss {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use pss_test.go:newTestPss()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because i wanted to use specific sttings (e.g. base address)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but now we have at least three different functions for setting up pss :'(

How about expanding the existing one with a chained method? This is a convention we've agreed to use in Swarm.

privKey, err := crypto.GenerateKey()
pssp := NewPssParams().WithPrivateKey(privKey)
ps, err := NewPss(kad, pssp)
if err != nil {
t.Fatal(err.Error())
}
return ps
}

func newBaseAddress() pot.Address {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you want to keep this, then?

base := make([]byte, 32)
for i := 0; i < len(base); i++ {
base[i] = 0xFF
}
return pot.NewAddressFromBytes(base)
}

func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer {
rw := &p2p.MsgPipeRW{}
p := p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{})
pp := protocols.NewPeer(p, rw, &protocols.Spec{})
bp := &network.BzzPeer{
Peer: pp,
BzzAddr: &network.BzzAddr{
OAddr: addr.Bytes(),
UAddr: []byte(fmt.Sprintf("%x", addr[:])),
},
}
return network.NewPeer(bp, kad)
}

func newTestMsg(addr []byte) *PssMsg {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see code already existing in pss_test.go etc already creates messages inline in the test. Having a testmessage "factory" is fine, but then we should endeavour to use it the same way everywhere.

Also, I suggest it should generate random data for the payload data, and take a topic param.

Refactoring all that is of course not within scope of this PR.

Perhaps as a first step towards consolidation is that we put this method (and similar generic methods) in a file called common_test.go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gluk256 You didn't want to do anything with this?

msg := newPssMsg(&msgParams{})
msg.To = addr[:]
msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
msg.Payload = &whisper.Envelope{
Topic: [4]byte{},
Data: []byte("i have nothing to hide"),
}
return msg
}
130 changes: 79 additions & 51 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,68 +886,96 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
return nil
}

// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
// sendMessage is a helper function that tries to send a message and returns true on success
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it sendFunc and see the pattern proposed

// It is set in the init function for usage in production, and optionally overridden in tests
// for data validation.
var sendMessage func(p *Pss, sp *network.Peer, msg *PssMsg) bool

func init() {
sendMessage = sendMessageProd
}

// tries to send a message, returns true if successful
func sendMessageProd(p *Pss, sp *network.Peer, msg *PssMsg) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send or sendMsg pls

var isPssEnabled bool
info := sp.Info()
for _, capability := range info.Caps {
if capability == p.capstring {
isPssEnabled = true
break
}
}
if !isPssEnabled {
log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
return false
}

// get the protocol peer from the forwarding peer cache
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()

err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
}

return err == nil
}

// Forwards a pss message to the peer(s) based on recipient address according to the algorithm
// described below. The recipient address can be of any length, and the byte slice will be matched
// to the MSB slice of the peer address of the equivalent length.
//
// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
// partial address, it should be forwarded to all the peers matching the partial address, if there
// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
// forwarding fails, the node should try to forward it to the next best peer, until the message is
// successfully forwarded to at least one peer.
func (p *Pss) forward(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)

sent := 0 // number of successful sends
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()

// send with kademlia
// find the closest peer to the recipient and attempt to send
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()

// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
if cap == p.capstring {
ispss = true
break
}
}
if !ispss {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
return true
}
// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
// but the luminosity is less. here luminosity equals the number of bits given in the destination address.
luminosityRadius := len(msg.To) * 8

// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()
// proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
pof := pot.DefaultPof(neighbourhoodDepth)

// attempt to send the message
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
// soft threshold for msg broadcast
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I promise to think of a more elaborate and enlightening comment here, but let's not let that stop the merging.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
if broadcastThreshold > luminosityRadius {
broadcastThreshold = luminosityRadius
}

// if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
// call below), then peers that fall in the same proximity bin as recipient address will appear
// [at least] one bit closer, but only if these additional bits are given in the recipient address.
if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
broadcastThreshold++
}

p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
if po < broadcastThreshold && sent > 0 {
return false // stop iterating
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))

// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true
if sendMessage(p, sp, msg) {
sent++
if po == addressLength*8 {
// stop iterating if successfully sent to the exact recipient (perfect match of full address)
return false
}
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false
return true
})

// if we failed to send to anyone, re-insert message in the send-queue
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
Expand Down