@@ -81,8 +81,7 @@ module Node.Stream.Aff
81
81
, end
82
82
, toStringUTF8
83
83
, fromStringUTF8
84
- )
85
- where
84
+ ) where
86
85
87
86
import Prelude
88
87
@@ -105,7 +104,6 @@ import Node.Stream (Readable, Writable)
105
104
import Node.Stream as Stream
106
105
import Node.Stream.Aff.Internal (onceDrain , onceEnd , onceError , onceReadable , readable )
107
106
108
-
109
107
-- | Wait until there is some data available from the stream, then read it.
110
108
-- |
111
109
-- | This function is useful for streams like __stdin__ which never
@@ -140,44 +138,43 @@ readSome r = liftAff <<< makeAff $ \res -> do
140
138
removeEnd
141
139
res (Right (Tuple [] false ))
142
140
143
-
144
141
ret1 <- liftST $ Array.ST .unsafeFreeze bufs
145
142
readagain <- readable r
146
- removeReadable <- if readagain && Array .length ret1 == 0 then do
147
- -- if still readable and we couldn't read anything right away,
148
- -- then wait for the readable event.
149
- -- “The 'readable' event will also be emitted once the end of the
150
- -- stream data has been reached but before the 'end' event is emitted.”
151
- -- if not readable then this was a zero-length Readable stream.
152
- -- https://nodejs.org/api/stream.html#event-readable
153
- onceReadable r do
154
- catchException (res <<< Left ) do
155
- untilE do
156
- Stream .read r Nothing >>= case _ of
157
- Nothing -> pure true
158
- Just chunk -> do
159
- void $ liftST $ Array.ST .push chunk bufs
160
- pure false
161
- ret2 <- liftST $ Array.ST .unsafeFreeze bufs
162
- removeError
163
- removeEnd
164
- readagain2 <- readable r
165
- res (Right (Tuple ret2 readagain2))
143
+ removeReadable <-
144
+ if readagain && Array .length ret1 == 0 then do
145
+ -- if still readable and we couldn't read anything right away,
146
+ -- then wait for the readable event.
147
+ -- “The 'readable' event will also be emitted once the end of the
148
+ -- stream data has been reached but before the 'end' event is emitted.”
149
+ -- if not readable then this was a zero-length Readable stream.
150
+ -- https://nodejs.org/api/stream.html#event-readable
151
+ onceReadable r do
152
+ catchException (res <<< Left ) do
153
+ untilE do
154
+ Stream .read r Nothing >>= case _ of
155
+ Nothing -> pure true
156
+ Just chunk -> do
157
+ void $ liftST $ Array.ST .push chunk bufs
158
+ pure false
159
+ ret2 <- liftST $ Array.ST .unsafeFreeze bufs
160
+ removeError
161
+ removeEnd
162
+ readagain2 <- readable r
163
+ res (Right (Tuple ret2 readagain2))
166
164
167
165
-- return what we read right away
168
- else do
169
- removeError
170
- removeEnd
171
- res (Right (Tuple ret1 readagain))
172
- pure (pure unit) -- dummy canceller
166
+ else do
167
+ removeError
168
+ removeEnd
169
+ res (Right (Tuple ret1 readagain))
170
+ pure (pure unit) -- dummy canceller
173
171
174
172
-- canceller might by called while waiting for `onceReadable`
175
173
pure $ effectCanceler do
176
174
removeError
177
175
removeEnd
178
176
removeReadable
179
177
180
-
181
178
-- | Read all data until the end of the stream.
182
179
-- |
183
180
-- | Note that __stdin__ will never end.
@@ -243,7 +240,6 @@ readAll r = liftAff <<< makeAff $ \res -> do
243
240
removeEnd
244
241
join $ Ref .read removeReadable
245
242
246
-
247
243
-- | Wait for *N* bytes to become available from the stream.
248
244
-- |
249
245
-- | If more than *N* bytes are available on the stream, then
@@ -288,12 +284,12 @@ readN r n = liftAff <<< makeAff $ \res -> do
288
284
-- “If size bytes are not available to be read, null will be returned
289
285
-- unless the stream has ended, in which case all of the data remaining
290
286
-- in the internal buffer will be returned.”
291
- Stream .read r (Just (n- red)) >>= case _ of
287
+ Stream .read r (Just (n - red)) >>= case _ of
292
288
Nothing -> pure true
293
289
Just chunk -> do
294
290
_ <- liftST $ Array.ST .push chunk bufs
295
291
s <- Buffer .size chunk
296
- red' <- Ref .modify (_+ s) redRef
292
+ red' <- Ref .modify (_ + s) redRef
297
293
if red' >= n then
298
294
pure true
299
295
else
@@ -331,7 +327,6 @@ readN r n = liftAff <<< makeAff $ \res -> do
331
327
removeEnd
332
328
join $ Ref .read removeReadable
333
329
334
-
335
330
-- | Write to a stream.
336
331
-- |
337
332
-- | Will complete after the data is flushed to the stream.
@@ -368,7 +363,7 @@ write w bs = liftAff <<< makeAff $ \res -> do
368
363
Nothing -> do
369
364
pure true
370
365
Just chunk -> do
371
- isLast <- liftST $ (_== 0 ) <$> Array .length <$> Array.ST .unsafeFreeze bufs
366
+ isLast <- liftST $ (_ == 0 ) <$> Array .length <$> Array.ST .unsafeFreeze bufs
372
367
nobackpressure <- Stream .write w chunk (if isLast then callbackLast else callback)
373
368
if nobackpressure then do
374
369
pure false
@@ -403,7 +398,7 @@ end w = liftAff <<< makeAff $ \res -> do
403
398
Just err -> res (Left err)
404
399
pure $ nonCanceler
405
400
406
- -- | Concatenate an `Array` of UTF-8 encoded `Buffer`s into a `String`.
401
+ -- | Concatenate an `Array` of UTF-8 encoded `Buffer`s into a `String`.
407
402
toStringUTF8 :: forall m . MonadEffect m => Array Buffer -> m String
408
403
toStringUTF8 bs = liftEffect $ Buffer .toString Encoding.UTF8 =<< Buffer .concat bs
409
404
0 commit comments