retry method
Retries the stream subscription in case of error.
retryCount specifies the maximum number of retry attempts.
delayFactor determines the base delay for exponential backoff.
shouldRetry can be used to decide whether to retry for a specific error.
Returns a new stream that, upon error, will resubscribe to the source.
Implementation
Stream<T> retry({
int retryCount = 3,
Duration delayFactor = const Duration(seconds: 1),
bool Function(Object error)? shouldRetry,
}) async* {
if (retryCount < 0) {
throw ArgumentError('retryCount must be non-negative');
}
if (delayFactor.inMilliseconds <= 0) {
throw ArgumentError('delayFactor must be greater than zero');
}
var attempts = 0;
while (true) {
try {
await for (final value in this) {
yield value;
}
break;
} catch (error) {
if (error is StateError) {
final message = error.message.toString();
if (message.contains('already been listened')) {
throw StateError(
'Cannot retry a single-subscription stream that has already been listened to. '
'Use a broadcast stream or the "retry" extension on a Stream Factory function (Stream<T> Function()) instead.',
);
}
}
if (attempts < retryCount &&
(shouldRetry == null || shouldRetry(error))) {
attempts++;
// Exponential backoff: delay = delayFactor * (2 ^ (attempts - 1))
final delayMillis =
delayFactor.inMilliseconds * pow(2, attempts - 1).toInt();
await delayMillis.millisecondsDelay();
continue;
}
rethrow;
}
}
}