Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 910b3a1

Browse files
authored
swarm/chunk: add tags data type (#1341)
* swarm/chunk: add tags backend to chunk package
1 parent 240e4b0 commit 910b3a1

File tree

5 files changed

+604
-0
lines changed

5 files changed

+604
-0
lines changed

swarm/chunk/chunk.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
// Copyright 2019 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
117
package chunk
218

319
import (

swarm/chunk/tag.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// Copyright 2019 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package chunk
18+
19+
import (
20+
"encoding/binary"
21+
"errors"
22+
"sync/atomic"
23+
"time"
24+
)
25+
26+
var (
27+
errExists = errors.New("already exists")
28+
errNA = errors.New("not available yet")
29+
errNoETA = errors.New("unable to calculate ETA")
30+
errTagNotFound = errors.New("tag not found")
31+
)
32+
33+
// State is the enum type for chunk states
34+
type State = uint32
35+
36+
const (
37+
SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
38+
STORED // chunk stored locally
39+
SEEN // chunk previously seen
40+
SENT // chunk sent to neighbourhood
41+
SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
42+
)
43+
44+
// Tag represents info on the status of new chunks
45+
type Tag struct {
46+
Uid uint32 // a unique identifier for this tag
47+
Name string // a name tag for this tag
48+
Address Address // the associated swarm hash for this tag
49+
total uint32 // total chunks belonging to a tag
50+
split uint32 // number of chunks already processed by splitter for hashing
51+
seen uint32 // number of chunks already seen
52+
stored uint32 // number of chunks already stored locally
53+
sent uint32 // number of chunks sent for push syncing
54+
synced uint32 // number of chunks synced with proof
55+
startedAt time.Time // tag started to calculate ETA
56+
}
57+
58+
// New creates a new tag, stores it by the name and returns it
59+
// it returns an error if the tag with this name already exists
60+
func NewTag(uid uint32, s string, total uint32) *Tag {
61+
t := &Tag{
62+
Uid: uid,
63+
Name: s,
64+
startedAt: time.Now(),
65+
total: total,
66+
}
67+
return t
68+
}
69+
70+
// Inc increments the count for a state
71+
func (t *Tag) Inc(state State) {
72+
var v *uint32
73+
switch state {
74+
case SPLIT:
75+
v = &t.split
76+
case STORED:
77+
v = &t.stored
78+
case SEEN:
79+
v = &t.seen
80+
case SENT:
81+
v = &t.sent
82+
case SYNCED:
83+
v = &t.synced
84+
}
85+
atomic.AddUint32(v, 1)
86+
}
87+
88+
// Get returns the count for a state on a tag
89+
func (t *Tag) Get(state State) int {
90+
var v *uint32
91+
switch state {
92+
case SPLIT:
93+
v = &t.split
94+
case STORED:
95+
v = &t.stored
96+
case SEEN:
97+
v = &t.seen
98+
case SENT:
99+
v = &t.sent
100+
case SYNCED:
101+
v = &t.synced
102+
}
103+
return int(atomic.LoadUint32(v))
104+
}
105+
106+
// GetTotal returns the total count
107+
func (t *Tag) Total() int {
108+
return int(atomic.LoadUint32(&t.total))
109+
}
110+
111+
// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
112+
// is meant to be called when splitter finishes for input streams of unknown size
113+
func (t *Tag) DoneSplit(address Address) int {
114+
total := atomic.LoadUint32(&t.split)
115+
atomic.StoreUint32(&t.total, total)
116+
t.Address = address
117+
return int(total)
118+
}
119+
120+
// Status returns the value of state and the total count
121+
func (t *Tag) Status(state State) (int, int, error) {
122+
count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total))
123+
if total == 0 {
124+
return count, total, errNA
125+
}
126+
switch state {
127+
case SPLIT, STORED, SEEN:
128+
return count, total, nil
129+
case SENT, SYNCED:
130+
stored := int(atomic.LoadUint32(&t.stored))
131+
if stored < total {
132+
return count, total - seen, errNA
133+
}
134+
return count, total - seen, nil
135+
}
136+
return count, total, errNA
137+
}
138+
139+
// ETA returns the time of completion estimated based on time passed and rate of completion
140+
func (t *Tag) ETA(state State) (time.Time, error) {
141+
cnt, total, err := t.Status(state)
142+
if err != nil {
143+
return time.Time{}, err
144+
}
145+
if cnt == 0 || total == 0 {
146+
return time.Time{}, errNoETA
147+
}
148+
diff := time.Since(t.startedAt)
149+
dur := time.Duration(total) * diff / time.Duration(cnt)
150+
return t.startedAt.Add(dur), nil
151+
}
152+
153+
// MarshalBinary marshals the tag into a byte slice
154+
func (tag *Tag) MarshalBinary() (data []byte, err error) {
155+
buffer := make([]byte, 0)
156+
encodeUint32Append(&buffer, tag.Uid)
157+
encodeUint32Append(&buffer, tag.total)
158+
encodeUint32Append(&buffer, tag.split)
159+
encodeUint32Append(&buffer, tag.seen)
160+
encodeUint32Append(&buffer, tag.stored)
161+
encodeUint32Append(&buffer, tag.sent)
162+
encodeUint32Append(&buffer, tag.synced)
163+
164+
intBuffer := make([]byte, 8)
165+
166+
n := binary.PutVarint(intBuffer, tag.startedAt.Unix())
167+
buffer = append(buffer, intBuffer[:n]...)
168+
169+
n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
170+
buffer = append(buffer, intBuffer[:n]...)
171+
172+
buffer = append(buffer, tag.Address[:]...)
173+
174+
buffer = append(buffer, []byte(tag.Name)...)
175+
176+
return buffer, nil
177+
}
178+
179+
// UnmarshalBinary unmarshals a byte slice into a tag
180+
func (tag *Tag) UnmarshalBinary(buffer []byte) error {
181+
if len(buffer) < 13 {
182+
return errors.New("buffer too short")
183+
}
184+
185+
tag.Uid = decodeUint32Splice(&buffer)
186+
tag.total = decodeUint32Splice(&buffer)
187+
tag.split = decodeUint32Splice(&buffer)
188+
tag.seen = decodeUint32Splice(&buffer)
189+
tag.stored = decodeUint32Splice(&buffer)
190+
tag.sent = decodeUint32Splice(&buffer)
191+
tag.synced = decodeUint32Splice(&buffer)
192+
193+
t, n := binary.Varint(buffer)
194+
tag.startedAt = time.Unix(t, 0)
195+
buffer = buffer[n:]
196+
197+
t, n = binary.Varint(buffer)
198+
buffer = buffer[n:]
199+
if t > 0 {
200+
tag.Address = buffer[:t]
201+
}
202+
tag.Name = string(buffer[t:])
203+
204+
return nil
205+
206+
}
207+
208+
func encodeUint32Append(buffer *[]byte, val uint32) {
209+
intBuffer := make([]byte, 4)
210+
binary.BigEndian.PutUint32(intBuffer, val)
211+
*buffer = append(*buffer, intBuffer...)
212+
}
213+
214+
func decodeUint32Splice(buffer *[]byte) uint32 {
215+
val := binary.BigEndian.Uint32((*buffer)[:4])
216+
*buffer = (*buffer)[4:]
217+
return val
218+
}

0 commit comments

Comments
 (0)