@@ -26,27 +26,31 @@ import (
2626 "github.com/ethereum/go-ethereum/swarm/storage"
2727)
2828
29- var searchTimeout = 1 * time .Second
29+ const (
30+ defaultSearchTimeout = 1 * time .Second
31+ // maximum number of forwarded requests (hops), to make sure requests are not
32+ // forwarded forever in peer loops
33+ maxHopCount uint8 = 20
34+ )
3035
3136// Time to consider peer to be skipped.
3237// Also used in stream delivery.
3338var RequestTimeout = 10 * time .Second
3439
35- var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops
36-
3740type RequestFunc func (context.Context , * Request ) (* enode.ID , chan struct {}, error )
3841
3942// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
4043// keeps it alive until all active requests are completed. This can happen:
4144// 1. either because the chunk is delivered
42- // 2. or becuse the requestor cancelled/timed out
45+ // 2. or because the requester cancelled/timed out
4346// Fetcher self destroys itself after it is completed.
4447// TODO: cancel all forward requests after termination
4548type Fetcher struct {
4649 protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
4750 addr storage.Address // the address of the chunk to be fetched
4851 offerC chan * enode.ID // channel of sources (peer node id strings)
4952 requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
53+ searchTimeout time.Duration
5054 skipCheck bool
5155}
5256
@@ -79,7 +83,7 @@ func (r *Request) SkipPeer(nodeID string) bool {
7983 }
8084 t , ok := val .(time.Time )
8185 if ok && time .Now ().After (t .Add (RequestTimeout )) {
82- // deadine expired
86+ // deadline expired
8387 r .peersToSkip .Delete (nodeID )
8488 return false
8589 }
@@ -100,9 +104,10 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
100104 }
101105}
102106
103- // New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
104- // deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
105- // this chunk, to make sure we don't request back the chunks from them.
107+ // New constructs a new Fetcher, for the given chunk. All peers in peersToSkip
108+ // are not requested to deliver the given chunk. peersToSkip should always
109+ // contain the peers which are actively requesting this chunk, to make sure we
110+ // don't request back the chunks from them.
106111// The created Fetcher is started and returned.
107112func (f * FetcherFactory ) New (ctx context.Context , source storage.Address , peersToSkip * sync.Map ) storage.NetFetcher {
108113 fetcher := NewFetcher (source , f .request , f .skipCheck )
@@ -117,6 +122,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
117122 protoRequestFunc : rf ,
118123 offerC : make (chan * enode.ID ),
119124 requestC : make (chan uint8 ),
125+ searchTimeout : defaultSearchTimeout ,
120126 skipCheck : skipCheck ,
121127 }
122128}
@@ -176,7 +182,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
176182 // loop that keeps the fetching process alive
177183 // after every request a timer is set. If this goes off we request again from another peer
178184 // note that the previous request is still alive and has the chance to deliver, so
179- // rerequesting extends the search. ie.,
185+ // requesting again extends the search. ie.,
180186 // if a peer we requested from is gone we issue a new request, so the number of active
181187 // requests never decreases
182188 for {
@@ -209,13 +215,13 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
209215 // search timeout: too much time passed since the last request,
210216 // extend the search to a new peer if we can find one
211217 case <- waitC :
212- log .Trace ("search timed out: rerequesting " , "request addr" , f .addr )
218+ log .Trace ("search timed out: requesting " , "request addr" , f .addr )
213219 doRequest = requested
214220
215221 // all Fetcher context closed, can quit
216222 case <- ctx .Done ():
217223 log .Trace ("terminate fetcher" , "request addr" , f .addr )
218- // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
224+ // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
219225 return
220226 }
221227
@@ -231,7 +237,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
231237 // if wait channel is not set, set it to a timer
232238 if requested {
233239 if wait == nil {
234- wait = time .NewTimer (searchTimeout )
240+ wait = time .NewTimer (f . searchTimeout )
235241 defer wait .Stop ()
236242 waitC = wait .C
237243 } else {
@@ -242,8 +248,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
242248 default :
243249 }
244250 }
245- // reset the timer to go off after searchTimeout
246- wait .Reset (searchTimeout )
251+ // reset the timer to go off after defaultSearchTimeout
252+ wait .Reset (f . searchTimeout )
247253 }
248254 }
249255 doRequest = false
0 commit comments