add method

  1. @override
Future add (
  1. Stream<T> stream
)
override

Adds stream as a member of this group.

Any events from stream will be emitted through this.stream. If this group has a listener, stream will be listened to immediately; otherwise it will only be listened to once this group gets a listener.

If this is a single-subscription group and its subscription has been canceled, stream will be canceled as soon as its added. If this returns a Future, it will be returned from add. Otherwise, add returns null.

Throws a StateError if this group is closed.

Implementation

@override
Future add(Stream<T> stream) {
  if (_closed) {
    throw StateError("Can't add a Stream to a closed StreamGroup.");
  }

  if (_state == _StreamGroupState.dormant) {
    _subscriptions.putIfAbsent(stream, () => null);
  } else if (_state == _StreamGroupState.canceled) {
    // Listen to the stream and cancel it immediately so that no one else can
    // listen, for consistency. If the stream has an onCancel listener this
    // will also fire that, which may help it clean up resources.
    return stream.listen(null).cancel();
  } else {
    _subscriptions.putIfAbsent(stream, () => _listenToStream(stream));
  }

  return null;
}