-
Notifications
You must be signed in to change notification settings - Fork 209
refactor: 10x faster RPC splitting #615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
2091283
f6871e3
13e6f0f
5698515
c818b19
3fb6361
67e5ac6
e9bb570
348cb4a
c1fb3b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import ( | |
| "fmt" | ||
| "io" | ||
| "iter" | ||
| "math/bits" | ||
| "math/rand" | ||
| "sort" | ||
| "time" | ||
|
|
@@ -1375,14 +1376,13 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) { | |
| } | ||
|
|
||
| // Potentially split the RPC into multiple RPCs that are below the max message size | ||
| outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out) | ||
| for _, rpc := range outRPCs { | ||
| for rpc := range out.split(gs.p.maxMessageSize) { | ||
| if rpc.Size() > gs.p.maxMessageSize { | ||
| // This should only happen if a single message/control is above the maxMessageSize. | ||
| gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize)) | ||
| continue | ||
| } | ||
| gs.doSendRPC(rpc, p, q, urgent) | ||
| gs.doSendRPC(&rpc, p, q, urgent) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1412,135 +1412,182 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo | |
| gs.tracer.SendRPC(rpc, p) | ||
| } | ||
|
|
||
| // appendOrMergeRPC appends the given RPCs to the slice, merging them if possible. | ||
| // If any elem is too large to fit in a single RPC, it will be split into multiple RPCs. | ||
| // If an RPC is too large and can't be split further (e.g. Message data is | ||
| // bigger than the RPC limit), then it will be returned as an oversized RPC. | ||
| // The caller should filter out oversized RPCs. | ||
| func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { | ||
| if len(elems) == 0 { | ||
| return slice | ||
| } | ||
|
|
||
| if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit { | ||
| // Fast path: no merging needed and only one element | ||
| return append(slice, &elems[0]) | ||
| } | ||
|
|
||
| out := slice | ||
| if len(out) == 0 { | ||
| out = append(out, &RPC{RPC: pb.RPC{}}) | ||
| out[0].from = elems[0].from | ||
| } | ||
| // split splits the given RPC If a sub RPC is too large and can't be split | ||
| // further (e.g. Message data is bigger than the RPC limit), then it will be | ||
| // returned as an oversized RPC. The caller should filter out oversized RPCs. | ||
| func (rpc *RPC) split(limit int) iter.Seq[RPC] { | ||
| return func(yield func(RPC) bool) { | ||
| nextRPC := RPC{from: rpc.from} | ||
|
|
||
| { | ||
| nextRPCSize := 0 | ||
|
|
||
| messagesInNextRPC := 0 | ||
| messageSlice := rpc.Publish | ||
|
|
||
| // Merge/Append publish messages. This pattern is optimized compared the | ||
| // the patterns for other fields because this is the common cause for | ||
| // splitting a message. | ||
| for _, msg := range rpc.Publish { | ||
| // We know the message field number is <15 so this is safe. | ||
| incrementalSize := pbFieldNumberLT15Size + SizeOfEmbeddedMsg(msg.Size()) | ||
| if nextRPCSize+incrementalSize > limit { | ||
| // The message doesn't fit. Let's set the messages that did fit | ||
| // into this RPC, yield it, then make a new one | ||
| nextRPC.Publish = messageSlice[:messagesInNextRPC] | ||
| messageSlice = messageSlice[messagesInNextRPC:] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
|
|
||
| for _, elem := range elems { | ||
| lastRPC := out[len(out)-1] | ||
| nextRPC = RPC{from: rpc.from} | ||
| nextRPCSize = 0 | ||
| messagesInNextRPC = 0 | ||
| } | ||
| messagesInNextRPC++ | ||
| nextRPCSize += incrementalSize | ||
| } | ||
|
|
||
| // Merge/Append publish messages | ||
| // TODO: Never merge messages. The current behavior is the same as the | ||
| // old behavior. In the future let's not merge messages. Since, | ||
| // it may increase message latency. | ||
| for _, msg := range elem.GetPublish() { | ||
| if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit { | ||
| lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} | ||
| lastRPC.Publish = append(lastRPC.Publish, msg) | ||
| out = append(out, lastRPC) | ||
| if nextRPCSize > 0 { | ||
| // yield the message here for simplicity. We aren't optimally | ||
| // packing this RPC, but we avoid successively calling .Size() | ||
| // on the messages for the next parts. | ||
| nextRPC.Publish = messageSlice[:messagesInNextRPC] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{from: rpc.from} | ||
| } | ||
| } | ||
|
|
||
| // Merge/Append Subscriptions | ||
| for _, sub := range elem.GetSubscriptions() { | ||
| if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit { | ||
| lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} | ||
| lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub) | ||
| out = append(out, lastRPC) | ||
| for _, sub := range rpc.Subscriptions { | ||
| if nextRPC.Subscriptions = append(nextRPC.Subscriptions, sub); nextRPC.Size() > limit { | ||
|
||
| nextRPC.Subscriptions = nextRPC.Subscriptions[:len(nextRPC.Subscriptions)-1] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
|
|
||
| nextRPC = RPC{from: rpc.from} | ||
| nextRPC.Subscriptions = append(nextRPC.Subscriptions, sub) | ||
| } | ||
| } | ||
|
|
||
| // Merge/Append Control messages | ||
| if ctl := elem.GetControl(); ctl != nil { | ||
| if lastRPC.Control == nil { | ||
| lastRPC.Control = &pb.ControlMessage{} | ||
| if lastRPC.Size() > limit { | ||
| lastRPC.Control = nil | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} | ||
| out = append(out, lastRPC) | ||
| if ctl := rpc.Control; ctl != nil { | ||
| if nextRPC.Control == nil { | ||
| nextRPC.Control = &pb.ControlMessage{} | ||
| if nextRPC.Size() > limit { | ||
| nextRPC.Control = nil | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} | ||
| } | ||
| } | ||
|
|
||
| for _, graft := range ctl.GetGraft() { | ||
| if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit { | ||
| lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} | ||
| lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft) | ||
| out = append(out, lastRPC) | ||
| if nextRPC.Control.Graft = append(nextRPC.Control.Graft, graft); nextRPC.Size() > limit { | ||
| nextRPC.Control.Graft = nextRPC.Control.Graft[:len(nextRPC.Control.Graft)-1] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} | ||
| nextRPC.Control.Graft = append(nextRPC.Control.Graft, graft) | ||
| } | ||
| } | ||
|
|
||
| for _, prune := range ctl.GetPrune() { | ||
| if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit { | ||
| lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} | ||
| lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune) | ||
| out = append(out, lastRPC) | ||
| if nextRPC.Control.Prune = append(nextRPC.Control.Prune, prune); nextRPC.Size() > limit { | ||
| nextRPC.Control.Prune = nextRPC.Control.Prune[:len(nextRPC.Control.Prune)-1] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: rpc.from} | ||
| nextRPC.Control.Prune = append(nextRPC.Control.Prune, prune) | ||
| } | ||
| } | ||
|
|
||
| for _, iwant := range ctl.GetIwant() { | ||
| if len(lastRPC.Control.Iwant) == 0 { | ||
| if len(nextRPC.Control.Iwant) == 0 { | ||
| // Initialize with a single IWANT. | ||
| // For IWANTs we don't need more than a single one, | ||
| // since there are no topic IDs here. | ||
| newIWant := &pb.ControlIWant{} | ||
| if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit { | ||
| lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| if nextRPC.Control.Iwant = append(nextRPC.Control.Iwant, newIWant); nextRPC.Size() > limit { | ||
| nextRPC.Control.Iwant = nextRPC.Control.Iwant[:len(nextRPC.Control.Iwant)-1] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| Iwant: []*pb.ControlIWant{newIWant}, | ||
| }}, from: elem.from} | ||
| out = append(out, lastRPC) | ||
| }}, from: rpc.from} | ||
| } | ||
| } | ||
| for _, msgID := range iwant.GetMessageIDs() { | ||
| if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit { | ||
| lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| if nextRPC.Control.Iwant[0].MessageIDs = append(nextRPC.Control.Iwant[0].MessageIDs, msgID); nextRPC.Size() > limit { | ||
| nextRPC.Control.Iwant[0].MessageIDs = nextRPC.Control.Iwant[0].MessageIDs[:len(nextRPC.Control.Iwant[0].MessageIDs)-1] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}}, | ||
| }}, from: elem.from} | ||
| out = append(out, lastRPC) | ||
| }}, from: rpc.from} | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for _, ihave := range ctl.GetIhave() { | ||
| if len(lastRPC.Control.Ihave) == 0 || | ||
| lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1].TopicID != ihave.TopicID { | ||
| if len(nextRPC.Control.Ihave) == 0 || | ||
| nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1].TopicID != ihave.TopicID { | ||
| // Start a new IHAVE if we are referencing a new topic ID | ||
| newIhave := &pb.ControlIHave{TopicID: ihave.TopicID} | ||
| if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit { | ||
| lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| if nextRPC.Control.Ihave = append(nextRPC.Control.Ihave, newIhave); nextRPC.Size() > limit { | ||
| nextRPC.Control.Ihave = nextRPC.Control.Ihave[:len(nextRPC.Control.Ihave)-1] | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| Ihave: []*pb.ControlIHave{newIhave}, | ||
| }}, from: elem.from} | ||
| out = append(out, lastRPC) | ||
| }}, from: rpc.from} | ||
| } | ||
| } | ||
| for _, msgID := range ihave.GetMessageIDs() { | ||
| lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1] | ||
| if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit { | ||
| lastIHave := nextRPC.Control.Ihave[len(nextRPC.Control.Ihave)-1] | ||
| if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); nextRPC.Size() > limit { | ||
| lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1] | ||
| lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| nextRPC = RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ | ||
| Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}}, | ||
| }}, from: elem.from} | ||
| out = append(out, lastRPC) | ||
| }}, from: rpc.from} | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if nextRPC.Size() > 0 { | ||
| if !yield(nextRPC) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // pbFieldNumberLT15Size is the number of bytes required to encode a protobuf | ||
MarcoPolo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // field number less than or equal to 15. | ||
| const pbFieldNumberLT15Size = 1 | ||
|
|
||
| func sovRpc(x uint64) (n int) { | ||
| return (bits.Len64(x) + 6) / 7 | ||
| } | ||
|
|
||
| return out | ||
| func SizeOfEmbeddedMsg( | ||
| msgSize int, | ||
| ) int { | ||
| prefixSize := sovRpc(uint64(msgSize)) | ||
| return prefixSize + msgSize | ||
| } | ||
|
|
||
| func (gs *GossipSubRouter) heartbeatTimer() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should likely be in the
pubsub.gofile instead. I kept it here to make reviewing easier, but would like to move it after reviewer's approval.