LoaderBloc<Content extends Object> constructor
LoaderBloc<Content extends Object> ({
- required Stream<
Content> loaderFunction(), - Stream<
Content> refresherFunction()?, - Content? initialContent,
- void logger()?,
- FlattenStrategy loaderFlattenStrategy = FlattenStrategy.latest,
- FlattenStrategy refreshFlattenStrategy = FlattenStrategy.first,
Construct a LoaderBloc
The loaderFunction
is a function return a stream of Content
s (must be not null).
It's called when fetch is called
The refresherFunction
is a function return a stream of Content
s (can be null).
It's called when refresh is called
When it is null, is will equal to a function that returns a empty stream
The initialContent
is used to create initial view state (can be null)
Implementation
factory LoaderBloc({
required Stream<Content> Function() loaderFunction,
Stream<Content> Function()? refresherFunction,
Content? initialContent,
void Function(String)? logger,
FlattenStrategy loaderFlattenStrategy =
FlattenStrategy.latest, // default is `switchMap`
FlattenStrategy refreshFlattenStrategy =
FlattenStrategy.first, // default is `exhaustMap`
}) {
refresherFunction ??= () => Stream<Content>.empty();
/// Controllers
final fetchS = StreamController<void>();
final refreshS = StreamController<Completer<void>>();
final messageS = PublishSubject<LoaderMessage<Content>>();
final controllers = <StreamController<dynamic>>[fetchS, refreshS, messageS];
/// Input actions to state
final fetchChanges = fetchS.stream.flatMapWithStrategy(
loaderFlattenStrategy,
(_) => Rx.defer(loaderFunction)
.doOnData(
(content) => messageS.add(LoaderMessage.fetchSuccess(content)))
.map<LoaderPartialStateChange<Content>>(
(content) => LoaderPartialStateChange.fetchSuccess(content))
.startWith(const LoaderPartialStateChange.fetchLoading())
.doOnError((e, s) => messageS.add(LoaderMessage.fetchFailure(e, s)))
.onErrorReturnWith(
(e, _) => LoaderPartialStateChange.fetchFailure(e)),
);
final refreshChanges = refreshS.stream.flatMapWithStrategy(
refreshFlattenStrategy,
(completer) => Rx.defer(refresherFunction!)
.doOnData(
(content) => messageS.add(LoaderMessage.refreshSuccess(content)))
.map<LoaderPartialStateChange<Content>>(
(content) => LoaderPartialStateChange.refreshSuccess(content))
.doOnError((e, s) => messageS.add(LoaderMessage.refreshFailure(e, s)))
.onErrorResumeNext(const Stream.empty())
.doOnCancel(() => completer.complete()),
);
final initialState = LoaderState.initial(content: initialContent);
final state$ = Rx.merge([fetchChanges, refreshChanges])
.let((stream) =>
logger?.let(
(l) => stream.doOnData((data) => l('$_tag change: $data'))) ??
stream)
.scan(reduce, initialState)
.publishState(initialState);
final subscriptions = [
state$.connect(),
...?logger?.let(
(l) => [
state$.listen((state) => l('$_tag state: $state')),
messageS.listen((message) => l('$_tag message: $message')),
],
),
];
late LoaderBloc<Content> bloc;
bloc = LoaderBloc._(
dispose: () => DisposeBag(
[...subscriptions, ...controllers],
'${bloc.runtimeType}#${bloc.hashCode.toUnsigned(20).toRadixString(16).padLeft(5, '0')}',
).dispose(),
state$: state$,
fetch: () => fetchS.add(null),
refresh: () {
final completer = Completer<void>.sync();
refreshS.add(completer);
return completer.future;
},
message$: messageS,
);
return bloc;
}