Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 83 additions & 56 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"fmt"
"io"
"iter"
"math/rand"
"sort"
"time"
Expand Down Expand Up @@ -522,6 +523,8 @@ type GossipSubRouter struct {
heartbeatTicks uint64
}

var _ BatchPublisher = &GossipSubRouter{}

type connectInfo struct {
p peer.ID
spr *record.Envelope
Expand Down Expand Up @@ -1139,81 +1142,105 @@ func (gs *GossipSubRouter) connector() {
}
}

func (gs *GossipSubRouter) PublishBatch(messages []*Message, opts *BatchPublishOptions) {
strategy := opts.Strategy
for _, msg := range messages {
msgID := gs.p.idGen.ID(msg)
for p, rpc := range gs.rpcs(msg) {
strategy.AddRPC(p, msgID, rpc)
}
}

for p, rpc := range strategy.All() {
gs.sendRPC(p, rpc, false)
}
}

func (gs *GossipSubRouter) Publish(msg *Message) {
gs.mcache.Put(msg)
for p, rpc := range gs.rpcs(msg) {
gs.sendRPC(p, rpc, false)
}
}

from := msg.ReceivedFrom
topic := msg.GetTopic()
func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
return func(yield func(peer.ID, *RPC) bool) {
gs.mcache.Put(msg)

tosend := make(map[peer.ID]struct{})
from := msg.ReceivedFrom
topic := msg.GetTopic()

// any peers in the topic?
tmap, ok := gs.p.topics[topic]
if !ok {
return
}
tosend := make(map[peer.ID]struct{})

if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
// any peers in the topic?
tmap, ok := gs.p.topics[topic]
if !ok {
return
}
} else {
// direct peers
for p := range gs.direct {
_, inTopic := tmap[p]
if inTopic {
tosend[p] = struct{}{}

if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
}
} else {
// direct peers
for p := range gs.direct {
_, inTopic := tmap[p]
if inTopic {
tosend[p] = struct{}{}
}
}
}

// floodsub peers
for p := range tmap {
if !gs.feature(GossipSubFeatureMesh, gs.peers[p]) && gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
// floodsub peers
for p := range tmap {
if !gs.feature(GossipSubFeatureMesh, gs.peers[p]) && gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
}
}

// gossipsub peers
gmap, ok := gs.mesh[topic]
if !ok {
// we are not in the mesh for topic, use fanout peers
gmap, ok = gs.fanout[topic]
if !ok || len(gmap) == 0 {
// we don't have any, pick some with score above the publish threshold
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= gs.publishThreshold
})
// gossipsub peers
gmap, ok := gs.mesh[topic]
if !ok {
// we are not in the mesh for topic, use fanout peers
gmap, ok = gs.fanout[topic]
if !ok || len(gmap) == 0 {
// we don't have any, pick some with score above the publish threshold
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= gs.publishThreshold
})

if len(peers) > 0 {
gmap = peerListToMap(peers)
gs.fanout[topic] = gmap
}
}
gs.lastpub[topic] = time.Now().UnixNano()
}

if len(peers) > 0 {
gmap = peerListToMap(peers)
gs.fanout[topic] = gmap
csum := computeChecksum(gs.p.idGen.ID(msg))
for p := range gmap {
// Check if it has already received an IDONTWANT for the message.
// If so, don't send it to the peer
if _, ok := gs.unwanted[p][csum]; ok {
continue
}
tosend[p] = struct{}{}
}
gs.lastpub[topic] = time.Now().UnixNano()
}

csum := computeChecksum(gs.p.idGen.ID(msg))
for p := range gmap {
// Check if it has already received an IDONTWANT for the message.
// If so, don't send it to the peer
if _, ok := gs.unwanted[p][csum]; ok {
out := rpcWithMessages(msg.Message)
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
tosend[p] = struct{}{}
}
}

out := rpcWithMessages(msg.Message)
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
if !yield(pid, out) {
return
}
}

gs.sendRPC(pid, out, false)
}
}

Expand Down
Loading