addStream method
Consumes the elements of stream
.
Listens on stream
and does something for each event.
Returns a future which is completed when the stream is done being added, and the consumer is ready to accept a new stream. No further calls to addStream or close should happen before the returned future has completed.
The consumer may stop listening to the stream after an error, it may consume all the errors and only stop at a done event, or it may be canceled early if the receiver don't want any further events.
If the consumer stops listening because of some error preventing it
from continuing, it may report this error in the returned future,
otherwise it will just complete the future with null
.
Implementation
@override
Future addStream(Stream<List<T>> stream) {
var lastStream = _currentStream ?? stream;
_sub?.cancel();
_currentStream = stream;
final streamDone = Completer<void>();
_sub = stream.listen((items) {
_chunks.addAll(items);
_counter += items.length;
if (limited && _counter >= limit) {
_sub!.pause();
}
while (_readers.isNotEmpty && _readers.first.size <= _counter) {
var waiting = _readers.removeAt(0);
waiting.completer.complete(_consume(waiting.size));
}
}, onDone: () {
// User is piping in a new stream
if (stream == lastStream && _throwOnError) {
_closed(UnderflowError());
}
streamDone.complete();
}, onError: (e, stack) {
_closed(e, stack);
});
return streamDone.future;
}