retryWhen<T> static method

Stream<T> retryWhen<T>(
  1. Stream<T> streamFactory(),
  2. Stream<void> retryWhenFactory(
    1. Object error,
    2. StackTrace stackTrace
    )
)

Creates a Stream that will recreate and re-listen to the source Stream when the notifier emits a new value. If the source Stream emits an error or it completes, the Stream terminates.

If the retryWhenFactory throws an error or returns a Stream that emits an error, original error will be emitted. And then, the error from retryWhenFactory will be emitted if it is not identical with original error.

Basic Example

    Rx.retryWhen<int>(
      () => Stream<int>.fromIterable(<int>[1]),
      (Object error, StackTrace s) => throw error,
    ).listen(print); // Prints 1

Periodic Example

    Rx.retryWhen<int>(
      () => Stream<int>.periodic(const Duration(seconds: 1), (int i) => i)
          .map((int i) => i == 2 ? throw 'exception' : i),
      (Object e, StackTrace s) =>
          Rx.timer<void>(null, const Duration(milliseconds: 200)),
    ).take(4).listen(print); // Prints 0, 1, 0, 1

Complex Example

    var errorHappened = false;
    Rx.retryWhen<int>(
      () => Stream.periodic(const Duration(seconds: 1), (i) => i).map((i) {
        if (i == 3 && !errorHappened) {
          throw 'We can take this. Please restart.';
        } else if (i == 4) {
          throw 'It\'s enough.';
        } else {
          return i;
        }
      }),
      (e, s) {
        errorHappened = true;
        if (e == 'We can take this. Please restart.') {
          return Stream.value('Ok. Here you go!');
        } else {
          return Stream.error(e, s);
        }
      },
    ).listen(print, onError: print); // Prints 0, 1, 2, 0, 1, 2, 3, It's enough.

Implementation

static Stream<T> retryWhen<T>(
  Stream<T> Function() streamFactory,
  Stream<void> Function(Object error, StackTrace stackTrace) retryWhenFactory,
) =>
    RetryWhenStream<T>(streamFactory, retryWhenFactory);