listen method

StreamSubscription<MapUpdate<K, V?>> listen(
  1. void onUpdate(
    1. MapUpdate mapUpdate
    ), [
  2. List? keys
])

Adds a subscription to this MapStream's stream.

Optional parameter keys can be passed so only updates to specified keys are sent down stream.

Return StreamSubscription can be used to close the stream.

Implementation

StreamSubscription<MapUpdate<K, V?>> listen(
  void Function(MapUpdate mapUpdate) onUpdate, [
  List? keys,
]) {
  if (isClosed) {
    throw StateError('Cannot listen to state stream because it is closed');
  }

  final subscribedStreamController = StreamController<MapUpdate<K, V?>>();

  final changeStreamSubscription = _changeStream.listen((mapUpdate) {
    final updatedMap = mapUpdate.updatedMap;

    if (
        // If [keys] is null then updates are always sent.
        keys == null ||
            // The only time an empty update is sent is during resendAll.
            updatedMap.isEmpty ||
            // Otherwise only sends update if it contains a listened for key.
            updatedMap.keys.any((k) => keys.contains(k))) {
      subscribedStreamController.add(mapUpdate);
    }
  });

  changeStreamSubscription.onDone(subscribedStreamController.close);

  subscribedStreamController.onCancel = () {
    changeStreamSubscription.cancel();
  };

  return subscribedStreamController.stream.listen(onUpdate);
}