safeAddStream method

Future<int> safeAddStream(
  1. Stream<T> sourceStream
)

Listens to sourceStream and adds its events to this controller.

Errors from the sourceStream are forwarded via safeAddError. Returns a Future that completes with the number of events added once sourceStream is done.

Implementation

Future<int> safeAddStream(Stream<T> sourceStream) async {
  var addedCount = 0;
  final completer = Completer<int>();

  final subscription = sourceStream.listen(
    (event) {
      if (safeAdd(event)) addedCount++;
    },
    onError: _handleDynamicOnError,
    onDone: () {
      if (!completer.isCompleted) {
        completer.complete(addedCount);
      }
    },
    cancelOnError: false,
  );

  // If the controller is already closed, cancel the subscription.
  if (isClosed && !completer.isCompleted) {
    await subscription.cancel();
    completer.complete(addedCount);
  }
  return completer.future;
}