@@ -453,133 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
453453}
454454
455455func testDeliveryFromNodes (t * testing.T , nodes , chunkCount int , skipCheck bool ) {
456- sim := simulation .New (map [string ]simulation.ServiceFunc {
457- "streamer" : func (ctx * adapters.ServiceContext , bucket * sync.Map ) (s node.Service , cleanup func (), err error ) {
458- addr , netStore , delivery , clean , err := newNetStoreAndDelivery (ctx , bucket )
459- if err != nil {
460- return nil , nil , err
461- }
462-
463- r := NewRegistry (addr .ID (), delivery , netStore , state .NewInmemoryStore (), & RegistryOptions {
464- SkipCheck : skipCheck ,
465- Syncing : SyncingDisabled ,
466- Retrieval : RetrievalEnabled ,
467- }, nil )
468- bucket .Store (bucketKeyRegistry , r )
456+ t .Helper ()
457+ t .Run (fmt .Sprintf ("testDeliveryFromNodes_%d_%d_skipCheck_%t" , nodes , chunkCount , skipCheck ), func (t * testing.T ) {
458+ sim := simulation .New (map [string ]simulation.ServiceFunc {
459+ "streamer" : func (ctx * adapters.ServiceContext , bucket * sync.Map ) (s node.Service , cleanup func (), err error ) {
460+ addr , netStore , delivery , clean , err := newNetStoreAndDelivery (ctx , bucket )
461+ if err != nil {
462+ return nil , nil , err
463+ }
469464
470- cleanup = func () {
471- r .Close ()
472- clean ()
473- }
465+ r := NewRegistry (addr .ID (), delivery , netStore , state .NewInmemoryStore (), & RegistryOptions {
466+ SkipCheck : skipCheck ,
467+ Syncing : SyncingDisabled ,
468+ Retrieval : RetrievalEnabled ,
469+ }, nil )
470+ bucket .Store (bucketKeyRegistry , r )
474471
475- return r , cleanup , nil
476- },
477- } )
478- defer sim . Close ()
472+ cleanup = func () {
473+ r . Close ()
474+ clean ( )
475+ }
479476
480- log .Info ("Adding nodes to simulation" )
481- _ , err := sim .AddNodesAndConnectChain (nodes )
482- if err != nil {
483- t .Fatal (err )
484- }
477+ return r , cleanup , nil
478+ },
479+ })
480+ defer sim .Close ()
485481
486- log .Info ("Starting simulation" )
487- ctx , cancel := context .WithCancel (context .Background ())
488- defer cancel ()
489- result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) (err error ) {
490- nodeIDs := sim .UpNodeIDs ()
491- //determine the pivot node to be the first node of the simulation
492- pivot := nodeIDs [0 ]
493-
494- //distribute chunks of a random file into Stores of nodes 1 to nodes
495- //we will do this by creating a file store with an underlying round-robin store:
496- //the file store will create a hash for the uploaded file, but every chunk will be
497- //distributed to different nodes via round-robin scheduling
498- log .Debug ("Writing file to round-robin file store" )
499- //to do this, we create an array for chunkstores (length minus one, the pivot node)
500- stores := make ([]storage.ChunkStore , len (nodeIDs )- 1 )
501- //we then need to get all stores from the sim....
502- lStores := sim .NodesItems (bucketKeyStore )
503- i := 0
504- //...iterate the buckets...
505- for id , bucketVal := range lStores {
506- //...and remove the one which is the pivot node
507- if id == pivot {
508- continue
509- }
510- //the other ones are added to the array...
511- stores [i ] = bucketVal .(storage.ChunkStore )
512- i ++
513- }
514- //...which then gets passed to the round-robin file store
515- roundRobinFileStore := storage .NewFileStore (newRoundRobinStore (stores ... ), storage .NewFileStoreParams ())
516- //now we can actually upload a (random) file to the round-robin store
517- size := chunkCount * chunkSize
518- log .Debug ("Storing data to file store" )
519- fileHash , wait , err := roundRobinFileStore .Store (ctx , testutil .RandomReader (1 , size ), int64 (size ), false )
520- // wait until all chunks stored
482+ log .Info ("Adding nodes to simulation" )
483+ _ , err := sim .AddNodesAndConnectChain (nodes )
521484 if err != nil {
522- return err
523- }
524- err = wait (ctx )
525- if err != nil {
526- return err
485+ t .Fatal (err )
527486 }
528487
529- log .Debug ("Waiting for kademlia" )
530- // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
531- if _ , err := sim .WaitTillHealthy (ctx ); err != nil {
532- return err
533- }
488+ log .Info ("Starting simulation" )
489+ ctx , cancel := context .WithCancel (context .Background ())
490+ defer cancel ()
491+ result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) (err error ) {
492+ nodeIDs := sim .UpNodeIDs ()
493+ //determine the pivot node to be the first node of the simulation
494+ pivot := nodeIDs [0 ]
495+
496+ //distribute chunks of a random file into Stores of nodes 1 to nodes
497+ //we will do this by creating a file store with an underlying round-robin store:
498+ //the file store will create a hash for the uploaded file, but every chunk will be
499+ //distributed to different nodes via round-robin scheduling
500+ log .Debug ("Writing file to round-robin file store" )
501+ //to do this, we create an array for chunkstores (length minus one, the pivot node)
502+ stores := make ([]storage.ChunkStore , len (nodeIDs )- 1 )
503+ //we then need to get all stores from the sim....
504+ lStores := sim .NodesItems (bucketKeyStore )
505+ i := 0
506+ //...iterate the buckets...
507+ for id , bucketVal := range lStores {
508+ //...and remove the one which is the pivot node
509+ if id == pivot {
510+ continue
511+ }
512+ //the other ones are added to the array...
513+ stores [i ] = bucketVal .(storage.ChunkStore )
514+ i ++
515+ }
516+ //...which then gets passed to the round-robin file store
517+ roundRobinFileStore := storage .NewFileStore (newRoundRobinStore (stores ... ), storage .NewFileStoreParams ())
518+ //now we can actually upload a (random) file to the round-robin store
519+ size := chunkCount * chunkSize
520+ log .Debug ("Storing data to file store" )
521+ fileHash , wait , err := roundRobinFileStore .Store (ctx , testutil .RandomReader (1 , size ), int64 (size ), false )
522+ // wait until all chunks stored
523+ if err != nil {
524+ return err
525+ }
526+ err = wait (ctx )
527+ if err != nil {
528+ return err
529+ }
534530
535- //get the pivot node's filestore
536- item , ok := sim .NodeItem (pivot , bucketKeyFileStore )
537- if ! ok {
538- return fmt .Errorf ("No filestore" )
539- }
540- pivotFileStore := item .(* storage.FileStore )
541- log .Debug ("Starting retrieval routine" )
542- retErrC := make (chan error )
543- go func () {
544- // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
545- // we must wait for the peer connections to have started before requesting
546- n , err := readAll (pivotFileStore , fileHash )
547- log .Info (fmt .Sprintf ("retrieved %v" , fileHash ), "read" , n , "err" , err )
548- retErrC <- err
549- }()
531+ log .Debug ("Waiting for kademlia" )
532+ // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
533+ if _ , err := sim .WaitTillHealthy (ctx ); err != nil {
534+ return err
535+ }
550536
551- disconnected := watchDisconnections ( ctx , sim )
552- defer func () {
553- if err != nil && disconnected . bool () {
554- err = errors . New ( "disconnect events received " )
537+ //get the pivot node's filestore
538+ item , ok := sim . NodeItem ( pivot , bucketKeyFileStore )
539+ if ! ok {
540+ return fmt . Errorf ( "No filestore " )
555541 }
556- }()
542+ pivotFileStore := item .(* storage.FileStore )
543+ log .Debug ("Starting retrieval routine" )
544+ retErrC := make (chan error )
545+ go func () {
546+ // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
547+ // we must wait for the peer connections to have started before requesting
548+ n , err := readAll (pivotFileStore , fileHash )
549+ log .Info (fmt .Sprintf ("retrieved %v" , fileHash ), "read" , n , "err" , err )
550+ retErrC <- err
551+ }()
552+
553+ disconnected := watchDisconnections (ctx , sim )
554+ defer func () {
555+ if err != nil && disconnected .bool () {
556+ err = errors .New ("disconnect events received" )
557+ }
558+ }()
557559
558- //finally check that the pivot node gets all chunks via the root hash
559- log .Debug ("Check retrieval" )
560- success := true
561- var total int64
562- total , err = readAll (pivotFileStore , fileHash )
563- if err != nil {
564- return err
565- }
566- log .Info (fmt .Sprintf ("check if %08x is available locally: number of bytes read %v/%v (error: %v)" , fileHash , total , size , err ))
567- if err != nil || total != int64 (size ) {
568- success = false
569- }
560+ //finally check that the pivot node gets all chunks via the root hash
561+ log .Debug ("Check retrieval" )
562+ success := true
563+ var total int64
564+ total , err = readAll (pivotFileStore , fileHash )
565+ if err != nil {
566+ return err
567+ }
568+ log .Info (fmt .Sprintf ("check if %08x is available locally: number of bytes read %v/%v (error: %v)" , fileHash , total , size , err ))
569+ if err != nil || total != int64 (size ) {
570+ success = false
571+ }
570572
571- if ! success {
572- return fmt .Errorf ("Test failed, chunks not available on all nodes" )
573- }
574- if err := <- retErrC ; err != nil {
575- return fmt .Errorf ("requesting chunks: %v" , err )
573+ if ! success {
574+ return fmt .Errorf ("Test failed, chunks not available on all nodes" )
575+ }
576+ if err := <- retErrC ; err != nil {
577+ return fmt .Errorf ("requesting chunks: %v" , err )
578+ }
579+ log .Debug ("Test terminated successfully" )
580+ return nil
581+ })
582+ if result .Error != nil {
583+ t .Fatal (result .Error )
576584 }
577- log .Debug ("Test terminated successfully" )
578- return nil
579585 })
580- if result .Error != nil {
581- t .Fatal (result .Error )
582- }
583586}
584587
585588func BenchmarkDeliveryFromNodesWithoutCheck (b * testing.B ) {
0 commit comments