Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
df7d887
swarm/storage/localstore: add hasser
janos Feb 27, 2019
c0a4b1b
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 4, 2019
8381e38
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 4, 2019
5ac0f79
cmd/swarm, swarm: integrate localstore partially
janos Mar 4, 2019
a96aeeb
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 5, 2019
44633ce
swarm/network/stream: start of localstore integration
janos Mar 5, 2019
3e6e03a
swarm/storage/localstore: add TestExportImport and fix Export function
janos Mar 5, 2019
6ce3481
cmd/swarm: re-enable TestCLISwarmExportImport
janos Mar 5, 2019
5e69e3d
swarm: extend chunk.Store
janos Mar 5, 2019
5d7f169
swarm/storage/feed: fix TestValidatorInStore
janos Mar 5, 2019
f0a8d3a
swarm: rename ModeGetFeedLookup to ModeGetLookup
janos Mar 5, 2019
5474d30
swarm: minor updates to chunk, storage and localstore packages
janos Mar 6, 2019
9ca71f4
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 11, 2019
83b64ce
swarm/shed: add vector uint64 field
janos Mar 12, 2019
547f30b
cmd/swarm: fix merge issue
janos Mar 12, 2019
970b305
swarm: use BinID for pull syncing index
janos Mar 12, 2019
5a669c6
swarm/network, swarm/storage/localstore: SetNextBatch pull subscription
janos Mar 12, 2019
4594130
swarm/network/stream: fix TestSyncerSimulation
janos Mar 13, 2019
4af1d7a
swarm: change localstore SubscribePull function signature
janos Mar 13, 2019
4df51ee
swarm/storage/localstore: fix SubscribePull comment
janos Mar 14, 2019
804301e
swarm/network/stream: return errors in roundRobinStore
janos Mar 14, 2019
6aa0d0e
swarm/storage/localstore: SubscribePull with both range ends inclusive
janos Mar 14, 2019
c16dcd1
swarm/network/stream: close intervals store in newStreamerTester
janos Mar 15, 2019
14758a4
swarm/storage/localstore: improve TestDB_SubscribePull_since
janos Mar 15, 2019
be1a360
swarm/network/stream: implement new SetNextBatch
janos Mar 15, 2019
36a0d96
swarm/network/stream: update handleChunkDeliveryMsg function
janos Mar 15, 2019
3b3d2b0
swarm/storage: remove unused code
janos Mar 15, 2019
4b8a5c8
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 15, 2019
277ecf9
swarm: minor code cleanups
janos Mar 15, 2019
682eebd
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 15, 2019
5ee9143
Merge branch 'master' into localstore-storage-integration
nonsense Mar 21, 2019
c3a76d1
lint
nonsense Mar 21, 2019
2d20bf9
swarm/storage/localstore: add tags to push index
acud Mar 25, 2019
b828a34
swarm/storage, swarm/shed: add support for persisting push tags
acud Mar 26, 2019
d8c648b
swarm/storage/localstore: TestDB_pushIndex_Tags
janos Mar 26, 2019
e295dc8
swarm: address pr review comments
janos Mar 26, 2019
c9d679d
swarm: merge branch 'master' into localstore-storage-integration
janos Mar 26, 2019
2fe747e
swarm/storage/localstore: implement backward compatible Import function
janos Mar 26, 2019
3e82234
swamr/network/stream: fix handleChunkDeliveryMsg
janos Mar 27, 2019
4c14a09
swarm/storage/localstore: remove getters, setters and putters
acud Mar 27, 2019
31f4b0d
swarm/network/stream: update handleChunkDeliveryMsg
janos Mar 27, 2019
2e24e99
swarm/storage/localstore: only one item in pull and push indexes for …
janos Mar 29, 2019
9c79e26
Revert "swarm/storage/localstore: TestDB_pushIndex_Tags"
acud Apr 8, 2019
ca7dcc2
Revert "swarm/storage, swarm/shed: add support for persisting push tags"
acud Apr 8, 2019
e4d723c
swarm/chunk, swarm/storage: adjust Store.Put interface to return bool…
acud Apr 9, 2019
904df7a
cmd/swarm: new localstore database migration (#1297)
acud Apr 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,21 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
switch r := req.(type) {
case *ChunkDeliveryMsgRetrieval:
msg = (*ChunkDeliveryMsg)(r)
// do not sync if peer that is sending us a chunk is closer to the chunk then we are
peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr)
po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr)
if peerPO > po {
mode = chunk.ModePutRequest
} else {
depth := d.kad.NeighbourhoodDepth()
// is the chunk within our area of responsibility?
if po < depth {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if po >= depth || peerPO < po {
         mode = chunk.ModePutSync
} else {
         mode = chunk.ModePutRequest
}
```

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is more compact representation. I thought that the current one was easier to reason about, but this is good with a comment, as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is good but you got the comparison wrong, note
po >= depth -> sync

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Viktor.

// chunks within the area of responsibility should always sync
// https://github.com/ethersphere/go-ethereum/pull/1282#discussion_r269406125
mode = chunk.ModePutSync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the rational here;
A retrieval request automatically implicitly transforms into a syncing message on-the-fly according to the peer po?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a delivery request. If we got the chunk from the node that is closer to the chunk, we do not need to put in the storage to sync it. I am not sure what is a syncing message? We are deciding here should we put this chunk in syncing index or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sry i know i said this but it should only be the case for chunks outside our depth.
chunks within the area of responsibility should always sync

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zelig thanks for the explanation, I may have understood wrongly. Could you check if current code handles various cases correctly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zelig I was discussing this with @janos today and I have a few questions in this matter:

  1. the chunk comes from somewhere, that somehwere should already be the so called "area of responsibility" (let's assume that we are talking about push syncing right now) - aren't we intervening in the process of syncing that a chunk should arrive correctly to the correct NN? also aren't we generating redundancy which is bigger than the premise of redundance of degree 𝑛 in the network? this is to say even more risky since area of responsibility is not symmetric between peers

  2. I find it very hard to define a test case for this and so I would assume this is a premature optimisation. I think that right now with all that has to do with routing and syncing we should stick to correctness over performant, moreover, our code should be loyal to the spec we are writing. we shouldn't optimise anything at this point (IMO)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justelad

  1. no its not generating more redundancy the rest i dont understand
  2. it is easy to construct test cases. this is about correctness

} else {
if peerPO > po {
// do not sync if peer that is sending us a chunk is closer to the chunk then we are
mode = chunk.ModePutRequest
} else {
mode = chunk.ModePutSync
}
}
case *ChunkDeliveryMsgSyncing:
msg = (*ChunkDeliveryMsg)(r)
Expand Down
7 changes: 3 additions & 4 deletions swarm/storage/localstore/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package localstore

import (
"bytes"
"context"
"testing"

"github.com/ethereum/go-ethereum/swarm/chunk"
Expand All @@ -36,7 +37,7 @@ func TestExportImport(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

err := db1.NewPutter(chunk.ModePutUpload).Put(ch)
err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -65,11 +66,9 @@ func TestExportImport(t *testing.T) {
t.Errorf("got import count %v, want %v", c, wantChunksCount)
}

getter := db2.NewGetter(chunk.ModeGetRequest)

for a, want := range chunks {
addr := chunk.Address([]byte(a))
ch, err := getter.Get(addr)
ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
Expand Down
48 changes: 20 additions & 28 deletions swarm/storage/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package localstore

import (
"context"
"io/ioutil"
"math/rand"
"os"
Expand Down Expand Up @@ -63,26 +64,23 @@ func testDB_collectGarbageWorker(t *testing.T) {
})()
defer cleanupFunc()

uploader := db.NewPutter(chunk.ModePutUpload)
syncer := db.NewSetter(chunk.ModeSetSync)

addrs := make([]chunk.Address, 0)

// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()

err := uploader.Put(chunk)
err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

err = syncer.Set(chunk.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}

addrs = append(addrs, chunk.Address())
addrs = append(addrs, ch.Address())
}

gcTarget := db.gcTarget()
Expand Down Expand Up @@ -110,15 +108,15 @@ func testDB_collectGarbageWorker(t *testing.T) {

// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
_, err := db.NewGetter(chunk.ModeGetRequest).Get(addrs[0])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
})

// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.NewGetter(chunk.ModeGetRequest).Get(addrs[len(addrs)-1])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
Expand All @@ -134,9 +132,6 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
})
defer cleanupFunc()

uploader := db.NewPutter(chunk.ModePutUpload)
syncer := db.NewSetter(chunk.ModeSetSync)

testHookCollectGarbageChan := make(chan uint64)
defer setTestHookCollectGarbage(func(collectedCount uint64) {
testHookCollectGarbageChan <- collectedCount
Expand All @@ -146,19 +141,19 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {

// upload random chunks just up to the capacity
for i := 0; i < int(db.capacity)-1; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()

err := uploader.Put(chunk)
err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

err = syncer.Set(chunk.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}

addrs = append(addrs, chunk.Address())
addrs = append(addrs, ch.Address())
}

// set update gc test hook to signal when
Expand All @@ -172,7 +167,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// request the latest synced chunk
// to prioritize it in the gc index
// not to be collected
_, err := db.NewGetter(chunk.ModeGetRequest).Get(addrs[0])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
Expand All @@ -191,11 +186,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload and sync another chunk to trigger
// garbage collection
ch := generateTestRandomChunk()
err = uploader.Put(ch)
err = db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -235,23 +230,23 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {

// requested chunk should not be removed
t.Run("get requested chunk", func(t *testing.T) {
_, err := db.NewGetter(chunk.ModeGetRequest).Get(addrs[0])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
})

// the second synced chunk should be removed
t.Run("get gc-ed chunk", func(t *testing.T) {
_, err := db.NewGetter(chunk.ModeGetRequest).Get(addrs[1])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[1])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
})

// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.NewGetter(chunk.ModeGetRequest).Get(addrs[len(addrs)-1])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
Expand All @@ -275,20 +270,17 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err)
}

uploader := db.NewPutter(chunk.ModePutUpload)
syncer := db.NewSetter(chunk.ModeSetSync)

count := 100

for i := 0; i < count; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()

err := uploader.Put(chunk)
err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

err = syncer.Set(chunk.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down
34 changes: 14 additions & 20 deletions swarm/storage/localstore/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package localstore

import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
Expand All @@ -39,23 +40,21 @@ func TestDB_pullIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()

uploader := db.NewPutter(chunk.ModePutUpload)

chunkCount := 50

chunks := make([]testIndexChunk, chunkCount)

// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()

err := uploader.Put(chunk)
err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

chunks[i] = testIndexChunk{
Chunk: chunk,
Chunk: ch,
binID: uint64(i),
}
}
Expand Down Expand Up @@ -86,23 +85,21 @@ func TestDB_gcIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()

uploader := db.NewPutter(chunk.ModePutUpload)

chunkCount := 50

chunks := make([]testIndexChunk, chunkCount)

// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()

err := uploader.Put(chunk)
err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

chunks[i] = testIndexChunk{
Chunk: chunk,
Chunk: ch,
}
}

Expand All @@ -124,7 +121,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request unsynced", func(t *testing.T) {
ch := chunks[1]

_, err := db.NewGetter(chunk.ModeGetRequest).Get(ch.Address())
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -141,7 +138,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("sync one chunk", func(t *testing.T) {
ch := chunks[0]

err := db.NewSetter(chunk.ModeSetSync).Set(ch.Address())
err := db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -153,10 +150,8 @@ func TestDB_gcIndex(t *testing.T) {
})

t.Run("sync all chunks", func(t *testing.T) {
setter := db.NewSetter(chunk.ModeSetSync)

for i := range chunks {
err := setter.Set(chunks[i].Address())
err := db.Set(context.Background(), chunk.ModeSetSync, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -170,7 +165,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request one chunk", func(t *testing.T) {
i := 6

_, err := db.NewGetter(chunk.ModeGetRequest).Get(chunks[i].Address())
_, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -188,14 +183,13 @@ func TestDB_gcIndex(t *testing.T) {
})

t.Run("random chunk request", func(t *testing.T) {
requester := db.NewGetter(chunk.ModeGetRequest)

rand.Shuffle(len(chunks), func(i, j int) {
chunks[i], chunks[j] = chunks[j], chunks[i]
})

for _, chunk := range chunks {
_, err := requester.Get(chunk.Address())
for _, ch := range chunks {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -211,7 +205,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("remove one chunk", func(t *testing.T) {
i := 3

err := db.NewSetter(chunk.ModeSetRemove).Set(chunks[i].Address())
err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
Expand Down
Loading