Skip to content
This repository was archived by the owner on May 25, 2025. It is now read-only.

Commit 3574cd3

Browse files
feat: refactor stream/Readable things
ReadableTyped is now better typed to support iterator helpers
1 parent 64cc859 commit 3574cd3

14 files changed

+133
-121
lines changed

src/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ export * from './stream/pipeline/pipeline'
4141
export * from './stream/readable/readableCreate'
4242
export * from './stream/readable/readableForEach'
4343
export * from './stream/readable/readableFromArray'
44-
export * from './stream/readable/readableMap'
45-
export * from './stream/readable/readableMapToArray'
4644
export * from './stream/readable/readableToArray'
4745
export * from './stream/stream.model'
4846
export * from './stream/progressLogger'

src/stream/progressLogger.ts

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import {
1010
} from '@naturalcycles/js-lib'
1111
import { boldWhite, dimGrey, hasColors, white, yellow } from '../colors/colors'
1212
import { SizeStack } from './sizeStack'
13+
import { ReadableMapper } from './stream.model'
1314

14-
export interface ProgressLoggerCfg<IN = any> {
15+
export interface ProgressLoggerCfg<T = any> {
1516
/**
1617
* Progress metric
1718
*
@@ -98,7 +99,7 @@ export interface ProgressLoggerCfg<IN = any> {
9899
*
99100
* chunk is undefined for "final" stats, otherwise is defined.
100101
*/
101-
extra?: (chunk: IN | undefined, index: number) => AnyObject
102+
extra?: (chunk: T | undefined, index: number) => AnyObject
102103

103104
/**
104105
* If specified - will multiply the counter by this number.
@@ -150,8 +151,8 @@ const inspectOpt: InspectOptions = {
150151
breakLength: 300,
151152
}
152153

153-
export class ProgressLogger<IN> implements Disposable {
154-
constructor(cfg: ProgressLoggerCfg<IN> = {}) {
154+
export class ProgressLogger<T> implements Disposable {
155+
constructor(cfg: ProgressLoggerCfg<T> = {}) {
155156
this.cfg = {
156157
metric: 'progress',
157158
rss: true,
@@ -170,7 +171,7 @@ export class ProgressLogger<IN> implements Disposable {
170171
this.logStats() // initial
171172
}
172173

173-
cfg!: ProgressLoggerCfg<IN> & {
174+
cfg!: ProgressLoggerCfg<T> & {
174175
logEvery: number
175176
logSizesBuffer: number
176177
batchSize: number
@@ -201,7 +202,7 @@ export class ProgressLogger<IN> implements Disposable {
201202
: undefined
202203
}
203204

204-
log(chunk?: IN): void {
205+
log(chunk?: T): void {
205206
this.progress++
206207
this.processedLastSecond++
207208

@@ -223,7 +224,7 @@ export class ProgressLogger<IN> implements Disposable {
223224
this.done()
224225
}
225226

226-
private logStats(chunk?: IN, final = false, tenx = false): void {
227+
private logStats(chunk?: T, final = false, tenx = false): void {
227228
if (!this.cfg.logProgress) return
228229

229230
const {
@@ -304,6 +305,20 @@ export class ProgressLogger<IN> implements Disposable {
304305
/**
305306
* Create new ProgressLogger.
306307
*/
307-
export function progressLogger<IN>(cfg: ProgressLoggerCfg<IN> = {}): ProgressLogger<IN> {
308+
export function progressLogger<T>(cfg: ProgressLoggerCfg<T> = {}): ProgressLogger<T> {
308309
return new ProgressLogger(cfg)
309310
}
311+
312+
/**
313+
* Limitation: I don't know how to catch the `final` callback to log final stats.
314+
*
315+
* @experimental
316+
*/
317+
export function progressReadableMapper<T>(cfg: ProgressLoggerCfg<T> = {}): ReadableMapper<T, T> {
318+
const progress = new ProgressLogger(cfg)
319+
320+
return chunk => {
321+
progress.log(chunk)
322+
return chunk
323+
}
324+
}

src/stream/readable/readableCreate.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ export function readableCreate<T>(
3434
* Convenience type-safe wrapper around Readable.from() that infers the Type of input.
3535
*/
3636
export function readableFrom<T>(
37-
items: Iterable<T> | AsyncIterable<T>,
37+
iterable: Iterable<T> | AsyncIterable<T>,
3838
opt?: ReadableOptions,
3939
): ReadableTyped<T> {
40-
return Readable.from(items, opt)
40+
return Readable.from(iterable, opt)
4141
}
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import { _range, pDelay } from '@naturalcycles/js-lib'
2-
import { _pipelineToArray, readableFromArray } from '../../index'
2+
import { readableFromArray } from '../../index'
33

44
test('readableFromArray', async () => {
55
const items = _range(1, 11)
66

77
const readable = readableFromArray(items, async item => await pDelay(10, item))
88

9-
const r = await _pipelineToArray([readable])
10-
11-
// jestLog('pipeline done')
9+
const r = await readable.toArray()
1210

1311
expect(r).toEqual(items)
1412
})

src/stream/readable/readableMap.test.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/stream/readable/readableMap.ts

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/stream/readable/readableMapToArray.test.ts

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/stream/readable/readableMapToArray.ts

Lines changed: 0 additions & 24 deletions
This file was deleted.

src/stream/readable/readableToArray.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ import { ReadableTyped } from '../stream.model'
33
/**
44
* Convenience function to read the whole Readable stream into Array (in-memory)
55
* and return that array.
6+
*
7+
* Native `await readable.toArray()` can be used instead.
8+
* This helper is kept for type-safery support.
69
*/
710
export async function readableToArray<T>(readable: ReadableTyped<T>): Promise<T[]> {
8-
const a: T[] = []
9-
10-
for await (const item of readable) {
11-
a.push(item)
12-
}
13-
14-
return a
11+
return await readable.toArray()
12+
// const a: T[] = []
13+
//
14+
// for await (const item of readable) {
15+
// a.push(item)
16+
// }
17+
//
18+
// return a
1519
}

src/stream/stream.model.ts

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,55 @@
1-
import { Readable, Transform, Writable } from 'node:stream'
1+
import type { Readable, Transform, Writable } from 'node:stream'
2+
import type { Promisable } from '@naturalcycles/js-lib'
23

3-
// eslint-disable-next-line unused-imports/no-unused-vars
4-
export interface ReadableTyped<T> extends Readable {}
4+
export interface ReadableSignalOptions {
5+
/** allows destroying the stream if the signal is aborted. */
6+
signal?: AbortSignal
7+
}
8+
9+
export interface ReadableArrayOptions {
10+
/** the maximum concurrent invocations of `fn` to call on the stream at once. **Default: 1**. */
11+
concurrency?: number
12+
/** allows destroying the stream if the signal is aborted. */
13+
signal?: AbortSignal
14+
}
15+
16+
export type ReadableMapper<IN, OUT> = (data: IN, opt?: ReadableSignalOptions) => Promisable<OUT>
17+
18+
export type ReadableFlatMapper<IN, OUT> = (
19+
data: IN,
20+
opt?: ReadableSignalOptions,
21+
) => Promisable<OUT[]>
22+
23+
export type ReadableVoidMapper<IN> = (data: IN, opt?: ReadableSignalOptions) => void | Promise<void>
24+
25+
export type ReadablePredicate<IN> = (
26+
data: IN,
27+
opt?: ReadableSignalOptions,
28+
) => boolean | Promise<boolean>
29+
30+
export interface ReadableTyped<T> extends Readable {
31+
toArray: (opt?: ReadableSignalOptions) => Promise<T[]>
32+
33+
map: <OUT>(mapper: ReadableMapper<T, OUT>, opt?: ReadableArrayOptions) => ReadableTyped<OUT>
34+
35+
flatMap: <OUT>(
36+
mapper: ReadableFlatMapper<T, OUT>,
37+
opt?: ReadableArrayOptions,
38+
) => ReadableTyped<OUT>
39+
40+
filter: (predicate: ReadablePredicate<T>, opt?: ReadableArrayOptions) => ReadableTyped<T>
41+
42+
forEach: (mapper: ReadableVoidMapper<T>, opt?: ReadableArrayOptions) => Promise<void>
43+
44+
take: (limit: number, opt?: ReadableSignalOptions) => ReadableTyped<T>
45+
drop: (limit: number, opt?: ReadableSignalOptions) => ReadableTyped<T>
46+
}
547

648
// eslint-disable-next-line unused-imports/no-unused-vars
749
export interface WritableTyped<T> extends Writable {}
850

951
// eslint-disable-next-line unused-imports/no-unused-vars
10-
export interface TransformTyped<IN, OUT = IN> extends Transform {}
52+
export interface TransformTyped<IN, OUT> extends Transform {}
1153

1254
export interface TransformOptions {
1355
/**

0 commit comments

Comments
 (0)