pushToStream method

Resolvable<Option<Object>> pushToStream(
  1. Result<TData> data, {
  2. bool eagerError = false,
})

Implementation

Resolvable<Option> pushToStream(
  Result<TData> data, {
  bool eagerError = false,
}) {
  UNSAFE:
  {
    final seq = TaskSequencer();
    return seq.then((prev1) {
      assert(!state.didDispose());
      if (state.didDispose()) {
        return Sync.result(prev1);
      }
      seq.then((_) {
        return Resolvable(() {
          if (_streamController.isSome()) {
            _streamController.unwrap().add(data);
          }
          return _initDataCompleter.map(
            (e) => e.resolve(Sync.result(data)).value,
          );
        });
      }).end();
      provideOnPushToStreamListeners().map((listener) {
        seq.then((prev2) {
          if (prev2.isErr()) {
            assert(prev2.isErr(), prev2.err().unwrap());
            if (eagerError) {
              return Sync.result(prev2);
            }
          }
          return listener(data).then((e) => prev2).flatten2();
        }).end();
      });
      return Sync.result(prev1);
    });
  }
}