Skip to content

Commit baded64

Browse files
authored
swarm/network: measure time of messages in priority queue (#19250)
1 parent c53c5e6 commit baded64

File tree

16 files changed

+87
-60
lines changed

16 files changed

+87
-60
lines changed

cmd/swarm/swarm-smoke/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ var (
4040
allhosts string
4141
hosts []string
4242
filesize int
43+
inputSeed int
4344
syncDelay int
4445
httpPort int
4546
wsPort int
@@ -74,6 +75,12 @@ func main() {
7475
Usage: "ws port",
7576
Destination: &wsPort,
7677
},
78+
cli.IntFlag{
79+
Name: "seed",
80+
Value: 0,
81+
Usage: "input seed in case we need deterministic upload",
82+
Destination: &inputSeed,
83+
},
7784
cli.IntFlag{
7885
Name: "filesize",
7986
Value: 1024,

cmd/swarm/swarm-smoke/upload_and_sync.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ import (
3939
)
4040

4141
func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
42+
// use input seed if it has been set
43+
if inputSeed != 0 {
44+
seed = inputSeed
45+
}
46+
4247
randomBytes := testutil.RandomBytes(seed, filesize*1000)
4348

4449
errc := make(chan error)
@@ -47,37 +52,28 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
4752
errc <- uploadAndSync(ctx, randomBytes, tuid)
4853
}()
4954

55+
var err error
5056
select {
51-
case err := <-errc:
57+
case err = <-errc:
5258
if err != nil {
5359
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
5460
}
55-
return err
5661
case <-time.After(time.Duration(timeout) * time.Second):
5762
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
5863

59-
e := fmt.Errorf("timeout after %v sec", timeout)
60-
// trigger debug functionality on randomBytes
61-
err := trackChunks(randomBytes[:])
62-
if err != nil {
63-
e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err)
64-
}
65-
66-
return e
64+
err = fmt.Errorf("timeout after %v sec", timeout)
6765
}
6866

69-
// trigger debug functionality on randomBytes even on successful runs
70-
err := trackChunks(randomBytes[:])
71-
if err != nil {
72-
log.Error(err.Error())
67+
// trigger debug functionality on randomBytes
68+
e := trackChunks(randomBytes[:])
69+
if e != nil {
70+
log.Error(e.Error())
7371
}
7472

75-
return nil
73+
return err
7674
}
7775

7876
func trackChunks(testData []byte) error {
79-
log.Warn("Test timed out, running chunk debug sequence")
80-
8177
addrs, err := getAllRefs(testData)
8278
if err != nil {
8379
return err
@@ -94,14 +90,14 @@ func trackChunks(testData []byte) error {
9490

9591
rpcClient, err := rpc.Dial(httpHost)
9692
if err != nil {
97-
log.Error("Error dialing host", "err", err)
93+
log.Error("error dialing host", "err", err, "host", httpHost)
9894
continue
9995
}
10096

10197
var hasInfo []api.HasInfo
10298
err = rpcClient.Call(&hasInfo, "bzz_has", addrs)
10399
if err != nil {
104-
log.Error("Error calling host", "err", err)
100+
log.Error("error calling rpc client", "err", err, "host", httpHost)
105101
continue
106102
}
107103

@@ -125,7 +121,6 @@ func trackChunks(testData []byte) error {
125121
}
126122

127123
func getAllRefs(testData []byte) (storage.AddressCollection, error) {
128-
log.Trace("Getting all references for given root hash")
129124
datadir, err := ioutil.TempDir("", "chunk-debug")
130125
if err != nil {
131126
return nil, fmt.Errorf("unable to create temp dir: %v", err)

metrics/influxdb/influxdb.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (r *reporter) makeClient() (err error) {
9191
URL: r.url,
9292
Username: r.username,
9393
Password: r.password,
94+
Timeout: 10 * time.Second,
9495
})
9596

9697
return

swarm/network/fetcher.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,24 +204,24 @@ func (f *Fetcher) run(peers *sync.Map) {
204204

205205
// incoming request
206206
case hopCount = <-f.requestC:
207-
log.Trace("new request", "request addr", f.addr)
208207
// 2) chunk is requested, set requested flag
209208
// launch a request iff none been launched yet
210209
doRequest = !requested
210+
log.Trace("new request", "request addr", f.addr, "doRequest", doRequest)
211211
requested = true
212212

213213
// peer we requested from is gone. fall back to another
214214
// and remove the peer from the peers map
215215
case id := <-gone:
216-
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
217216
peers.Delete(id.String())
218217
doRequest = requested
218+
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest)
219219

220220
// search timeout: too much time passed since the last request,
221221
// extend the search to a new peer if we can find one
222222
case <-waitC:
223-
log.Trace("search timed out: requesting", "request addr", f.addr)
224223
doRequest = requested
224+
log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest)
225225

226226
// all Fetcher context closed, can quit
227227
case <-f.ctx.Done():
@@ -288,6 +288,7 @@ func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources
288288
for i = 0; i < len(sources); i++ {
289289
req.Source = sources[i]
290290
var err error
291+
log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String())
291292
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
292293
if err == nil {
293294
// remove the peer from known sources

swarm/network/priorityqueue/priorityqueue.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ package priorityqueue
2828
import (
2929
"context"
3030
"errors"
31+
"time"
3132

32-
"github.com/ethereum/go-ethereum/log"
33+
"github.com/ethereum/go-ethereum/metrics"
3334
)
3435

3536
var (
@@ -69,21 +70,23 @@ READ:
6970
case <-ctx.Done():
7071
return
7172
case x := <-q:
72-
log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
73-
f(x)
73+
val := x.(struct {
74+
v interface{}
75+
t time.Time
76+
})
77+
f(val.v)
78+
metrics.GetOrRegisterResettingTimer("pq.run", nil).UpdateSince(val.t)
7479
p = top
7580
default:
7681
if p > 0 {
7782
p--
78-
log.Trace("priority.queue p > 0", "p", p)
7983
continue READ
8084
}
8185
p = top
8286
select {
8387
case <-ctx.Done():
8488
return
8589
case <-pq.wakeup:
86-
log.Trace("priority.queue wakeup", "p", p)
8790
}
8891
}
8992
}
@@ -95,9 +98,15 @@ func (pq *PriorityQueue) Push(x interface{}, p int) error {
9598
if p < 0 || p >= len(pq.Queues) {
9699
return errBadPriority
97100
}
98-
log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
101+
val := struct {
102+
v interface{}
103+
t time.Time
104+
}{
105+
x,
106+
time.Now(),
107+
}
99108
select {
100-
case pq.Queues[p] <- x:
109+
case pq.Queues[p] <- val:
101110
default:
102111
return ErrContention
103112
}

swarm/network/stream/delivery.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
185185
if err != nil {
186186
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
187187
}
188+
osp.LogFields(olog.Bool("delivered", true))
188189
return
189190
}
190191
osp.LogFields(olog.Bool("skipCheck", false))
@@ -216,20 +217,29 @@ type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
216217

217218
// chunk delivery msg is response to retrieverequest msg
218219
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
220+
var osp opentracing.Span
221+
ctx, osp = spancontext.StartSpan(
222+
ctx,
223+
"handle.chunk.delivery")
219224

220225
processReceivedChunksCount.Inc(1)
221226

222227
// retrieve the span for the originating retrieverequest
223228
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
224229
span := tracing.ShiftSpanByKey(spanId)
225230

231+
log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID())
232+
226233
go func() {
234+
defer osp.Finish()
235+
227236
if span != nil {
228237
span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
229238
defer span.Finish()
230239
}
231240

232241
req.peer = sp
242+
log.Trace("handle.chunk.delivery", "put", req.Addr)
233243
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
234244
if err != nil {
235245
if err == storage.ErrChunkInvalid {
@@ -239,6 +249,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
239249
req.peer.Drop(err)
240250
}
241251
}
252+
log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err)
242253
}()
243254
return nil
244255
}
@@ -284,6 +295,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
284295
// this span will finish only when delivery is handled (or times out)
285296
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
286297
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
298+
log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
287299
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
288300
Addr: req.Addr,
289301
SkipCheck: req.SkipCheck,

swarm/network/stream/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ func (r *Registry) APIs() []rpc.API {
910910
Namespace: "stream",
911911
Version: "3.0",
912912
Service: r.api,
913-
Public: true,
913+
Public: false,
914914
},
915915
}
916916
}

