Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2074,6 +2074,50 @@ for await (const result of concatResult) {
}
```

### `readable.drop(limit[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `limit` {number} the number of chunks to drop from the readable.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream with `limit` chunks dropped.

This method returns a new stream with the first `limit` chunks dropped.

```mjs
import { Readable } from 'stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
```

### `readable.take(limit[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `limit` {number} the number of chunks to take from the readable.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream with `limit` chunks dropped.

This method returns a new stream with the first `limit` chunks dropped.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This method returns a new stream with the first `limit` chunks dropped.
This method returns a new stream with the last `limit` chunks dropped.

Refs: #41669 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix when landing to avoid waiting for full CI on a docs change.


```mjs
import { Readable } from 'stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
55 changes: 55 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
},
AbortError,
} = require('internal/errors');
Expand All @@ -14,6 +15,8 @@ const { kWeakHandler } = require('internal/event_target');
const {
ArrayPrototypePush,
MathFloor,
Number,
NumberIsNaN,
Promise,
PromiseReject,
PromisePrototypeCatch,
Expand Down Expand Up @@ -232,10 +235,62 @@ async function* flatMap(fn, options) {
}
}

function toIntegerOrInfinity(number) {
// We coerce here to align with the spec
// https://github.com/tc39/proposal-iterator-helpers/issues/169
number = Number(number);
if (NumberIsNaN(number)) {
return 0;
}
if (number < 0) {
throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
}
return number;
}

function drop(number, options) {
number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- <= 0) {
yield val;
}
}
}.call(this);
}


function take(number, options) {
number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;
} else {
return;
}
}
}.call(this);
}

module.exports.streamReturningOperators = {
drop,
filter,
flatMap,
map,
take,
};

module.exports.promiseReturningOperators = {
Expand Down
96 changes: 96 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const { deepStrictEqual, rejects, throws } = require('assert');

const { from } = Readable;

const fromAsync = (...args) => from(...args).map(async (x) => x);

const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
}
}());

{
// Synchronous streams
(async () => {
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await from([]).drop(2).toArray(), []);
deepStrictEqual(await from([]).take(1).toArray(), []);
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Asynchronous streams
(async () => {
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Infinite streams
// Asynchronous streams
(async () => {
deepStrictEqual(await naturals().take(1).toArray(), [1]);
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
})().then(common.mustCall());
}

{
// Coercion
(async () => {
// The spec made me do this ^^
deepStrictEqual(await naturals().take('cat').toArray(), []);
deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
deepStrictEqual(await naturals().take(true).toArray(), [1]);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
rejects(
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
rejects(
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}

{
// Support for AbortSignal, already aborted
const signal = AbortSignal.abort();
rejects(
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
}

{
// Error cases
const invalidArgs = [
-1,
-Infinity,
-40,
];

for (const example of invalidArgs) {
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
}
}