repeatLatest method
重复上一个,对源stream会一直保持订阅,永远不会取消,建议前向使用bindCancellable或者bindLifecycle,来自动解除订阅
repeatTimeout重复上一个的超时时间,如果onTimeout和onRepeatTimeout都不设置时,表示超时后清除重复数据onTimeout超时时返回的值,仅支持非null的值onRepeatTimeout超时时返回的值,允许null值repeatError是否重复错误broadcast是否广播
Implementation
Stream<T> repeatLatest(
{Duration? repeatTimeout,
T? onTimeout,
T Function()? onRepeatTimeout,
bool repeatError = false,
bool? broadcast}) {
var done = false;
_RepeatEntry<T>? latest;
Object? cacheError;
StackTrace? cacheStackTrace;
cleanCache() {
latest = null;
cacheError = null;
cacheStackTrace = null;
}
handleError(Object error, StackTrace stackTrace) {
cleanCache();
cacheError = error;
cacheStackTrace = stackTrace;
}
void Function(T value) setLatest = (value) {
cleanCache();
latest = _RepeatEntry(value);
};
Timer? timer;
if (repeatTimeout != null && repeatTimeout > Duration.zero) {
void Function() timeoutCallBack = cleanCache;
if (onTimeout != null) {
timeoutCallBack = () {
cleanCache();
latest = _RepeatEntry(onTimeout);
};
}
if (onRepeatTimeout != null) {
timeoutCallBack = () {
cleanCache();
latest = _RepeatEntry(onRepeatTimeout());
};
}
setLatest = (value) {
timer?.cancel();
cleanCache();
latest = _RepeatEntry(value);
if (!done) {
timer = Timer(repeatTimeout, timeoutCallBack);
}
};
}
var currentListeners = <MultiStreamController<T>>{};
final isBroadcast_ = broadcast ?? isBroadcast;
StreamSubscription<T>? sub;
return Stream.multi((controller) {
var latestValue = latest;
if (latestValue != null) {
if (!controller.isClosed) {
controller.add(latestValue.value);
}
} else if (cacheError != null) {
if (!controller.isClosed) {
controller.addError(cacheError!, cacheStackTrace);
}
}
if (done) {
if (!controller.isClosed) {
controller.close();
}
return;
}
currentListeners.add(controller);
sub ??= listen((event) {
setLatest(event);
if (currentListeners.isNotEmpty) {
for (var listener in [...currentListeners]) {
if (!listener.isClosed) {
listener.addSync(event);
}
}
}
}, onError: (Object error, StackTrace stack) {
if (repeatError) {
handleError(error, stack);
}
if (currentListeners.isNotEmpty) {
for (var listener in [...currentListeners]) {
if (!listener.isClosed) {
listener.addErrorSync(error, stack);
}
}
}
}, onDone: () {
done = true;
if (currentListeners.isNotEmpty) {
for (var listener in [...currentListeners]) {
if (!listener.isClosed) {
listener.close();
}
}
}
currentListeners.clear();
});
controller.onCancel = () => currentListeners.remove(controller);
}, isBroadcast: isBroadcast_);
}