fromFutureWithError<T> static method

ReactiveSubject<T> fromFutureWithError<T>(
  1. Future<T> future, {
  2. dynamic onError(
    1. Object error
    )?,
  3. dynamic onFinally()?,
})

Creates a ReactiveSubject from a Future, with error handling and completion callback.

future is the Future to convert into a ReactiveSubject. onError is an optional callback to handle errors. onFinally is an optional callback that runs when the Future completes, regardless of success or failure.

Usage:

final subject = ReactiveSubject.fromFutureWithError(
  Future.delayed(Duration(seconds: 1), () => 'Result'),
  onError: (error) => print('Error occurred: $error'),
  onFinally: () => print('Operation completed'),
);

subject.stream.listen(
  (value) => print('Received: $value'),
  onError: (error) => print('Stream error: $error'),
  onDone: () => print('Stream closed'),
);

// Output:
// Received: Result
// Operation completed
// Stream closed

Error handling example:

final subject = ReactiveSubject.fromFutureWithError(
  Future.delayed(Duration(seconds: 1), () => throw Exception('Test error')),
  onError: (error) => print('Error occurred: $error'),
  onFinally: () => print('Operation completed'),
);

subject.stream.listen(
  (value) => print('Received: $value'),
  onError: (error) => print('Stream error: $error'),
  onDone: () => print('Stream closed'),
);

// Output:
// Error occurred: Exception: Test error
// Stream error: Exception: Test error
// Operation completed
// Stream closed

Implementation

static ReactiveSubject<T> fromFutureWithError<T>(
  Future<T> future, {
  Function(Object error)? onError,
  Function()? onFinally,
}) {
  final subject = ReactiveSubject<T>();
  future.then((value) {
    subject.add(value);
  }).catchError((error) {
    if (onError != null) {
      onError(error);
    }
    subject.addError(error);
  }).whenComplete(() {
    if (onFinally != null) {
      onFinally();
    }
    subject.dispose();
  });
  return subject;
}