Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions swarm/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ type Store interface {
Close() (err error)
}

// FetchStore is a Store which supports syncing
type FetchStore interface {
Store
FetchFunc(ctx context.Context, addr Address) func(context.Context) error
}

// Validator validates a chunk.
type Validator interface {
Validate(ch Chunk) bool
Expand Down
20 changes: 10 additions & 10 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ var (
)

type Delivery struct {
chunkStore chunk.FetchStore
kad *network.Kademlia
getPeer func(enode.ID) *Peer
quit chan struct{}
netStore *storage.NetStore
kad *network.Kademlia
getPeer func(enode.ID) *Peer
quit chan struct{}
}

func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery {
func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
return &Delivery{
chunkStore: chunkStore,
kad: kad,
quit: make(chan struct{}),
netStore: netStore,
kad: kad,
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *

go func() {
defer osp.Finish()
ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int

msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
_, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
_, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
Expand Down
9 changes: 4 additions & 5 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -287,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error {

type testExternalClient struct {
hashes chan []byte
store chunk.FetchStore
netStore *storage.NetStore
enableNotificationsC chan struct{}
}

func newTestExternalClient(store chunk.FetchStore) *testExternalClient {
func newTestExternalClient(netStore *storage.NetStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
store: store,
netStore: netStore,
enableNotificationsC: make(chan struct{}),
}
}

func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
wait := c.store.FetchFunc(ctx, storage.Address(hash))
wait := c.netStore.FetchFunc(ctx, storage.Address(hash))
if wait == nil {
return nil
}
Expand Down
82 changes: 23 additions & 59 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ const (
// * live request delivery with or without checkback
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
store chunk.FetchStore
quit chan struct{}
po uint8
netStore *storage.NetStore
quit chan struct{}
}

// NewSwarmSyncerServer is constructor for SwarmSyncerServer
func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) {
func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) {
return &SwarmSyncerServer{
po: po,
store: syncChunkStore,
quit: make(chan struct{}),
po: po,
netStore: netStore,
quit: make(chan struct{}),
}, nil
}

func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) {
func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
return NewSwarmSyncerServer(po, syncChunkStore)
return NewSwarmSyncerServer(po, netStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
Expand All @@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() {

// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
Expand All @@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er

// SessionIndex returns current storage bin (po) index.
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
return s.store.LastPullSubscriptionBinID(s.po)
return s.netStore.LastPullSubscriptionBinID(s.po)
}

// SetNextBatch retrieves the next batch of hashes from the localstore.
Expand All @@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to)
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()

const batchTimeout = 2 * time.Second
Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
// validating that the chunk is successfully stored by the peer.
err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address)
err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
if err != nil {
return nil, 0, 0, nil, err
}
Expand Down Expand Up @@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6

// SwarmSyncerClient
type SwarmSyncerClient struct {
store chunk.FetchStore
peer *Peer
stream Stream
netStore *storage.NetStore
peer *Peer
stream Stream
}

// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) {
func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
store: store,
peer: p,
stream: stream,
netStore: netStore,
peer: p,
stream: stream,
}, nil
}

// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// retrieveC := make(storage.Chunk, chunksCap)
// RunChunkRequestor(p, retrieveC)
// storeC := make(storage.Chunk, chunksCap)
// RunChunkStorer(store, storeC)
// s := &SwarmSyncerClient{
// po: po,
// priority: priority,
// sessionAt: sessionAt,
// start: index,
// end: index,
// nextC: make(chan struct{}, 1),
// intervals: intervals,
// sessionRoot: sessionRoot,
// sessionReader: chunker.Join(sessionRoot, retrieveC),
// retrieveC: retrieveC,
// storeC: storeC,
// }
// return s
// }

// // StartSyncing is called on the Peer to start the syncing process
// // the idea is that it is called only after kademlia is close to healthy
// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) {
// lastPO := po
// if nn {
// lastPO = maxPO
// }
//
// for i := po; i <= lastPO; i++ {
// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true)
// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false)
// }
// }

// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) {
func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live))
})
}

// NeedData
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
return s.store.FetchFunc(ctx, key)
return s.netStore.FetchFunc(ctx, key)
}

// BatchDone
Expand Down