Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions swarm/shed/rushed/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package rushed

import (
"context"
"errors"
"sync"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
)

type Mode = int

var (
errDBClosed = errors.New("DB closed")
errCancelled = errors.New("iteration cancelled")
)

// batch wraps leveldb batch extending it with a waitgroup and a done channel
type Batch struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expose newBatch(). Do we need to expose Batch?
(or vice-versa, if we expose Batch maybe we should expose newBatch, too)

*leveldb.Batch
wg sync.WaitGroup // to signal and wait for parallel writes to batch
Done chan struct{} // to signal when batch is written
Err error // error resulting from write
}

// newBatch constructs a new batch
func newBatch() *Batch {
return &Batch{
Batch: new(leveldb.Batch),
Done: make(chan struct{}),
}
}

// DB extends shed DB with batch execution, garbage collection and iteration support with subscriptions
type DB struct {
*shed.DB // underlying shed.DB
update func(*Batch, Mode, *shed.IndexItem) error // mode-dependent update method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on update/access I would be happy to see more comments :) At this point, as just getting familiar with the code, it's totally not clear what their purpose is

access func(Mode, *shed.IndexItem) error // mode dependent access method
batch chan *Batch // channel to obtain current batch
quit chan struct{} // channel to be closed when DB quits
}

// New constructs a new DB
func New(sdb *shed.DB, update func(*Batch, Mode, *shed.IndexItem) error, access func(Mode, *shed.IndexItem) error) *DB {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I would extract update and access to a private Type => it would keep the method signature clearer and their definition is already duplicated (see DB struct)

db := &DB{
DB: sdb,
update: update,
access: access,
batch: make(chan *Batch),
quit: make(chan struct{}),
}
go db.listen()
return db
}

// Close terminates loops by closing the quit channel
func (db *DB) Close() {
// signal quit to listen loop
close(db.quit)
// wait till batch channel is closed and last batch is written
for b := range db.batch {
b.wg.Done()
<-b.Done
}
// close shed db
db.DB.Close()
}

// Accessor is a wrapper around DB where Put/Get is overwritten to apply the
// update/access method for the mode
// using With(mode) the DB implements the ChunkStore interface
type Accessor struct {
mode Mode
*DB
}

// Mode returns the ChunkStore interface for the mode of update on a multimode update DB
func (db *DB) Mode(mode Mode) *Accessor {
return &Accessor{
mode: mode,
DB: db,
}
}

// Put overwrites the underlying DB Put method for the specific mode of update
func (u *Accessor) Put(ctx context.Context, ch storage.Chunk) error {
return u.Update(ctx, u.mode, newItemFromChunk(ch))
}

// Get overwrites the underlying DB Get method for the specific mode of access
func (u *Accessor) Get(_ context.Context, addr storage.Address) (storage.Chunk, error) {
item := newItemFromAddress(addr)
if err := u.access(u.mode, item); err != nil {
return nil, err
}
return storage.NewChunk(item.Address, item.Data), nil
}

