Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package session
import (
"context"

"github.com/gammazero/chanqueue"
bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -89,7 +90,7 @@ type sessionWantSender struct {
// The session ID
sessionID uint64
// A channel that collects incoming changes (events)
changes chan change
changes *chanqueue.ChanQueue[change]
// Information about each want indexed by CID
wants map[cid.Cid]*wantInfo
// Keeps track of how many consecutive DONT_HAVEs a peer has sent
Expand Down Expand Up @@ -121,7 +122,7 @@ func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, ca
shutdown: cancel,
closed: make(chan struct{}),
sessionID: sid,
changes: make(chan change, changesBufferSize),
changes: chanqueue.New(chanqueue.WithBaseCapacity[change](changesBufferSize)),
wants: make(map[cid.Cid]*wantInfo),
peerConsecutiveDontHaves: make(map[peer.ID]int),
peerRspTrkr: newPeerResponseTracker(),
Expand Down Expand Up @@ -177,14 +178,16 @@ func (sws *sessionWantSender) SignalAvailability(p peer.ID, isAvailable bool) {
availability := peerAvailability{p, isAvailable}
// Add the change in a non-blocking manner to avoid the possibility of a
// deadlock
sws.addChangeNonBlocking(change{availability: availability})
sws.addChange(change{availability: availability})
}

// Run is the main loop for processing incoming changes
func (sws *sessionWantSender) Run() {
changes := sws.changes.Out()

for {
select {
case ch := <-sws.changes:
case ch := <-changes:
sws.onChange([]change{ch})
case <-sws.ctx.Done():
// Unregister the session with the PeerManager
Expand All @@ -209,28 +212,17 @@ func (sws *sessionWantSender) Shutdown() {
// addChange adds a new change to the queue
func (sws *sessionWantSender) addChange(c change) {
select {
case sws.changes <- c:
case sws.changes.In() <- c:
case <-sws.ctx.Done():
}
}

// addChangeNonBlocking adds a new change to the queue, using a go-routine
// if the change blocks, so as to avoid potential deadlocks
func (sws *sessionWantSender) addChangeNonBlocking(c change) {
select {
case sws.changes <- c:
default:
// changes channel is full, so add change in a go routine instead
go sws.addChange(c)
}
}

// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
func (sws *sessionWantSender) collectChanges(changes []change) []change {
for len(changes) < changesBufferSize {
for range sws.changes.Len() {
select {
case next := <-sws.changes:
case next := <-sws.changes.Out():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gammazero you wrote the library so you probably know better.

Are there any benefits of doing like above?

changes := sws.changes.Out()
for range sws.changes.Len() {
	select {
	case next := <-changes:
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe slightly? The function call should get inlined anyway, so is it worth the extra line of code? 🤷

changes = append(changes, next)
default:
return changes
Expand Down
Loading