swarm/storage/chunker.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,6 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
536536
chunkData, err := r.getter.Get(ctx, Reference(childAddress))
537537
if err != nil {
538538
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
539-
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
540539
select {
541540
case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
542541
case <-quitC:
@@ -561,12 +560,12 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
561560

562561
// Read keeps a cursor so cannot be called simulateously, see ReadAt
563562
func (r *LazyChunkReader) Read(b []byte) (read int, err error) {
564-
log.Debug("lazychunkreader.read", "key", r.addr)
563+
log.Trace("lazychunkreader.read", "key", r.addr)
565564
metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1)
566565

567566
read, err = r.ReadAt(b, r.off)
568567
if err != nil && err != io.EOF {
569-
log.Debug("lazychunkreader.readat", "read", read, "err", err)
568+
log.Trace("lazychunkreader.readat", "read", read, "err", err)
570569
metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1)
571570
}
572571

swarm/storage/netstore.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
8787

8888
// if chunk is now put in the store, check if there was an active fetcher and call deliver on it
8989
// (this delivers the chunk to requestors via the fetcher)
90+
log.Trace("n.getFetcher", "ref", ch.Address())
9091
if f := n.getFetcher(ch.Address()); f != nil {
92+
log.Trace("n.getFetcher deliver", "ref", ch.Address())
9193
f.deliver(ctx, ch)
9294
}
9395
return nil
@@ -341,5 +343,6 @@ func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
341343
f.chunk = ch
342344
// closing the deliveredC channel will terminate ongoing requests
343345
close(f.deliveredC)
346+
log.Trace("n.getFetcher close deliveredC", "ref", ch.Address())
344347
})
345348
}

swarm/swarm.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,8 @@ func (s *Swarm) APIs() []rpc.API {
522522

523523
apis = append(apis, s.bzz.APIs()...)
524524

525+
apis = append(apis, s.streamer.APIs()...)
526+
525527
if s.ps != nil {
526528
apis = append(apis, s.ps.APIs()...)
527529
}

0 commit comments

Comments
 (0)