observe<T extends Object> method
- @nonVirtual
- @override
- Key key,
- Decoder<
T?> decoder, [ - Options? options
override
Return Stream that will emit value read from persistent storage. It will automatic emit value when value associated with key was changed.
Implementation
@nonVirtual
@override
Stream<T?> observe<T extends Object>(Key key, Decoder<T?> decoder,
[Options? options]) {
assert(_debugAssertNotDisposed());
FutureOr<T?> convert(KeyAndValue<Object, Object?> entry) =>
identical(entry, _initialKeyValue)
? _useStorage((s) => s.read<T>(key, decoder, options))
: entry.value as FutureOr<T?>;
final stream = _keyValuesSubject
.toSingleSubscriptionStream()
.mapNotNull<KeyAndValue<Object, Object?>>((map) => map[key])
.startWith(_initialKeyValue) // Dummy value to trigger initial load.
.asyncMap<T?>(convert);
return _isLogEnabled
? stream
.doOnData((value) =>
_publishLog(OnDataStreamEvent(KeyAndValue(key, value, T))))
.doOnError(
(e, s) => _publishLog(OnErrorStreamEvent(RxStorageError(e, s))))
: stream;
}