streamDocument<T> method

Stream<NimbostratusDocumentSnapshot<T?>> streamDocument<T>(
  1. DocumentReference<T> ref, {
  2. StreamFetchPolicy fetchPolicy = StreamFetchPolicy.serverFirst,
  3. NimbostratusFromFirestore<T>? fromFirestore,
})

Streams changes to a Firestore document from the in-memory cache or server according to the specified StreamFetchPolicy.

Implementation

Stream<NimbostratusDocumentSnapshot<T?>> streamDocument<T>(
  DocumentReference<T> ref, {
  StreamFetchPolicy fetchPolicy = StreamFetchPolicy.serverFirst,
  NimbostratusFromFirestore<T>? fromFirestore,
}) {
  switch (fetchPolicy) {
    case StreamFetchPolicy.serverFirst:
    case StreamFetchPolicy.cacheFirst:
    case StreamFetchPolicy.cacheOnly:
      return Stream.fromFuture(
        getDocument(
          ref,
          fetchPolicy: convertStreamFetchPolicyToGetFetchPolicy(fetchPolicy),
          fromFirestore: fromFirestore,
        ),
      ).switchMap((_) {
        return _documents[ref.path]!
            .nonNullStream
            .cast<NimbostratusDocumentSnapshot<T?>>();
      });
    case StreamFetchPolicy.cacheAndServerFirst:
      return streamDocument(
        ref,
        fetchPolicy: StreamFetchPolicy.serverOnly,
        fromFirestore: fromFirestore,
      ).switchMap((_) {
        return streamDocument(ref, fetchPolicy: StreamFetchPolicy.cacheOnly);
      });
    // A server and cache policy will read the data from both the server and cache simultaneously.
    // If a value is present in the cache first, it will deliver that data eagerly. It then listens
    // to subsequent cache and server updates.
    case StreamFetchPolicy.cacheAndServer:
      return MergeStream([
        // While waiting for the server data, keep returning changes to the cached data.
        streamDocument(ref, fetchPolicy: StreamFetchPolicy.cacheOnly),
        streamDocument(
          ref,
          fetchPolicy: StreamFetchPolicy.serverOnly,
          fromFirestore: fromFirestore,
        )
        // The cache vs server streams can emit a duplicate event transitioning between them. Distinct the results
        // to remove that duplicate event.
      ]).distinct();
    case StreamFetchPolicy.cacheAndServerOnce:
      final serverStream = streamDocument(
        ref,
        fetchPolicy: StreamFetchPolicy.serverFirst,
        fromFirestore: fromFirestore,
      ).asBroadcastStream();
      return MergeStream([
        streamDocument(ref, fetchPolicy: StreamFetchPolicy.cacheOnly)
            .takeUntil(serverStream),
        serverStream,
      ]).distinct();
    case StreamFetchPolicy.serverOnly:
      return ref.serverSnapshots().switchMap((snap) {
        return Stream.value(
          _updateFromSnap(snap, fromFirestore: fromFirestore),
        );
      });
  }
}