From b8ea4ecd9c839fdcbba687ce358bdfed720c7b9d Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Fri, 23 Nov 2018 15:18:24 +0100 Subject: [PATCH] swarm/shed: initial subscription implementation --- swarm/shed/index.go | 236 +++++++++++++++++++++++++++++++++++++++ swarm/shed/index_test.go | 214 +++++++++++++++++++++++++++++++---- 2 files changed, 430 insertions(+), 20 deletions(-) diff --git a/swarm/shed/index.go b/swarm/shed/index.go index ba803e3c23..d27c03314c 100644 --- a/swarm/shed/index.go +++ b/swarm/shed/index.go @@ -17,6 +17,12 @@ package shed import ( + "context" + "crypto/rand" + "encoding/binary" + "errors" + "sync" + "github.com/syndtr/goleveldb/leveldb" ) @@ -78,6 +84,11 @@ type Index struct { decodeKeyFunc func(key []byte) (e IndexItem, err error) encodeValueFunc func(fields IndexItem) (value []byte, err error) decodeValueFunc func(value []byte) (e IndexItem, err error) + + // triggers are used for signaling + // subscriptions to continue iterations. + triggers map[uint64]chan struct{} + triggersMu *sync.Mutex } // IndexFuncs structure defines functions for encoding and decoding @@ -120,6 +131,8 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) { }, encodeValueFunc: funcs.EncodeValue, decodeValueFunc: funcs.DecodeValue, + triggers: make(map[uint64]chan struct{}), + triggersMu: new(sync.Mutex), }, nil } @@ -262,3 +275,226 @@ func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) { } return it.Error() } + +// Subscription provides methods to control +// and get information about subscription state. +type Subscription struct { + stopChan chan struct{} + onceOnce sync.Once + doneChan chan struct{} + err error + mu sync.RWMutex +} + +// Err returns an error that subscription encountered. +// It should be usually called after the Done is read from. +// It is safe to call this function multiple times. +func (s *Subscription) Err() (err error) { + s.mu.RLock() + err = s.err + s.mu.RUnlock() + return err +} + +// Done returns a read-only channel that will be closed +// when the subscription is stopped or encountered an error. +func (s *Subscription) Done() <-chan struct{} { + return s.doneChan +} + +// Stop terminates the subscription without any error. +// It is safe to call this function multiple times. +func (s *Subscription) Stop() { + s.onceOnce.Do(func() { + close(s.stopChan) + }) +} + +// NewSubscription starts a new subscription on the index. +// Subscribing is similar to iterating over the index key, but it +// is performend in the background and contiguously over the existing keys +// as well over the new keys when they are put. It is required to signal +// all the iterators to check new keys with TriggerSubscriptions method. +// This provides a greater control over subscription iterators instead to +// trigger subscriptions on Put method or batch writes. +// IndexIterFunc behaves the same as in iterate methods. +// Provided context allows cancellation of created goroutine and Subscription.Err() +// will return appropriate error from context. +func (f Index) NewSubscription(ctx context.Context, fn IndexIterFunc) (s *Subscription, err error) { + return f.newSubscription(ctx, f.prefix, fn) +} + +// NewSubscriptionFrom is the same method as NewSubscription, but it +// iterates over the keys from the provided start index. +func (f Index) NewSubscriptionFrom(ctx context.Context, start IndexItem, fn IndexIterFunc) (s *Subscription, err error) { + startKey, err := f.encodeKeyFunc(start) + if err != nil { + return nil, err + } + return f.newSubscription(ctx, startKey, fn) +} + +// newSubscription provides base functionality for NewSubscription +// and NewSubscriptionFrom methods. +// It creates a new goroutine which will iterate over existing keys of the index +// and create new iterators when TriggerSubscriptions is called. +func (f Index) newSubscription(ctx context.Context, startKey []byte, fn IndexIterFunc) (s *Subscription, err error) { + // Create a subscription id to be able to remove the channel from the triggers map. + f.triggersMu.Lock() + // Generate new ID. + id, err := f.newSubscriptionID() + if err != nil { + f.triggersMu.Unlock() + return nil, err + } + // trigger is a one size buffered channel for two reasons + // - to be able to signal the first iteration + // - performance on sending to channel in TriggerSubscriptions + trigger := make(chan struct{}, 1) + f.triggers[id] = trigger + f.triggersMu.Unlock() + + // send signal for the initial iteration + trigger <- struct{}{} + + s = &Subscription{ + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + } + + go func() { + // this error will be checked in defer + // and set to Subscription.err + var err error + defer func() { + if err != nil { + s.mu.Lock() + s.err = err + s.mu.Unlock() + } + // signal that the subscription is done + close(s.doneChan) + f.triggersMu.Lock() + // clean up the trigger channel + delete(f.triggers, id) + f.triggersMu.Unlock() + }() + // This flag identifies the first iteration to + // include the start item in it, and to exclude the + // start item in the next ones, as they are already sent + // in previous iterations. + firstIteration := true + for { + select { + case <-trigger: + // This closure is to provide a clean defer for + // iteration release. + err = func() error { + it := f.db.NewIterator() + defer it.Release() + + ok := it.Seek(startKey) + if firstIteration { + // The firs iteration will set the flag to false + // to provide information to all next iterations. + firstIteration = false + } else { + // All iterations but first will start from the + // startKey+1 as the start key for all non-first + // iterations is already processed. + ok = it.Next() + } + + for ; ok; ok = it.Next() { + key := it.Key() + if key[0] != f.prefix[0] { + break + } + keyIndexItem, err := f.decodeKeyFunc(key) + if err != nil { + return err + } + valueIndexItem, err := f.decodeValueFunc(it.Value()) + if err != nil { + return err + } + stop, err := fn(keyIndexItem.Merge(valueIndexItem)) + if err != nil { + return err + } + startKey = key + if stop { + // Q: should the whole subscription stop or just this iteration? + s.Stop() + break + } + select { + case <-s.stopChan: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + } + } + return it.Error() + }() + if err != nil { + return + } + case <-s.stopChan: + return + case <-ctx.Done(): + if err == nil { + err = ctx.Err() + } + return + } + } + }() + + return s, nil +} + +// newSubscriptionID generates a new subscription id as a random number. +func (f Index) newSubscriptionID() (id uint64, err error) { + b := make([]byte, 8) + newID := func() (uint64, error) { + _, err = rand.Read(b) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint64(b), nil + } + id, err = newID() + if err != nil { + return 0, err + } + // check up to 100 times if this id already exists + for i := 0; i < 100; i++ { + if _, ok := f.triggers[id]; !ok { + // this id is unique, return it + return id, nil + } + id, err = newID() + if err != nil { + return 0, err + } + } + return 0, errors.New("unable to generate subscription id") +} + +// TriggerSubscriptions signals to all index subscriptions +// that they should continue iterating over the index keys +// where they stopped in the last iteration. This method +// should be called when new data is put to the index. +// It is not automatically called by the index Put method +// to allow for optimizations and for signaling of batch +// writes. +func (f Index) TriggerSubscriptions() { + for _, t := range f.triggers { + select { + case t <- struct{}{}: + default: + } + } +} diff --git a/swarm/shed/index_test.go b/swarm/shed/index_test.go index f36ab7a383..79b2d76c11 100644 --- a/swarm/shed/index_test.go +++ b/swarm/shed/index_test.go @@ -18,36 +18,63 @@ package shed import ( "bytes" + "context" "encoding/binary" "fmt" "sort" + "sync" "testing" "time" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/syndtr/goleveldb/leveldb" ) // Index functions for the index that is used in tests in this file. -var retrievalIndexFuncs = IndexFuncs{ - EncodeKey: func(fields IndexItem) (key []byte, err error) { - return fields.Address, nil - }, - DecodeKey: func(key []byte) (e IndexItem, err error) { - e.Address = key - return e, nil - }, - EncodeValue: func(fields IndexItem) (value []byte, err error) { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) - value = append(b, fields.Data...) - return value, nil - }, - DecodeValue: func(value []byte) (e IndexItem, err error) { - e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) - e.Data = value[8:] - return e, nil - }, -} +var ( + // Address->StoreTimestamp|Data + retrievalIndexFuncs = IndexFuncs{ + EncodeKey: func(fields IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data = value[8:] + return e, nil + }, + } + // StoredTimestamp|Address->Data + storedIndexFuncs = IndexFuncs{ + EncodeKey: func(fields IndexItem) (key []byte, err error) { + b := make([]byte, 8, 8+len(fields.Address)) + binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp)) + key = append(b, fields.Address...) + return key, nil + }, + DecodeKey: func(key []byte) (e IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + e.Address = key[8:] + return e, nil + }, + EncodeValue: func(fields IndexItem) (value []byte, err error) { + return fields.Data, nil + }, + DecodeValue: func(value []byte) (e IndexItem, err error) { + e.Data = value + return e, nil + }, + } +) // TestIndex validates put, get and delete functions of the Index implementation. func TestIndex(t *testing.T) { @@ -373,6 +400,153 @@ func TestIndex_iterate(t *testing.T) { }) } +// TestIndex_NewSubscription tests one index subscription for iterations +// over existing keys and a newly saved. +func TestIndex_NewSubscription(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index, err := db.NewIndex("stored", storedIndexFuncs) + if err != nil { + t.Fatal(err) + } + + // put some items before the subscription is created and provide them for validation + items := putItems(t, index, 10) + + // cursor counts the number of received items + var cursor int + // mu protects the cursor and wantItemsCount + var mu sync.Mutex + // wait signals that it is safe to check if + // all items are iterated on. + wait := make(chan struct{}) + + // wantItemsCount is the expected number of items from subscription + wantItemsCount := len(items) + + s, err := index.NewSubscription(context.Background(), func(item IndexItem) (stop bool, err error) { + mu.Lock() + defer mu.Unlock() + + // validate that the item is the one that is expected by ordering + if !bytes.Equal(items[cursor].Address, item.Address) { + return false, fmt.Errorf("got %v address %x, want %x", cursor, items[cursor].Address, item.Address) + } + + // move the cursor (increase the count of received items) + cursor++ + + // if all expected items are received, signal for a check + if cursor == wantItemsCount { + wait <- struct{}{} + } + return + }) + if err != nil { + t.Fatal(err) + } + if s == nil { + t.Fatal("subscription is nil") + } + defer s.Stop() + + t.Run("initial items", func(t *testing.T) { + // wait until it is safe to check for the number of received items + select { + case <-s.Done(): + t.Fatalf("sunscription should not be done: %s", s.Err()) + case <-time.After(30 * time.Second): + t.Fatalf("index subscription items not received") + case <-wait: + } + + mu.Lock() + // get the current received items count + gotItemsCount := cursor + mu.Unlock() + + if gotItemsCount != wantItemsCount { + t.Fatalf("got items %v, want %v", gotItemsCount, wantItemsCount) + } + }) + + t.Run("put more items", func(t *testing.T) { + // add more items after the subscription started + items = append(items, putItems(t, index, 12)...) + + mu.Lock() + // increment expected total number of items + wantItemsCount = len(items) + mu.Unlock() + + // wait for a second as no items should be received until + // TriggerSubscriptions is called. + select { + case <-s.Done(): + t.Fatalf("sunscription should not be done: %s", s.Err()) + case <-time.After(time.Second): + case <-wait: + t.Fatalf("unexpected index subscriptions received") + } + + mu.Lock() + // get the current cursor + gotItemsCount := cursor + mu.Unlock() + + if gotItemsCount == wantItemsCount { + t.Fatalf("got items %v, before triggering subscriptions", gotItemsCount) + } + }) + + t.Run("trigger", func(t *testing.T) { + index.TriggerSubscriptions() + + // wait until it is safe to check for the number of received items + select { + case <-s.Done(): + t.Fatalf("sunscription should not be done: %s", s.Err()) + case <-time.After(30 * time.Second): + t.Fatalf("index subscription items not received") + case <-wait: + } + + mu.Lock() + // get the current cursor + gotItemsCount := cursor + // increment expected total number of items + wantItemsCount = len(items) + mu.Unlock() + + if gotItemsCount != wantItemsCount { + t.Fatalf("got items %v, want %v", gotItemsCount, wantItemsCount) + } + }) +} + +func putItems(t *testing.T, index Index, n int) []IndexItem { + t.Helper() + + items := make([]IndexItem, 0) + for i := 0; i < n; i++ { + c := storage.GenerateRandomChunk(24) + items = append(items, IndexItem{ + Address: c.Address(), + Data: c.Data(), + StoreTimestamp: time.Now().UTC().UnixNano(), + }) + } + + for _, item := range items { + err := index.Put(item) + if err != nil { + t.Fatal(err) + } + } + return items +} + // checkIndexItem is a test helper function that compares if two Index items are the same. func checkIndexItem(t *testing.T, got, want IndexItem) { t.Helper()