streamDocuments<T> method

Stream<List<NimbostratusDocumentSnapshot<T?>>> streamDocuments<T>(
  1. Query<T> docQuery, {
  2. StreamFetchPolicy fetchPolicy = StreamFetchPolicy.serverFirst,
  3. NimbostratusFromFirestore<T>? fromFirestore,
})

Streams changes to the specified Firestore Query for documents against the in-memory cache or server according to the specified StreamFetchPolicy.

Implementation

Stream<List<NimbostratusDocumentSnapshot<T?>>> streamDocuments<T>(
  Query<T> docQuery, {
  StreamFetchPolicy fetchPolicy = StreamFetchPolicy.serverFirst,
  NimbostratusFromFirestore<T>? fromFirestore,
}) {
  switch (fetchPolicy) {
    case StreamFetchPolicy.serverFirst:
    case StreamFetchPolicy.cacheOnly:
    case StreamFetchPolicy.cacheFirst:
      return Stream.fromFuture(
        getDocuments(
          docQuery,
          fetchPolicy: convertStreamFetchPolicyToGetFetchPolicy(fetchPolicy),
          fromFirestore: fromFirestore,
        ),
      ).switchMap((snapshots) {
        if (snapshots.isEmpty) {
          return Stream.value([]);
        }

        final streams = snapshots.map((snapshot) => snapshot.stream).toList();
        return CombineLatestStream.list(streams);
      });
    case StreamFetchPolicy.cacheAndServerFirst:
      return streamDocuments(
        docQuery,
        fetchPolicy: StreamFetchPolicy.serverOnly,
        fromFirestore: fromFirestore,
      ).switchMap((snapshots) {
        if (snapshots.isEmpty) {
          return Stream.value([]);
        }

        final streams = snapshots.map((snapshot) => snapshot.stream).toList();
        return CombineLatestStream.list(streams);
      });
    case StreamFetchPolicy.cacheAndServer:
      final serverStream = docQuery.serverSnapshots();
      return MergeStream([
        streamDocuments(
          docQuery,
          fetchPolicy: StreamFetchPolicy.cacheOnly,
        ).takeUntil(serverStream),
        serverStream.switchMap((snap) {
          final docSnaps = snap.docs;

          if (docSnaps.isEmpty) {
            return Stream.value([]);
          }

          final streams = docSnaps
              .map(
                (docSnap) =>
                    _updateFromSnap(docSnap, fromFirestore: fromFirestore)
                        .stream,
              )
              .toList();
          return CombineLatestStream.list(streams);
        }),
      ]);
    case StreamFetchPolicy.cacheAndServerOnce:
      final serverStream = streamDocuments(
        docQuery,
        fetchPolicy: StreamFetchPolicy.serverFirst,
        fromFirestore: fromFirestore,
      ).asBroadcastStream();
      return MergeStream([
        streamDocuments(
          docQuery,
          fetchPolicy: StreamFetchPolicy.cacheOnly,
        ).takeUntil(serverStream),
        serverStream,
      ]);
    case StreamFetchPolicy.serverOnly:
      return docQuery.serverSnapshots().switchMap((snap) {
        final docSnaps = snap.docs;

        final snaps = docSnaps.map((docSnap) {
          return _updateFromSnap(docSnap, fromFirestore: fromFirestore);
        }).toList();

        return Stream.value(snaps);
      });
  }
}