Skip to content

Commit d3bd45c

Browse files
committed
Add StreamQueue.fork and ForkableStream.
StramQueue.fork is a very useful operation for creating complex and composable user-defined operations over stream queues. It allows arbitrary lookahead to be performed without modifying the semantics of the original stream, providing for higher-order operations like "check for this sequence of values or, if they don't exist, this other distinct sequence". Review URL: https://codereview.chromium.org//1241723003 .
1 parent 8141bbb commit d3bd45c

File tree

6 files changed

+1083
-4
lines changed

6 files changed

+1083
-4
lines changed

pkgs/async/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
- Added `SubscriptionStream` which creates a single-subscription stream
1111
from an existing stream subscription.
1212

13+
- Added `ForkableStream` which wraps a stream and allows independent forks to be
14+
created that emit the same events as the original.
15+
1316
- Added a `ResultFuture` class for synchronously accessing the result of a
1417
wrapped future.
1518

pkgs/async/lib/async.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export "src/delegate/sink.dart";
1212
export "src/delegate/stream_consumer.dart";
1313
export "src/delegate/stream_sink.dart";
1414
export "src/delegate/stream_subscription.dart";
15+
export "src/forkable_stream.dart";
1516
export "src/future_group.dart";
1617
export "src/result_future.dart";
1718
export "src/stream_completer.dart";
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
library async.forkable_stream;
6+
7+
import 'dart:async';
8+
9+
import 'stream_completer.dart';
10+
11+
/// A single-subscription stream from which other streams may be forked off at
12+
/// the current position.
13+
///
14+
/// This adds an operation, [fork], which produces a new stream that
15+
/// independently emits the same events as this stream. Unlike the branches
16+
/// produced by [StreamSplitter], a fork only emits events that arrive *after*
17+
/// the call to [fork].
18+
///
19+
/// Each fork can be paused or canceled independently of one another and of this
20+
/// stream. The underlying stream will be listened to once any branch is
21+
/// listened to. It will be paused when all branches are paused or not yet
22+
/// listened to. It will be canceled when all branches have been listened to and
23+
/// then canceled.
24+
class ForkableStream<T> extends StreamView<T> {
25+
/// The underlying stream.
26+
final Stream _sourceStream;
27+
28+
/// The subscription to [_sourceStream].
29+
///
30+
/// This will be `null` until this stream or any of its forks are listened to.
31+
StreamSubscription _subscription;
32+
33+
/// Whether this has been cancelled and no more forks may be created.
34+
bool _isCanceled = false;
35+
36+
/// The controllers for any branches that have not yet been canceled.
37+
///
38+
/// This includes a controller for this stream, until that has been cancelled.
39+
final _controllers = new Set<StreamController<T>>();
40+
41+
/// Creates a new forkable stream wrapping [sourceStream].
42+
ForkableStream(Stream sourceStream)
43+
// Use a completer here so that we can provide its stream to the
44+
// superclass constructor while also adding the stream controller to
45+
// [_controllers].
46+
: this._(sourceStream, new StreamCompleter());
47+
48+
ForkableStream._(this._sourceStream, StreamCompleter completer)
49+
: super(completer.stream) {
50+
completer.setSourceStream(_fork(primary: true));
51+
}
52+
53+
/// Creates a new fork of this stream.
54+
///
55+
/// From this point forward, the fork will emit the same events as this
56+
/// stream. It will *not* emit any events that have already been emitted by
57+
/// this stream. The fork is independent of this stream, which means each one
58+
/// may be paused or canceled without affecting the other.
59+
///
60+
/// If this stream is done or its subscription has been canceled, this returns
61+
/// an empty stream.
62+
Stream<T> fork() => _fork(primary: false);
63+
64+
/// Creates a stream forwarding [_sourceStream].
65+
///
66+
/// If [primary] is true, this is the stream underlying this object;
67+
/// otherwise, it's a fork. The only difference is that when the primary
68+
/// stream is canceled, [fork] starts throwing [StateError]s.
69+
Stream<T> _fork({bool primary: false}) {
70+
if (_isCanceled) {
71+
var controller = new StreamController<T>()..close();
72+
return controller.stream;
73+
}
74+
75+
var controller;
76+
controller = new StreamController<T>(
77+
onListen: () => _onListenOrResume(controller),
78+
onCancel: () => _onCancel(controller, primary: primary),
79+
onPause: () => _onPause(controller),
80+
onResume: () => _onListenOrResume(controller),
81+
sync: true);
82+
83+
_controllers.add(controller);
84+
85+
return controller.stream;
86+
}
87+
88+
/// The callback called when `onListen` or `onResume` is called for the branch
89+
/// managed by [controller].
90+
///
91+
/// This ensures that we're subscribed to [_sourceStream] and that the
92+
/// subscription isn't paused.
93+
void _onListenOrResume(StreamController<T> controller) {
94+
if (controller.isClosed) return;
95+
if (_subscription == null) {
96+
_subscription =
97+
_sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
98+
} else {
99+
_subscription.resume();
100+
}
101+
}
102+
103+
/// The callback called when `onCancel` is called for the branch managed by
104+
/// [controller].
105+
///
106+
/// This cancels or pauses the underlying subscription as necessary. If
107+
/// [primary] is true, it also ensures that future calls to [fork] throw
108+
/// [StateError]s.
109+
Future _onCancel(StreamController<T> controller, {bool primary: false}) {
110+
if (primary) _isCanceled = true;
111+
112+
if (controller.isClosed) return null;
113+
_controllers.remove(controller);
114+
115+
if (_controllers.isEmpty) return _subscription.cancel();
116+
117+
_onPause(controller);
118+
return null;
119+
}
120+
121+
/// The callback called when `onPause` is called for the branch managed by
122+
/// [controller].
123+
///
124+
/// This pauses the underlying subscription if necessary.
125+
void _onPause(StreamController<T> controller) {
126+
if (controller.isClosed) return;
127+
if (_subscription.isPaused) return;
128+
if (_controllers.any((controller) =>
129+
controller.hasListener && !controller.isPaused)) {
130+
return;
131+
}
132+
133+
_subscription.pause();
134+
}
135+
136+
/// Forwards data events to all branches.
137+
void _onData(value) {
138+
// Don't iterate directly over the set because [controller.add] might cause
139+
// it to be modified synchronously.
140+
for (var controller in _controllers.toList()) {
141+
controller.add(value);
142+
}
143+
}
144+
145+
/// Forwards error events to all branches.
146+
void _onError(error, StackTrace stackTrace) {
147+
// Don't iterate directly over the set because [controller.addError] might
148+
// cause it to be modified synchronously.
149+
for (var controller in _controllers.toList()) {
150+
controller.addError(error, stackTrace);
151+
}
152+
}
153+
154+
/// Forwards close events to all branches.
155+
void _onDone() {
156+
_isCanceled = true;
157+
158+
// Don't iterate directly over the set because [controller.close] might
159+
// cause it to be modified synchronously.
160+
for (var controller in _controllers.toList()) {
161+
controller.close();
162+
}
163+
_controllers.clear();
164+
}
165+
}
166+

