Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit f107234

Browse files
committed
swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342)
1 parent 9a5bfef commit f107234

File tree

4 files changed

+37
-80
lines changed

4 files changed

+37
-80
lines changed

swarm/chunk/chunk.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,6 @@ type Store interface {
172172
Close() (err error)
173173
}
174174

175-
// FetchStore is a Store which supports syncing
176-
type FetchStore interface {
177-
Store
178-
FetchFunc(ctx context.Context, addr Address) func(context.Context) error
179-
}
180-
181175
// Validator validates a chunk.
182176
type Validator interface {
183177
Validate(ch Chunk) bool

swarm/network/stream/delivery.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,17 @@ var (
4646
)
4747

4848
type Delivery struct {
49-
chunkStore chunk.FetchStore
50-
kad *network.Kademlia
51-
getPeer func(enode.ID) *Peer
52-
quit chan struct{}
49+
netStore *storage.NetStore
50+
kad *network.Kademlia
51+
getPeer func(enode.ID) *Peer
52+
quit chan struct{}
5353
}
5454

55-
func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery {
55+
func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
5656
return &Delivery{
57-
chunkStore: chunkStore,
58-
kad: kad,
59-
quit: make(chan struct{}),
57+
netStore: netStore,
58+
kad: kad,
59+
quit: make(chan struct{}),
6060
}
6161
}
6262

@@ -94,7 +94,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
9494

9595
go func() {
9696
defer osp.Finish()
97-
ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
97+
ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
9898
if err != nil {
9999
retrieveChunkFail.Inc(1)
100100
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
@@ -171,7 +171,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
171171

172172
msg.peer = sp
173173
log.Trace("handle.chunk.delivery", "put", msg.Addr)
174-
_, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
174+
_, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
175175
if err != nil {
176176
if err == storage.ErrChunkInvalid {
177177
// we removed this log because it spams the logs

swarm/network/stream/intervals_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/ethereum/go-ethereum/node"
3030
"github.com/ethereum/go-ethereum/p2p/enode"
3131
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
32-
"github.com/ethereum/go-ethereum/swarm/chunk"
3332
"github.com/ethereum/go-ethereum/swarm/network/simulation"
3433
"github.com/ethereum/go-ethereum/swarm/state"
3534
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -287,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error {
287286

288287
type testExternalClient struct {
289288
hashes chan []byte
290-
store chunk.FetchStore
289+
netStore *storage.NetStore
291290
enableNotificationsC chan struct{}
292291
}
293292

294-
func newTestExternalClient(store chunk.FetchStore) *testExternalClient {
293+
func newTestExternalClient(netStore *storage.NetStore) *testExternalClient {
295294
return &testExternalClient{
296295
hashes: make(chan []byte),
297-
store: store,
296+
netStore: netStore,
298297
enableNotificationsC: make(chan struct{}),
299298
}
300299
}
301300

302301
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
303-
wait := c.store.FetchFunc(ctx, storage.Address(hash))
302+
wait := c.netStore.FetchFunc(ctx, storage.Address(hash))
304303
if wait == nil {
305304
return nil
306305
}

swarm/network/stream/syncer.go

Lines changed: 23 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,27 @@ const (
3434
// * live request delivery with or without checkback
3535
// * (live/non-live historical) chunk syncing per proximity bin
3636
type SwarmSyncerServer struct {
37-
po uint8
38-
store chunk.FetchStore
39-
quit chan struct{}
37+
po uint8
38+
netStore *storage.NetStore
39+
quit chan struct{}
4040
}
4141

4242
// NewSwarmSyncerServer is constructor for SwarmSyncerServer
43-
func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) {
43+
func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) {
4444
return &SwarmSyncerServer{
45-
po: po,
46-
store: syncChunkStore,
47-
quit: make(chan struct{}),
45+
po: po,
46+
netStore: netStore,
47+
quit: make(chan struct{}),
4848
}, nil
4949
}
5050

51-
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) {
51+
func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
5252
streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
5353
po, err := ParseSyncBinKey(t)
5454
if err != nil {
5555
return nil, err
5656
}
57-
return NewSwarmSyncerServer(po, syncChunkStore)
57+
return NewSwarmSyncerServer(po, netStore)
5858
})
5959
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
6060
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() {
6868

6969
// GetData retrieves the actual chunk from netstore
7070
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
71-
ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
71+
ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
7272
if err != nil {
7373
return nil, err
7474
}
@@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er
7777

7878
// SessionIndex returns current storage bin (po) index.
7979
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
80-
return s.store.LastPullSubscriptionBinID(s.po)
80+
return s.netStore.LastPullSubscriptionBinID(s.po)
8181
}
8282

8383
// SetNextBatch retrieves the next batch of hashes from the localstore.
@@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
8888
// are added in batchTimeout period, the batch will be returned. This function
8989
// will block until new chunks are received from localstore pull subscription.
9090
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
91-
descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to)
91+
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
9292
defer stop()
9393

9494
const batchTimeout = 2 * time.Second
@@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
118118
// This is the most naive approach to label the chunk as synced
119119
// allowing it to be garbage collected. A proper way requires
120120
// validating that the chunk is successfully stored by the peer.
121-
err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address)
121+
err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
122122
if err != nil {
123123
return nil, 0, 0, nil, err
124124
}
@@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
158158

159159
// SwarmSyncerClient
160160
type SwarmSyncerClient struct {
161-
store chunk.FetchStore
162-
peer *Peer
163-
stream Stream
161+
netStore *storage.NetStore
162+
peer *Peer
163+
stream Stream
164164
}
165165

166166
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
167-
func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) {
167+
func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) {
168168
return &SwarmSyncerClient{
169-
store: store,
170-
peer: p,
171-
stream: stream,
169+
netStore: netStore,
170+
peer: p,
171+
stream: stream,
172172
}, nil
173173
}
174174

175-
// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
176-
// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
177-
// retrieveC := make(storage.Chunk, chunksCap)
178-
// RunChunkRequestor(p, retrieveC)
179-
// storeC := make(storage.Chunk, chunksCap)
180-
// RunChunkStorer(store, storeC)
181-
// s := &SwarmSyncerClient{
182-
// po: po,
183-
// priority: priority,
184-
// sessionAt: sessionAt,
185-
// start: index,
186-
// end: index,
187-
// nextC: make(chan struct{}, 1),
188-
// intervals: intervals,
189-
// sessionRoot: sessionRoot,
190-
// sessionReader: chunker.Join(sessionRoot, retrieveC),
191-
// retrieveC: retrieveC,
192-
// storeC: storeC,
193-
// }
194-
// return s
195-
// }
196-
197-
// // StartSyncing is called on the Peer to start the syncing process
198-
// // the idea is that it is called only after kademlia is close to healthy
199-
// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) {
200-
// lastPO := po
201-
// if nn {
202-
// lastPO = maxPO
203-
// }
204-
//
205-
// for i := po; i <= lastPO; i++ {
206-
// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true)
207-
// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false)
208-
// }
209-
// }
210-
211175
// RegisterSwarmSyncerClient registers the client constructor function for
212176
// to handle incoming sync streams
213-
func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) {
177+
func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
214178
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
215-
return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
179+
return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live))
216180
})
217181
}
218182

219183
// NeedData
220184
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
221-
return s.store.FetchFunc(ctx, key)
185+
return s.netStore.FetchFunc(ctx, key)
222186
}
223187

224188
// BatchDone

0 commit comments

Comments
 (0)