Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit 0187057

Browse files
committed
swarm/storage/netstore: add fetcher cancellation on shutdown
swarm/network/stream: remove netstore internal wg swarm/network/stream: run individual tests with t.Run
1 parent 75d292b commit 0187057

File tree

3 files changed

+157
-134
lines changed

3 files changed

+157
-134
lines changed

swarm/network/stream/delivery_test.go

Lines changed: 129 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -454,149 +454,151 @@ func TestDeliveryFromNodes(t *testing.T) {
454454
}
455455

456456
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
457-
sim := simulation.New(map[string]simulation.ServiceFunc{
458-
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
459-
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
460-
if err != nil {
461-
return nil, nil, err
462-
}
463-
464-
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
465-
SkipCheck: skipCheck,
466-
Syncing: SyncingDisabled,
467-
Retrieval: RetrievalEnabled,
468-
}, nil)
469-
bucket.Store(bucketKeyRegistry, r)
457+
t.Run(fmt.Sprintf("test_deliveryFromNodes_%d_%d_skipCheck_%v", nodes, chunkCount, skipCheck), func(t *testing.T) {
458+
sim := simulation.New(map[string]simulation.ServiceFunc{
459+
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
460+
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
461+
if err != nil {
462+
return nil, nil, err
463+
}
470464

471-
cleanup = func() {
472-
r.Close()
473-
clean()
474-
}
465+
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
466+
SkipCheck: skipCheck,
467+
Syncing: SyncingDisabled,
468+
Retrieval: RetrievalEnabled,
469+
}, nil)
470+
bucket.Store(bucketKeyRegistry, r)
475471

476-
return r, cleanup, nil
477-
},
478-
})
479-
defer sim.Close()
472+
cleanup = func() {
473+
r.Close()
474+
clean()
475+
}
480476

481-
log.Info("Adding nodes to simulation")
482-
_, err := sim.AddNodesAndConnectChain(nodes)
483-
if err != nil {
484-
t.Fatal(err)
485-
}
477+
return r, cleanup, nil
478+
},
479+
})
480+
defer sim.Close()
486481

487-
log.Info("Starting simulation")
488-
ctx := context.Background()
489-
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
490-
nodeIDs := sim.UpNodeIDs()
491-
//determine the pivot node to be the first node of the simulation
492-
pivot := nodeIDs[0]
493-
494-
//distribute chunks of a random file into Stores of nodes 1 to nodes
495-
//we will do this by creating a file store with an underlying round-robin store:
496-
//the file store will create a hash for the uploaded file, but every chunk will be
497-
//distributed to different nodes via round-robin scheduling
498-
log.Debug("Writing file to round-robin file store")
499-
//to do this, we create an array for chunkstores (length minus one, the pivot node)
500-
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
501-
//we then need to get all stores from the sim....
502-
lStores := sim.NodesItems(bucketKeyStore)
503-
i := 0
504-
//...iterate the buckets...
505-
for id, bucketVal := range lStores {
506-
//...and remove the one which is the pivot node
507-
if id == pivot {
508-
continue
509-
}
510-
//the other ones are added to the array...
511-
stores[i] = bucketVal.(storage.ChunkStore)
512-
i++
513-
}
514-
//...which then gets passed to the round-robin file store
515-
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
516-
//now we can actually upload a (random) file to the round-robin store
517-
size := chunkCount * chunkSize
518-
log.Debug("Storing data to file store")
519-
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
520-
// wait until all chunks stored
482+
log.Info("Adding nodes to simulation")
483+
_, err := sim.AddNodesAndConnectChain(nodes)
521484
if err != nil {
522-
return err
523-
}
524-
err = wait(ctx)
525-
if err != nil {
526-
return err
485+
t.Fatal(err)
527486
}
528487

529-
log.Debug("Waiting for kademlia")
530-
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
531-
if _, err := sim.WaitTillHealthy(ctx); err != nil {
532-
return err
533-
}
488+
log.Info("Starting simulation")
489+
ctx := context.Background()
490+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
491+
nodeIDs := sim.UpNodeIDs()
492+
//determine the pivot node to be the first node of the simulation
493+
pivot := nodeIDs[0]
494+
495+
//distribute chunks of a random file into Stores of nodes 1 to nodes
496+
//we will do this by creating a file store with an underlying round-robin store:
497+
//the file store will create a hash for the uploaded file, but every chunk will be
498+
//distributed to different nodes via round-robin scheduling
499+
log.Debug("Writing file to round-robin file store")
500+
//to do this, we create an array for chunkstores (length minus one, the pivot node)
501+
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
502+
//we then need to get all stores from the sim....
503+
lStores := sim.NodesItems(bucketKeyStore)
504+
i := 0
505+
//...iterate the buckets...
506+
for id, bucketVal := range lStores {
507+
//...and remove the one which is the pivot node
508+
if id == pivot {
509+
continue
510+
}
511+
//the other ones are added to the array...
512+
stores[i] = bucketVal.(storage.ChunkStore)
513+
i++
514+
}
515+
//...which then gets passed to the round-robin file store
516+
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
517+
//now we can actually upload a (random) file to the round-robin store
518+
size := chunkCount * chunkSize
519+
log.Debug("Storing data to file store")
520+
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
521+
// wait until all chunks stored
522+
if err != nil {
523+
return err
524+
}
525+
err = wait(ctx)
526+
if err != nil {
527+
return err
528+
}
534529

535-
//get the pivot node's filestore
536-
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
537-
if !ok {
538-
return fmt.Errorf("No filestore")
539-
}
540-
pivotFileStore := item.(*storage.FileStore)
541-
log.Debug("Starting retrieval routine")
542-
retErrC := make(chan error)
543-
go func() {
544-
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
545-
// we must wait for the peer connections to have started before requesting
546-
n, err := readAll(pivotFileStore, fileHash)
547-
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
548-
retErrC <- err
549-
}()
530+
log.Debug("Waiting for kademlia")
531+
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
532+
if _, err := sim.WaitTillHealthy(ctx); err != nil {
533+
return err
534+
}
550535

