Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 09ad319

Browse files
committed
swarm/pushsync, swarm/pss: push sync implementation
1 parent 595b4ff commit 09ad319

File tree

6 files changed

+1094
-0
lines changed

6 files changed

+1094
-0
lines changed

swarm/pss/pubsub.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package pss
2+
3+
import "github.com/ethereum/go-ethereum/p2p"
4+
5+
// PubSub implements the pusher.PubSub interface using pss
6+
type PubSub struct {
7+
pss *Pss // pss
8+
prox bool // determines if pss send should use neighbourhood addressing
9+
}
10+
11+
// NewPubSub creates a new PubSub
12+
func NewPubSub(p *Pss, prox bool) *PubSub {
13+
return &PubSub{
14+
pss: p,
15+
prox: prox,
16+
}
17+
}
18+
19+
// BaseAddr returns Kademlia base address
20+
func (p *PubSub) BaseAddr() []byte {
21+
return p.pss.Kademlia.BaseAddr()
22+
}
23+
24+
// Register registers a handler
25+
func (p *PubSub) Register(topic string, handler func(msg []byte, p *p2p.Peer) error) func() {
26+
f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error {
27+
return handler(msg, peer)
28+
}
29+
h := NewHandler(f).WithRaw()
30+
if p.prox {
31+
h = h.WithProxBin()
32+
}
33+
pt := BytesToTopic([]byte(topic))
34+
return p.pss.Register(&pt, h)
35+
}
36+
37+
// Send sends a message using pss SendRaw
38+
func (p *PubSub) Send(to []byte, topic string, msg []byte) error {
39+
pt := BytesToTopic([]byte(topic))
40+
return p.pss.SendRaw(PssAddress(to), pt, msg)
41+
}

swarm/storage/pushsync/protocol.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package pushsync
2+
3+
import (
4+
"crypto/rand"
5+
6+
"github.com/ethereum/go-ethereum/common/hexutil"
7+
"github.com/ethereum/go-ethereum/p2p"
8+
"github.com/ethereum/go-ethereum/rlp"
9+
)
10+
11+
const (
12+
pssChunkTopic = "PUSHSYNC_CHUNKS" // pss topic for chunks
13+
pssReceiptTopic = "PUSHSYNC_RECEIPTS" // pss topic for statement of custody receipts
14+
)
15+
16+
// PubSub is a Postal Service interface needed to send/receive chunks and receipts for push syncing
17+
type PubSub interface {
18+
Register(topic string, handler func(msg []byte, p *p2p.Peer) error) func()
19+
Send(to []byte, topic string, msg []byte) error
20+
BaseAddr() []byte
21+
}
22+
23+
// chunkMsg is the message construct to send chunks to their local neighbourhood
24+
type chunkMsg struct {
25+
Addr []byte // chunk address
26+
Data []byte // chunk data
27+
Origin []byte // originator
28+
Nonce []byte // nonce to make multiple instances of send immune to deduplication cache
29+
}
30+
31+
// receiptMsg is a statement of custody response to receiving a push-synced chunk
32+
// it is currently a notification only (contains no proof) sent to the originator
33+
// Nonce is there to make multiple responses immune to deduplication cache
34+
type receiptMsg struct {
35+
Addr []byte
36+
Nonce []byte
37+
}
38+
39+
func decodeChunkMsg(msg []byte) (*chunkMsg, error) {
40+
var chmsg chunkMsg
41+
err := rlp.DecodeBytes(msg, &chmsg)
42+
if err != nil {
43+
return nil, err
44+
}
45+
return &chmsg, nil
46+
}
47+
48+
func decodeReceiptMsg(msg []byte) (*receiptMsg, error) {
49+
var rmsg receiptMsg
50+
err := rlp.DecodeBytes(msg, &rmsg)
51+
if err != nil {
52+
return nil, err
53+
}
54+
return &rmsg, nil
55+
}
56+
57+
// newNonce creates a random nonce;
58+
// even without POC it is important otherwise resending a chunk is deduplicated by pss
59+
func newNonce() []byte {
60+
buf := make([]byte, 32)
61+
t := 0
62+
for t < len(buf) {
63+
n, _ := rand.Read(buf[t:])
64+
t += n
65+
}
66+
return buf
67+
}
68+
69+
func label(b []byte) string {
70+
return hexutil.Encode(b[:8])
71+
}

