-
Notifications
You must be signed in to change notification settings - Fork 110
swarm/pss: forwarding function refactoring #1043
Changes from 11 commits
e52c5ec
8805f56
f48a3f3
af44980
f1eff7b
bcf32d2
dd027ed
7cd1483
f28c0ca
ad9e734
c8d6458
a7350bd
3e7aa78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| package pss | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/ethereum/go-ethereum/crypto" | ||
| "github.com/ethereum/go-ethereum/p2p" | ||
| "github.com/ethereum/go-ethereum/p2p/enode" | ||
| "github.com/ethereum/go-ethereum/p2p/protocols" | ||
| "github.com/ethereum/go-ethereum/swarm/network" | ||
| "github.com/ethereum/go-ethereum/swarm/pot" | ||
| whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" | ||
| ) | ||
|
|
||
| var testResMap map[pot.Address]int | ||
|
|
||
| // this function substitutes the real send function, since | ||
| // we only want to test the peer selection functionality | ||
| func dummySendMsg(_ *Pss, sp *network.Peer, _ *PssMsg) bool { | ||
| a := pot.NewAddressFromBytes(sp.Address()) | ||
| testResMap[a]++ | ||
| return true | ||
| } | ||
|
|
||
| // setDummySendMsg replaces sendMessage function for testing purposes | ||
|
||
| func setDummySendMsg() { | ||
| sendMessage = dummySendMsg | ||
| } | ||
|
|
||
| // resetSendMsgProduction resets sendMessage function to production version | ||
| func resetSendMsgProduction() { | ||
| sendMessage = sendMessageProd | ||
| } | ||
|
|
||
| // the purpose of this test is to see that pss.forward() function correctly | ||
| // selects the peers for message forwarding, depending on the message address | ||
| // and kademlia constellation. | ||
| func TestForwardBasic(t *testing.T) { | ||
nolash marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| setDummySendMsg() | ||
|
||
| defer resetSendMsgProduction() | ||
|
|
||
| baseAddrBytes := make([]byte, 32) | ||
| for i := 0; i < len(baseAddrBytes); i++ { | ||
| baseAddrBytes[i] = 0xFF | ||
| } | ||
| base := pot.NewAddressFromBytes(baseAddrBytes) | ||
| var peerAddresses []pot.Address | ||
| var a pot.Address | ||
| const depth = 10 | ||
| for i := 0; i <= depth; i++ { | ||
nolash marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // add one peer for each proximity order | ||
| a = pot.RandomAddressAt(base, i) | ||
| peerAddresses = append(peerAddresses, a) | ||
| } | ||
|
|
||
| // add one peer to the "depth" level, then skip one level, add one peer at one level below. | ||
| // as a result, we will have an edge case of three peers in nearest neighbours' bin. | ||
| peerAddresses = append(peerAddresses, pot.RandomAddressAt(base, depth)) | ||
nolash marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| peerAddresses = append(peerAddresses, pot.RandomAddressAt(base, depth+2)) | ||
|
|
||
| kad := network.NewKademlia(base[:], network.NewKadParams()) | ||
| ps := createPss(t, kad) | ||
| addPeers(kad, peerAddresses) | ||
|
|
||
| const firstNearest = depth // shallowest peer in the nearest neighbours' bin | ||
| nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} | ||
|
|
||
| for i := 0; i < len(peerAddresses); i++ { | ||
| // send msg directly to the known peers (recipient address == peer address) | ||
| testForwardMsg(100+i, t, ps, peerAddresses[i][:], peerAddresses, []int{i}) | ||
| } | ||
|
|
||
| for i := 0; i < firstNearest; i++ { | ||
| // send random messages with proximity orders, corresponding to PO of each bin | ||
| a = pot.RandomAddressAt(base, i) | ||
| testForwardMsg(200+i, t, ps, a[:], peerAddresses, []int{i}) | ||
| } | ||
|
|
||
| for i := firstNearest; i < len(peerAddresses); i++ { | ||
| // recipient address falls into the nearest neighbours' bin | ||
| a = pot.RandomAddressAt(base, i) | ||
| testForwardMsg(300+i, t, ps, a[:], peerAddresses, nearestNeighbours) | ||
| } | ||
|
|
||
| // send msg with proximity order much deeper than the deepest nearest neighbour | ||
| a = pot.RandomAddressAt(base, 77) | ||
| testForwardMsg(400, t, ps, a[:], peerAddresses, nearestNeighbours) | ||
|
|
||
| // test with partial addresses | ||
| const part = 12 | ||
|
|
||
| for i := 0; i < firstNearest; i++ { | ||
| // send messages with partial address falling into different proximity orders | ||
| if i%8 != 0 { | ||
| testForwardMsg(500+i, t, ps, peerAddresses[i][:i], peerAddresses, []int{i}) | ||
| } | ||
| testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{i}) | ||
| } | ||
|
|
||
| for i := firstNearest; i < len(peerAddresses); i++ { | ||
| // partial address falls into the nearest neighbours' bin | ||
| testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours) | ||
| } | ||
|
|
||
| // partial address with proximity order deeper than any of the nearest neighbour | ||
| a = pot.RandomAddressAt(base, part) | ||
| testForwardMsg(700, t, ps, a[:part], peerAddresses, nearestNeighbours) | ||
|
|
||
| // special cases where partial address matches a large group of peers | ||
| all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} | ||
| testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) | ||
|
|
||
| // luminous radius of one byte (8 bits) | ||
|
||
| testForwardMsg(900, t, ps, baseAddrBytes[:1], peerAddresses, all[8:]) | ||
|
||
| } | ||
|
|
||
| // this function tests the forwarding of a single message. the recipient address is passed as param, | ||
| // along with addresses of all peers, and indices of those peers which are expected to receive the message. | ||
| func testForwardMsg(testID int, t *testing.T, ps *Pss, recipientAddr []byte, peers []pot.Address, expected []int) { | ||
| testResMap = make(map[pot.Address]int) | ||
| msg := newTestMsg(recipientAddr) | ||
| ps.forward(msg) | ||
|
|
||
| // check test results | ||
| var fail bool | ||
| s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", testID, recipientAddr[:len(recipientAddr)%4], 8*len(recipientAddr)) | ||
|
|
||
| // false negatives (expected message didn't reach peer) | ||
| for _, i := range expected { | ||
| a := peers[i] | ||
| received := testResMap[a] | ||
| if received != 1 { | ||
| s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received) | ||
| fail = true | ||
| } | ||
| testResMap[a] = 0 | ||
| } | ||
|
|
||
| // false positives (unexpected message reached peer) | ||
| for k, v := range testResMap { | ||
nolash marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if v != 0 { | ||
| // find the index of the false positive peer | ||
| var j int | ||
| for j = 0; j < len(peers); j++ { | ||
| if peers[j] == k { | ||
| break | ||
| } | ||
| } | ||
| s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", j, k[:4], v) | ||
| fail = true | ||
| } | ||
| } | ||
|
|
||
| if fail { | ||
| t.Fatal(s) | ||
| } | ||
| } | ||
|
|
||
| func addPeers(kad *network.Kademlia, addresses []pot.Address) { | ||
| for _, a := range addresses { | ||
| p := newTestDiscoveryPeer(a, kad) | ||
| kad.On(p) | ||
| } | ||
| } | ||
|
|
||
| func createPss(t *testing.T, kad *network.Kademlia) *Pss { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because i wanted to use specific sttings (e.g. base address)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand, but now we have at least three different functions for setting up pss :'( How about expanding the existing one with a chained method? This is a convention we've agreed to use in Swarm. |
||
| privKey, err := crypto.GenerateKey() | ||
| pssp := NewPssParams().WithPrivateKey(privKey) | ||
| ps, err := NewPss(kad, pssp) | ||
| if err != nil { | ||
| t.Fatal(err.Error()) | ||
| } | ||
| return ps | ||
| } | ||
|
|
||
| func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer { | ||
| rw := &p2p.MsgPipeRW{} | ||
| p := p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}) | ||
| pp := protocols.NewPeer(p, rw, &protocols.Spec{}) | ||
| bp := &network.BzzPeer{ | ||
| Peer: pp, | ||
| BzzAddr: &network.BzzAddr{ | ||
| OAddr: addr.Bytes(), | ||
| UAddr: []byte(fmt.Sprintf("%x", addr[:])), | ||
| }, | ||
| } | ||
| return network.NewPeer(bp, kad) | ||
| } | ||
|
|
||
| func newTestMsg(addr []byte) *PssMsg { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see code already existing in Also, I suggest it should generate random data for the payload data, and take a topic param. Refactoring all that is of course not within scope of this PR. Perhaps as a first step towards consolidation is that we put this method (and similar generic methods) in a file called
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gluk256 You didn't want to do anything with this? |
||
| msg := newPssMsg(&msgParams{}) | ||
| msg.To = addr[:] | ||
| msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) | ||
| msg.Payload = &whisper.Envelope{ | ||
| Topic: [4]byte{}, | ||
| Data: []byte("i have nothing to hide"), | ||
nolash marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| return msg | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -886,68 +886,96 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by | |
| return nil | ||
| } | ||
|
|
||
| // Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct | ||
| // The recipient address can be of any length, and the byte slice will be matched to the MSB slice | ||
| // of the peer address of the equivalent length. | ||
| // sendMessage is a helper function that tries to send a message and returns true on success | ||
|
||
| // It is set in the init function for usage in production, and optionally overridden in tests | ||
| // for data validation. | ||
| var sendMessage func(p *Pss, sp *network.Peer, msg *PssMsg) bool | ||
|
|
||
| func init() { | ||
| sendMessage = sendMessageProd | ||
| } | ||
|
|
||
| // tries to send a message, returns true if successful | ||
| func sendMessageProd(p *Pss, sp *network.Peer, msg *PssMsg) bool { | ||
|
||
| var isPssEnabled bool | ||
| info := sp.Info() | ||
| for _, capability := range info.Caps { | ||
| if capability == p.capstring { | ||
| isPssEnabled = true | ||
| break | ||
| } | ||
| } | ||
| if !isPssEnabled { | ||
| log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) | ||
| return false | ||
| } | ||
|
|
||
| // get the protocol peer from the forwarding peer cache | ||
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
|
|
||
| err := pp.Send(context.TODO(), msg) | ||
gluk256 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if err != nil { | ||
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| } | ||
|
|
||
| return err == nil | ||
gluk256 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // Forwards a pss message to the peer(s) based on recipient address according to the algorithm | ||
| // described below. The recipient address can be of any length, and the byte slice will be matched | ||
| // to the MSB slice of the peer address of the equivalent length. | ||
| // | ||
| // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding | ||
| // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of | ||
| // partial address, it should be forwarded to all the peers matching the partial address, if there | ||
| // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message | ||
| // forwarding fails, the node should try to forward it to the next best peer, until the message is | ||
| // successfully forwarded to at least one peer. | ||
| func (p *Pss) forward(msg *PssMsg) error { | ||
| metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | ||
|
|
||
| sent := 0 // number of successful sends | ||
| to := make([]byte, addressLength) | ||
| copy(to[:len(msg.To)], msg.To) | ||
| neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() | ||
|
|
||
| // send with kademlia | ||
| // find the closest peer to the recipient and attempt to send | ||
| sent := 0 | ||
| p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool { | ||
| info := sp.Info() | ||
|
|
||
| // check if the peer is running pss | ||
| var ispss bool | ||
| for _, cap := range info.Caps { | ||
| if cap == p.capstring { | ||
| ispss = true | ||
| break | ||
| } | ||
| } | ||
| if !ispss { | ||
| log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) | ||
| return true | ||
| } | ||
| // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, | ||
| // but the luminosity is less. here luminosity equals the number of bits given in the destination address. | ||
| luminosityRadius := len(msg.To) * 8 | ||
|
|
||
| // get the protocol peer from the forwarding peer cache | ||
| sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address()) | ||
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth) | ||
| pof := pot.DefaultPof(neighbourhoodDepth) | ||
|
|
||
| // attempt to send the message | ||
| err := pp.Send(context.TODO(), msg) | ||
| if err != nil { | ||
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| return true | ||
| // soft threshold for msg broadcast | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I promise to think of a more elaborate and enlightening comment here, but let's not let that stop the merging.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| broadcastThreshold, _ := pof(to, p.BaseAddr(), 0) | ||
| if broadcastThreshold > luminosityRadius { | ||
| broadcastThreshold = luminosityRadius | ||
| } | ||
|
|
||
| // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn | ||
| // call below), then peers that fall in the same proximity bin as recipient address will appear | ||
| // [at least] one bit closer, but only if these additional bits are given in the recipient address. | ||
| if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth { | ||
| broadcastThreshold++ | ||
| } | ||
|
|
||
| p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { | ||
| if po < broadcastThreshold && sent > 0 { | ||
| return false // stop iterating | ||
| } | ||
| sent++ | ||
| log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg)) | ||
|
|
||
| // continue forwarding if: | ||
| // - if the peer is end recipient but the full address has not been disclosed | ||
| // - if the peer address matches the partial address fully | ||
| // - if the peer is in proxbin | ||
| if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) { | ||
| log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match")) | ||
| return true | ||
| } else if isproxbin { | ||
| log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address()))) | ||
| return true | ||
| if sendMessage(p, sp, msg) { | ||
| sent++ | ||
| if po == addressLength*8 { | ||
| // stop iterating if successfully sent to the exact recipient (perfect match of full address) | ||
| return false | ||
| } | ||
| } | ||
| // at this point we stop forwarding, and the state is as follows: | ||
| // - the peer is end recipient and we have full address | ||
| // - we are not in proxbin (directed routing) | ||
| // - partial addresses don't fully match | ||
| return false | ||
| return true | ||
| }) | ||
|
|
||
| // if we failed to send to anyone, re-insert message in the send-queue | ||
| if sent == 0 { | ||
| log.Debug("unable to forward to any peers") | ||
| if err := p.enqueue(msg); err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just Hex the Address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't understand: what should be done here? and what's wrong with existing implementation?