repeatLatest method

Stream<T> repeatLatest({
  1. Duration? repeatTimeout,
  2. T? onTimeout,
  3. T onRepeatTimeout()?,
  4. bool repeatError = false,
  5. bool? broadcast,
})

重复上一个,对源stream会一直保持订阅,永远不会取消,建议前向使用bindCancellable或者bindLifecycle,来自动解除订阅

  • repeatTimeout 重复上一个的超时时间,如果onTimeoutonRepeatTimeout都不设置时,表示超时后清除重复数据
  • onTimeout 超时时返回的值,仅支持非null的值
  • onRepeatTimeout 超时时返回的值,允许null
  • repeatError 是否重复错误
  • broadcast 是否广播

Implementation

Stream<T> repeatLatest(
    {Duration? repeatTimeout,
    T? onTimeout,
    T Function()? onRepeatTimeout,
    bool repeatError = false,
    bool? broadcast}) {
  var done = false;
  _RepeatEntry<T>? latest;
  Object? cacheError;
  StackTrace? cacheStackTrace;

  cleanCache() {
    latest = null;
    cacheError = null;
    cacheStackTrace = null;
  }

  handleError(Object error, StackTrace stackTrace) {
    cleanCache();
    cacheError = error;
    cacheStackTrace = stackTrace;
  }

  void Function(T value) setLatest = (value) {
    cleanCache();
    latest = _RepeatEntry(value);
  };

  Timer? timer;
  if (repeatTimeout != null && repeatTimeout > Duration.zero) {
    void Function() timeoutCallBack = cleanCache;
    if (onTimeout != null) {
      timeoutCallBack = () {
        cleanCache();
        latest = _RepeatEntry(onTimeout);
      };
    }

    if (onRepeatTimeout != null) {
      timeoutCallBack = () {
        cleanCache();
        latest = _RepeatEntry(onRepeatTimeout());
      };
    }

    setLatest = (value) {
      timer?.cancel();
      cleanCache();
      latest = _RepeatEntry(value);
      if (!done) {
        timer = Timer(repeatTimeout, timeoutCallBack);
      }
    };
  }

  var currentListeners = <MultiStreamController<T>>{};
  final isBroadcast_ = broadcast ?? isBroadcast;
  StreamSubscription<T>? sub;

  return Stream.multi((controller) {
    var latestValue = latest;
    if (latestValue != null) {
      if (!controller.isClosed) {
        controller.add(latestValue.value);
      }
    } else if (cacheError != null) {
      if (!controller.isClosed) {
        controller.addError(cacheError!, cacheStackTrace);
      }
    }
    if (done) {
      if (!controller.isClosed) {
        controller.close();
      }
      return;
    }
    currentListeners.add(controller);
    sub ??= listen((event) {
      setLatest(event);
      if (currentListeners.isNotEmpty) {
        for (var listener in [...currentListeners]) {
          if (!listener.isClosed) {
            listener.addSync(event);
          }
        }
      }
    }, onError: (Object error, StackTrace stack) {
      if (repeatError) {
        handleError(error, stack);
      }
      if (currentListeners.isNotEmpty) {
        for (var listener in [...currentListeners]) {
          if (!listener.isClosed) {
            listener.addErrorSync(error, stack);
          }
        }
      }
    }, onDone: () {
      done = true;
      if (currentListeners.isNotEmpty) {
        for (var listener in [...currentListeners]) {
          if (!listener.isClosed) {
            listener.close();
          }
        }
      }
      currentListeners.clear();
    });

    controller.onCancel = () => currentListeners.remove(controller);
  }, isBroadcast: isBroadcast_);
}