// Update calls the update method for the specific mode with items
func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error {
// obtain the current batch
b := <-db.batch
log.Debug("obtained batch")
if b == nil {
return errDBClosed
}
// call the update with the access mode
err := db.update(b, mode, item)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I the only one who favors the "simple statement" format if?
I think it's clearer that the variable only needed for a limited scope and does not pollute the scope.
(For me it's also easier to understand as I immediately know the scope I have to consider is just a few lines.)
=> if err:= foo(); err != nil { <3

return err
}
// signal to listen loop that the update to batch is complete
b.wg.Done()
// wait for batch to be written and return batch error
// this is in order for Put calls to be synchronous
select {
case <-b.Done:
case <-ctx.Done():
return ctx.Err()
}
return b.Err
}

// listen is a forever loop handing out the current batch to updaters
// and apply the batch when the db is free
// if the db is quit, the last batch is written out and batch channel is closed
func (db *DB) listen() {
b := newBatch() // current batch
var done chan struct{} //
wasdone := make(chan struct{})
close(wasdone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I did not get it why the hack we close this channel immediately. 1st though was we missed a defer.

for {
b.wg.Add(1)
select {
case db.batch <- b:
// allow
done = wasdone
case <-done:
b.wg.Done()
// if batchwriter is idle, hand over the batch and creates a new one
// if batchwriter loop is busy, keep adding to the same batch
go db.writeBatch(b)
wasdone = b.Done
// disable case until further ops happen
done = nil
b = newBatch()
case <-db.quit:
// make sure batch is saved to disk so as not to lose chunks
if done != nil {
b.wg.Done()
db.writeBatch(b)
<-b.Done
}
close(db.batch)
return
}
}
}

// writeBatch writes out the batch, sets the error and closes the done channel
func (db *DB) writeBatch(b *Batch) {
// wait for all updaters to finish writing to this batch
b.wg.Wait()
// apply the batch
b.Err = db.DB.WriteBatch(b.Batch)
// signal batch write to callers
close(b.Done)
}

/*
Address []byte
Data []byte
AccessTimestamp int64
StoreTimestamp int64
*/
func newItemFromChunk(ch storage.Chunk) *shed.IndexItem {
return &shed.IndexItem{
Address: ch.Address(),
Data: ch.Data(),
}
}

func newItemFromAddress(addr storage.Address) *shed.IndexItem {
return &shed.IndexItem{
Address: addr,
}
}
169 changes: 169 additions & 0 deletions swarm/shed/rushed/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package rushed

import (
"bytes"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
colorable "github.com/mattn/go-colorable"
)

var (
loglevel = flag.Int("loglevel", 3, "verbosity of logs")
)

func init() {
flag.Parse()
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}

type tester struct {
index shed.Index
db *DB
}

func (t *tester) access(mode Mode, item *shed.IndexItem) error {
it, err := t.index.Get(*item)
if err != nil {
return err
}
*item = it
return nil
}

// update defines set accessors for different modes
func (t *tester) update(b *Batch, mode Mode, item *shed.IndexItem) error {
if mode != 0 {
return errors.New("no such mode")
}
return t.index.PutInBatch(b.Batch, *item)
}

func newTester(path string) (*tester, error) {
tester := new(tester)
sdb, err := shed.NewDB(path)
if err != nil {
return nil, err
}
tester.db = New(sdb, tester.update, tester.access)
tester.index, err = sdb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
b := make([]byte, 16)
binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp))
binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp))
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.Data = value[16:]
return e, nil
},
})
if err != nil {
return nil, err
}
return tester, nil
}

func TestPutGet(t *testing.T) {
path, err := ioutil.TempDir("", "rushed-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(path)
tester, err := newTester(path)
if err != nil {
t.Fatal(err)
}
defer tester.db.Close()
s := tester.db.Mode(0)
ch := storage.GenerateRandomChunk(chunk.DefaultSize)
log.Debug("put")
err = s.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
log.Debug("get")
got, err := s.Get(context.Background(), ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Fatal("chunk data mismatch after retrieval")
}
}

type putter interface {
Put(context.Context, storage.Chunk) error
}

func (t *tester) Put(_ context.Context, ch storage.Chunk) error {
return t.index.Put(*(newItemFromChunk(ch)))
}
func BenchmarkPut(b *testing.B) {
n := 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe: n => numberOfChunks ?
It's not clear from here why 128 or why n. Those two together makes it hard to resolve the meaning.

for j := 0; j < 5; j++ {
n *= 2
in := time.Nanosecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of in?

for i := 0; i < 3; i++ {
for _, name := range []string{"shed", "rushed"} {
path, err := ioutil.TempDir("", "rushed-test")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(path)
tester, err := newTester(path)
if err != nil {
b.Fatal(err)
}
defer tester.db.Close()
var db putter
if name == "shed" {
db = tester
} else {
db = tester.db.Mode(0)
}
b.Run(fmt.Sprintf("N=%v Interval=%v, DB=%v", n, in, name), func(t *testing.B) {
benchmarkPut(t, n, in, db)
})
}
in *= time.Duration(10)
}
}
}

func benchmarkPut(b *testing.B, n int, in time.Duration, db putter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in parameter not used

for i := 0; i < b.N; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • before this for loop I would generate the random chunks => so we don't benchmark the generate function, too
  • actually I would try to start only X number of Goroutines, where X should correlate to the number of CPUs (as we cannot run more in parallel anyway)
  • each Goroutine would get it's own slice and would try to iterate that as fast with put as possible

var wg sync.WaitGroup
wg.Add(n)
for j := 0; j < n; j++ {
go func() {
defer wg.Done()
db.Put(context.Background(), storage.GenerateRandomChunk(chunk.DefaultSize))
}()
}
wg.Wait()
}
}
Loading