swarm/storage/pushsync/pusher.go

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package pushsync
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/ethereum/go-ethereum/p2p"
8+
"github.com/ethereum/go-ethereum/rlp"
9+
"github.com/ethereum/go-ethereum/swarm/chunk"
10+
"github.com/ethereum/go-ethereum/swarm/log"
11+
"github.com/ethereum/go-ethereum/swarm/storage"
12+
)
13+
14+
// DB interface
15+
type DB interface {
16+
// subscribe to chunk to be push synced - iterates from earliest to newest
17+
SubscribePush(context.Context) (<-chan storage.Chunk, func())
18+
// called to set a chunk as synced - and allow it to be garbage collected
19+
// TODO this should take ... last argument to delete many in one batch
20+
Set(context.Context, chunk.ModeSet, storage.Address) error
21+
}
22+
23+
// Pusher takes care of the push syncing
24+
type Pusher struct {
25+
store DB // localstore DB
26+
tags *chunk.Tags // tags to update counts
27+
quit chan struct{} // channel to signal quitting on all loops
28+
pushed map[string]*pushedItem // cache of items push-synced
29+
receipts chan chunk.Address // channel to receive receipts
30+
ps PubSub // PubSub interface to send chunks and receive receipts
31+
}
32+
33+
var (
34+
retryInterval = 1 * time.Second // seconds to wait before retry sync
35+
)
36+
37+
// pushedItem captures the info needed for the pusher about a chunk during the
38+
// push-sync--receipt roundtrip
39+
type pushedItem struct {
40+
tag *chunk.Tag // tag for the chunk
41+
sentAt time.Time // time last pushed
42+
synced bool // set when
43+
}
44+
45+
// New contructs a Pusher and starts up the push sync protocol
46+
func New(store DB, ps PubSub, tags *chunk.Tags) *Pusher {
47+
p := &Pusher{
48+
store: store,
49+
tags: tags,
50+
quit: make(chan struct{}),
51+
pushed: make(map[string]*pushedItem),
52+
receipts: make(chan chunk.Address),
53+
ps: ps,
54+
}
55+
go p.sync()
56+
return p
57+
}
58+
59+
// Close closes the pusher
60+
func (p *Pusher) Close() {
61+
close(p.quit)
62+
}
63+
64+
// sync starts a forever loop that pushes chunks to their neighbourhood
65+
// and receives receipts for them
66+
// chunks that are not acknowledged with a receipt are retried
67+
// not earlier than retryInterval after they were last pushed
68+
// the routine also updates counts of states on a tag in order
69+
// to monitor the proportion of saved, sent and synced chunks of
70+
// a file or collection
71+
func (p *Pusher) sync() {
72+
var chunks <-chan chunk.Chunk
73+
var cancel, stop func()
74+
var ctx context.Context
75+
var synced []storage.Address
76+
77+
// timer
78+
timer := time.NewTimer(0)
79+
defer timer.Stop()
80+
81+
// register handler for pssReceiptTopic on pss pubsub
82+
deregister := p.ps.Register(pssReceiptTopic, func(msg []byte, _ *p2p.Peer) error {
83+
return p.handleReceiptMsg(msg)
84+
})
85+
defer deregister()
86+
87+
for {
88+
select {
89+
90+
// handle incoming chunks
91+
case ch, more := <-chunks:
92+
// if no more, set to nil and wait for timer
93+
if !more {
94+
chunks = nil
95+
continue
96+
}
97+
// if no need to sync this chunk then continue
98+
if !p.needToSync(ch) {
99+
continue
100+
}
101+
// send the chunk and ignore the error
102+
if err := p.sendChunkMsg(ch); err != nil {
103+
log.Warn("error sending chunk", "addr", ch.Address(), "err", err)
104+
}
105+
106+
// handle incoming receipts
107+
case addr := <-p.receipts:
108+
log.Warn("synced", "addr", addr)
109+
// ignore if already received receipt
110+
item, found := p.pushed[addr.Hex()]
111+
if !found {
112+
log.Debug("not wanted or already got... ignore", "addr", addr)
113+
continue
114+
}
115+
if item.synced {
116+
log.Debug("just synced... ignore", "addr", addr)
117+
continue
118+
}
119+
// collect synced addresses
120+
synced = append(synced, addr)
121+
// set synced flag
122+
item.synced = true
123+
// increment synced count for the tag if exists
124+
if item.tag != nil {
125+
item.tag.Inc(chunk.StateSynced)
126+
log.Warn("tag", "uid", item.tag.Uid, "synced", item.tag.Get(chunk.StateSynced))
127+
}
128+
log.Warn("synced", "addr", addr)
129+
130+
// retry interval timer triggers starting from new
131+
case <-timer.C:
132+
log.Debug("timer")
133+
// TODO: implement some smart retry strategy relying on sent/synced ratio change
134+
// if subscribe was running, stop it
135+
if stop != nil {
136+
stop()
137+
}
138+
for _, addr := range synced {
139+
// set chunk status to synced, insert to db GC index
140+
if err := p.store.Set(context.Background(), chunk.ModeSetSync, addr); err != nil {
141+
log.Warn("error setting chunk to synced", "addr", addr, "err", err)
142+
continue
143+
}
144+
delete(p.pushed, addr.Hex())
145+
}
146+
synced = nil
147+
148+
// and start from the beginning
149+
ctx, cancel = context.WithCancel(context.Background())
150+
chunks, stop = p.store.SubscribePush(ctx)
151+
// reset timer to go off after retryInterval
152+
timer.Reset(retryInterval)
153+
154+
case <-p.quit:
155+
// if there was a subscription, cancel it
156+
if cancel != nil {
157+
cancel()
158+
}
159+
return
160+
}
161+
}
162+
}
163+
164+
// handleReceiptMsg id a handler for pssReceiptTopic that
165+
// - deserialises receiptMsg and
166+
// - sends the receipted address on a channel
167+
func (p *Pusher) handleReceiptMsg(msg []byte) error {
168+
receipt, err := decodeReceiptMsg(msg)
169+
if err != nil {
170+
return err
171+
}
172+
log.Debug("Handler", "receipt", label(receipt.Addr), "self", label(p.ps.BaseAddr()))
173+
select {
174+
case p.receipts <- receipt.Addr:
175+
case <-p.quit:
176+
}
177+
return nil
178+
}
179+
180+
// sendChunkMsg sends chunks to their destination
181+
// using the PubSub interface Send method (e.g., pss neighbourhood addressing)
182+
func (p *Pusher) sendChunkMsg(ch chunk.Chunk) error {
183+
cmsg := &chunkMsg{
184+
Origin: p.ps.BaseAddr(),
185+
Addr: ch.Address()[:],
186+
Data: ch.Data(),
187+
Nonce: newNonce(),
188+
}
189+
msg, err := rlp.EncodeToBytes(cmsg)
190+
if err != nil {
191+
return err
192+
}
193+
log.Warn("send chunk", "addr", label(ch.Address()), "self", label(p.ps.BaseAddr()))
194+
return p.ps.Send(ch.Address()[:], pssChunkTopic, msg)
195+
}
196+
197+
// needToSync checks if a chunk needs to be push-synced:
198+
// * if not sent yet OR
199+
// * if sent but more then retryInterval ago, so need resend
200+
func (p *Pusher) needToSync(ch chunk.Chunk) bool {
201+
item, found := p.pushed[ch.Address().Hex()]
202+
// has been pushed already
203+
if found {
204+
// has synced already since subscribe called
205+
if item.synced {
206+
return false
207+
}
208+
// too early to retry
209+
if item.sentAt.Add(retryInterval).After(time.Now()) {
210+
return false
211+
}
212+
// first time encountered
213+
} else {
214+
// remember item
215+
tag, _ := p.tags.Get(ch.Tag())
216+
log.Warn("tag", "uid from db", ch.Tag())
217+
item = &pushedItem{
218+
tag: tag,
219+
}
220+
if item.tag != nil {
221+
item.tag.Inc(chunk.StateSent)
222+
}
223+
log.Warn("tag", "uid", tag.Uid, "sent", tag.Get(chunk.StateSent))
224+
p.pushed[ch.Address().Hex()] = item
225+
}
226+
item.sentAt = time.Now()
227+
return true
228+
}

0 commit comments

Comments
 (0)