From 9141ccca4ef7778874a77080e1935ee27866f7eb Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 20 Nov 2018 10:01:58 +0100 Subject: [PATCH 1/3] swarm/shed: new package rushed --- swarm/shed/rushed/db.go | 190 +++++++++++++++++++++++++++++++++ swarm/shed/rushed/db_test.go | 169 +++++++++++++++++++++++++++++ swarm/shed/rushed/subscribe.go | 115 ++++++++++++++++++++ 3 files changed, 474 insertions(+) create mode 100644 swarm/shed/rushed/db.go create mode 100644 swarm/shed/rushed/db_test.go create mode 100644 swarm/shed/rushed/subscribe.go diff --git a/swarm/shed/rushed/db.go b/swarm/shed/rushed/db.go new file mode 100644 index 0000000000..6d8bee123e --- /dev/null +++ b/swarm/shed/rushed/db.go @@ -0,0 +1,190 @@ +package rushed + +import ( + "context" + "errors" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +type Mode = int + +var ( + errDBClosed = errors.New("DB closed") + errCancelled = errors.New("iteration cancelled") +) + +// batch wraps leveldb batch extending it with a waitgroup and a done channel +type Batch struct { + *leveldb.Batch + wg sync.WaitGroup // to signal and wait for parallel writes to batch + Done chan struct{} // to signal when batch is written + Err error // error resulting from write +} + +// newBatch constructs a new batch +func newBatch() *Batch { + return &Batch{ + Batch: new(leveldb.Batch), + Done: make(chan struct{}), + } +} + +// DB extends shed DB with batch execution, garbage collection and iteration support with subscriptions +type DB struct { + *shed.DB // underlying shed.DB + update func(*Batch, Mode, *shed.IndexItem) error // mode-dependent update method + access func(Mode, *shed.IndexItem) error // mode dependent access method + batch chan *Batch // channel to obtain current batch + quit chan struct{} // channel to be closed when DB quits +} + +// New constructs a new DB +func New(sdb *shed.DB, update func(*Batch, Mode, *shed.IndexItem) error, access func(Mode, *shed.IndexItem) error) *DB { + db := &DB{ + DB: sdb, + update: update, + access: access, + batch: make(chan *Batch), + quit: make(chan struct{}), + } + go db.listen() + return db +} + +// Close terminates loops by closing the quit channel +func (db *DB) Close() { + // signal quit to listen loop + close(db.quit) + // wait till batch channel is closed and last batch is written + for b := range db.batch { + b.wg.Done() + <-b.Done + } + // close shed db + db.DB.Close() +} + +// Accessor is a wrapper around DB where Put/Get is overwritten to apply the +// update/access method for the mode +// using With(mode) the DB implements the ChunkStore interface +type Accessor struct { + mode Mode + *DB +} + +// Mode returns the ChunkStore interface for the mode of update on a multimode update DB +func (db *DB) Mode(mode Mode) *Accessor { + return &Accessor{ + mode: mode, + DB: db, + } +} + +// Put overwrites the underlying DB Put method for the specific mode of update +func (u *Accessor) Put(ctx context.Context, ch storage.Chunk) error { + return u.Update(ctx, u.mode, newItemFromChunk(ch)) +} + +// Get overwrites the underlying DB Get method for the specific mode of access +func (u *Accessor) Get(_ context.Context, addr storage.Address) (storage.Chunk, error) { + item := newItemFromAddress(addr) + if err := u.access(u.mode, item); err != nil { + return nil, err + } + return storage.NewChunk(item.Address, item.Data), nil +} + +// Update calls the update method for the specific mode with items +func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error { + // obtain the current batch + b := <-db.batch + log.Debug("obtained batch") + if b == nil { + return errDBClosed + } + // call the update with the access mode + err := db.update(b, mode, item) + if err != nil { + return err + } + // signal to listen loop that the update to batch is complete + b.wg.Done() + // wait for batch to be written and return batch error + // this is in order for Put calls to be synchronous + select { + case <-b.Done: + case <-ctx.Done(): + return ctx.Err() + } + return b.Err +} + +// listen is a forever loop handing out the current batch to updaters +// and apply the batch when the db is free +// if the db is quit, the last batch is written out and batch channel is closed +func (db *DB) listen() { + b := newBatch() // current batch + var done chan struct{} // + wasdone := make(chan struct{}) + close(wasdone) + for { + b.wg.Add(1) + select { + case db.batch <- b: + // allow + done = wasdone + case <-done: + b.wg.Done() + // if batchwriter is idle, hand over the batch and creates a new one + // if batchwriter loop is busy, keep adding to the same batch + go db.writeBatch(b) + wasdone = b.Done + // disable case until further ops happen + done = nil + b = newBatch() + case <-db.quit: + // make sure batch is saved to disk so as not to lose chunks + if done != nil { + b.wg.Done() + db.writeBatch(b) + <-b.Done + } + close(db.batch) + return + } + } +} + +// writeBatch writes out the batch, sets the error and closes the done channel +func (db *DB) writeBatch(b *Batch) { + // wait for all updaters to finish writing to this batch + b.wg.Wait() + // apply the batch + b.Err = db.DB.WriteBatch(b.Batch) + // signal batch write to callers + close(b.Done) +} + +/* + Address []byte + Data []byte + AccessTimestamp int64 + StoreTimestamp int64 +*/ +func newItemFromChunk(ch storage.Chunk) *shed.IndexItem { + return &shed.IndexItem{ + Address: ch.Address(), + Data: ch.Data(), + } +} + +func newItemFromAddress(addr storage.Address) *shed.IndexItem { + return &shed.IndexItem{ + Address: addr, + } +} diff --git a/swarm/shed/rushed/db_test.go b/swarm/shed/rushed/db_test.go new file mode 100644 index 0000000000..dd02f6bcac --- /dev/null +++ b/swarm/shed/rushed/db_test.go @@ -0,0 +1,169 @@ +package rushed + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "flag" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + colorable "github.com/mattn/go-colorable" +) + +var ( + loglevel = flag.Int("loglevel", 3, "verbosity of logs") +) + +func init() { + flag.Parse() + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) +} + +type tester struct { + index shed.Index + db *DB +} + +func (t *tester) access(mode Mode, item *shed.IndexItem) error { + it, err := t.index.Get(*item) + if err != nil { + return err + } + *item = it + return nil +} + +// update defines set accessors for different modes +func (t *tester) update(b *Batch, mode Mode, item *shed.IndexItem) error { + if mode != 0 { + return errors.New("no such mode") + } + return t.index.PutInBatch(b.Batch, *item) +} + +func newTester(path string) (*tester, error) { + tester := new(tester) + sdb, err := shed.NewDB(path) + if err != nil { + return nil, err + } + tester.db = New(sdb, tester.update, tester.access) + tester.index, err = sdb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.Data = value[16:] + return e, nil + }, + }) + if err != nil { + return nil, err + } + return tester, nil +} + +func TestPutGet(t *testing.T) { + path, err := ioutil.TempDir("", "rushed-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(path) + tester, err := newTester(path) + if err != nil { + t.Fatal(err) + } + defer tester.db.Close() + s := tester.db.Mode(0) + ch := storage.GenerateRandomChunk(chunk.DefaultSize) + log.Debug("put") + err = s.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + log.Debug("get") + got, err := s.Get(context.Background(), ch.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got.Data(), ch.Data()) { + t.Fatal("chunk data mismatch after retrieval") + } +} + +type putter interface { + Put(context.Context, storage.Chunk) error +} + +func (t *tester) Put(_ context.Context, ch storage.Chunk) error { + return t.index.Put(*(newItemFromChunk(ch))) +} +func BenchmarkPut(b *testing.B) { + n := 128 + for j := 0; j < 5; j++ { + n *= 2 + in := time.Nanosecond + for i := 0; i < 3; i++ { + for _, name := range []string{"shed", "rushed"} { + path, err := ioutil.TempDir("", "rushed-test") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(path) + tester, err := newTester(path) + if err != nil { + b.Fatal(err) + } + defer tester.db.Close() + var db putter + if name == "shed" { + db = tester + } else { + db = tester.db.Mode(0) + } + b.Run(fmt.Sprintf("N=%v Interval=%v, DB=%v", n, in, name), func(t *testing.B) { + benchmarkPut(t, n, in, db) + }) + } + in *= time.Duration(10) + } + } +} + +func benchmarkPut(b *testing.B, n int, in time.Duration, db putter) { + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(n) + for j := 0; j < n; j++ { + go func() { + defer wg.Done() + db.Put(context.Background(), storage.GenerateRandomChunk(chunk.DefaultSize)) + }() + } + wg.Wait() + } +} diff --git a/swarm/shed/rushed/subscribe.go b/swarm/shed/rushed/subscribe.go new file mode 100644 index 0000000000..737543b779 --- /dev/null +++ b/swarm/shed/rushed/subscribe.go @@ -0,0 +1,115 @@ +package rushed + +import ( + "github.com/ethereum/go-ethereum/swarm/shed" +) + +const ( + iterBatchSize = 128 +) + +type Subscription struct { + cancel chan struct{} // cancel the subscription + err error +} + +func Subscribe(index *shed.Index, buffer chan *shed.IndexItem, from *shed.IndexItem, trigger chan struct{}) *Subscription { + cancel := make(chan struct{}) + f := func(item *shed.IndexItem) (bool, error) { + select { + case buffer <- item: + return false, nil + case <-cancel: + return false, errCancelled + } + } + s := &Subscription{ + cancel: cancel, + } + wait := func() (bool, error) { + select { + case <-trigger: + return false, nil + case <-cancel: + return false, errCancelled + } + } + go func() { + defer close(buffer) + s.err = Iterate(index, from, f, wait) + }() + return s +} + +// iterate is a wrapper to shed.IterateFrom that periodically iterates starting from 'from' +// and remembering the last item on each round and continue the iteration from this on the +// following round +// once the items are retrieved in a fixed slice of max iterBatchSize elements +// it iterates over this slice and applies f to each element +// f returns a bool which when true terminates the iteration +// error returned from f result in terminating the iteration and returning the error +// if the iterator reached the last item in the index it calls the wait function +func Iterate(index *shed.Index, from *shed.IndexItem, f func(*shed.IndexItem) (bool, error), wait func() (bool, error)) error { + items := make([]*shed.IndexItem, iterBatchSize) + pos := 0 + cur := from + size := 0 + // define feed function that populates the items slice + feed := func(item shed.IndexItem) (bool, error) { + // assign the item at pos + items[pos] = &item + pos++ + cur = &item + // if reached the end, stop + if pos == len(items) { + return true, nil + } + return false, nil + } + // read when called triggers an IterateFrom on the index, populates the items slice + read := func() (int, error) { + defer func() { pos = 0 }() + for { + if err := index.IterateFrom(*cur, feed); err != nil { + return size, err + } + if size > 0 { + break + } + // if no items are available it calls wait and returns if stop or error + stop, err := wait() + if err != nil { + return 0, err + } + if stop { + return 0, nil + } + } + return size, nil + } + cnt := 0 + for { + if cnt == size { + // retrieved items are all fed to buffer + // get a new batch + // if c is buffered channel, it can still get items while batch is read from disk + // size items read, last is set if after size element no more needed + var err error + size, err = read() + if err != nil { + return err + } + cnt = 0 + } + // calls f on the item + stop, err := f(items[cnt]) + if err != nil { + return err + } + if stop { + break + } + cnt++ + } + return nil +} From 1189c3dd954f9e71e9b1ac37d6246d8ebfc5f7cd Mon Sep 17 00:00:00 2001 From: zelig Date: Tue, 20 Nov 2018 10:03:20 +0100 Subject: [PATCH 2/3] swarm/storage: new package localstore --- swarm/storage/localstore/db.go | 230 +++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 swarm/storage/localstore/db.go diff --git a/swarm/storage/localstore/db.go b/swarm/storage/localstore/db.go new file mode 100644 index 0000000000..c4f8ad3e00 --- /dev/null +++ b/swarm/storage/localstore/db.go @@ -0,0 +1,230 @@ +package localstore + +import ( + "context" + "encoding/binary" + "errors" + "time" + + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/syndtr/goleveldb/leveldb" +) + +/* + types of access: + - just get the data + - increment access index + + - when uploaded or pull synced + - when delivered + - when push synced + - when accessed +*/ + +var ( + errInvalidMode = errors.New("invalid mode") +) + +// Modes of access/update +const ( + SYNCING rushed.Mode = iota + UPLOAD + REQUEST + SYNCED + ACCESS + REMOVAL +) + +// DB is a local chunkstore using disk storage +type DB struct { + *rushed.DB + // fields and indexes + schemaName shed.StringField + size shed.Uint64Field + retrieval shed.Index + push shed.Index + pull shed.Index + gc shed.Index +} + +// NewDB constructs a local chunks db +func NewDB(path string) (*DB, error) { + db := new(DB) + sdb := shed.NewDB(path) + db.DB = rushed.NewDB(sdb, db.update, db.access) + db.schemaName, err = idb.NewStringField("schema-name") + if err != nil { + return nil, err + } + db.size, err = idb.NewUint64Field("size") + if err != nil { + return nil, err + } + db.retrieval, err = idb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Hash, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Hash = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.StoredTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.Data = value[16:] + return e, nil + }, + }) + if err != nil { + return nil, err + } + // pull index allows history and live syncing per po bin + db.pull, err = idb.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + key = make([]byte, 41) + key[0] = byte(uint8(db.po(fields.Hash))) + binary.BigEndian.PutUint64(key[1:9], fields.StoredTimestamp) + copy(key[9:], fields.Hash[:]) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Hash = key[9:] + e.StoredTimestamp = int64(binary.BigEndian.Uint64(key[1:9])) + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + // push index contains as yet unsynced chunks + db.push, err = idb.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + key = make([]byte, 40) + binary.BigEndian.PutUint64(key[:8], fields.StoredTimestamp) + copy(key[8:], fields.Hash[:]) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Hash = key[8:] + e.StoredTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) + return e, nil + }, + }) + if err != nil { + return nil, err + } + // gc index for removable chunk ordered by ascending last access time + db.gcIndex, err = idb.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + b := make([]byte, 16, 16+len(fields.Hash)) + binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + key = append(b, fields.Hash...) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) + e.Hash = key[16:] + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + return db, nil +} + +// access defines get accessors for different modes +func (db *DB) access(b *leveldb.Batch, mode rushed.Mode, item *shed.IndexItem) error { + err := db.retrieve.Get(item) + switch mode { + case SYNCING: + case TESTSYNCING: + case REQUEST: + return db.Update(context.TODO(), REQUEST, item) + default: + return errInvalidMode + } + return nil +} + +// update defines set accessors for different modes +func (db *DB) update(b *rushed.Batch, mode rushed.Mode, item *shed.IndexItem) error { + switch mode { + case SYNCING: + // put to indexes: retrieve, pull + item.StoredTimestamp = now() + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.pull.PutInBatch(b, item) + db.size.IncInBatch(b) + + case UPLOAD: + // put to indexes: retrieve, push, pull + item.StoredTimestamp = now() + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.pull.PutInBatch(b, item) + db.push.PutInBatch(b, item) + + case REQUEST: + // put to indexes: retrieve, gc + item.StoredTimestamp = now() + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.gc.PutInBatch(b, item) + + case SYNCED: + // delete from push, insert to gc + db.push.DeleteInBatch(b, item) + db.gc.PutInBatch(b, item) + + case ACCESS: + // update accessTimeStamp in retrieve, gc + db.gc.DeleteInBatch(b, item) + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.gc.PutInBatch(b, item) + + case REMOVAL: + // delete from retrieve, pull, gc + db.retrieve.DeleteInBatch(b, item) + db.pull.DeleteInBatch(b, item) + db.gc.DeleteInBatch(b, item) + + default: + return errInvalidMode + } + return nil +} + +func now() uint64 { + return uint64(time.Now().UnixNano()) +} From e696a7b7f4736de035760d6d1e084157dee00b61 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 21 Nov 2018 15:02:01 +0100 Subject: [PATCH 3/3] change to lock --- swarm/shed/db.go | 13 ++++- swarm/shed/rushed/db.go | 103 ++++++++++++--------------------- swarm/shed/rushed/db_test.go | 66 ++++++++++++--------- swarm/shed/rushed/subscribe.go | 6 ++ 4 files changed, 92 insertions(+), 96 deletions(-) diff --git a/swarm/shed/db.go b/swarm/shed/db.go index 987c89dcff..325aebabd7 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -40,12 +40,19 @@ type DB struct { ldb *leveldb.DB } +var leveldbOptions = &opt.Options{ + Compression: opt.NoCompression, + BlockSize: 1 << 16, + // Default max open file descriptors (ulimit -n) is 256 on OS + // X, and >=1024 on (most?) Linux machines. So set to a low + // number since we have multiple leveldb instances. + OpenFilesCacheCapacity: 10, +} + // NewDB constructs a new DB and validates the schema // if it exists in database on the given path. func NewDB(path string) (db *DB, err error) { - ldb, err := leveldb.OpenFile(path, &opt.Options{ - OpenFilesCacheCapacity: openFileLimit, - }) + ldb, err := leveldb.OpenFile(path, leveldbOptions) if err != nil { return nil, err } diff --git a/swarm/shed/rushed/db.go b/swarm/shed/rushed/db.go index 6d8bee123e..69d330a5be 100644 --- a/swarm/shed/rushed/db.go +++ b/swarm/shed/rushed/db.go @@ -11,23 +11,22 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +// Mode is an enum for modes of access/update type Mode = int var ( - errDBClosed = errors.New("DB closed") - errCancelled = errors.New("iteration cancelled") + errDBClosed = errors.New("DB closed") ) -// batch wraps leveldb batch extending it with a waitgroup and a done channel +// Batch wraps leveldb.Batch extending it with a waitgroup and a done channel type Batch struct { *leveldb.Batch - wg sync.WaitGroup // to signal and wait for parallel writes to batch - Done chan struct{} // to signal when batch is written - Err error // error resulting from write + Done chan struct{} // to signal when batch is written + Err error // error resulting from write } -// newBatch constructs a new batch -func newBatch() *Batch { +// NewBatch constructs a new batch +func NewBatch() *Batch { return &Batch{ Batch: new(leveldb.Batch), Done: make(chan struct{}), @@ -38,9 +37,11 @@ func newBatch() *Batch { type DB struct { *shed.DB // underlying shed.DB update func(*Batch, Mode, *shed.IndexItem) error // mode-dependent update method - access func(Mode, *shed.IndexItem) error // mode dependent access method - batch chan *Batch // channel to obtain current batch - quit chan struct{} // channel to be closed when DB quits + access func(Mode, *shed.IndexItem) error // mode-dependent access method + batch *Batch // current batch + mu sync.RWMutex // mutex for accessing current batch + c chan struct{} // channel to signal writes on + closed chan struct{} // closed when writeBatches loop quits } // New constructs a new DB @@ -49,29 +50,26 @@ func New(sdb *shed.DB, update func(*Batch, Mode, *shed.IndexItem) error, access DB: sdb, update: update, access: access, - batch: make(chan *Batch), - quit: make(chan struct{}), + batch: NewBatch(), + c: make(chan struct{}, 1), + closed: make(chan struct{}), } - go db.listen() + go db.writeBatches() return db } // Close terminates loops by closing the quit channel func (db *DB) Close() { - // signal quit to listen loop - close(db.quit) - // wait till batch channel is closed and last batch is written - for b := range db.batch { - b.wg.Done() - <-b.Done - } - // close shed db + // signal quit to writeBatches loop + close(db.c) + // wait for last batch to be written + <-db.closed db.DB.Close() } // Accessor is a wrapper around DB where Put/Get is overwritten to apply the // update/access method for the mode -// using With(mode) the DB implements the ChunkStore interface +// using Mode(mode) the DB implements the ChunkStore interface type Accessor struct { mode Mode *DB @@ -102,7 +100,10 @@ func (u *Accessor) Get(_ context.Context, addr storage.Address) (storage.Chunk, // Update calls the update method for the specific mode with items func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error { // obtain the current batch - b := <-db.batch + // b := <-db.batch + db.mu.RLock() + b := db.batch + db.mu.RUnlock() log.Debug("obtained batch") if b == nil { return errDBClosed @@ -112,11 +113,13 @@ func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error if err != nil { return err } - // signal to listen loop that the update to batch is complete - b.wg.Done() // wait for batch to be written and return batch error // this is in order for Put calls to be synchronous select { + case db.c <- struct{}{}: + default: + } + select { case <-b.Done: case <-ctx.Done(): return ctx.Err() @@ -124,58 +127,28 @@ func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error return b.Err } -// listen is a forever loop handing out the current batch to updaters +// writeBatches is a forever loop handing out the current batch to updaters // and apply the batch when the db is free // if the db is quit, the last batch is written out and batch channel is closed -func (db *DB) listen() { - b := newBatch() // current batch - var done chan struct{} // - wasdone := make(chan struct{}) - close(wasdone) - for { - b.wg.Add(1) - select { - case db.batch <- b: - // allow - done = wasdone - case <-done: - b.wg.Done() - // if batchwriter is idle, hand over the batch and creates a new one - // if batchwriter loop is busy, keep adding to the same batch - go db.writeBatch(b) - wasdone = b.Done - // disable case until further ops happen - done = nil - b = newBatch() - case <-db.quit: - // make sure batch is saved to disk so as not to lose chunks - if done != nil { - b.wg.Done() - db.writeBatch(b) - <-b.Done - } - close(db.batch) - return - } +func (db *DB) writeBatches() { + defer close(db.closed) + for range db.c { + db.mu.Lock() + b := db.batch + db.batch = NewBatch() + db.mu.Unlock() + db.writeBatch(b) } } // writeBatch writes out the batch, sets the error and closes the done channel func (db *DB) writeBatch(b *Batch) { - // wait for all updaters to finish writing to this batch - b.wg.Wait() // apply the batch b.Err = db.DB.WriteBatch(b.Batch) // signal batch write to callers close(b.Done) } -/* - Address []byte - Data []byte - AccessTimestamp int64 - StoreTimestamp int64 -*/ func newItemFromChunk(ch storage.Chunk) *shed.IndexItem { return &shed.IndexItem{ Address: ch.Address(), diff --git a/swarm/shed/rushed/db_test.go b/swarm/shed/rushed/db_test.go index dd02f6bcac..5529368301 100644 --- a/swarm/shed/rushed/db_test.go +++ b/swarm/shed/rushed/db_test.go @@ -123,47 +123,57 @@ func (t *tester) Put(_ context.Context, ch storage.Chunk) error { return t.index.Put(*(newItemFromChunk(ch))) } func BenchmarkPut(b *testing.B) { - n := 128 - for j := 0; j < 5; j++ { + n := 128 + for j := 0; j < 2; j++ { n *= 2 - in := time.Nanosecond - for i := 0; i < 3; i++ { + chunks := make([]storage.Chunk, n) + for k := 0; k < n; k++ { + chunks[k] = storage.GenerateRandomChunk(chunk.DefaultSize) + } + in := 1 * time.Nanosecond + for i := 0; i < 4; i++ { for _, name := range []string{"shed", "rushed"} { - path, err := ioutil.TempDir("", "rushed-test") - if err != nil { - b.Fatal(err) - } - defer os.RemoveAll(path) - tester, err := newTester(path) - if err != nil { - b.Fatal(err) - } - defer tester.db.Close() - var db putter - if name == "shed" { - db = tester - } else { - db = tester.db.Mode(0) - } b.Run(fmt.Sprintf("N=%v Interval=%v, DB=%v", n, in, name), func(t *testing.B) { - benchmarkPut(t, n, in, db) + benchmarkPut(t, chunks, in, name) }) } - in *= time.Duration(10) + in *= time.Duration(100) } } } -func benchmarkPut(b *testing.B, n int, in time.Duration, db putter) { +func benchmarkPut(b *testing.B, chunks []storage.Chunk, in time.Duration, name string) { for i := 0; i < b.N; i++ { + b.StopTimer() + path, err := ioutil.TempDir("", "rushed-test") + if err != nil { + b.Fatal(err) + } + tester, err := newTester(path) + if err != nil { + os.RemoveAll(path) + b.Fatal(err) + } + var db putter + if name == "shed" { + db = tester + } else { + db = tester.db.Mode(0) + } var wg sync.WaitGroup - wg.Add(n) - for j := 0; j < n; j++ { - go func() { + wg.Add(len(chunks)) + ctx := context.Background() + b.StartTimer() + for _, ch := range chunks { + time.Sleep(in) + go func(chu storage.Chunk) { defer wg.Done() - db.Put(context.Background(), storage.GenerateRandomChunk(chunk.DefaultSize)) - }() + db.Put(ctx, chu) + }(ch) } wg.Wait() + b.StopTimer() + tester.db.Close() + os.RemoveAll(path) } } diff --git a/swarm/shed/rushed/subscribe.go b/swarm/shed/rushed/subscribe.go index 737543b779..0307936675 100644 --- a/swarm/shed/rushed/subscribe.go +++ b/swarm/shed/rushed/subscribe.go @@ -1,6 +1,8 @@ package rushed import ( + "errors" + "github.com/ethereum/go-ethereum/swarm/shed" ) @@ -8,6 +10,10 @@ const ( iterBatchSize = 128 ) +var ( + errCancelled = errors.New("cancelled") +) + type Subscription struct { cancel chan struct{} // cancel the subscription err error