Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion les/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *dist
case <-ctx.Done():
sentReq.stop(ctx.Err())
case <-shutdown:
sentReq.stop(fmt.Errorf("Client is shutting down"))
sentReq.stop(fmt.Errorf("client is shutting down"))
}
return sentReq.getError()
}
Expand Down
84 changes: 42 additions & 42 deletions les/txrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,51 +54,51 @@ func newLesTxRelay(ps *peerSet, retriever *retrieveManager) *lesTxRelay {
return r
}

func (self *lesTxRelay) Stop() {
close(self.stop)
func (ltrx *lesTxRelay) Stop() {
close(ltrx.stop)
}

func (self *lesTxRelay) registerPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
func (ltrx *lesTxRelay) registerPeer(p *peer) {
ltrx.lock.Lock()
defer ltrx.lock.Unlock()

self.peerList = self.ps.AllPeers()
ltrx.peerList = ltrx.ps.AllPeers()
}

func (self *lesTxRelay) unregisterPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
func (ltrx *lesTxRelay) unregisterPeer(p *peer) {
ltrx.lock.Lock()
defer ltrx.lock.Unlock()

self.peerList = self.ps.AllPeers()
ltrx.peerList = ltrx.ps.AllPeers()
}

// send sends a list of transactions to at most a given number of peers at
// once, never resending any particular transaction to the same peer twice
func (self *lesTxRelay) send(txs types.Transactions, count int) {
func (ltrx *lesTxRelay) send(txs types.Transactions, count int) {
sendTo := make(map[*peer]types.Transactions)

self.peerStartPos++ // rotate the starting position of the peer list
if self.peerStartPos >= len(self.peerList) {
self.peerStartPos = 0
ltrx.peerStartPos++ // rotate the starting position of the peer list
if ltrx.peerStartPos >= len(ltrx.peerList) {
ltrx.peerStartPos = 0
}

for _, tx := range txs {
hash := tx.Hash()
ltr, ok := self.txSent[hash]
ltr, ok := ltrx.txSent[hash]
if !ok {
ltr = &ltrInfo{
tx: tx,
sentTo: make(map[*peer]struct{}),
}
self.txSent[hash] = ltr
self.txPending[hash] = struct{}{}
ltrx.txSent[hash] = ltr
ltrx.txPending[hash] = struct{}{}
}

if len(self.peerList) > 0 {
if len(ltrx.peerList) > 0 {
cnt := count
pos := self.peerStartPos
pos := ltrx.peerStartPos
for {
peer := self.peerList[pos]
peer := ltrx.peerList[pos]
if _, ok := ltr.sentTo[peer]; !ok {
sendTo[peer] = append(sendTo[peer], tx)
ltr.sentTo[peer] = struct{}{}
Expand All @@ -108,10 +108,10 @@ func (self *lesTxRelay) send(txs types.Transactions, count int) {
break // sent it to the desired number of peers
}
pos++
if pos == len(self.peerList) {
if pos == len(ltrx.peerList) {
pos = 0
}
if pos == self.peerStartPos {
if pos == ltrx.peerStartPos {
break // tried all available peers
}
}
Expand Down Expand Up @@ -139,46 +139,46 @@ func (self *lesTxRelay) send(txs types.Transactions, count int) {
return func() { peer.SendTxs(reqID, cost, enc) }
},
}
go self.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, self.stop)
go ltrx.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, ltrx.stop)
}
}

func (self *lesTxRelay) Send(txs types.Transactions) {
self.lock.Lock()
defer self.lock.Unlock()
func (ltrx *lesTxRelay) Send(txs types.Transactions) {
ltrx.lock.Lock()
defer ltrx.lock.Unlock()

self.send(txs, 3)
ltrx.send(txs, 3)
}

func (self *lesTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
func (ltrx *lesTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) {
ltrx.lock.Lock()
defer ltrx.lock.Unlock()

for _, hash := range mined {
delete(self.txPending, hash)
delete(ltrx.txPending, hash)
}

for _, hash := range rollback {
self.txPending[hash] = struct{}{}
ltrx.txPending[hash] = struct{}{}
}

if len(self.txPending) > 0 {
txs := make(types.Transactions, len(self.txPending))
if len(ltrx.txPending) > 0 {
txs := make(types.Transactions, len(ltrx.txPending))
i := 0
for hash := range self.txPending {
txs[i] = self.txSent[hash].tx
for hash := range ltrx.txPending {
txs[i] = ltrx.txSent[hash].tx
i++
}
self.send(txs, 1)
ltrx.send(txs, 1)
}
}

func (self *lesTxRelay) Discard(hashes []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
func (ltrx *lesTxRelay) Discard(hashes []common.Hash) {
ltrx.lock.Lock()
defer ltrx.lock.Unlock()

for _, hash := range hashes {
delete(self.txSent, hash)
delete(self.txPending, hash)
delete(ltrx.txSent, hash)
delete(ltrx.txPending, hash)
}
}