onValueChange method

Stream<ValueUpdate> onValueChange(
  1. String path, {
  2. int cacheLevel = 1,
})

Implementation

Stream<ValueUpdate> onValueChange(String path, {int cacheLevel = 1}) {
  RespSubscribeListener? listener;
  StreamController<ValueUpdate>? controller;
  var subs = 0;
  controller = StreamController<ValueUpdate>.broadcast(
    onListen: () {
      subs++;
      listener ??= this[path]!.subscribe((ValueUpdate update) {
        controller?.add(update);
      }, cacheLevel);
    },
    onCancel: () {
      subs--;
      if (subs == 0) {
        listener!.cancel();
        listener = null;
      }
    },
  );
  return controller.stream;
}