repeatLatest method

Stream<T> repeatLatest({
  1. Duration? repeatTimeout,
  2. T? onTimeout,
  3. T onRepeatTimeout()?,
  4. bool repeatError = false,
  5. bool? broadcast,
})

重复上一个

Implementation

Stream<T> repeatLatest(
    {Duration? repeatTimeout,
    T? onTimeout,
    T Function()? onRepeatTimeout,
    bool repeatError = false,
    bool? broadcast}) {
  var done = false;
  T? latest;
  Object? cacheError;
  StackTrace? cacheStackTrace;

  cleanCache() {
    latest = null;
    cacheError = null;
    cacheStackTrace = null;
  }

  handleError(Object error, StackTrace stackTrace) {
    cleanCache();
    cacheError = error;
    cacheStackTrace = stackTrace;
  }

  void Function(T value) setLatest = (value) {
    cleanCache();
    latest = value;
  };

  Timer? timer;
  if (repeatTimeout != null && repeatTimeout > Duration.zero) {
    void Function() timeoutCallBack = () => latest = onTimeout;

    if (onRepeatTimeout != null) {
      timeoutCallBack = () => latest = onRepeatTimeout();
    }

    setLatest = (value) {
      timer?.cancel();
      cleanCache();
      latest = value;
      if (!done) {
        timer = Timer(repeatTimeout, timeoutCallBack);
      }
    };
  }

  var currentListeners = <MultiStreamController<T>>{};
  final isBroadcast_ = broadcast ?? isBroadcast;
  StreamSubscription<T>? sub;

  return Stream.multi((controller) {
    var latestValue = latest;
    if (latestValue != null) {
      if (!controller.isClosed) {
        controller.add(latestValue);
      }
    } else if (cacheError != null) {
      if (!controller.isClosed) {
        controller.addError(cacheError!, cacheStackTrace);
      }
    }
    if (done) {
      if (!controller.isClosed) {
        controller.close();
      }
      return;
    }
    currentListeners.add(controller);
    sub ??= listen((event) {
      setLatest(event);
      if (currentListeners.isNotEmpty) {
        for (var listener in [...currentListeners]) {
          if (!listener.isClosed) {
            listener.addSync(event);
          }
        }
      }
    }, onError: (Object error, StackTrace stack) {
      if (repeatError) {
        handleError(error, stack);
      }
      if (currentListeners.isNotEmpty) {
        for (var listener in [...currentListeners]) {
          if (!listener.isClosed) {
            listener.addErrorSync(error, stack);
          }
        }
      }
    }, onDone: () {
      done = true;
      if (currentListeners.isNotEmpty) {
        for (var listener in [...currentListeners]) {
          if (!listener.isClosed) {
            listener.close();
          }
        }
      }
      currentListeners.clear();
    });

    controller.onCancel = () => currentListeners.remove(controller);
  }, isBroadcast: isBroadcast_);
}