Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions swarm/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) {
// ModeGet enumerates different Getter modes.
type ModeGet int

func (m ModeGet) String() string {
switch m {
case ModeGetRequest:
return "Request"
case ModeGetSync:
return "Sync"
case ModeGetLookup:
return "Lookup"
default:
return "Unknown"
}
}

// Getter modes.
const (
// ModeGetRequest: when accessed for retrieval
Expand All @@ -125,6 +138,19 @@ const (
// ModePut enumerates different Putter modes.
type ModePut int

func (m ModePut) String() string {
switch m {
case ModePutRequest:
return "Request"
case ModePutSync:
return "Sync"
case ModePutUpload:
return "Upload"
default:
return "Unknown"
}
}

// Putter modes.
const (
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
Expand All @@ -138,6 +164,19 @@ const (
// ModeSet enumerates different Setter modes.
type ModeSet int

func (m ModeSet) String() string {
switch m {
case ModeSetAccess:
return "Access"
case ModeSetSync:
return "Sync"
case ModeSetRemove:
return "Remove"
default:
return "Unknown"
}
}

// Setter modes.
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
Expand Down
71 changes: 31 additions & 40 deletions swarm/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,7 @@ const (
// It provides a schema functionality to store fields and indexes
// information about naming and types.
type DB struct {
ldb *leveldb.DB

compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written

ldb *leveldb.DB
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
}

Expand Down Expand Up @@ -86,13 +77,10 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) {
}
}

// Configure meters for DB
db.configure(metricsPrefix)

// Create a quit channel for the periodic metrics collector and run it
db.quit = make(chan struct{})

go db.meter(10 * time.Second)
go db.meter(metricsPrefix, 10*time.Second)

return db, nil
}
Expand Down Expand Up @@ -169,19 +157,22 @@ func (db *DB) Close() (err error) {
return db.ldb.Close()
}

// Configure configures the database metrics collectors
func (db *DB) configure(prefix string) {
// Initialize all the metrics collector at the requested prefix
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
}
func (db *DB) meter(prefix string, refresh time.Duration) {
// Meter for measuring the total time spent in database compaction
compTimeMeter := metrics.NewRegisteredMeter(prefix+"compact/time", nil)
// Meter for measuring the data read during compaction
compReadMeter := metrics.NewRegisteredMeter(prefix+"compact/input", nil)
// Meter for measuring the data written during compaction
compWriteMeter := metrics.NewRegisteredMeter(prefix+"compact/output", nil)
// Meter for measuring the write delay number due to database compaction
writeDelayMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
// Meter for measuring the write delay duration due to database compaction
writeDelayNMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
// Meter for measuring the effective amount of data read
diskReadMeter := metrics.NewRegisteredMeter(prefix+"disk/read", nil)
// Meter for measuring the effective amount of data written
diskWriteMeter := metrics.NewRegisteredMeter(prefix+"disk/write", nil)

func (db *DB) meter(refresh time.Duration) {
// Create the counters to store current and previous compaction values
compactions := make([][]float64, 2)
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -234,14 +225,14 @@ func (db *DB) meter(refresh time.Duration) {
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
if compTimeMeter != nil {
compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
if compReadMeter != nil {
compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
if compWriteMeter != nil {
compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
}

// Retrieve the write delay statistic
Expand All @@ -265,11 +256,11 @@ func (db *DB) meter(refresh time.Duration) {
log.Error("Failed to parse delay duration", "err", err)
continue
}
if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(delayN - delaystats[0])
if writeDelayNMeter != nil {
writeDelayNMeter.Mark(delayN - delaystats[0])
}
if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
if writeDelayMeter != nil {
writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
}
// If a warning that db is performing compaction has been displayed, any subsequent
// warnings will be withheld for one minute not to overwhelm the user.
Expand Down Expand Up @@ -300,11 +291,11 @@ func (db *DB) meter(refresh time.Duration) {
log.Error("Bad syntax of write entry", "entry", parts[1])
continue
}
if db.diskReadMeter != nil {
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
if diskReadMeter != nil {
diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
}
if db.diskWriteMeter != nil {
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
if diskWriteMeter != nil {
diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
}
iostats[0], iostats[1] = nRead, nWrite

Expand Down
12 changes: 12 additions & 0 deletions swarm/storage/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package localstore

import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
Expand Down Expand Up @@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
metricName := "localstore.gc"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
}()

batch := new(leveldb.Batch)
target := db.gcTarget()

Expand Down
10 changes: 10 additions & 0 deletions swarm/storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
Expand Down Expand Up @@ -388,3 +389,12 @@ func init() {
return time.Now().UTC().UnixNano()
}
}

// totalTimeMetric logs a message about time between provided start time
// and the time when the function is called and sends a resetting timer metric
// with provided name appended with ".total-time".
func totalTimeMetric(name string, start time.Time) {
totalTime := time.Since(start)
log.Trace(name+" total time", "time", totalTime)
metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
}
28 changes: 27 additions & 1 deletion swarm/storage/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)

Expand All @@ -30,7 +35,22 @@ import (
// All required indexes will be updated required by the
// Getter Mode. Get is required to implement chunk.Store
// interface.
func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore.Get.%s", mode)

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-get", mode.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

defer func() {
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf(metricName+".error", mode), nil).Inc(1)
}
}()

out, err := db.get(mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
Expand Down Expand Up @@ -66,8 +86,14 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
// for a new goroutine
defer func() { <-db.updateGCSem }()
}

metricName := "localstore.updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

err := db.updateGC(out)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
Expand Down
21 changes: 19 additions & 2 deletions swarm/storage/localstore/mode_has.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,28 @@ package localstore

import (
"context"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
)

// Has returns true if the chunk is stored in database.
func (db *DB) Has(_ context.Context, addr chunk.Address) (bool, error) {
return db.retrievalDataIndex.Has(addressToItem(addr))
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
metricName := "localstore.Has"

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", addr.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

has, err := db.retrievalDataIndex.Has(addressToItem(addr))
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return has, err
}
22 changes: 20 additions & 2 deletions swarm/storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,36 @@ package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)

// Put stores the Chunk to database and depending
// on the Putter mode, it updates required indexes.
// Put is required to implement chunk.Store
// interface.
func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
return db.put(mode, chunkToItem(ch))
func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
metricName := fmt.Sprintf("localstore.Put.%s", mode)

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", ch.Address().String()), olog.String("mode-put", mode.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

exists, err = db.put(mode, chunkToItem(ch))
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return exists, err
}

// put stores Item to database and updates other
Expand Down
22 changes: 20 additions & 2 deletions swarm/storage/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,35 @@ package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/spancontext"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
)

// Set updates database indexes for a specific
// chunk represented by the address.
// Set is required to implement chunk.Store
// interface.
func (db *DB) Set(_ context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
return db.set(mode, addr)
func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
metricName := fmt.Sprintf("localstore.Set.%s", mode)

ctx, sp := spancontext.StartSpan(ctx, metricName)
defer sp.Finish()
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-set", mode.String()))

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

err = db.set(mode, addr)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return err
}

// set updates database indexes for a specific
Expand Down
Loading