observe<T extends Object> method

  1. @nonVirtual
  2. @override
Stream<T?> observe<T extends Object>(
  1. Key key,
  2. Decoder<T?> decoder, [
  3. Options? options
])
override

Return Stream that will emit value read from persistent storage. It will automatic emit value when value associated with key was changed.

Implementation

@nonVirtual
@override
Stream<T?> observe<T extends Object>(Key key, Decoder<T?> decoder,
    [Options? options]) {
  assert(_debugAssertNotDisposed());

  FutureOr<T?> convert(KeyAndValue<Object, Object?> entry) =>
      identical(entry, _initialKeyValue)
          ? _useStorage((s) => s.read<T>(key, decoder, options))
          : entry.value as FutureOr<T?>;

  final stream = _keyValuesSubject
      .toSingleSubscriptionStream()
      .mapNotNull<KeyAndValue<Object, Object?>>((map) => map[key])
      .startWith(_initialKeyValue) // Dummy value to trigger initial load.
      .asyncMap<T?>(convert);

  return _isLogEnabled
      ? stream
          .doOnData((value) =>
              _publishLog(OnDataStreamEvent(KeyAndValue(key, value, T))))
          .doOnError(
              (e, s) => _publishLog(OnErrorStreamEvent(RxStorageError(e, s))))
      : stream;
}