pkgs/async/lib/src/stream_queue.dart

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ library async.stream_events;
77
import 'dart:async';
88
import 'dart:collection';
99

10+
import "forkable_stream.dart";
1011
import "subscription_stream.dart";
1112
import "stream_completer.dart";
1213
import "../result.dart";
@@ -78,7 +79,7 @@ class StreamQueue<T> {
7879
// by the content of the fifth event.
7980

8081
/// Source of events.
81-
final Stream _sourceStream;
82+
final ForkableStream _sourceStream;
8283

8384
/// Subscription on [_sourceStream] while listening for events.
8485
///
@@ -104,7 +105,9 @@ class StreamQueue<T> {
104105

105106
/// Create a `StreamQueue` of the events of [source].
106107
StreamQueue(Stream source)
107-
: _sourceStream = source;
108+
: _sourceStream = source is ForkableStream
109+
? source
110+
: new ForkableStream(source);
108111

109112
/// Asks if the stream has any more events.
110113
///
@@ -216,6 +219,22 @@ class StreamQueue<T> {
216219
throw _failClosed();
217220
}
218221

222+
/// Creates a new stream queue in the same position as this one.
223+
///
224+
/// The fork is subscribed to the same underlying stream as this queue, but
225+
/// it's otherwise wholly independent. If requests are made on one, they don't
226+
/// move the other forward; if one is closed, the other is still open.
227+
///
228+
/// The underlying stream will only be paused when all forks have no
229+
/// outstanding requests, and only canceled when all forks are canceled.
230+
StreamQueue<T> fork() {
231+
if (_isClosed) throw _failClosed();
232+
233+
var request = new _ForkRequest<T>(this);
234+
_addRequest(request);
235+
return request.queue;
236+
}
237+
219238
/// Cancels the underlying stream subscription.
220239
///
221240
/// If [immediate] is `false` (the default), the cancel operation waits until
@@ -236,14 +255,15 @@ class StreamQueue<T> {
236255
if (_isClosed) throw _failClosed();
237256
_isClosed = true;
238257

258+
if (_isDone) return new Future.value();
259+
if (_subscription == null) _subscription = _sourceStream.listen(null);
260+
239261
if (!immediate) {
240262
var request = new _CancelRequest(this);
241263
_addRequest(request);
242264
return request.future;
243265
}
244266

245-
if (_isDone) return new Future.value();
246-
if (_subscription == null) _subscription = _sourceStream.listen(null);
247267
var future = _subscription.cancel();
248268
_onDone();
249269
return future;
@@ -333,6 +353,7 @@ class StreamQueue<T> {
333353
return;
334354
}
335355
}
356+
336357
if (!_isDone) {
337358
_subscription.pause();
338359
}
@@ -628,3 +649,50 @@ class _HasNextRequest<T> implements _EventRequest {
628649
_completer.complete(false);
629650
}
630651
}
652+
653+
/// Request for a [StreamQueue.fork] call.
654+
class _ForkRequest<T> implements _EventRequest {
655+
/// Completer for the stream used by the queue by the `fork` call.
656+
StreamCompleter _completer;
657+
658+
StreamQueue<T> queue;
659+
660+
/// The [StreamQueue] object that has this request queued.
661+
final StreamQueue _streamQueue;
662+
663+
_ForkRequest(this._streamQueue) {
664+
_completer = new StreamCompleter<T>();
665+
queue = new StreamQueue<T>(_completer.stream);
666+
}
667+
668+
bool addEvents(Queue<Result> events) {
669+
_completeStream(events);
670+
return true;
671+
}
672+
673+
void close(Queue<Result> events) {
674+
_completeStream(events);
675+
}
676+
677+
void _completeStream(Queue<Result> events) {
678+
if (events.isEmpty) {
679+
if (_streamQueue._isDone) {
680+
_completer.setEmpty();
681+
} else {
682+
_completer.setSourceStream(_streamQueue._sourceStream.fork());
683+
}
684+
} else {
685+
// There are prefetched events which need to be added before the
686+
// remaining stream.
687+
var controller = new StreamController<T>();
688+
for (var event in events) {
689+
event.addTo(controller);
690+
}
691+
692+
var fork = _streamQueue._sourceStream.fork();
693+
controller.addStream(fork, cancelOnError: false)
694+
.whenComplete(controller.close);
695+
_completer.setSourceStream(controller.stream);
696+
}
697+
}
698+
}

0 commit comments

Comments
 (0)