listen method
Adds a subscription to this MapStream's stream.
Optional parameter keys
can be passed so only updates to specified
keys are sent down stream.
Return StreamSubscription can be used to close the stream.
Implementation
StreamSubscription<MapUpdate<K, V?>> listen(
void Function(MapUpdate mapUpdate) onUpdate, [
List? keys,
]) {
if (isClosed) {
throw StateError('Cannot listen to state stream because it is closed');
}
final subscribedStreamController = StreamController<MapUpdate<K, V?>>();
final changeStreamSubscription = _changeStream.listen((mapUpdate) {
final updatedMap = mapUpdate.updatedMap;
if (
// If [keys] is null then updates are always sent.
keys == null ||
// The only time an empty update is sent is during resendAll.
updatedMap.isEmpty ||
// Otherwise only sends update if it contains a listened for key.
updatedMap.keys.any((k) => keys.contains(k))) {
subscribedStreamController.add(mapUpdate);
}
});
changeStreamSubscription.onDone(subscribedStreamController.close);
subscribedStreamController.onCancel = () {
changeStreamSubscription.cancel();
};
return subscribedStreamController.stream.listen(onUpdate);
}