stopStream method

  1. @protected
Resolvable<Unit> stopStream()

Implementation

@protected
Resolvable<Unit> stopStream() {
  UNSAFE:
  {
    final seq = TaskSequencer();
    final prevSubscription = _streamSubscription;
    _streamSubscription = const None();
    if (prevSubscription.isSome()) {
      seq.then((prev) {
        assert(!prev.isErr(), prev.err().unwrap());
        return Async(() async {
          final _ = await prevSubscription.unwrap().cancel();
          if (prev.isErr()) {
            throw prev.err().unwrap();
          }
          return const None();
        });
      }).end();
    }
    final prevController = _streamController;
    _streamController = const None();
    if (prevController.isSome() && !prevController.unwrap().isClosed) {
      seq.then((prev) {
        assert(!prev.isErr(), prev.err().unwrap());
        return Async(() async {
          await prevController.unwrap().close();
          if (prev.isErr()) {
            throw prev.err().unwrap();
          }
          return const None();
        });
      }).end();
    }
    _initDataCompleter = const None();
    return seq.completion.toUnit();
  }
}