@@ -4,12 +4,12 @@ const extend = require('deep-extend')
44const UnixFS = require ( 'ipfs-unixfs' )
55const pull = require ( 'pull-stream/pull' )
66const values = require ( 'pull-stream/sources/values' )
7- const asyncMap = require ( 'pull-stream/throughs/async-map' )
8- const map = require ( 'pull-stream/throughs/map' )
97const collect = require ( 'pull-stream/sinks/collect' )
10- const through = require ( 'pull-through' )
8+ const through = require ( 'pull-stream/throughs/through' )
9+ const pullThrough = require ( 'pull-through' )
1110const parallel = require ( 'async/parallel' )
1211const waterfall = require ( 'async/waterfall' )
12+ const paraMap = require ( 'pull-paramap' )
1313const persist = require ( '../utils/persist' )
1414const reduce = require ( './reduce' )
1515const {
@@ -106,52 +106,55 @@ module.exports = function builder (createChunker, ipld, createReducer, _options)
106106 pull (
107107 file . content ,
108108 chunker ,
109- map ( chunk => {
109+ through ( chunk => {
110110 if ( options . progress && typeof options . progress === 'function' ) {
111111 options . progress ( chunk . byteLength )
112112 }
113- return Buffer . from ( chunk )
114113 } ) ,
115- asyncMap ( ( buffer , callback ) => {
116- if ( options . rawLeaves ) {
117- return callback ( null , {
118- size : buffer . length ,
119- leafSize : buffer . length ,
120- data : buffer
121- } )
122- }
123-
124- const file = new UnixFS ( options . leafType , buffer )
114+ paraMap ( ( buffer , callback ) => {
115+ waterfall ( [
116+ ( cb ) => {
117+ if ( options . rawLeaves ) {
118+ return cb ( null , {
119+ size : buffer . length ,
120+ leafSize : buffer . length ,
121+ data : buffer
122+ } )
123+ }
125124
126- DAGNode . create ( file . marshal ( ) , [ ] , ( err , node ) => {
127- if ( err ) {
128- return callback ( err )
125+ const file = new UnixFS ( options . leafType , buffer )
126+
127+ DAGNode . create ( file . marshal ( ) , [ ] , ( err , node ) => {
128+ if ( err ) {
129+ return cb ( err )
130+ }
131+
132+ cb ( null , {
133+ size : node . size ,
134+ leafSize : file . fileSize ( ) ,
135+ data : node
136+ } )
137+ } )
138+ } ,
139+ ( leaf , cb ) => {
140+ persist ( leaf . data , ipld , options , ( error , results ) => {
141+ if ( error ) {
142+ return cb ( error )
143+ }
144+
145+ cb ( null , {
146+ size : leaf . size ,
147+ leafSize : leaf . leafSize ,
148+ data : results . node ,
149+ multihash : results . cid . buffer ,
150+ path : leaf . path ,
151+ name : ''
152+ } )
153+ } )
129154 }
130-
131- callback ( null , {
132- size : node . size ,
133- leafSize : file . fileSize ( ) ,
134- data : node
135- } )
136- } )
137- } ) ,
138- asyncMap ( ( leaf , callback ) => {
139- persist ( leaf . data , ipld , options , ( error , results ) => {
140- if ( error ) {
141- return callback ( error )
142- }
143-
144- callback ( null , {
145- size : leaf . size ,
146- leafSize : leaf . leafSize ,
147- data : results . node ,
148- multihash : results . cid . buffer ,
149- path : leaf . path ,
150- name : ''
151- } )
152- } )
155+ ] , callback )
153156 } ) ,
154- through ( // mark as single node if only one single node
157+ pullThrough ( // mark as single node if only one single node
155158 function onData ( data ) {
156159 count ++
157160 if ( previous ) {
0 commit comments