listen method

void Function() listen(
  1. void onData(
    1. T item
    ), {
  2. void onError(
    1. dynamic,
    2. StackTrace? stack
    )?,
  3. void onDone()?,
})

Continually pull data, and get notified on every value change. Returns a void Function() that when called, cancel's the subscription.

Implementation

void Function() listen(
  void Function(T item) onData, {
  void Function(dynamic, StackTrace? stack)? onError,
  void Function()? onDone,
}) {
  var cancelled = false;

  void handleData(Option<T> item) {
    if (item is Some) {
      onData((item as Some).value);
    }
  }

  FutureOr<void> doPull() {
    while (!cancelled && !drained) {
      try {
        final futureOr = pull();

        if (futureOr is Future<Option<T>>) {
          return futureOr.then((item) {
            handleData(item);
            return doPull();
          }, onError: (err, stack) {
            onError?.call(err, stack);
            return doPull();
          });
        }

        handleData(futureOr);
      } catch (err, stack) {
        onError?.call(err, stack);
      }
    }

    if (!cancelled) onDone?.call();
  }

  Future.microtask(doPull);

  return () => cancelled = true;
}