safeAddStream method
Listens to sourceStream and adds its events to this controller.
Errors from the sourceStream are forwarded via safeAddError.
Returns a Future that completes with the number of events added once
sourceStream is done.
Implementation
Future<int> safeAddStream(Stream<T> sourceStream) async {
var addedCount = 0;
final completer = Completer<int>();
final subscription = sourceStream.listen(
(event) {
if (safeAdd(event)) addedCount++;
},
onError: _handleDynamicOnError,
onDone: () {
if (!completer.isCompleted) {
completer.complete(addedCount);
}
},
cancelOnError: false,
);
// If the controller is already closed, cancel the subscription.
if (isClosed && !completer.isCompleted) {
await subscription.cancel();
completer.complete(addedCount);
}
return completer.future;
}