-
Notifications
You must be signed in to change notification settings - Fork 110
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| package rushed | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "sync" | ||
|
|
||
| "github.com/ethereum/go-ethereum/log" | ||
| "github.com/ethereum/go-ethereum/swarm/shed" | ||
| "github.com/ethereum/go-ethereum/swarm/storage" | ||
| "github.com/syndtr/goleveldb/leveldb" | ||
| ) | ||
|
|
||
| // Mode is an enum for modes of access/update | ||
| type Mode = int | ||
|
|
||
| var ( | ||
| errDBClosed = errors.New("DB closed") | ||
| ) | ||
|
|
||
| // Batch wraps leveldb.Batch extending it with a waitgroup and a done channel | ||
| type Batch struct { | ||
| *leveldb.Batch | ||
| Done chan struct{} // to signal when batch is written | ||
| Err error // error resulting from write | ||
| } | ||
|
|
||
| // NewBatch constructs a new batch | ||
| func NewBatch() *Batch { | ||
| return &Batch{ | ||
| Batch: new(leveldb.Batch), | ||
| Done: make(chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| // DB extends shed DB with batch execution, garbage collection and iteration support with subscriptions | ||
| type DB struct { | ||
| *shed.DB // underlying shed.DB | ||
| update func(*Batch, Mode, *shed.IndexItem) error // mode-dependent update method | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. on |
||
| access func(Mode, *shed.IndexItem) error // mode-dependent access method | ||
| batch *Batch // current batch | ||
| mu sync.RWMutex // mutex for accessing current batch | ||
| c chan struct{} // channel to signal writes on | ||
| closed chan struct{} // closed when writeBatches loop quits | ||
| } | ||
|
|
||
| // New constructs a new DB | ||
| func New(sdb *shed.DB, update func(*Batch, Mode, *shed.IndexItem) error, access func(Mode, *shed.IndexItem) error) *DB { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe I would extract update and access to a private Type => it would keep the method signature clearer and their definition is already duplicated (see DB struct) |
||
| db := &DB{ | ||
| DB: sdb, | ||
| update: update, | ||
| access: access, | ||
| batch: NewBatch(), | ||
| c: make(chan struct{}, 1), | ||
| closed: make(chan struct{}), | ||
| } | ||
| go db.writeBatches() | ||
| return db | ||
| } | ||
|
|
||
| // Close terminates loops by closing the quit channel | ||
| func (db *DB) Close() { | ||
| // signal quit to writeBatches loop | ||
| close(db.c) | ||
| // wait for last batch to be written | ||
| <-db.closed | ||
| db.DB.Close() | ||
| } | ||
|
|
||
| // Accessor is a wrapper around DB where Put/Get is overwritten to apply the | ||
| // update/access method for the mode | ||
| // using Mode(mode) the DB implements the ChunkStore interface | ||
| type Accessor struct { | ||
| mode Mode | ||
| *DB | ||
| } | ||
|
|
||
| // Mode returns the ChunkStore interface for the mode of update on a multimode update DB | ||
| func (db *DB) Mode(mode Mode) *Accessor { | ||
| return &Accessor{ | ||
| mode: mode, | ||
| DB: db, | ||
| } | ||
| } | ||
|
|
||
| // Put overwrites the underlying DB Put method for the specific mode of update | ||
| func (u *Accessor) Put(ctx context.Context, ch storage.Chunk) error { | ||
| return u.Update(ctx, u.mode, newItemFromChunk(ch)) | ||
| } | ||
|
|
||
| // Get overwrites the underlying DB Get method for the specific mode of access | ||
| func (u *Accessor) Get(_ context.Context, addr storage.Address) (storage.Chunk, error) { | ||
| item := newItemFromAddress(addr) | ||
| if err := u.access(u.mode, item); err != nil { | ||
| return nil, err | ||
| } | ||
| return storage.NewChunk(item.Address, item.Data), nil | ||
| } | ||
|
|
||
| // Update calls the update method for the specific mode with items | ||
| func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error { | ||
| // obtain the current batch | ||
| // b := <-db.batch | ||
| db.mu.RLock() | ||
| b := db.batch | ||
| db.mu.RUnlock() | ||
| log.Debug("obtained batch") | ||
| if b == nil { | ||
| return errDBClosed | ||
| } | ||
| // call the update with the access mode | ||
| err := db.update(b, mode, item) | ||
| if err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am I the only one who favors the "simple statement" format |
||
| return err | ||
| } | ||
| // wait for batch to be written and return batch error | ||
| // this is in order for Put calls to be synchronous | ||
| select { | ||
| case db.c <- struct{}{}: | ||
| default: | ||
| } | ||
| select { | ||
| case <-b.Done: | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
| return b.Err | ||
| } | ||
|
|
||
| // writeBatches is a forever loop handing out the current batch to updaters | ||
| // and apply the batch when the db is free | ||
| // if the db is quit, the last batch is written out and batch channel is closed | ||
| func (db *DB) writeBatches() { | ||
| defer close(db.closed) | ||
| for range db.c { | ||
| db.mu.Lock() | ||
| b := db.batch | ||
| db.batch = NewBatch() | ||
| db.mu.Unlock() | ||
| db.writeBatch(b) | ||
| } | ||
| } | ||
|
|
||
| // writeBatch writes out the batch, sets the error and closes the done channel | ||
| func (db *DB) writeBatch(b *Batch) { | ||
| // apply the batch | ||
| b.Err = db.DB.WriteBatch(b.Batch) | ||
| // signal batch write to callers | ||
| close(b.Done) | ||
| } | ||
|
|
||
| func newItemFromChunk(ch storage.Chunk) *shed.IndexItem { | ||
| return &shed.IndexItem{ | ||
| Address: ch.Address(), | ||
| Data: ch.Data(), | ||
| } | ||
| } | ||
|
|
||
| func newItemFromAddress(addr storage.Address) *shed.IndexItem { | ||
| return &shed.IndexItem{ | ||
| Address: addr, | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| package rushed | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/binary" | ||
| "errors" | ||
| "flag" | ||
| "fmt" | ||
| "io/ioutil" | ||
| "os" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/ethereum/go-ethereum/log" | ||
| "github.com/ethereum/go-ethereum/swarm/chunk" | ||
| "github.com/ethereum/go-ethereum/swarm/shed" | ||
| "github.com/ethereum/go-ethereum/swarm/storage" | ||
| colorable "github.com/mattn/go-colorable" | ||
| ) | ||
|
|
||
| var ( | ||
| loglevel = flag.Int("loglevel", 3, "verbosity of logs") | ||
| ) | ||
|
|
||
| func init() { | ||
| flag.Parse() | ||
| log.PrintOrigins(true) | ||
| log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) | ||
| } | ||
|
|
||
| type tester struct { | ||
| index shed.Index | ||
| db *DB | ||
| } | ||
|
|
||
| func (t *tester) access(mode Mode, item *shed.IndexItem) error { | ||
| it, err := t.index.Get(*item) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| *item = it | ||
| return nil | ||
| } | ||
|
|
||
| // update defines set accessors for different modes | ||
| func (t *tester) update(b *Batch, mode Mode, item *shed.IndexItem) error { | ||
| if mode != 0 { | ||
| return errors.New("no such mode") | ||
| } | ||
| return t.index.PutInBatch(b.Batch, *item) | ||
| } | ||
|
|
||
| func newTester(path string) (*tester, error) { | ||
| tester := new(tester) | ||
| sdb, err := shed.NewDB(path) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| tester.db = New(sdb, tester.update, tester.access) | ||
| tester.index, err = sdb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{ | ||
| EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { | ||
| return fields.Address, nil | ||
| }, | ||
| DecodeKey: func(key []byte) (e shed.IndexItem, err error) { | ||
| e.Address = key | ||
| return e, nil | ||
| }, | ||
| EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { | ||
| b := make([]byte, 16) | ||
| binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp)) | ||
| binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp)) | ||
| value = append(b, fields.Data...) | ||
| return value, nil | ||
| }, | ||
| DecodeValue: func(value []byte) (e shed.IndexItem, err error) { | ||
| e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) | ||
| e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) | ||
| e.Data = value[16:] | ||
| return e, nil | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return tester, nil | ||
| } | ||
|
|
||
| func TestPutGet(t *testing.T) { | ||
| path, err := ioutil.TempDir("", "rushed-test") | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer os.RemoveAll(path) | ||
| tester, err := newTester(path) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer tester.db.Close() | ||
| s := tester.db.Mode(0) | ||
| ch := storage.GenerateRandomChunk(chunk.DefaultSize) | ||
| log.Debug("put") | ||
| err = s.Put(context.Background(), ch) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| log.Debug("get") | ||
| got, err := s.Get(context.Background(), ch.Address()) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if !bytes.Equal(got.Data(), ch.Data()) { | ||
| t.Fatal("chunk data mismatch after retrieval") | ||
| } | ||
| } | ||
|
|
||
| type putter interface { | ||
| Put(context.Context, storage.Chunk) error | ||
| } | ||
|
|
||
| func (t *tester) Put(_ context.Context, ch storage.Chunk) error { | ||
| return t.index.Put(*(newItemFromChunk(ch))) | ||
| } | ||
| func BenchmarkPut(b *testing.B) { | ||
| n := 128 | ||
| for j := 0; j < 2; j++ { | ||
| n *= 2 | ||
| chunks := make([]storage.Chunk, n) | ||
| for k := 0; k < n; k++ { | ||
| chunks[k] = storage.GenerateRandomChunk(chunk.DefaultSize) | ||
| } | ||
| in := 1 * time.Nanosecond | ||
| for i := 0; i < 4; i++ { | ||
| for _, name := range []string{"shed", "rushed"} { | ||
| b.Run(fmt.Sprintf("N=%v Interval=%v, DB=%v", n, in, name), func(t *testing.B) { | ||
| benchmarkPut(t, chunks, in, name) | ||
| }) | ||
| } | ||
| in *= time.Duration(100) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func benchmarkPut(b *testing.B, chunks []storage.Chunk, in time.Duration, name string) { | ||
| for i := 0; i < b.N; i++ { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| b.StopTimer() | ||
| path, err := ioutil.TempDir("", "rushed-test") | ||
| if err != nil { | ||
| b.Fatal(err) | ||
| } | ||
| tester, err := newTester(path) | ||
| if err != nil { | ||
| os.RemoveAll(path) | ||
| b.Fatal(err) | ||
| } | ||
| var db putter | ||
| if name == "shed" { | ||
| db = tester | ||
| } else { | ||
| db = tester.db.Mode(0) | ||
| } | ||
| var wg sync.WaitGroup | ||
| wg.Add(len(chunks)) | ||
| ctx := context.Background() | ||
| b.StartTimer() | ||
| for _, ch := range chunks { | ||
| time.Sleep(in) | ||
| go func(chu storage.Chunk) { | ||
| defer wg.Done() | ||
| db.Put(ctx, chu) | ||
| }(ch) | ||
| } | ||
| wg.Wait() | ||
| b.StopTimer() | ||
| tester.db.Close() | ||
| os.RemoveAll(path) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't expose
newBatch(). Do we need to expose Batch?(or vice-versa, if we expose
Batchmaybe we should exposenewBatch, too)