Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1525,9 +1525,6 @@ added: v16.3.0
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
async iterator, or exiting a `for await...of` iteration using a `break`,
`return`, or `throw` will not destroy the stream. **Default:** `true`.
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
error while it's being iterated, the iterator will not destroy the stream.
**Default:** `true`.
* Returns: {AsyncIterator} to consume the stream.

The iterator created by this method gives users the option to cancel the
Expand Down
30 changes: 21 additions & 9 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,22 @@ Readable.prototype.read = function(n) {
state.needReadable = true;

// Call internal read method
this._read(state.highWaterMark);
try {
const result = this._read(state.highWaterMark);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
nop,
function(err) {
errorOrDestroy(this, err);
});
}
}
} catch (err) {
errorOrDestroy(this, err);
}

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
Expand Down Expand Up @@ -1130,14 +1145,11 @@ async function* createAsyncIterator(stream, options) {
error = aggregateTwoErrors(error, err);
throw error;
} finally {
if (error) {
if (options?.destroyOnError !== false) {
destroyImpl.destroyer(stream, error);
}
} else if (options?.destroyOnReturn !== false) {
if (error === undefined || stream._readableState.autoDestroy) {
destroyImpl.destroyer(stream, null);
}
if (
(error || options?.destroyOnReturn !== false) &&
(error === undefined || stream._readableState.autoDestroy)
) {
destroyImpl.destroyer(stream, null);
}
}
}
Expand Down
60 changes: 0 additions & 60 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -750,22 +750,6 @@ async function tests() {
})());
}

function createErrorReadable() {
const opts = { read() { throw new Error('inner'); } };
return new Readable(opts);
}

// Check default destroys on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator()) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroying on return
(async function() {
const readable = createReadable();
Expand All @@ -777,50 +761,6 @@ async function tests() {
assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check default destroys on error
(async function() {
const readable = createErrorReadable();
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroys on error
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: true, destroyOnReturn: false };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: false, destroyOnReturn: true };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(!readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createReadable();
Expand Down
21 changes: 8 additions & 13 deletions test/parallel/test-stream-readable-with-unimplemented-_read.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
'use strict';
require('../common');

const assert = require('assert');
const common = require('../common');
const { Readable } = require('stream');

const readable = new Readable();

assert.throws(
() => {
readable.read();
},
{
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The _read() method is not implemented'
}
);
readable.read();
readable.on('error', common.expectsError({
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The _read() method is not implemented'
}));
readable.on('close', common.mustCall());