fromFutureWithError<T> static method
Creates a ReactiveSubject from a Future, with error handling and completion callback.
future is the Future to convert into a ReactiveSubject.
onError is an optional callback to handle errors.
onFinally is an optional callback that runs when the Future completes.
timeout is an optional duration after which the Future will time out.
Usage:
final subject = ReactiveSubject.fromFutureWithError(
Future.delayed(Duration(seconds: 1), () => 'Result'),
onError: (error) => print('Error occurred: $error'),
onFinally: () => print('Operation completed'),
);
subject.stream.listen(
(value) => print('Received: $value'),
onError: (error) => print('Stream error: $error'),
onDone: () => print('Stream closed'),
);
// Output:
// Received: Result
// Operation completed
// Stream closed
Error handling example:
final subject = ReactiveSubject.fromFutureWithError(
Future.delayed(Duration(seconds: 1), () => throw Exception('Test error')),
onError: (error) => print('Error occurred: $error'),
onFinally: () => print('Operation completed'),
);
subject.stream.listen(
(value) => print('Received: $value'),
onError: (error) => print('Stream error: $error'),
onDone: () => print('Stream closed'),
);
// Output:
// Error occurred: Exception: Test error
// Stream error: Exception: Test error
// Operation completed
// Stream closed
Implementation
static ReactiveSubject<T> fromFutureWithError<T>(
Future<T> future, {
Function(Object error)? onError,
Function()? onFinally,
Duration? timeout,
}) {
final subject = ReactiveSubject<T>();
Future<T> timeoutFuture = future;
if (timeout != null) {
timeoutFuture = future.timeout(
timeout,
onTimeout: () => throw TimeoutException(
'Operation timed out after ${timeout.inSeconds} seconds'),
);
}
timeoutFuture.then((value) {
subject.add(value);
}).catchError((error) {
if (onError != null) {
onError(error);
}
subject.addError(error);
}).whenComplete(() {
if (onFinally != null) {
onFinally();
}
subject.dispose();
});
return subject;
}