Skip to content
2 changes: 2 additions & 0 deletions pkgs/http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* **Breaking** Change the behavior of `Request.body` so that a charset
parameter is only added for text and XML media types. This brings the
behavior of `package:http` in line with RFC-8259.
* On the web, fix cancellations for `StreamSubscription`s of response bodies
waiting for the next chunk.

## 1.5.0

Expand Down
123 changes: 89 additions & 34 deletions pkgs/http/lib/src/browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ class BrowserClient extends BaseClient {
}
}

Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
Object _toClientException(Object e, BaseRequest request) {
if (e case DOMException(:final name) when name == 'AbortError') {
Error.throwWithStackTrace(RequestAbortedException(request.url), st);
return RequestAbortedException(request.url);
}
if (e is! ClientException) {
var message = e.toString();
Expand All @@ -155,50 +155,105 @@ Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
}
e = ClientException(message, request.url);
}
Error.throwWithStackTrace(e, st);
return e;
}

Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
Error.throwWithStackTrace(_toClientException(e, request), st);
}

Stream<List<int>> _readBody(BaseRequest request, Response response) async* {
Stream<List<int>> _readBody(BaseRequest request, Response response) {
final bodyStreamReader =
response.body?.getReader() as ReadableStreamDefaultReader?;

if (bodyStreamReader == null) {
return;
return const Stream.empty();
}

var isDone = false, isError = false;
try {
while (true) {
final chunk = await bodyStreamReader.read().toDart;
final controller = StreamController<List<int>>();
final cancelCompleter = Completer<Null>();
Completer<void>? waitingForResume;
var readerEmittedDone = false;

Future<void> readUntilDoneOrCancelled() async {
while (!cancelCompleter.isCompleted) {
assert(waitingForResume == null);

final chunk = await Future.any(
[bodyStreamReader.read().toDart, cancelCompleter.future]);
if (chunk == null) {
// Stream subscription was cancelled. We'll cancel the reader later.
assert(cancelCompleter.isCompleted);
return;
}

if (chunk.done) {
isDone = true;
break;
readerEmittedDone = true;
return;
}
yield (chunk.value! as JSUint8Array).toDart;
}
} catch (e, st) {
isError = true;
_rethrowAsClientException(e, st, request);
} finally {
if (!isDone) {
try {
// catchError here is a temporary workaround for
// http://dartbug.com/57046: an exception from cancel() will
// clobber an exception which is currently in flight.
await bodyStreamReader
.cancel()
.toDart
.catchError((_) => null, test: (_) => isError);
} catch (e, st) {
// If we have already encountered an error swallow the
// error from cancel and simply let the original error to be
// rethrown.
if (!isError) {
_rethrowAsClientException(e, st, request);
}

controller.add((chunk.value! as JSUint8Array).toDart);
if (controller.isPaused) {
final resume = waitingForResume = Completer();
await resume.future;
}
}
}

void markResumed() {
if (waitingForResume case final waiter?) {
waitingForResume = null;
waiter.complete();
}
}

// Depending on whether the stream has been cancelled, reports an error on the
// stream or on the subscription's `cancel()` future.
void reportError(Object e, StackTrace st) {
if (cancelCompleter.isCompleted) {
_rethrowAsClientException(e, st, request);
} else {
controller.addError(_toClientException(e, request), st);
}
}

late Future<void> pipeIntoController;

controller
..onListen = () {
pipeIntoController = Future(() async {
var hadError = false;

try {
await readUntilDoneOrCancelled();
} catch (e, st) {
hadError = true;
reportError(e, st);
} finally {
if (!readerEmittedDone) {
try {
await bodyStreamReader.cancel().toDart;
} catch (e, st) {
// If we have already encountered an error swallow the error from
// cancel and simply let the original error to be rethrown.
if (!hadError) {
reportError(e, st);
}
}
}

unawaited(controller.close());
}
});
}
..onResume = markResumed
..onCancel = () async {
// Ensure the read loop isn't blocked due to a paused listener.
markResumed();
cancelCompleter.complete(null);
await pipeIntoController;
};

return controller.stream;
}

/// Workaround for `Headers` not providing a way to iterate the headers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@ void hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;
server = (await HttpServer.bind('localhost', 0))
..listen((request) async {
var path = request.uri.pathSegments;
// Slow down lines if requested, e.g. GET /1000 would send a line every
// second. This is used to test cancellations.
var delayBetweenLines = switch (path) {
[var delayMs] => Duration(milliseconds: int.parse(delayMs)),
_ => Duration.zero,
};

await request.drain<void>();
request.response.headers.set('Access-Control-Allow-Origin', '*');
request.response.headers.set('Content-Type', 'text/plain');
if (delayBetweenLines > Duration.zero) {
request.response.bufferOutput = false;
}
serverWriting = true;
for (var i = 0; serverWriting; ++i) {
request.response.write('$i\n');
await request.response.flush();
// Let the event loop run.
await Future(() {});
await Future<void>.delayed(delayBetweenLines);
}
await request.response.close();
unawaited(server.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,55 @@ void testResponseBodyStreamed(Client client,
});
await cancelled.future;
});

test('cancelling stream subscription after chunk', () async {
// Request a 10s delay between subsequent lines.
const delayMillis = 10000;
final request = Request('GET', Uri.http(host, '$delayMillis'));
final response = await client.send(request);
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

var stopwatch = Stopwatch()..start();
var line = await const LineSplitter()
.bind(const Utf8Decoder().bind(response.stream))
// Cancel the stream after the first line
.first;

// Receiving the first line and cancelling the stream should not wait for
// the second line, which is sent much later.
stopwatch.stop();
expect(line, '0');
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
});

test('cancelling stream subscription after chunk with delay', () async {
// Request a 10s delay between subsequent lines.
const delayMillis = 10000;
final request = Request('GET', Uri.http(host, '$delayMillis'));
final response = await client.send(request);
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

var stopwatch = Stopwatch()..start();
final done = Completer<void>();
late StreamSubscription<String> sub;
sub = response.stream
.transform(utf8.decoder)
.transform(const LineSplitter())
.listen((line) async {
// Don't cancel in direct response to event, we want to test cancelling
// while the client is actively waiting for data.
await pumpEventQueue();
await sub.cancel();
stopwatch.stop();
done.complete();
});

await done.future;
// Receiving the first line and cancelling the stream should not wait for
// the second line, which is sent much later.
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
});
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
}