onValueChange method

Stream<ValueUpdate> onValueChange(
  1. String path, {
  2. int cacheLevel = 1,
})

Retrieves a Broadcast Stream which subscribes to path with the specified cacheLevel. The node is only subscribed if there is at least one stream subscription. When the stream subscription count goes to 0, the node is unsubscribed from.

Implementation

Stream<ValueUpdate> onValueChange(String path, {int cacheLevel = 1}) {
  RespSubscribeListener? listener;
  late StreamController<ValueUpdate> controller;
  var subs = 0;
  controller = StreamController<ValueUpdate>.broadcast(
    onListen: () {
      subs++;
      listener ??= this[path]?.subscribe((ValueUpdate update) {
        controller.add(update);
      }, cacheLevel);
    },
    onCancel: () {
      subs--;
      if (subs == 0) {
        listener?.cancel();
        listener = null;
      }
    },
  );
  return controller.stream;
}