listenAsFuture method

CancelableOperation<void> listenAsFuture(
  1. void onData(
    1. T event
    )?, {
  2. Function? onError,
  3. void onDone()?,
  4. bool cancelOnError = false,
})

Adds a subscription to this stream.

listenAsFuture calls listen and registers extra handlers for the onError and onDone events so that the Future returned by listenAsFuture successfully completes, once this stream is done, or received an error (in case cancelOnError is true).

On each data event from this stream, the onData handler is called. If onData is null, nothing happens.

On errors from this stream, the onError handler is called with the error object and possibly a stack trace.

The onError callback must be of type void Function(Object error) or void Function(Object error, StackTrace). The function type determines whether onError is invoked with a stack trace argument. The stack trace argument may be StackTrace.empty if this stream received an error without a stack trace. Otherwise it is called with just the error object.

If cancelOnError is true, the returned Future completes with the first error event, otherwise all error events are ignored by the Future.

If cancelOnError is false, or this stream emitted no error event, then onDone is called once this stream closes and sends the done event. If onDone is null, nothing happens.

Implementation

CancelableOperation<void> listenAsFuture(
  void Function(T event)? onData, {
  Function? onError,
  void Function()? onDone,
  bool cancelOnError = false,
}) {
  // Validate onError callback.
  if (null != onError &&
      onError is! _FullErrorHandler &&
      onError is! _HalfErrorHandler) {
    throw ArgumentError(
      "onError callback must take either an Object "
      "(the error), or both an Object (the error) and a StackTrace.",
    );
  }

  final comp = Completer<void>();
  var hadError = false;

  void handleError(Object err, StackTrace st) {
    hadError = true;

    // Call user provided error handler. [onError] is either of type
    // [_FullErrorHandler], [_HalfErrorHandler] or null because the parameter
    // is checked at the start of the [listenAsFuture] function.
    if (onError is _FullErrorHandler) {
      onError(err, st);
    } else if (onError is _HalfErrorHandler) {
      onError(err);
    }

    if (cancelOnError) {
      comp.completeError(err, st);
    }
  }

  final sub = listen(
    onData,
    onError: handleError,
    onDone: comp.complete,
    cancelOnError: cancelOnError,
  );

  var isCanceled = false;
  return CancelableOperation<void>.fromFuture(
    () async {
      try {
        // Await here, because [listenAsFuture] should appear in the
        // stack trace, if an error occures.
        await comp.future;
      } finally {
        if (!isCanceled && (!cancelOnError || !hadError)) {
          onDone?.call();
        }
        await sub.cancel();
      }
    }(),
    onCancel: () async {
      isCanceled = true;
      await sub.cancel();
    },
  );
}