Skip to content
This repository was archived by the owner on May 25, 2025. It is now read-only.

Commit 64cc859

Browse files
feat: ProgressLogger
Refactor transformLogProgress to use ProgressLogger underneath.
1 parent 792589f commit 64cc859

File tree

6 files changed

+331
-253
lines changed

6 files changed

+331
-253
lines changed

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export * from './stream/readable/readableMap'
4545
export * from './stream/readable/readableMapToArray'
4646
export * from './stream/readable/readableToArray'
4747
export * from './stream/stream.model'
48+
export * from './stream/progressLogger'
4849
export * from './stream/transform/transformBuffer'
4950
export * from './stream/transform/transformFilter'
5051
export * from './stream/transform/transformLimit'

src/stream/progressLogger.ts

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
import { inspect, InspectOptions } from 'node:util'
2+
import {
3+
_mb,
4+
_since,
5+
AnyObject,
6+
CommonLogger,
7+
localTimeNow,
8+
SimpleMovingAverage,
9+
UnixTimestampMillisNumber,
10+
} from '@naturalcycles/js-lib'
11+
import { boldWhite, dimGrey, hasColors, white, yellow } from '../colors/colors'
12+
import { SizeStack } from './sizeStack'
13+
14+
export interface ProgressLoggerCfg<IN = any> {
15+
/**
16+
* Progress metric
17+
*
18+
* @default `progress`
19+
*/
20+
metric?: string
21+
22+
/**
23+
* Include `heapUsed` in log.
24+
*
25+
* @default false
26+
*/
27+
heapUsed?: boolean
28+
29+
/**
30+
* Include `heapTotal` in log.
31+
*
32+
* @default false
33+
*/
34+
heapTotal?: boolean
35+
36+
/**
37+
* Include `rss` in log.
38+
*
39+
* @default true
40+
*/
41+
rss?: boolean
42+
43+
/**
44+
* Incude Peak RSS in log.
45+
*
46+
* @default true
47+
*/
48+
peakRSS?: boolean
49+
50+
/**
51+
* Include `external` in log.
52+
*
53+
* @default false
54+
*/
55+
external?: boolean
56+
57+
/**
58+
* Include `arrayBuffers` in log.
59+
*
60+
* @default false
61+
*/
62+
arrayBuffers?: boolean
63+
64+
/**
65+
* Log (rss - heapTotal)
66+
* For convenience of debugging "out-of-heap" memory size.
67+
*
68+
* @default false
69+
*/
70+
rssMinusHeap?: boolean
71+
72+
/**
73+
* Log "rows per second"
74+
*
75+
* @default true
76+
*/
77+
logRPS?: boolean
78+
79+
/**
80+
* Set to false to disable logging progress
81+
*
82+
* @default true
83+
*/
84+
logProgress?: boolean
85+
86+
/**
87+
* Log progress event Nth record that is _processed_ (went through mapper).
88+
* Set to 0 to disable logging.
89+
*
90+
* @default 1000
91+
*/
92+
logEvery?: number
93+
94+
logger?: CommonLogger
95+
96+
/**
97+
* Function to return extra properties to the "progress object".
98+
*
99+
* chunk is undefined for "final" stats, otherwise is defined.
100+
*/
101+
extra?: (chunk: IN | undefined, index: number) => AnyObject
102+
103+
/**
104+
* If specified - will multiply the counter by this number.
105+
* Useful e.g when using `transformBuffer({ batchSize: 500 })`, so
106+
* it'll accurately represent the number of processed entries (not batches).
107+
*
108+
* Defaults to 1.
109+
*/
110+
batchSize?: number
111+
112+
/**
113+
* Experimental logging of item (shunk) sizes, when json-stringified.
114+
*
115+
* Defaults to false.
116+
*
117+
* @experimental
118+
*/
119+
logSizes?: boolean
120+
121+
/**
122+
* How many last item sizes to keep in a buffer, to calculate stats (p50, p90, avg, etc).
123+
* Defaults to 100_000.
124+
* Cannot be Infinity.
125+
*/
126+
logSizesBuffer?: number
127+
128+
/**
129+
* Works in addition to `logSizes`. Adds "zipped sizes".
130+
*
131+
* @experimental
132+
*/
133+
logZippedSizes?: boolean
134+
}
135+
136+
export interface ProgressLogItem extends AnyObject {
137+
heapUsed?: number
138+
heapTotal?: number
139+
rss?: number
140+
peakRSS?: number
141+
rssMinusHeap?: number
142+
external?: number
143+
arrayBuffers?: number
144+
rps10?: number
145+
rpsTotal?: number
146+
}
147+
148+
const inspectOpt: InspectOptions = {
149+
colors: hasColors,
150+
breakLength: 300,
151+
}
152+
153+
export class ProgressLogger<IN> implements Disposable {
154+
constructor(cfg: ProgressLoggerCfg<IN> = {}) {
155+
this.cfg = {
156+
metric: 'progress',
157+
rss: true,
158+
peakRSS: true,
159+
logRPS: true,
160+
logEvery: 1000,
161+
logSizesBuffer: 100_000,
162+
batchSize: 1,
163+
logger: console,
164+
logProgress: cfg.logProgress !== false && cfg.logEvery !== 0,
165+
...cfg,
166+
}
167+
this.logEvery10 = this.cfg.logEvery * 10
168+
169+
this.start()
170+
this.logStats() // initial
171+
}
172+
173+
cfg!: ProgressLoggerCfg<IN> & {
174+
logEvery: number
175+
logSizesBuffer: number
176+
batchSize: number
177+
metric: string
178+
logger: CommonLogger
179+
}
180+
181+
private started!: UnixTimestampMillisNumber
182+
private lastSecondStarted!: UnixTimestampMillisNumber
183+
private sma!: SimpleMovingAverage
184+
private logEvery10!: number
185+
private processedLastSecond!: number
186+
private progress!: number
187+
private peakRSS!: number
188+
private sizes?: SizeStack
189+
private sizesZipped?: SizeStack
190+
191+
private start(): void {
192+
this.started = Date.now()
193+
this.lastSecondStarted = Date.now()
194+
this.sma = new SimpleMovingAverage(10)
195+
this.processedLastSecond = 0
196+
this.progress = 0
197+
this.peakRSS = 0
198+
this.sizes = this.cfg.logSizes ? new SizeStack('json', this.cfg.logSizesBuffer) : undefined
199+
this.sizesZipped = this.cfg.logZippedSizes
200+
? new SizeStack('json.gz', this.cfg.logSizesBuffer)
201+
: undefined
202+
}
203+
204+
log(chunk?: IN): void {
205+
this.progress++
206+
this.processedLastSecond++
207+
208+
if (this.sizes) {
209+
// Check it, cause gzipping might be delayed here..
210+
void SizeStack.countItem(chunk, this.cfg.logger, this.sizes, this.sizesZipped)
211+
}
212+
213+
if (this.cfg.logProgress && this.progress % this.cfg.logEvery === 0) {
214+
this.logStats(chunk, false, this.progress % this.logEvery10 === 0)
215+
}
216+
}
217+
218+
done(): void {
219+
this.logStats(undefined, true)
220+
}
221+
222+
[Symbol.dispose](): void {
223+
this.done()
224+
}
225+
226+
private logStats(chunk?: IN, final = false, tenx = false): void {
227+
if (!this.cfg.logProgress) return
228+
229+
const {
230+
metric,
231+
extra,
232+
batchSize,
233+
heapUsed: logHeapUsed,
234+
heapTotal: logHeapTotal,
235+
rss: logRss,
236+
peakRSS: logPeakRss,
237+
rssMinusHeap,
238+
external,
239+
arrayBuffers,
240+
logRPS,
241+
logger,
242+
} = this.cfg
243+
244+
const mem = process.memoryUsage()
245+
246+
const now = Date.now()
247+
const batchedProgress = this.progress * batchSize
248+
const lastRPS =
249+
(this.processedLastSecond * batchSize) / ((now - this.lastSecondStarted) / 1000) || 0
250+
const rpsTotal = Math.round(batchedProgress / ((now - this.started) / 1000)) || 0
251+
this.lastSecondStarted = now
252+
this.processedLastSecond = 0
253+
254+
const rps10 = Math.round(this.sma.pushGetAvg(lastRPS))
255+
if (mem.rss > this.peakRSS) this.peakRSS = mem.rss
256+
257+
const o: ProgressLogItem = {
258+
[final ? `${this.cfg.metric}_final` : this.cfg.metric]: batchedProgress,
259+
}
260+
261+
if (extra) Object.assign(o, extra(chunk, this.progress))
262+
if (logHeapUsed) o.heapUsed = _mb(mem.heapUsed)
263+
if (logHeapTotal) o.heapTotal = _mb(mem.heapTotal)
264+
if (logRss) o.rss = _mb(mem.rss)
265+
if (logPeakRss) o.peakRSS = _mb(this.peakRSS)
266+
if (rssMinusHeap) o.rssMinusHeap = _mb(mem.rss - mem.heapTotal)
267+
if (external) o.external = _mb(mem.external)
268+
if (arrayBuffers) o.arrayBuffers = _mb(mem.arrayBuffers || 0)
269+
270+
if (logRPS) Object.assign(o, { rps10, rpsTotal })
271+
272+
logger.log(inspect(o, inspectOpt))
273+
274+
if (this.sizes?.items.length) {
275+
logger.log(this.sizes.getStats())
276+
277+
if (this.sizesZipped?.items.length) {
278+
logger.log(this.sizesZipped.getStats())
279+
}
280+
}
281+
282+
if (tenx) {
283+
let perHour: number | string =
284+
Math.round((batchedProgress * 1000 * 60 * 60) / (now - this.started)) || 0
285+
if (perHour > 900) {
286+
perHour = Math.round(perHour / 1000) + 'K'
287+
}
288+
289+
logger.log(
290+
`${dimGrey(localTimeNow().toPretty())} ${white(metric)} took ${yellow(
291+
_since(this.started),
292+
)} so far to process ${yellow(batchedProgress)} rows, ~${yellow(perHour)}/hour`,
293+
)
294+
} else if (final) {
295+
logger.log(
296+
`${boldWhite(metric)} took ${yellow(_since(this.started))} to process ${yellow(
297+
batchedProgress,
298+
)} rows with total RPS of ${yellow(rpsTotal)}`,
299+
)
300+
}
301+
}
302+
}
303+
304+
/**
305+
* Create new ProgressLogger.
306+
*/
307+
export function progressLogger<IN>(cfg: ProgressLoggerCfg<IN> = {}): ProgressLogger<IN> {
308+
return new ProgressLogger(cfg)
309+
}

