@@ -7,7 +7,7 @@ const base32 = require('base32.js')
77const  path  =  require ( 'path' ) 
88const  write  =  require ( 'pull-write' ) 
99const  parallel  =  require ( 'run-parallel' ) 
10- const  through  =  require ( 'pull-through ' ) 
10+ const  defer  =  require ( 'pull-defer/source ' ) 
1111
1212const  PREFIX_LENGTH  =  5 
1313
@@ -52,19 +52,25 @@ exports.setUp = (basePath, BlobStore, locks) => {
5252      } 
5353
5454      const  p  =  multihashToPath ( key ,  extension ) 
55+       const  deferred  =  defer ( ) 
56+ 
57+       lock ( p ,  ( release )  =>  { 
58+         const  ext  =  extension  ===  'data'  ? 'protobuf'  : extension 
59+         pull ( 
60+           store . read ( p ) , 
61+           pull . collect ( release ( ( err ,  data )  =>  { 
62+             if  ( err )  { 
63+               return  deferred . abort ( err ) 
64+             } 
65+ 
66+             deferred . resolve ( pull . values ( [ 
67+               new  Block ( Buffer . concat ( data ) ,  ext ) 
68+             ] ) ) 
69+           } ) ) 
70+         ) 
71+       } ) 
5572
56-       const  ext  =  extension  ===  'data'  ? 'protobuf'  : extension 
57-       let  data  =  [ ] 
58- 
59-       return  pull ( 
60-         store . read ( p ) , 
61-         through ( function  ( values )  { 
62-           data  =  data . concat ( values ) 
63-         } ,  function  ( )  { 
64-           this . queue ( new  Block ( Buffer . concat ( data ) ,  ext ) ) 
65-           this . queue ( null ) 
66-         } ) 
67-       ) 
73+       return  deferred 
6874    } , 
6975
7076    putStream  ( )  { 
@@ -75,7 +81,10 @@ exports.setUp = (basePath, BlobStore, locks) => {
7581      const  sink  =  write ( ( blocks ,  cb )  =>  { 
7682        parallel ( blocks . map ( ( block )  =>  ( cb )  =>  { 
7783          writeBlock ( block ,  ( err ,  meta )  =>  { 
78-             if  ( err )  return  cb ( err ) 
84+             if  ( err )  { 
85+               return  cb ( err ) 
86+             } 
87+ 
7988            if  ( push )  { 
8089              const  read  =  push 
8190              push  =  null 
@@ -94,7 +103,9 @@ exports.setUp = (basePath, BlobStore, locks) => {
94103
95104      const  source  =  ( end ,  cb )  =>  { 
96105        if  ( end )  ended  =  end 
97-         if  ( ended )  return  cb ( ended ) 
106+         if  ( ended )  { 
107+           return  cb ( ended ) 
108+         } 
98109
99110        if  ( written . length )  { 
100111          return  cb ( null ,  written . shift ( ) ) 
0 commit comments