Skip to content

Fetch client: Cancelling response subscriptions waits for next chunk #1820

@simolus3

Description

@simolus3

Consider an HTTP server that sends a line every 10 seconds:

import 'dart:convert';
import 'dart:io';

void main() async {
  final server = (await HttpServer.bind('localhost', 0))
    ..listen((request) async {
      await request.drain<void>();
      request.response.headers.set('Access-Control-Allow-Origin', '*');
      request.response.headers.set('Content-Type', 'text/plain');

      request.response.bufferOutput = false;
      request.response.add(utf8.encode('0\n'));
      await request.response.addStream(
        Stream.periodic(const Duration(seconds: 10), (i) => '${i + 1}\n')
            .transform(utf8.encoder),
      );
    });

  print('listening on http://localhost:${server.port}/');
}

Now, let's say we have a client that inspects the first line and, after a second of deliberation, decides it's done with the response:

import 'dart:async';
import 'dart:convert';

import 'package:http/http.dart';

void main() async {
  final client = Client();
  final response =
      await client.send(Request('GET', Uri.parse('http://localhost:<port>')));
  print('has response');
  late StreamSubscription<String> sub;
  sub = response.stream
      .transform(utf8.decoder)
      .transform(const LineSplitter())
      .listen((line) async {
    print('has line: $line');
    await Future<void>.delayed(const Duration(seconds: 1));
    print('cancelling subscription');
    final sw = Stopwatch()..start();
    await sub.cancel();
    sw.stop();
    print('subscription cancelled: ${sw.elapsed}');
  });
}

With the dart:io implementation, cancelling the stream subscription is fast:

has response
listening
has line: 0
cancelling subscription
subscription cancelled: 0:00:00.003603

With the fetch client however, cancelling the stream after a line waits for the next one:

has response
listening
has line: 0
cancelling subscription
subscription cancelled: 0:00:08.981000

The issue is that, because the client is implemented with async*, it can't react to cancellation requests while in this line:

final chunk = await bodyStreamReader.read().toDart;

I think the only solution is to rewrite that logic with stream controllers, and I'm happy to contribute a fix (unless I'm doing something wrong or there's a better solution). I know that I could use Abortable for that, but I think we should also be able to cancel responses like any other stream.

Metadata

Metadata

Assignees

No one assigned

    Labels

    package:httptype-bugIncorrect behavior (everything from a crash to more subtle misbehavior)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions