Skip to content

Commit 1e06720

Browse files
jpeletiernonsense
authored andcommitted
swarm/feeds: Parallel feed lookups (#19414)
1 parent 0c5f8c0 commit 1e06720

File tree

14 files changed

+952
-342
lines changed

14 files changed

+952
-342
lines changed

swarm/storage/feed/cacheentry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
const (
2828
hasherCount = 8
2929
feedsHashAlgorithm = storage.SHA3Hash
30-
defaultRetrieveTimeout = 100 * time.Millisecond
30+
defaultRetrieveTimeout = 1000 * time.Millisecond
3131
)
3232

3333
// cacheEntry caches the last known update of a specific Swarm feed.

swarm/storage/feed/handler.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"context"
2424
"fmt"
2525
"sync"
26+
"sync/atomic"
2627

2728
"github.com/ethereum/go-ethereum/swarm/chunk"
2829

@@ -178,12 +179,12 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
178179
return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
179180
}
180181

181-
var readCount int
182+
var readCount int32
182183

183184
// Invoke the lookup engine.
184185
// The callback will be called every time the lookup algorithm needs to guess
185186
requestPtr, err := lookup.Lookup(ctx, timeLimit, query.Hint, func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) {
186-
readCount++
187+
atomic.AddInt32(&readCount, 1)
187188
id := ID{
188189
Feed: query.Feed,
189190
Epoch: epoch,
@@ -228,17 +229,17 @@ func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
228229
updateAddr := request.Addr()
229230
log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
230231

231-
feedUpdate := h.get(&request.Feed)
232-
if feedUpdate == nil {
233-
feedUpdate = &cacheEntry{}
234-
h.set(&request.Feed, feedUpdate)
232+
entry := h.get(&request.Feed)
233+
if entry == nil {
234+
entry = &cacheEntry{}
235+
h.set(&request.Feed, entry)
235236
}
236237

237238
// update our rsrcs entry map
238-
feedUpdate.lastKey = updateAddr
239-
feedUpdate.Update = request.Update
240-
feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
241-
return feedUpdate, nil
239+
entry.lastKey = updateAddr
240+
entry.Update = request.Update
241+
entry.Reader = bytes.NewReader(entry.data)
242+
return entry, nil
242243
}
243244

244245
// Update publishes a feed update

swarm/storage/feed/handler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ func TestFeedsHandler(t *testing.T) {
177177
if err != nil {
178178
t.Fatal(err)
179179
}
180-
if request.Epoch.Base() != 0 || request.Epoch.Level != 22 {
181-
t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 22, request.Epoch.Level)
180+
if request.Epoch.Base() != 0 || request.Epoch.Level != 28 {
181+
t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 28, request.Epoch.Level)
182182
}
183183
data = []byte(updates[3])
184184
request.SetData(data)
@@ -213,8 +213,8 @@ func TestFeedsHandler(t *testing.T) {
213213
if !bytes.Equal(update2.data, []byte(updates[len(updates)-1])) {
214214
t.Fatalf("feed update data was %v, expected %v", string(update2.data), updates[len(updates)-1])
215215
}
216-
if update2.Level != 22 {
217-
t.Fatalf("feed update epoch level was %d, expected 22", update2.Level)
216+
if update2.Level != 28 {
217+
t.Fatalf("feed update epoch level was %d, expected 28", update2.Level)
218218
}
219219
if update2.Base() != 0 {
220220
t.Fatalf("feed update epoch base time was %d, expected 0", update2.Base())

swarm/storage/feed/id_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ func getTestID() *ID {
1616
func TestIDAddr(t *testing.T) {
1717
id := getTestID()
1818
updateAddr := id.Addr()
19-
compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783")
19+
compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x842d0a81987b9755dfeaa5558f5c134c1c0af48b6545005cac7b533d9411453a")
2020
}
2121

2222
func TestIDSerializer(t *testing.T) {
23-
testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019")
23+
testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f")
2424
}
2525

2626
func TestIDLengthCheck(t *testing.T) {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package lookup
2+
3+
import "context"
4+
5+
// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found
6+
// going back and forth in time
7+
// First, it will attempt to find an update where it should be now if the hint was
8+
// really the last update. If that lookup fails, then the last update must be either the hint itself
9+
// or the epochs right below. If however, that lookup succeeds, then the update must be
10+
// that one or within the epochs right below.
11+
// see the guide for a more graphical representation
12+
func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) {
13+
var lastFound interface{}
14+
var epoch Epoch
15+
if hint == NoClue {
16+
hint = worstHint
17+
}
18+
19+
t := now
20+
21+
for {
22+
epoch = GetNextEpoch(hint, t)
23+
value, err = read(ctx, epoch, now)
24+
if err != nil {
25+
return nil, err
26+
}
27+
if value != nil {
28+
lastFound = value
29+
if epoch.Level == LowestLevel || epoch.Equals(hint) {
30+
return value, nil
31+
}
32+
hint = epoch
33+
continue
34+
}
35+
if epoch.Base() == hint.Base() {
36+
if lastFound != nil {
37+
return lastFound, nil
38+
}
39+
// we have reached the hint itself
40+
if hint == worstHint {
41+
return nil, nil
42+
}
43+
// check it out
44+
value, err = read(ctx, hint, now)
45+
if err != nil {
46+
return nil, err
47+
}
48+
if value != nil {
49+
return value, nil
50+
}
51+
// bad hint.
52+
t = hint.Base()
53+
hint = worstHint
54+
continue
55+
}
56+
base := epoch.Base()
57+
if base == 0 {
58+
return nil, nil
59+
}
60+
t = base - 1
61+
}
62+
63+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package lookup
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"time"
7+
)
8+
9+
type stepFunc func(ctx context.Context, t uint64, hint Epoch) interface{}
10+
11+
// LongEarthLookaheadDelay is the headstart the lookahead gives R before it launches
12+
var LongEarthLookaheadDelay = 250 * time.Millisecond
13+
14+
// LongEarthLookbackDelay is the headstart the lookback gives R before it launches
15+
var LongEarthLookbackDelay = 250 * time.Millisecond
16+
17+
// LongEarthAlgorithm explores possible lookup paths in parallel, pruning paths as soon
18+
// as a more promising lookup path is found. As a result, this lookup algorithm is an order
19+
// of magnitude faster than the FluzCapacitor algorithm, but at the expense of more exploratory reads.
20+
// This algorithm works as follows. On each step, the next epoch is immediately looked up (R)
21+
// and given a head start, while two parallel "steps" are launched a short time after:
22+
// look ahead (A) is the path the algorithm would take if the R lookup returns a value, whereas
23+
// look back (B) is the path the algorithm would take if the R lookup failed.
24+
// as soon as R is actually finished, the A or B paths are pruned depending on the value of R.
25+
// if A returns earlier than R, then R and B read operations can be safely canceled, saving time.
26+
// The maximum number of active read operations is calculated as 2^(timeout/headstart).
27+
// If headstart is infinite, this algorithm behaves as FluzCapacitor.
28+
// timeout is the maximum execution time of the passed `read` function.
29+
// the two head starts can be configured by changing LongEarthLookaheadDelay or LongEarthLookbackDelay
30+
func LongEarthAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (interface{}, error) {
31+
if hint == NoClue {
32+
hint = worstHint
33+
}
34+
35+
var stepCounter int32 // for debugging, stepCounter allows to give an ID to each step instance
36+
37+
errc := make(chan struct{}) // errc will help as an error shortcut signal
38+
var gerr error // in case of error, this variable will be set
39+
40+
var step stepFunc // For efficiency, the algorithm step is defined as a closure
41+
step = func(ctxS context.Context, t uint64, last Epoch) interface{} {
42+
stepID := atomic.AddInt32(&stepCounter, 1) // give an ID to this call instance
43+
trace(stepID, "init: t=%d, last=%s", t, last.String())
44+
var valueA, valueB, valueR interface{}
45+
46+
// initialize the three read contexts
47+
ctxR, cancelR := context.WithCancel(ctxS) // will handle the current read operation
48+
ctxA, cancelA := context.WithCancel(ctxS) // will handle the lookahead path
49+
ctxB, cancelB := context.WithCancel(ctxS) // will handle the lookback path
50+
51+
epoch := GetNextEpoch(last, t) // calculate the epoch to look up in this step instance
52+
53+
// define the lookAhead function, which will follow the path as if R was successful
54+
lookAhead := func() {
55+
valueA = step(ctxA, t, epoch) // launch the next step, recursively.
56+
if valueA != nil { // if this path is successful, we don't need R or B.
57+
cancelB()
58+
cancelR()
59+
}
60+
}
61+
62+
// define the lookBack function, which will follow the path as if R was unsuccessful
63+
lookBack := func() {
64+
if epoch.Base() == last.Base() {
65+
return
66+
}
67+
base := epoch.Base()
68+
if base == 0 {
69+
return
70+
}
71+
valueB = step(ctxB, base-1, last)
72+
}
73+
74+
go func() { //goroutine to read the current epoch (R)
75+
defer cancelR()
76+
var err error
77+
valueR, err = read(ctxR, epoch, now) // read this epoch
78+
if valueR == nil { // if unsuccessful, cancel lookahead, otherwise cancel lookback.
79+
cancelA()
80+
} else {
81+
cancelB()
82+
}
83+
if err != nil && err != context.Canceled {
84+
gerr = err
85+
close(errc)
86+
}
87+
}()
88+
89+
go func() { // goroutine to give a headstart to R and then launch lookahead.
90+
defer cancelA()
91+
92+
// if we are at the lowest level or the epoch to look up equals the last one,
93+
// then we cannot lookahead (can't go lower or repeat the same lookup, this would
94+
// cause an infinite loop)
95+
if epoch.Level == LowestLevel || epoch.Equals(last) {
96+
return
97+
}
98+
99+
// give a head start to R, or launch immediately if R finishes early enough
100+
select {
101+
case <-TimeAfter(LongEarthLookaheadDelay):
102+
lookAhead()
103+
case <-ctxR.Done():
104+
if valueR != nil {
105+
lookAhead() // only look ahead if R was successful
106+
}
107+
case <-ctxA.Done():
108+
}
109+
}()
110+
111+
go func() { // goroutine to give a headstart to R and then launch lookback.
112+
defer cancelB()
113+
114+
// give a head start to R, or launch immediately if R finishes early enough
115+
select {
116+
case <-TimeAfter(LongEarthLookbackDelay):
117+
lookBack()
118+
case <-ctxR.Done():
119+
if valueR == nil {
120+
lookBack() // only look back in case R failed
121+
}
122+
case <-ctxB.Done():
123+
}
124+
}()
125+
126+
<-ctxA.Done()
127+
if valueA != nil {
128+
trace(stepID, "Returning valueA=%v", valueA)
129+
return valueA
130+
}
131+
132+
<-ctxR.Done()
133+
if valueR != nil {
134+
trace(stepID, "Returning valueR=%v", valueR)
135+
return valueR
136+
}
137+
<-ctxB.Done()
138+
trace(stepID, "Returning valueB=%v", valueB)
139+
return valueB
140+
}
141+
142+
var value interface{}
143+
stepCtx, cancel := context.WithCancel(ctx)
144+
145+
go func() { // launch the root step in its own goroutine to allow cancellation
146+
defer cancel()
147+
value = step(stepCtx, now, hint)
148+
}()
149+
150+
// wait for the algorithm to finish, but shortcut in case
151+
// of errors
152+
select {
153+
case <-stepCtx.Done():
154+
case <-errc:
155+
cancel()
156+
return nil, gerr
157+
}
158+
159+
if ctx.Err() != nil {
160+
return nil, ctx.Err()
161+
}
162+
163+
if value != nil || hint == worstHint {
164+
return value, nil
165+
}
166+
167+
// at this point the algorithm did not return a value,
168+
// so we challenge the hint given.
169+
value, err := read(ctx, hint, now)
170+
if err != nil {
171+
return nil, err
172+
}
173+
if value != nil {
174+
return value, nil // hint is valid, return it.
175+
}
176+
177+
// hint is invalid. Invoke the algorithm
178+
// without hint.
179+
now = hint.Base()
180+
if hint.Level == HighestLevel {
181+
now--
182+
}
183+
184+
return LongEarthAlgorithm(ctx, now, NoClue, read)
185+
}

swarm/storage/feed/lookup/epoch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,5 @@ func (e *Epoch) Equals(epoch Epoch) bool {
8787

8888
// String implements the Stringer interface.
8989
func (e *Epoch) String() string {
90-
return fmt.Sprintf("Epoch{Time:%d, Level:%d}", e.Time, e.Level)
90+
return fmt.Sprintf("Epoch{Base: %d, Time:%d, Level:%d}", e.Base(), e.Time, e.Level)
9191
}

0 commit comments

Comments
 (0)