551-
log.Debug("Watching for disconnections")
552-
disconnections := sim.PeerEvents(
553-
context.Background(),
554-
sim.NodeIDs(),
555-
simulation.NewPeerEventsFilter().Drop(),
556-
)
536+
//get the pivot node's filestore
537+
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
538+
if !ok {
539+
return fmt.Errorf("No filestore")
540+
}
541+
pivotFileStore := item.(*storage.FileStore)
542+
log.Debug("Starting retrieval routine")
543+
retErrC := make(chan error)
544+
go func() {
545+
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
546+
// we must wait for the peer connections to have started before requesting
547+
n, err := readAll(pivotFileStore, fileHash)
548+
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
549+
retErrC <- err
550+
}()
551+
552+
log.Debug("Watching for disconnections")
553+
disconnections := sim.PeerEvents(
554+
context.Background(),
555+
sim.NodeIDs(),
556+
simulation.NewPeerEventsFilter().Drop(),
557+
)
557558

558-
var disconnected atomic.Value
559-
go func() {
560-
for d := range disconnections {
561-
if d.Error != nil {
562-
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
563-
disconnected.Store(true)
559+
var disconnected atomic.Value
560+
go func() {
561+
for d := range disconnections {
562+
if d.Error != nil {
563+
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
564+
disconnected.Store(true)
565+
}
564566
}
565-
}
566-
}()
567-
defer func() {
568-
if err != nil {
569-
if yes, ok := disconnected.Load().(bool); ok && yes {
570-
err = errors.New("disconnect events received")
567+
}()
568+
defer func() {
569+
if err != nil {
570+
if yes, ok := disconnected.Load().(bool); ok && yes {
571+
err = errors.New("disconnect events received")
572+
}
571573
}
572-
}
573-
}()
574+
}()
574575

575-
//finally check that the pivot node gets all chunks via the root hash
576-
log.Debug("Check retrieval")
577-
success := true
578-
var total int64
579-
total, err = readAll(pivotFileStore, fileHash)
580-
if err != nil {
581-
return err
582-
}
583-
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
584-
if err != nil || total != int64(size) {
585-
success = false
586-
}
576+
//finally check that the pivot node gets all chunks via the root hash
577+
log.Debug("Check retrieval")
578+
success := true
579+
var total int64
580+
total, err = readAll(pivotFileStore, fileHash)
581+
if err != nil {
582+
return err
583+
}
584+
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
585+
if err != nil || total != int64(size) {
586+
success = false
587+
}
587588

588-
if !success {
589-
return fmt.Errorf("Test failed, chunks not available on all nodes")
590-
}
591-
if err := <-retErrC; err != nil {
592-
t.Fatalf("requesting chunks: %v", err)
589+
if !success {
590+
return fmt.Errorf("Test failed, chunks not available on all nodes")
591+
}
592+
if err := <-retErrC; err != nil {
593+
t.Fatalf("requesting chunks: %v", err)
594+
}
595+
log.Debug("Test terminated successfully")
596+
return nil
597+
})
598+
if result.Error != nil {
599+
t.Fatal(result.Error)
593600
}
594-
log.Debug("Test terminated successfully")
595-
return nil
596601
})
597-
if result.Error != nil {
598-
t.Fatal(result.Error)
599-
}
600602
}
601603

602604
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {

swarm/network/stream/snapshot_retrieval_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestRetrieval(t *testing.T) {
7474
//if nodes/chunks have been provided via commandline,
7575
//run the tests with these values
7676
if *nodes != 0 && *chunks != 0 {
77-
err := runRetrievalTest(*chunks, *nodes)
77+
err := runRetrievalTest(t, *chunks, *nodes)
7878
if err != nil {
7979
t.Fatal(err)
8080
}
@@ -93,10 +93,12 @@ func TestRetrieval(t *testing.T) {
9393
}
9494
for _, n := range nodeCnt {
9595
for _, c := range chnkCnt {
96-
err := runRetrievalTest(c, n)
97-
if err != nil {
98-
t.Fatal(err)
99-
}
96+
t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
97+
err := runRetrievalTest(t, c, n)
98+
if err != nil {
99+
t.Fatal(err)
100+
}
101+
})
100102
}
101103
}
102104
}
@@ -225,7 +227,8 @@ simulation's `action` function.
225227
226228
The snapshot should have 'streamer' in its service list.
227229
*/
228-
func runRetrievalTest(chunkCount int, nodeCount int) error {
230+
func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
231+
t.Helper()
229232
sim := simulation.New(retrievalSimServiceMap)
230233
defer sim.Close()
231234

swarm/storage/netstore.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
128128
func (n *NetStore) Close() {
129129
close(n.closeC)
130130
n.store.Close()
131-
// TODO: loop through fetchers to cancel them
131+
132+
wg := sync.WaitGroup{}
133+
for _, key := range n.fetchers.Keys() {
134+
if f, ok := n.fetchers.Get(key); ok {
135+
if fetch, ok := f.(*fetcher); ok {
136+
wg.Add(1)
137+
go func(fetch *fetcher) {
138+
defer wg.Done()
139+
fetch.cancel()
140+
141+
select {
142+
case <-fetch.deliveredC:
143+
case <-fetch.cancelledC:
144+
}
145+
}(fetch)
146+
}
147+
}
148+
}
149+
wg.Wait()
132150
}
133151

134152
// get attempts at retrieving the chunk from LocalStore

0 commit comments

Comments
 (0)