diff --git a/gossipsub.go b/gossipsub.go index 3b52efec..68bef4a5 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1375,14 +1375,13 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) { } // Potentially split the RPC into multiple RPCs that are below the max message size - outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out) - for _, rpc := range outRPCs { + for rpc := range out.split(gs.p.maxMessageSize) { if rpc.Size() > gs.p.maxMessageSize { // This should only happen if a single message/control is above the maxMessageSize. gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize)) continue } - gs.doSendRPC(rpc, p, q, urgent) + gs.doSendRPC(&rpc, p, q, urgent) } } @@ -1412,137 +1411,6 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo gs.tracer.SendRPC(rpc, p) } -// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible. -// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs. -// If an RPC is too large and can't be split further (e.g. Message data is -// bigger than the RPC limit), then it will be returned as an oversized RPC. -// The caller should filter out oversized RPCs. -func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { - if len(elems) == 0 { - return slice - } - - if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit { - // Fast path: no merging needed and only one element - return append(slice, &elems[0]) - } - - out := slice - if len(out) == 0 { - out = append(out, &RPC{RPC: pb.RPC{}}) - out[0].from = elems[0].from - } - - for _, elem := range elems { - lastRPC := out[len(out)-1] - - // Merge/Append publish messages - // TODO: Never merge messages. The current behavior is the same as the - // old behavior. In the future let's not merge messages. Since, - // it may increase message latency. - for _, msg := range elem.GetPublish() { - if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit { - lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1] - lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} - lastRPC.Publish = append(lastRPC.Publish, msg) - out = append(out, lastRPC) - } - } - - // Merge/Append Subscriptions - for _, sub := range elem.GetSubscriptions() { - if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit { - lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1] - lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} - lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub) - out = append(out, lastRPC) - } - } - - // Merge/Append Control messages - if ctl := elem.GetControl(); ctl != nil { - if lastRPC.Control == nil { - lastRPC.Control = &pb.ControlMessage{} - if lastRPC.Size() > limit { - lastRPC.Control = nil - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} - out = append(out, lastRPC) - } - } - - for _, graft := range ctl.GetGraft() { - if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit { - lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} - lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft) - out = append(out, lastRPC) - } - } - - for _, prune := range ctl.GetPrune() { - if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit { - lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} - lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune) - out = append(out, lastRPC) - } - } - - for _, iwant := range ctl.GetIwant() { - if len(lastRPC.Control.Iwant) == 0 { - // Initialize with a single IWANT. - // For IWANTs we don't need more than a single one, - // since there are no topic IDs here. - newIWant := &pb.ControlIWant{} - if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit { - lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ - Iwant: []*pb.ControlIWant{newIWant}, - }}, from: elem.from} - out = append(out, lastRPC) - } - } - for _, msgID := range iwant.GetMessageIDs() { - if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit { - lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ - Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}}, - }}, from: elem.from} - out = append(out, lastRPC) - } - } - } - - for _, ihave := range ctl.GetIhave() { - if len(lastRPC.Control.Ihave) == 0 || - lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1].TopicID != ihave.TopicID { - // Start a new IHAVE if we are referencing a new topic ID - newIhave := &pb.ControlIHave{TopicID: ihave.TopicID} - if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit { - lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ - Ihave: []*pb.ControlIHave{newIhave}, - }}, from: elem.from} - out = append(out, lastRPC) - } - } - for _, msgID := range ihave.GetMessageIDs() { - lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1] - if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit { - lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ - Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}}, - }}, from: elem.from} - out = append(out, lastRPC) - } - } - } - } - } - - return out -} - func (gs *GossipSubRouter) heartbeatTimer() { time.Sleep(gs.params.HeartbeatInitialDelay) select { diff --git a/gossipsub_test.go b/gossipsub_test.go index c37382df..7aa5188d 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -8,7 +8,10 @@ import ( "fmt" "io" mrand "math/rand" + mrand2 "math/rand/v2" + "slices" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -2341,7 +2344,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { } } -func validRPCSizes(slice []*RPC, limit int) bool { +func validRPCSizes(slice []RPC, limit int) bool { for _, rpc := range slice { if rpc.Size() > limit { return false @@ -2351,8 +2354,8 @@ func validRPCSizes(slice []*RPC, limit int) bool { } func TestFragmentRPCFunction(t *testing.T) { - fragmentRPC := func(rpc *RPC, limit int) ([]*RPC, error) { - rpcs := appendOrMergeRPC(nil, limit, *rpc) + fragmentRPC := func(rpc *RPC, limit int) ([]RPC, error) { + rpcs := slices.Collect(rpc.split(limit)) if allValid := validRPCSizes(rpcs, limit); !allValid { return rpcs, fmt.Errorf("RPC size exceeds limit") } @@ -2371,7 +2374,7 @@ func TestFragmentRPCFunction(t *testing.T) { return msg } - ensureBelowLimit := func(rpcs []*RPC) { + ensureBelowLimit := func(rpcs []RPC) { for _, r := range rpcs { if r.Size() > limit { t.Fatalf("expected fragmented RPC to be below %d bytes, was %d", limit, r.Size()) @@ -2387,7 +2390,7 @@ func TestFragmentRPCFunction(t *testing.T) { t.Fatal(err) } if len(results) != 1 { - t.Fatalf("expected single RPC if input is < limit, got %d", len(results)) + t.Fatalf("expected single RPC if input is < limit, got %d %#v", len(results), results) } // if there's a message larger than the limit, we should fail @@ -2418,8 +2421,8 @@ func TestFragmentRPCFunction(t *testing.T) { ensureBelowLimit(results) msgsPerRPC := limit / msgSize expectedRPCs := nMessages / msgsPerRPC - if len(results) != expectedRPCs { - t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results)) + if len(results) > expectedRPCs+1 { + t.Fatalf("expected around %d RPC messages in output, got %d", expectedRPCs, len(results)) } var nMessagesFragmented int var nSubscriptions int @@ -2514,7 +2517,7 @@ func TestFragmentRPCFunction(t *testing.T) { // Now we return a the giant ID in a RPC by itself so that it can be // dropped before actually sending the RPC. This lets us log the anamoly. // To keep this test useful, we implement the old behavior here. - filtered := make([]*RPC, 0, len(results)) + filtered := make([]RPC, 0, len(results)) for _, r := range results { if r.Size() < limit { filtered = append(filtered, r) @@ -2541,7 +2544,7 @@ func TestFragmentRPCFunction(t *testing.T) { } } -func FuzzAppendOrMergeRPC(f *testing.F) { +func FuzzRPCSplit(f *testing.F) { minMaxMsgSize := 100 maxMaxMsgSize := 2048 f.Fuzz(func(t *testing.T, data []byte) { @@ -2550,14 +2553,102 @@ func FuzzAppendOrMergeRPC(f *testing.F) { maxSize = minMaxMsgSize } rpc := generateRPC(data, maxSize) - rpcs := appendOrMergeRPC(nil, maxSize, *rpc) - if !validRPCSizes(rpcs, maxSize) { - t.Fatalf("invalid RPC size") + originalControl := compressedRPC{ihave: make(map[string][]string)} + originalControl.append(&rpc.RPC) + mergedControl := compressedRPC{ihave: make(map[string][]string)} + + for rpc := range rpc.split(maxSize) { + if rpc.Size() > maxSize { + t.Fatalf("invalid RPC size %v %d (max=%d)", rpc, rpc.Size(), maxSize) + } + mergedControl.append(&rpc.RPC) + } + + if !originalControl.equal(&mergedControl) { + t.Fatalf("control mismatch: \n%#v\n%#v\n", originalControl, mergedControl) + } }) } +type compressedRPC struct { + msgs [][]byte + iwant []string + ihave map[string][]string // topic -> []string + idontwant []string + prune [][]byte + graft []string // list of topic +} + +func (c *compressedRPC) equal(o *compressedRPC) bool { + equalBytesSlices := func(a, b [][]byte) bool { + return slices.EqualFunc(a, b, func(e1 []byte, e2 []byte) bool { + return bytes.Equal(e1, e2) + }) + } + if !equalBytesSlices(c.msgs, o.msgs) { + return false + } + + if !slices.Equal(c.iwant, o.iwant) || + !slices.Equal(c.idontwant, o.idontwant) || + !equalBytesSlices(c.prune, o.prune) || + !slices.Equal(c.graft, o.graft) { + return false + } + + if len(c.ihave) != len(o.ihave) { + return false + } + for topic, ids := range c.ihave { + if !slices.Equal(ids, o.ihave[topic]) { + return false + } + } + + return true + +} + +func (c *compressedRPC) append(rpc *pb.RPC) { + for _, m := range rpc.Publish { + d, err := m.Marshal() + if err != nil { + panic(err) + } + c.msgs = append(c.msgs, d) + } + + ctrl := rpc.Control + if ctrl == nil { + return + } + for _, iwant := range ctrl.Iwant { + c.iwant = append(c.iwant, iwant.MessageIDs...) + c.iwant = slices.DeleteFunc(c.iwant, func(e string) bool { return len(e) == 0 }) + } + for _, ihave := range ctrl.Ihave { + c.ihave[*ihave.TopicID] = append(c.ihave[*ihave.TopicID], ihave.MessageIDs...) + c.ihave[*ihave.TopicID] = slices.DeleteFunc(c.ihave[*ihave.TopicID], func(e string) bool { return len(e) == 0 }) + } + for _, idontwant := range ctrl.Idontwant { + c.idontwant = append(c.idontwant, idontwant.MessageIDs...) + c.idontwant = slices.DeleteFunc(c.idontwant, func(e string) bool { return len(e) == 0 }) + } + for _, prune := range ctrl.Prune { + d, err := prune.Marshal() + if err != nil { + panic(err) + } + c.prune = append(c.prune, d) + } + for _, graft := range ctrl.Graft { + c.graft = append(c.graft, *graft.TopicID) + c.graft = slices.DeleteFunc(c.graft, func(e string) bool { return len(e) == 0 }) + } +} + func TestGossipsubManagesAnAddressBook(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -3675,3 +3766,78 @@ func TestPublishDuplicateMessage(t *testing.T) { t.Fatal("Duplicate message should not return an error") } } + +func genNRpcs(tb testing.TB, n int, maxSize int) []*RPC { + r := mrand2.NewChaCha8([32]byte{}) + rpcs := make([]*RPC, n) + for i := range rpcs { + var data [64]byte + _, err := r.Read(data[:]) + if err != nil { + tb.Fatal(err) + } + rpcs[i] = generateRPC(data[:], maxSize) + } + return rpcs +} + +func BenchmarkSplitRPC(b *testing.B) { + maxSize := 2048 + rpcs := genNRpcs(b, 100, maxSize) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rpc := rpcs[i%len(rpcs)] + rpc.split(maxSize) + } +} + +func BenchmarkSplitRPCLargeMessages(b *testing.B) { + addToRPC := func(rpc *RPC, numMsgs int, msgSize int) { + msgs := make([]*pb.Message, numMsgs) + payload := make([]byte, msgSize) + for i := range msgs { + rpc.Publish = append(rpc.Publish, &pb.Message{ + Data: payload, + From: []byte(strconv.Itoa(i)), + }) + } + } + + b.Run("Many large messages", func(b *testing.B) { + r := mrand.New(mrand.NewSource(99)) + const numRPCs = 30 + const msgSize = 50 * 1024 + rpc := &RPC{} + for i := 0; i < numRPCs; i++ { + addToRPC(rpc, 20, msgSize+r.Intn(100)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for range rpc.split(DefaultMaxMessageSize) { + + } + } + }) + + b.Run("2 large messages", func(b *testing.B) { + const numRPCs = 2 + const msgSize = DefaultMaxMessageSize - 100 + rpc := &RPC{} + for i := 0; i < numRPCs; i++ { + addToRPC(rpc, 1, msgSize) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + count := 0 + for range rpc.split(DefaultMaxMessageSize) { + count++ + } + if count != 2 { + b.Fatalf("expected 2 RPCs, got %d", count) + } + } + }) +} diff --git a/pubsub.go b/pubsub.go index e8e598bb..91017d1b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "errors" "fmt" + "iter" + "math/bits" "math/rand" "sync" "sync/atomic" @@ -255,6 +257,202 @@ type RPC struct { from peer.ID } +// split splits the given RPC If a sub RPC is too large and can't be split +// further (e.g. Message data is bigger than the RPC limit), then it will be +// returned as an oversized RPC. The caller should filter out oversized RPCs. +func (rpc *RPC) split(limit int) iter.Seq[RPC] { + return func(yield func(RPC) bool) { + nextRPC := RPC{from: rpc.from} + + { + nextRPCSize := 0 + + messagesInNextRPC := 0 + messageSlice := rpc.Publish + + // Merge/Append publish messages. This pattern is optimized compared the + // the patterns for other fields because this is the common cause for + // splitting a message. + for _, msg := range rpc.Publish { + // We know the message field number is <15 so this is safe. + incrementalSize := pbFieldNumberLT15Size + sizeOfEmbeddedMsg(msg.Size()) + if nextRPCSize+incrementalSize > limit { + // The message doesn't fit. Let's set the messages that did fit + // into this RPC, yield it, then make a new one + nextRPC.Publish = messageSlice[:messagesInNextRPC] + messageSlice = messageSlice[messagesInNextRPC:] + if !yield(nextRPC) { + return + } + + nextRPC = RPC{from: rpc.from} + nextRPCSize = 0 + messagesInNextRPC = 0 + } + messagesInNextRPC++ + nextRPCSize += incrementalSize + } + + if nextRPCSize > 0 { + // yield the message here for simplicity. We aren't optimally + // packing this RPC, but we avoid successively calling .Size() + // on the messages for the next parts. + nextRPC.Publish = messageSlice[:messagesInNextRPC] + if !yield(nextRPC) { + return + } + nextRPC = RPC{from: rpc.from} + } + } + + // Fast path check. It's possible the original RPC is now small enough + // without the messages to publish + nextRPC = *rpc + nextRPC.Publish = nil + if s := nextRPC.Size(); s < limit { + if s != 0 { + yield(nextRPC) + } + return + } + // We have to split the RPC into multiple parts + nextRPC = RPC{from: rpc.from} + + // Merge/Append Subscriptions + for _, sub := range rpc.Subscriptions { + if nextRPC.Subscriptions = append(nextRPC.Subscriptions, sub); nextRPC.Size() > limit { + nextRPC.Subscriptions = nextRPC.Subscriptions[:len(nextRPC.Subscriptions)-1] + if !yield(nextRPC) { + return + } + + nextRPC = RPC{from: rpc.from} + nextRPC.Subscriptions = append(nextRPC.Subscriptions, sub) + } + } + + // Merge/Append Control messages + if ctl := rpc.Control; ctl != nil { + if nextRPC.Control == nil { + nextRPC.Control = &pb.ControlMessage{} + if nextRPC.Size() > limit { + nextRPC.Control = nil + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} + } + } + + for _, graft := range ctl.GetGraft() { + if nextRPC.Control.Graft = append(nextRPC.Control.Graft, graft); nextRPC.Size() > limit { + nextRPC.Control.Graft = nextRPC.Control.Graft[:len(nextRPC.Control.Graft)-1] + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} + nextRPC.Control.Graft = append(nextRPC.Control.Graft, graft) + } + } + + for _, prune := range ctl.GetPrune() { + if nextRPC.Control.Prune = append(nextRPC.Control.Prune, prune); nextRPC.Size() > limit { + nextRPC.Control.Prune = nextRPC.Control.Prune[:len(nextRPC.Control.Prune)-1] + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} + nextRPC.Control.Prune = append(nextRPC.Control.Prune, prune) + } + } + + for _, iwant := range ctl.GetIwant() { + if len(nextRPC.Control.Iwant) == 0 { + // Initialize with a single IWANT. + // For IWANTs we don't need more than a single one, + // since there are no topic IDs here. + newIWant := &pb.ControlIWant{} + if nextRPC.Control.Iwant = append(nextRPC.Control.Iwant, newIWant); nextRPC.Size() > limit { + nextRPC.Control.Iwant = nextRPC.Control.Iwant[:len(nextRPC.Control.Iwant)-1] + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Iwant: []*pb.ControlIWant{newIWant}, + }}, from: rpc.from} + } + } + for _, msgID := range iwant.GetMessageIDs() { + if nextRPC.Control.Iwant[0].MessageIDs = append(nextRPC.Control.Iwant[0].MessageIDs, msgID); nextRPC.Size() > limit { + nextRPC.Control.Iwant[0].MessageIDs = nextRPC.Control.Iwant[0].MessageIDs[:len(nextRPC.Control.Iwant[0].MessageIDs)-1] + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}}, + }}, from: rpc.from} + } + } + } + + for _, ihave := range ctl.GetIhave() { + if len(nextRPC.Control.Ihave) == 0 || + nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1].TopicID != ihave.TopicID { + // Start a new IHAVE if we are referencing a new topic ID + newIhave := &pb.ControlIHave{TopicID: ihave.TopicID} + if nextRPC.Control.Ihave = append(nextRPC.Control.Ihave, newIhave); nextRPC.Size() > limit { + nextRPC.Control.Ihave = nextRPC.Control.Ihave[:len(nextRPC.Control.Ihave)-1] + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Ihave: []*pb.ControlIHave{newIhave}, + }}, from: rpc.from} + } + } + for _, msgID := range ihave.GetMessageIDs() { + lastIHave := nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1] + if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); nextRPC.Size() > limit { + lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1] + if !yield(nextRPC) { + return + } + nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}}, + }}, from: rpc.from} + } + } + } + } + + if nextRPC.Size() > 0 { + if !yield(nextRPC) { + return + } + } + } +} + +// pbFieldNumberLT15Size is the number of bytes required to encode a protobuf +// field number less than or equal to 15 along with its wire type. This is 1 +// byte because the protobuf encoding of field numbers is a varint encoding of: +// fieldNumber << 3 | wireType +// Refer to https://protobuf.dev/programming-guides/encoding/#structure +// for more details on the encoding of messages. You may also reference the +// concrete implementation of pb.RPC.Size() +const pbFieldNumberLT15Size = 1 + +func sovRpc(x uint64) (n int) { + return (bits.Len64(x) + 6) / 7 +} + +func sizeOfEmbeddedMsg( + msgSize int, +) int { + prefixSize := sovRpc(uint64(msgSize)) + return prefixSize + msgSize +} + type Option func(*PubSub) error // NewPubSub returns a new PubSub management object.