LoaderBloc<Content extends Object> constructor

LoaderBloc<Content extends Object>({
  1. required Stream<Content> loaderFunction(),
  2. Stream<Content> refresherFunction()?,
  3. Content? initialContent,
  4. void logger(
    1. String
    )?,
  5. FlattenStrategy loaderFlattenStrategy = FlattenStrategy.latest,
  6. FlattenStrategy refreshFlattenStrategy = FlattenStrategy.first,
})

Construct a LoaderBloc The loaderFunction is a function return a stream of Contents (must be not null). It's called when fetch is called

The refresherFunction is a function return a stream of Contents (can be null). It's called when refresh is called When it is null, is will equal to a function that returns a empty stream

The initialContent is used to create initial view state (can be null)

Implementation

factory LoaderBloc({
  required Stream<Content> Function() loaderFunction,
  Stream<Content> Function()? refresherFunction,
  Content? initialContent,
  void Function(String)? logger,
  FlattenStrategy loaderFlattenStrategy =
      FlattenStrategy.latest, // default is `switchMap`
  FlattenStrategy refreshFlattenStrategy =
      FlattenStrategy.first, // default is `exhaustMap`
}) {
  refresherFunction ??= () => Stream<Content>.empty();

  /// Controllers
  final fetchS = StreamController<void>();
  final refreshS = StreamController<Completer<void>>();
  final messageS = PublishSubject<LoaderMessage<Content>>();
  final controllers = <StreamController<dynamic>>[fetchS, refreshS, messageS];

  /// Input actions to state
  final fetchChanges = fetchS.stream.flatMapWithStrategy(
    loaderFlattenStrategy,
    (_) => Rx.defer(loaderFunction)
        .doOnData(
            (content) => messageS.add(LoaderMessage.fetchSuccess(content)))
        .map<LoaderPartialStateChange<Content>>(
            (content) => LoaderPartialStateChange.fetchSuccess(content))
        .startWith(const LoaderPartialStateChange.fetchLoading())
        .doOnError((e, s) => messageS.add(LoaderMessage.fetchFailure(e, s)))
        .onErrorReturnWith(
            (e, _) => LoaderPartialStateChange.fetchFailure(e)),
  );
  final refreshChanges = refreshS.stream.flatMapWithStrategy(
    refreshFlattenStrategy,
    (completer) => Rx.defer(refresherFunction!)
        .doOnData(
            (content) => messageS.add(LoaderMessage.refreshSuccess(content)))
        .map<LoaderPartialStateChange<Content>>(
            (content) => LoaderPartialStateChange.refreshSuccess(content))
        .doOnError((e, s) => messageS.add(LoaderMessage.refreshFailure(e, s)))
        .onErrorResumeNext(const Stream.empty())
        .doOnCancel(() => completer.complete()),
  );

  final initialState = LoaderState.initial(content: initialContent);
  final state$ = Rx.merge([fetchChanges, refreshChanges])
      .let((stream) =>
          logger?.let(
              (l) => stream.doOnData((data) => l('$_tag change: $data'))) ??
          stream)
      .scan(reduce, initialState)
      .publishState(initialState);

  final subscriptions = [
    state$.connect(),
    ...?logger?.let(
      (l) => [
        state$.listen((state) => l('$_tag state: $state')),
        messageS.listen((message) => l('$_tag message: $message')),
      ],
    ),
  ];

  late LoaderBloc<Content> bloc;
  bloc = LoaderBloc._(
    dispose: () => DisposeBag(
      [...subscriptions, ...controllers],
      '${bloc.runtimeType}#${bloc.hashCode.toUnsigned(20).toRadixString(16).padLeft(5, '0')}',
    ).dispose(),
    state$: state$,
    fetch: () => fetchS.add(null),
    refresh: () {
      final completer = Completer<void>.sync();
      refreshS.add(completer);
      return completer.future;
    },
    message$: messageS,
  );
  return bloc;
}