diff --git a/pkgs/http/CHANGELOG.md b/pkgs/http/CHANGELOG.md index 4c0c31af84..286823a992 100644 --- a/pkgs/http/CHANGELOG.md +++ b/pkgs/http/CHANGELOG.md @@ -3,6 +3,8 @@ * **Breaking** Change the behavior of `Request.body` so that a charset parameter is only added for text and XML media types. This brings the behavior of `package:http` in line with RFC-8259. +* On the web, fix cancellations for `StreamSubscription`s of response bodies + waiting for the next chunk. ## 1.5.0 diff --git a/pkgs/http/lib/src/browser_client.dart b/pkgs/http/lib/src/browser_client.dart index 0f57d5c38d..e4e30a3886 100644 --- a/pkgs/http/lib/src/browser_client.dart +++ b/pkgs/http/lib/src/browser_client.dart @@ -11,6 +11,7 @@ import 'package:web/web.dart' DOMException, HeadersInit, ReadableStreamDefaultReader, + ReadableStreamReadResult, RequestInfo, RequestInit, Response; @@ -116,7 +117,7 @@ class BrowserClient extends BaseClient { }.toJS); return StreamedResponseV2( - _readBody(request, response), + _bodyToStream(request, response), response.status, headers: headers, request: request, @@ -144,9 +145,9 @@ class BrowserClient extends BaseClient { } } -Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) { - if (e case DOMException(:final name) when name == 'AbortError') { - Error.throwWithStackTrace(RequestAbortedException(request.url), st); +Object _toClientException(Object e, BaseRequest request) { + if (e case DOMException(name: 'AbortError')) { + return RequestAbortedException(request.url); } if (e is! ClientException) { var message = e.toString(); @@ -155,49 +156,103 @@ Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) { } e = ClientException(message, request.url); } - Error.throwWithStackTrace(e, st); + return e; +} + +Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) { + Error.throwWithStackTrace(_toClientException(e, request), st); } -Stream> _readBody(BaseRequest request, Response response) async* { - final bodyStreamReader = - response.body?.getReader() as ReadableStreamDefaultReader?; +Stream> _bodyToStream(BaseRequest request, Response response) => + Stream.multi( + isBroadcast: false, + (listener) => _readStreamBody(request, response, listener), + ); - if (bodyStreamReader == null) { +Future _readStreamBody(BaseRequest request, Response response, + MultiStreamController> controller) async { + final reader = response.body?.getReader() as ReadableStreamDefaultReader?; + if (reader == null) { + // No response? Treat that as an empty stream. + await controller.close(); return; } - var isDone = false, isError = false; - try { - while (true) { - final chunk = await bodyStreamReader.read().toDart; - if (chunk.done) { - isDone = true; - break; + Completer? resumeSignal; + var cancelled = false; + var hadError = false; + controller + ..onResume = () { + if (resumeSignal case final resume?) { + resumeSignal = null; + resume.complete(); } - yield (chunk.value! as JSUint8Array).toDart; } - } catch (e, st) { - isError = true; - _rethrowAsClientException(e, st, request); - } finally { - if (!isDone) { + ..onCancel = () async { try { - // catchError here is a temporary workaround for - // http://dartbug.com/57046: an exception from cancel() will - // clobber an exception which is currently in flight. - await bodyStreamReader - .cancel() - .toDart - .catchError((_) => null, test: (_) => isError); - } catch (e, st) { - // If we have already encountered an error swallow the - // error from cancel and simply let the original error to be - // rethrown. - if (!isError) { - _rethrowAsClientException(e, st, request); + cancelled = true; + // We only cancel the reader when the subscription is cancelled - we + // don't need to do that for normal done events because the stream is in + // a completed state at that point. + await reader.cancel().toDart; + } catch (e, s) { + // It is possible for reader.cancel() to throw. This happens either + // because the stream has already been in an error state (in which case + // we would have called addErrorSync() before and don't need to re- + // report the error here), or because of an issue here (MDN says the + // method can throw if "The source object is not a + // ReadableStreamDefaultReader, or the stream has no owner."). Both of + // these don't look applicable here, but we want to ensure a new error + // in cancel() is surfaced to the caller. + if (!hadError) { + _rethrowAsClientException(e, s, request); } } + }; + + // Async loop reading chunks from `bodyStreamReader` and sending them to + // `controller`. + // Checks for pause/cancel after delivering each event. + // Exits if stream closes or becomes an error, or if cancelled. + while (true) { + final ReadableStreamReadResult chunk; + try { + chunk = await reader.read().toDart; + } catch (e, s) { + // After a stream was cancelled, adding error events would result in + // unhandled async errors. This is most likely an AbortError anyway, so + // not really an exceptional state. We report errors of .cancel() in + // onCancel, that should cover this case. + if (!cancelled) { + hadError = true; + controller.addErrorSync(_toClientException(e, request), s); + await controller.close(); + } + + break; + } + + if (chunk.done) { + // Sync because we're forwarding an async event. + controller.closeSync(); + break; + } else { + // Handle chunk whether paused, cancelled or not. + // If subscription is cancelled, it's a no-op to add events. + // If subscription is paused, events will be buffered until resumed, + // which is what we need. + // We can use addSync here because we're only forwarding this async + // event. + controller.addSync((chunk.value! as JSUint8Array).toDart); + } + + // Check pause/cancel state immediately *after* delivering event, + // listener might have paused or cancelled. + if (controller.isPaused) { + // Will never complete if cancelled before resumed. + await (resumeSignal ??= Completer()).future; } + if (!controller.hasListener) break; // Is cancelled. } } diff --git a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_server.dart b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_server.dart index 5f211d6dec..ef30bda9d2 100644 --- a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_server.dart +++ b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_server.dart @@ -23,15 +23,25 @@ void hybridMain(StreamChannel channel) async { late HttpServer server; server = (await HttpServer.bind('localhost', 0)) ..listen((request) async { + var path = request.uri.pathSegments; + // Slow down lines if requested, e.g. GET /1000 would send a line every + // second. This is used to test cancellations. + var delayBetweenLines = switch (path) { + [var delayMs] => Duration(milliseconds: int.parse(delayMs)), + _ => Duration.zero, + }; + await request.drain(); request.response.headers.set('Access-Control-Allow-Origin', '*'); request.response.headers.set('Content-Type', 'text/plain'); + request.response.bufferOutput = false; + serverWriting = true; for (var i = 0; serverWriting; ++i) { request.response.write('$i\n'); await request.response.flush(); // Let the event loop run. - await Future(() {}); + await Future.delayed(delayBetweenLines); } await request.response.close(); unawaited(server.close()); diff --git a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart index 8ae49d1f80..f8ee0d8187 100644 --- a/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart +++ b/pkgs/http_client_conformance_tests/lib/src/response_body_streamed_test.dart @@ -59,6 +59,119 @@ void testResponseBodyStreamed(Client client, expect(response.statusCode, 200); }); + test('pausing response stream after events', () async { + final response = await client.send(Request('GET', Uri.http(host, ''))); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + + // The server responds with a streamed response of lines containing + // incrementing integers. Verify that pausing the stream after each one + // does not cause any missed lines. + final stream = response.stream + .transform(const Utf8Decoder()) + .transform(const LineSplitter()) + .map(int.parse); + var expectedLine = 0; + final cancelCompleter = Completer(); + late StreamSubscription subscription; + + subscription = stream.listen((line) async { + expect(line, expectedLine); + expectedLine++; + + if (expectedLine == 10) { + subscription.pause(); + Future.delayed( + const Duration(seconds: 1), () => subscription.resume()); + } + + if (expectedLine == 100) { + cancelCompleter.complete(subscription.cancel()); + } + await pumpEventQueue(); + }); + + await cancelCompleter.future; + expect(expectedLine, 100); + }); + + test('pausing response stream asynchronously', () async { + final response = await client.send(Request('GET', Uri.http(host, ''))); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + + final originalSubscription = response.stream + .transform(const Utf8Decoder()) + .transform(const LineSplitter()) + .map(int.parse) + .listen(null); + var expectedLine = 0; + await for (final line in SubscriptionStream(originalSubscription)) { + expect(line, expectedLine); + expectedLine++; + if (expectedLine == 100) { + break; + } + + // Instead of pausing the subscription in response to an event, pause it + // after the event has already been delivered. + Timer.run(() { + originalSubscription.pause(Future(pumpEventQueue)); + }); + } + }); + + test('cancel paused stream', () async { + final response = await client.send(Request('GET', Uri.http(host, ''))); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + + final completer = Completer(); + late StreamSubscription subscription; + subscription = response.stream + .transform(const Utf8Decoder()) + .transform(const LineSplitter()) + .listen((line) async { + subscription.pause(); + + completer.complete(Future(() async { + await pumpEventQueue(); + await subscription.cancel(); + })); + }); + + await completer.future; + }); + + test('cancel paused stream via abortable request', () async { + final abortTrigger = Completer(); + final response = await client.send(AbortableRequest( + 'GET', Uri.http(host, ''), + abortTrigger: abortTrigger.future)); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + + late StreamSubscription subscription; + subscription = response.stream + .transform(const Utf8Decoder()) + .transform(const LineSplitter()) + .listen((line) { + subscription.pause(); + if (!abortTrigger.isCompleted) { + abortTrigger.complete(); + } + }); + + final aborted = expectLater(subscription.asFuture(), + throwsA(isA())); + await abortTrigger.future; + + // We need to resume the subscription after the response has been + // cancelled to record that error event. + subscription.resume(); + await aborted; + }); + test('cancel streamed response', () async { final request = Request('GET', Uri.http(host, '')); final response = await client.send(request); @@ -77,5 +190,63 @@ void testResponseBodyStreamed(Client client, }); await cancelled.future; }); + + test('cancelling stream subscription after chunk', () async { + // Request a 10s delay between subsequent lines. + const delayMillis = 10000; + final request = Request('GET', Uri.http(host, '$delayMillis')); + final response = await client.send(request); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + + final cancelled = Completer(); + var stopwatch = Stopwatch(); + final subscription = response.stream + .transform(const Utf8Decoder()) + .transform(const LineSplitter()) + .listen(null); + subscription.onData((line) { + stopwatch.start(); + cancelled.complete(subscription.cancel()); + expect(line, '0'); + }); + + await cancelled.future; + stopwatch.stop(); + + // Receiving the first line and cancelling the stream should not wait for + // the second line, which is sent much later. + expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis)); + }); + + test('cancelling stream subscription after chunk with delay', () async { + // Request a 10s delay between subsequent lines. + const delayMillis = 10000; + final request = Request('GET', Uri.http(host, '$delayMillis')); + final response = await client.send(request); + expect(response.reasonPhrase, 'OK'); + expect(response.statusCode, 200); + + var stopwatch = Stopwatch()..start(); + final done = Completer(); + late StreamSubscription sub; + sub = response.stream + .transform(utf8.decoder) + .transform(const LineSplitter()) + .listen((line) { + // Don't cancel in direct response to event, we want to test cancelling + // while the client is actively waiting for data. + Timer.run(() { + stopwatch.start(); + done.complete(sub.cancel()); + }); + }); + + await done.future; + stopwatch.stop(); + // Receiving the first line and cancelling the stream should not wait for + // the second line, which is sent much later. + expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis)); + }); }, skip: canStreamResponseBody ? false : 'does not stream response bodies'); }