Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 7ef4555

Browse files
janosacud
authored andcommitted
Swarm rather stable: LocalStore metrics (#1349)
* swarm/shed: remove metrics fields from DB struct * swarm/schunk: add String methods to modes * swarm/storage/localstore: add metrics and traces * swarm/chunk: unknown modes without spaces in String methods * swarm/storage/localstore: remove bin number from pull subscription metrics * swarm/storage/localstore: add resetting time metrics and code improvements
1 parent 183c55c commit 7ef4555

File tree

10 files changed

+239
-47
lines changed

10 files changed

+239
-47
lines changed

swarm/chunk/chunk.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) {
112112
// ModeGet enumerates different Getter modes.
113113
type ModeGet int
114114

115+
func (m ModeGet) String() string {
116+
switch m {
117+
case ModeGetRequest:
118+
return "Request"
119+
case ModeGetSync:
120+
return "Sync"
121+
case ModeGetLookup:
122+
return "Lookup"
123+
default:
124+
return "Unknown"
125+
}
126+
}
127+
115128
// Getter modes.
116129
const (
117130
// ModeGetRequest: when accessed for retrieval
@@ -125,6 +138,19 @@ const (
125138
// ModePut enumerates different Putter modes.
126139
type ModePut int
127140

141+
func (m ModePut) String() string {
142+
switch m {
143+
case ModePutRequest:
144+
return "Request"
145+
case ModePutSync:
146+
return "Sync"
147+
case ModePutUpload:
148+
return "Upload"
149+
default:
150+
return "Unknown"
151+
}
152+
}
153+
128154
// Putter modes.
129155
const (
130156
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
@@ -138,6 +164,19 @@ const (
138164
// ModeSet enumerates different Setter modes.
139165
type ModeSet int
140166

167+
func (m ModeSet) String() string {
168+
switch m {
169+
case ModeSetAccess:
170+
return "Access"
171+
case ModeSetSync:
172+
return "Sync"
173+
case ModeSetRemove:
174+
return "Remove"
175+
default:
176+
return "Unknown"
177+
}
178+
}
179+
141180
// Setter modes.
142181
const (
143182
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery

swarm/shed/db.go

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,7 @@ const (
4545
// It provides a schema functionality to store fields and indexes
4646
// information about naming and types.
4747
type DB struct {
48-
ldb *leveldb.DB
49-
50-
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
51-
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
52-
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
53-
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
54-
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
55-
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
56-
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
57-
48+
ldb *leveldb.DB
5849
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
5950
}
6051

@@ -86,13 +77,10 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) {
8677
}
8778
}
8879

89-
// Configure meters for DB
90-
db.configure(metricsPrefix)
91-
9280
// Create a quit channel for the periodic metrics collector and run it
9381
db.quit = make(chan struct{})
9482

95-
go db.meter(10 * time.Second)
83+
go db.meter(metricsPrefix, 10*time.Second)
9684

9785
return db, nil
9886
}
@@ -169,19 +157,22 @@ func (db *DB) Close() (err error) {
169157
return db.ldb.Close()
170158
}
171159

172-
// Configure configures the database metrics collectors
173-
func (db *DB) configure(prefix string) {
174-
// Initialize all the metrics collector at the requested prefix
175-
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
176-
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
177-
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
178-
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
179-
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
180-
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
181-
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
182-
}
160+
func (db *DB) meter(prefix string, refresh time.Duration) {
161+
// Meter for measuring the total time spent in database compaction
162+
compTimeMeter := metrics.NewRegisteredMeter(prefix+"compact/time", nil)
163+
// Meter for measuring the data read during compaction
164+
compReadMeter := metrics.NewRegisteredMeter(prefix+"compact/input", nil)
165+
// Meter for measuring the data written during compaction
166+
compWriteMeter := metrics.NewRegisteredMeter(prefix+"compact/output", nil)
167+
// Meter for measuring the write delay number due to database compaction
168+
writeDelayMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
169+
// Meter for measuring the write delay duration due to database compaction
170+
writeDelayNMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
171+
// Meter for measuring the effective amount of data read
172+
diskReadMeter := metrics.NewRegisteredMeter(prefix+"disk/read", nil)
173+
// Meter for measuring the effective amount of data written
174+
diskWriteMeter := metrics.NewRegisteredMeter(prefix+"disk/write", nil)
183175

184-
func (db *DB) meter(refresh time.Duration) {
185176
// Create the counters to store current and previous compaction values
186177
compactions := make([][]float64, 2)
187178
for i := 0; i < 2; i++ {
@@ -234,14 +225,14 @@ func (db *DB) meter(refresh time.Duration) {
234225
}
235226
}
236227
// Update all the requested meters
237-
if db.compTimeMeter != nil {
238-
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
228+
if compTimeMeter != nil {
229+
compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
239230
}
240-
if db.compReadMeter != nil {
241-
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
231+
if compReadMeter != nil {
232+
compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
242233
}
243-
if db.compWriteMeter != nil {
244-
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
234+
if compWriteMeter != nil {
235+
compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
245236
}
246237

247238
// Retrieve the write delay statistic
@@ -265,11 +256,11 @@ func (db *DB) meter(refresh time.Duration) {
265256
log.Error("Failed to parse delay duration", "err", err)
266257
continue
267258
}
268-
if db.writeDelayNMeter != nil {
269-
db.writeDelayNMeter.Mark(delayN - delaystats[0])
259+
if writeDelayNMeter != nil {
260+
writeDelayNMeter.Mark(delayN - delaystats[0])
270261
}
271-
if db.writeDelayMeter != nil {
272-
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
262+
if writeDelayMeter != nil {
263+
writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
273264
}
274265
// If a warning that db is performing compaction has been displayed, any subsequent
275266
// warnings will be withheld for one minute not to overwhelm the user.
@@ -300,11 +291,11 @@ func (db *DB) meter(refresh time.Duration) {
300291
log.Error("Bad syntax of write entry", "entry", parts[1])
301292
continue
302293
}
303-
if db.diskReadMeter != nil {
304-
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
294+
if diskReadMeter != nil {
295+
diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
305296
}
306-
if db.diskWriteMeter != nil {
307-
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
297+
if diskWriteMeter != nil {
298+
diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
308299
}
309300
iostats[0], iostats[1] = nRead, nWrite
310301

swarm/storage/localstore/gc.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package localstore
1818

1919
import (
20+
"time"
21+
2022
"github.com/ethereum/go-ethereum/log"
23+
"github.com/ethereum/go-ethereum/metrics"
2124
"github.com/ethereum/go-ethereum/swarm/shed"
2225
"github.com/syndtr/goleveldb/leveldb"
2326
)
@@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
7578
// the rest of the garbage as the batch size limit is reached.
7679
// This function is called in collectGarbageWorker.
7780
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
81+
metricName := "localstore.gc"
82+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
83+
defer totalTimeMetric(metricName, time.Now())
84+
defer func() {
85+
if err != nil {
86+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
87+
}
88+
}()
89+
7890
batch := new(leveldb.Batch)
7991
target := db.gcTarget()
8092

swarm/storage/localstore/localstore.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/ethereum/go-ethereum/log"
26+
"github.com/ethereum/go-ethereum/metrics"
2627
"github.com/ethereum/go-ethereum/swarm/chunk"
2728
"github.com/ethereum/go-ethereum/swarm/shed"
2829
"github.com/ethereum/go-ethereum/swarm/storage/mock"
@@ -388,3 +389,12 @@ func init() {
388389
return time.Now().UTC().UnixNano()
389390
}
390391
}
392+
393+
// totalTimeMetric logs a message about time between provided start time
394+
// and the time when the function is called and sends a resetting timer metric
395+
// with provided name appended with ".total-time".
396+
func totalTimeMetric(name string, start time.Time) {
397+
totalTime := time.Since(start)
398+
log.Trace(name+" total time", "time", totalTime)
399+
metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
400+
}

swarm/storage/localstore/mode_get.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@ package localstore
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"time"
2123

2224
"github.com/ethereum/go-ethereum/log"
25+
"github.com/ethereum/go-ethereum/metrics"
2326
"github.com/ethereum/go-ethereum/swarm/chunk"
2427
"github.com/ethereum/go-ethereum/swarm/shed"
28+
"github.com/ethereum/go-ethereum/swarm/spancontext"
29+
olog "github.com/opentracing/opentracing-go/log"
2530
"github.com/syndtr/goleveldb/leveldb"
2631
)
2732

@@ -30,7 +35,22 @@ import (
3035
// All required indexes will be updated required by the
3136
// Getter Mode. Get is required to implement chunk.Store
3237
// interface.
33-
func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
38+
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
39+
metricName := fmt.Sprintf("localstore.Get.%s", mode)
40+
41+
ctx, sp := spancontext.StartSpan(ctx, metricName)
42+
defer sp.Finish()
43+
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-get", mode.String()))
44+
45+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
46+
defer totalTimeMetric(metricName, time.Now())
47+
48+
defer func() {
49+
if err != nil {
50+
metrics.GetOrRegisterCounter(fmt.Sprintf(metricName+".error", mode), nil).Inc(1)
51+
}
52+
}()
53+
3454
out, err := db.get(mode, addr)
3555
if err != nil {
3656
if err == leveldb.ErrNotFound {
@@ -66,8 +86,14 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
6686
// for a new goroutine
6787
defer func() { <-db.updateGCSem }()
6888
}
89+
90+
metricName := "localstore.updateGC"
91+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
92+
defer totalTimeMetric(metricName, time.Now())
93+
6994
err := db.updateGC(out)
7095
if err != nil {
96+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
7197
log.Error("localstore update gc", "err", err)
7298
}
7399
// if gc update hook is defined, call it

swarm/storage/localstore/mode_has.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,28 @@ package localstore
1818

1919
import (
2020
"context"
21+
"time"
2122

23+
"github.com/ethereum/go-ethereum/metrics"
2224
"github.com/ethereum/go-ethereum/swarm/chunk"
25+
"github.com/ethereum/go-ethereum/swarm/spancontext"
26+
olog "github.com/opentracing/opentracing-go/log"
2327
)
2428

2529
// Has returns true if the chunk is stored in database.
26-
func (db *DB) Has(_ context.Context, addr chunk.Address) (bool, error) {
27-
return db.retrievalDataIndex.Has(addressToItem(addr))
30+
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
31+
metricName := "localstore.Has"
32+
33+
ctx, sp := spancontext.StartSpan(ctx, metricName)
34+
defer sp.Finish()
35+
sp.LogFields(olog.String("ref", addr.String()))
36+
37+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
38+
defer totalTimeMetric(metricName, time.Now())
39+
40+
has, err := db.retrievalDataIndex.Has(addressToItem(addr))
41+
if err != nil {
42+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
43+
}
44+
return has, err
2845
}

swarm/storage/localstore/mode_put.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,36 @@ package localstore
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"time"
2123

24+
"github.com/ethereum/go-ethereum/metrics"
2225
"github.com/ethereum/go-ethereum/swarm/chunk"
2326
"github.com/ethereum/go-ethereum/swarm/shed"
27+
"github.com/ethereum/go-ethereum/swarm/spancontext"
28+
olog "github.com/opentracing/opentracing-go/log"
2429
"github.com/syndtr/goleveldb/leveldb"
2530
)
2631

2732
// Put stores the Chunk to database and depending
2833
// on the Putter mode, it updates required indexes.
2934
// Put is required to implement chunk.Store
3035
// interface.
31-
func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
32-
return db.put(mode, chunkToItem(ch))
36+
func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
37+
metricName := fmt.Sprintf("localstore.Put.%s", mode)
38+
39+
ctx, sp := spancontext.StartSpan(ctx, metricName)
40+
defer sp.Finish()
41+
sp.LogFields(olog.String("ref", ch.Address().String()), olog.String("mode-put", mode.String()))
42+
43+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
44+
defer totalTimeMetric(metricName, time.Now())
45+
46+
exists, err = db.put(mode, chunkToItem(ch))
47+
if err != nil {
48+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
49+
}
50+
return exists, err
3351
}
3452

3553
// put stores Item to database and updates other

swarm/storage/localstore/mode_set.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,35 @@ package localstore
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"time"
2123

24+
"github.com/ethereum/go-ethereum/metrics"
2225
"github.com/ethereum/go-ethereum/swarm/chunk"
26+
"github.com/ethereum/go-ethereum/swarm/spancontext"
27+
olog "github.com/opentracing/opentracing-go/log"
2328
"github.com/syndtr/goleveldb/leveldb"
2429
)
2530

2631
// Set updates database indexes for a specific
2732
// chunk represented by the address.
2833
// Set is required to implement chunk.Store
2934
// interface.
30-
func (db *DB) Set(_ context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
31-
return db.set(mode, addr)
35+
func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
36+
metricName := fmt.Sprintf("localstore.Set.%s", mode)
37+
38+
ctx, sp := spancontext.StartSpan(ctx, metricName)
39+
defer sp.Finish()
40+
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-set", mode.String()))
41+
42+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
43+
defer totalTimeMetric(metricName, time.Now())
44+
45+
err = db.set(mode, addr)
46+
if err != nil {
47+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
48+
}
49+
return err
3250
}
3351

3452
// set updates database indexes for a specific

0 commit comments

Comments
 (0)