Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 11f1528

Browse files
committed
swarm/pss: Add deliver to self on prox topics
1 parent d533da4 commit 11f1528

File tree

2 files changed

+146
-3
lines changed

2 files changed

+146
-3
lines changed

swarm/pss/pss.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,6 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
499499
depth, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
500500
log.Trace("selfpossible", "minprox", minProx, "depth", depth)
501501

502-
log.Debug("here")
503502
if minProx <= depth {
504503
return true
505504
}
@@ -752,7 +751,14 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
752751
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
753752
pssMsg.Payload = payload
754753
p.addFwdCache(pssMsg)
755-
return p.enqueue(pssMsg)
754+
err := p.enqueue(pssMsg)
755+
if err != nil {
756+
return err
757+
}
758+
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 {
759+
return p.process(pssMsg, true, true)
760+
}
761+
return nil
756762
}
757763

758764
// Send a message using symmetric encryption
@@ -853,7 +859,14 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
853859
pssMsg.To = to
854860
pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
855861
pssMsg.Payload = envelope
856-
return p.enqueue(pssMsg)
862+
err = p.enqueue(pssMsg)
863+
if err != nil {
864+
return err
865+
}
866+
if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 {
867+
return p.process(pssMsg, true, true)
868+
}
869+
return nil
857870
}
858871

859872
// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct

swarm/pss/pss_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,136 @@ func TestAddressMatch(t *testing.T) {
315315

316316
}
317317

318+
func TestProxShortCircuit(t *testing.T) {
319+
320+
// sender node address
321+
localAddr := network.RandomAddr().Over()
322+
localPotAddr := pot.NewAddressFromBytes(localAddr)
323+
324+
// set up kademlia
325+
kadParams := network.NewKadParams()
326+
kad := network.NewKademlia(localAddr, kadParams)
327+
peerCount := kad.MinBinSize + 1
328+
329+
// set up pss
330+
privKey, err := crypto.GenerateKey()
331+
pssp := NewPssParams().WithPrivateKey(privKey)
332+
ps, err := NewPss(kad, pssp)
333+
if err != nil {
334+
t.Fatal(err.Error())
335+
}
336+
337+
// create kademlia peers, so we have peers both inside and outside minproxlimit
338+
var peers []*network.Peer
339+
proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes()
340+
distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes()
341+
342+
for i := 0; i < peerCount; i++ {
343+
rw := &p2p.MsgPipeRW{}
344+
ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{})
345+
protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
346+
peerAddr := pot.RandomAddressAt(localPotAddr, i)
347+
bzzPeer := &network.BzzPeer{
348+
Peer: protoPeer,
349+
BzzAddr: &network.BzzAddr{
350+
OAddr: peerAddr.Bytes(),
351+
UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
352+
},
353+
}
354+
peer := network.NewPeer(bzzPeer, kad)
355+
kad.On(peer)
356+
peers = append(peers, peer)
357+
}
358+
359+
// register it marking prox capability
360+
delivered := make(chan struct{})
361+
rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
362+
log.Trace("in allowraw handler")
363+
delivered <- struct{}{}
364+
return nil
365+
}
366+
topic := BytesToTopic([]byte{0x2a})
367+
hndlrProxDereg := ps.Register(&topic, &handler{
368+
f: rawHandlerFunc,
369+
caps: handlerCapProx | handlerCapRaw,
370+
})
371+
defer hndlrProxDereg()
372+
373+
errC := make(chan error)
374+
go func() {
375+
err := ps.SendRaw(distantMessageAddress, topic, []byte("foo"))
376+
if err != nil {
377+
errC <- err
378+
}
379+
}()
380+
381+
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
382+
defer cancel()
383+
select {
384+
case <-delivered:
385+
t.Fatal("raw distant message delivered")
386+
case err := <-errC:
387+
t.Fatal(err)
388+
case <-ctx.Done():
389+
}
390+
391+
go func() {
392+
err := ps.SendRaw(proxMessageAddress, topic, []byte("bar"))
393+
if err != nil {
394+
errC <- err
395+
}
396+
}()
397+
398+
ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
399+
defer cancel()
400+
select {
401+
case <-delivered:
402+
case err := <-errC:
403+
t.Fatal(err)
404+
case <-ctx.Done():
405+
t.Fatal("raw timeout")
406+
}
407+
408+
localAddrPss := PssAddress(localAddr)
409+
symKeyId, err := ps.GenerateSymmetricKey(topic, &localAddrPss, true)
410+
go func() {
411+
err := ps.SendSym(symKeyId, topic, []byte("baz"))
412+
if err != nil {
413+
errC <- err
414+
}
415+
}()
416+
ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
417+
defer cancel()
418+
select {
419+
case <-delivered:
420+
case err := <-errC:
421+
t.Fatal(err)
422+
case <-ctx.Done():
423+
t.Fatal("sym timeout")
424+
}
425+
426+
err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &localAddrPss)
427+
if err != nil {
428+
t.Fatal(err)
429+
}
430+
pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey))
431+
go func() {
432+
err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy"))
433+
if err != nil {
434+
errC <- err
435+
}
436+
}()
437+
ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
438+
defer cancel()
439+
select {
440+
case <-delivered:
441+
case err := <-errC:
442+
t.Fatal(err)
443+
case <-ctx.Done():
444+
t.Fatal("asym timeout")
445+
}
446+
}
447+
318448
// verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it
319449
// note that in these tests we use the raw capability on handlers for convenience
320450
func TestAddressMatchProx(t *testing.T) {

0 commit comments

Comments
 (0)