Skip to content

Commit e9711a9

Browse files
committed
feat: handle huge stream inputs
1 parent 82780cc commit e9711a9

File tree

4 files changed

+1235
-0
lines changed

4 files changed

+1235
-0
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { StreamListDiff } from "@models/stream";
2+
3+
type Listener<T extends unknown[]> = (...args: T) => void;
4+
5+
export enum StreamEvent {
6+
Data = "data",
7+
Finish = "finish",
8+
Error = "error",
9+
}
10+
11+
export type Emitter<T extends Record<string, unknown>> = EventEmitter<{
12+
data: [StreamListDiff<T>[]];
13+
error: [Error];
14+
finish: [];
15+
}>;
16+
17+
export class EventEmitter<Events extends Record<string, unknown[]>> {
18+
private events: Record<string, Listener<unknown[]>[]> = {};
19+
20+
on<E extends keyof Events>(event: E, listener: Listener<Events[E]>): this {
21+
if (!this.events[event as string]) {
22+
this.events[event as string] = [];
23+
}
24+
this.events[event as string].push(listener as Listener<unknown[]>);
25+
return this;
26+
}
27+
28+
emit<E extends keyof Events>(event: E, ...args: Events[E]): void {
29+
if (this.events[event as string]) {
30+
this.events[event as string].forEach((listener) => listener(...args));
31+
}
32+
}
33+
}
34+
35+
export type EmitterEvents<T extends Record<string, unknown>> = {
36+
data: [StreamListDiff<T>[]];
37+
error: [Error];
38+
finish: [];
39+
};
40+
41+
export interface StreamListener<T extends Record<string, unknown>> {
42+
on<E extends keyof EmitterEvents<T>>(
43+
event: E,
44+
listener: Listener<EmitterEvents<T>[E]>,
45+
): this;
46+
}
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
import {
2+
DataBuffer,
3+
DEFAULT_LIST_STREAM_OPTIONS,
4+
ListStreamOptions,
5+
ReferenceProperty,
6+
StreamListDiff,
7+
} from "@models/stream";
8+
import { LIST_STATUS } from "@models/list";
9+
import { isObject } from "@lib/utils";
10+
import {
11+
Emitter,
12+
EmitterEvents,
13+
EventEmitter,
14+
StreamListener,
15+
StreamEvent,
16+
} from "./emitter";
17+
import type { Readable } from "stream";
18+
19+
const isNode =
20+
typeof process !== "undefined" &&
21+
process.release &&
22+
process?.release?.name === "node";
23+
24+
// let ReadableStreamImpl: unknown;
25+
// if (isNode) {
26+
// // eslint-disable-next-line @typescript-eslint/no-require-imports
27+
// ReadableStreamImpl = require("stream").Readable;
28+
// } else {
29+
// ReadableStreamImpl = globalThis.ReadableStream;
30+
// }
31+
32+
// async function streamToList<T>(stream: any): Promise<T[]> {
33+
// const list: T[] = [];
34+
// if (isNode) {
35+
// // Node.js stream handling
36+
// for await (const chunk of stream) {
37+
// list.push(chunk);
38+
// }
39+
// } else {
40+
// // Web Streams API handling (browser)
41+
// const reader = stream.getReader();
42+
// let done: boolean | undefined = false;
43+
// while (!done) {
44+
// const { value, done: streamDone } = await reader.read();
45+
// done = streamDone;
46+
// if (value) list.push(value);
47+
// }
48+
// }
49+
// return list;
50+
// }
51+
52+
function outputDiffChunk<T extends Record<string, unknown>>(
53+
emitter: Emitter<T>,
54+
) {
55+
let chunks: StreamListDiff<T>[] = [];
56+
57+
function handleDiffChunk(
58+
chunk: StreamListDiff<T>,
59+
options: ListStreamOptions,
60+
): void {
61+
const showChunk = options?.showOnly
62+
? options?.showOnly.includes(chunk.status)
63+
: true;
64+
if (!showChunk) {
65+
return;
66+
}
67+
if ((options.chunksSize as number) > 0) {
68+
chunks.push(chunk);
69+
if (chunks.length >= (options.chunksSize as number)) {
70+
const output = chunks;
71+
chunks = [];
72+
return emitter.emit(StreamEvent.Data, output);
73+
} else {
74+
return;
75+
}
76+
}
77+
return emitter.emit(StreamEvent.Data, [chunk]);
78+
}
79+
80+
function releaseLastChunks() {
81+
if (chunks.length > 0) {
82+
const output = chunks;
83+
chunks = [];
84+
return emitter.emit(StreamEvent.Data, output);
85+
}
86+
}
87+
88+
return {
89+
handleDiffChunk,
90+
releaseLastChunks,
91+
};
92+
}
93+
94+
function isValidChunkSize(
95+
chunksSize: ListStreamOptions["chunksSize"],
96+
): boolean {
97+
if (!chunksSize) return true;
98+
const sign = String(Math.sign(chunksSize));
99+
return sign !== "-1" && sign !== "NaN";
100+
}
101+
102+
function isDataValid<T extends Record<string, unknown>>(
103+
data: T,
104+
referenceProperty: ReferenceProperty<T>,
105+
listType: "prevList" | "nextList",
106+
): { isValid: boolean; message?: string } {
107+
if (!isObject(data)) {
108+
return {
109+
isValid: false,
110+
message: `Your ${listType} must only contain valid objects. Found '${data}'`,
111+
};
112+
}
113+
if (!Object.hasOwn(data, referenceProperty)) {
114+
return {
115+
isValid: false,
116+
message: `The reference property '${String(referenceProperty)}' is not available in all the objects of your ${listType}.`,
117+
};
118+
}
119+
return {
120+
isValid: true,
121+
message: "",
122+
};
123+
}
124+
125+
async function getDiffChunks<T extends Record<string, unknown>>(
126+
prevStream: Readable,
127+
nextStream: Readable,
128+
referenceProperty: ReferenceProperty<T>,
129+
emitter: Emitter<T>,
130+
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
131+
): Promise<void> {
132+
if (!isValidChunkSize(options?.chunksSize)) {
133+
return emitter.emit(
134+
StreamEvent.Error,
135+
new Error(
136+
`The chunk size can't be negative. You entered the value '${options.chunksSize}'`,
137+
),
138+
);
139+
}
140+
const { handleDiffChunk, releaseLastChunks } = outputDiffChunk<T>(emitter);
141+
const prevDataBuffer: DataBuffer<T> = new Map();
142+
const nextDataBuffer: DataBuffer<T> = new Map();
143+
let currentPrevIndex = 0;
144+
let currentNextIndex = 0;
145+
146+
async function processPrevStreamChunk(chunk: T) {
147+
const { isValid, message } = isDataValid(
148+
chunk,
149+
referenceProperty,
150+
"prevList",
151+
);
152+
if (!isValid) {
153+
emitter.emit(StreamEvent.Error, new Error(message));
154+
emitter.emit(StreamEvent.Finish);
155+
return;
156+
}
157+
const ref = chunk[referenceProperty] as ReferenceProperty<T>;
158+
const relatedChunk = nextDataBuffer.get(ref);
159+
160+
if (relatedChunk) {
161+
nextDataBuffer.delete(ref);
162+
const isDataEqual =
163+
JSON.stringify(chunk) === JSON.stringify(relatedChunk.data);
164+
const indexDiff = (relatedChunk.index as number) - currentPrevIndex;
165+
if (isDataEqual) {
166+
handleDiffChunk(
167+
{
168+
previousValue: chunk,
169+
currentValue: relatedChunk.nextData,
170+
prevIndex: currentPrevIndex,
171+
newIndex: relatedChunk.nextIndex,
172+
indexDiff,
173+
status:
174+
indexDiff === 0
175+
? LIST_STATUS.EQUAL
176+
: options.considerMoveAsUpdate
177+
? LIST_STATUS.UPDATED
178+
: LIST_STATUS.MOVED,
179+
},
180+
options,
181+
);
182+
} else {
183+
handleDiffChunk(
184+
{
185+
previousValue: chunk,
186+
currentValue: relatedChunk.nextData,
187+
prevIndex: currentPrevIndex,
188+
newIndex: relatedChunk.nextIndex,
189+
indexDiff,
190+
status: LIST_STATUS.UPDATED,
191+
},
192+
options,
193+
);
194+
}
195+
} else {
196+
prevDataBuffer.set(ref, { data: chunk, index: currentPrevIndex });
197+
}
198+
currentPrevIndex++;
199+
}
200+
201+
async function processNextStreamChunk(chunk: T) {
202+
const { isValid, message } = isDataValid(
203+
chunk,
204+
referenceProperty,
205+
"nextList",
206+
);
207+
if (!isValid) {
208+
emitter.emit(StreamEvent.Error, new Error(message));
209+
emitter.emit(StreamEvent.Finish);
210+
return;
211+
}
212+
const ref = chunk[referenceProperty] as ReferenceProperty<T>;
213+
const relatedChunk = prevDataBuffer.get(ref);
214+
215+
if (relatedChunk) {
216+
prevDataBuffer.delete(ref);
217+
const isDataEqual =
218+
JSON.stringify(chunk) === JSON.stringify(relatedChunk.data);
219+
const indexDiff = currentNextIndex - (relatedChunk.index as number);
220+
if (isDataEqual) {
221+
handleDiffChunk(
222+
{
223+
previousValue: relatedChunk.data,
224+
currentValue: chunk,
225+
prevIndex: relatedChunk.index,
226+
newIndex: currentNextIndex,
227+
indexDiff,
228+
status:
229+
indexDiff === 0
230+
? LIST_STATUS.EQUAL
231+
: options.considerMoveAsUpdate
232+
? LIST_STATUS.UPDATED
233+
: LIST_STATUS.MOVED,
234+
},
235+
options,
236+
);
237+
} else {
238+
handleDiffChunk(
239+
{
240+
previousValue: relatedChunk.data,
241+
currentValue: chunk,
242+
prevIndex: relatedChunk.index,
243+
newIndex: currentNextIndex,
244+
indexDiff,
245+
status: LIST_STATUS.UPDATED,
246+
},
247+
options,
248+
);
249+
}
250+
} else {
251+
nextDataBuffer.set(ref, { data: chunk, index: currentNextIndex });
252+
}
253+
currentNextIndex++;
254+
}
255+
256+
const prevStreamReader = async () => {
257+
for await (const chunk of prevStream) {
258+
await processPrevStreamChunk(chunk);
259+
}
260+
};
261+
262+
const nextStreamReader = async () => {
263+
for await (const chunk of nextStream) {
264+
await processNextStreamChunk(chunk);
265+
}
266+
};
267+
await Promise.all([prevStreamReader(), nextStreamReader()]);
268+
269+
for (const [key, chunk] of prevDataBuffer.entries()) {
270+
handleDiffChunk(
271+
{
272+
previousValue: chunk.data,
273+
currentValue: null,
274+
prevIndex: chunk.index,
275+
newIndex: null,
276+
indexDiff: null,
277+
status: LIST_STATUS.DELETED,
278+
},
279+
options,
280+
);
281+
prevDataBuffer.delete(key);
282+
}
283+
for (const [key, chunk] of nextDataBuffer.entries()) {
284+
handleDiffChunk(
285+
{
286+
previousValue: null,
287+
currentValue: chunk.data,
288+
prevIndex: null,
289+
newIndex: chunk.index,
290+
indexDiff: null,
291+
status: LIST_STATUS.ADDED,
292+
},
293+
options,
294+
);
295+
nextDataBuffer.delete(key);
296+
}
297+
releaseLastChunks();
298+
return emitter.emit(StreamEvent.Finish);
299+
}
300+
301+
export function streamListDiff2<T extends Record<string, unknown>>(
302+
prevStream: Readable,
303+
nextStream: Readable,
304+
referenceProperty: ReferenceProperty<T>,
305+
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
306+
): StreamListener<T> {
307+
const emitter = new EventEmitter<EmitterEvents<T>>();
308+
setTimeout(async () => {
309+
try {
310+
await getDiffChunks(
311+
prevStream,
312+
nextStream,
313+
referenceProperty,
314+
emitter,
315+
options,
316+
);
317+
} catch (err) {
318+
return emitter.emit(StreamEvent.Error, err as Error);
319+
}
320+
}, 0);
321+
return emitter as StreamListener<T>;
322+
}

0 commit comments

Comments
 (0)