src/stream/readable/readableForEach.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { transformMap, TransformMapOptions } from '../transform/transformMap'
66
* Convenience function to do `.forEach` over a Readable.
77
* Typed! (unlike default Readable).
88
*
9+
* Try native readable.forEach() instead!
10+
*
911
* @experimental
1012
*/
1113
export async function readableForEach<T>(

src/stream/readable/readableMapToArray.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { TransformMapOptions } from '../transform/transformMap'
88
* passing each result via `transformMap`.
99
*
1010
* Warning! All results are stored in memory (no backpressure).
11+
*
12+
* Try native readable.toArray instead!
1113
*/
1214
export async function readableMapToArray<IN, OUT = IN>(
1315
stream: ReadableTyped<IN>,

src/stream/transform/transformLimit.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Readable } from 'node:stream'
12
import { _range } from '@naturalcycles/js-lib'
23
import { readableFromArray } from '../..'
34
import { _pipelineToArray } from '../pipeline/pipeline'
@@ -31,3 +32,12 @@ test('transformLimit with readable.destroy', async () => {
3132

3233
expect(arr).toEqual(data.slice(0, 5))
3334
})
35+
36+
test('using .take', async () => {
37+
const data = _range(1, 50).map(n => ({ id: String(n) }))
38+
const readable = Readable.from(data)
39+
40+
const arr = await readable.take(5).toArray()
41+
42+
expect(arr).toEqual(data.slice(0, 5))
43+
})

0 commit comments

Comments
 (0)