onValueChange method
Implementation
Stream<ValueUpdate> onValueChange(String path, {int cacheLevel = 1}) {
RespSubscribeListener? listener;
StreamController<ValueUpdate>? controller;
var subs = 0;
controller = StreamController<ValueUpdate>.broadcast(
onListen: () {
subs++;
listener ??= this[path]!.subscribe((ValueUpdate update) {
controller?.add(update);
}, cacheLevel);
},
onCancel: () {
subs--;
if (subs == 0) {
listener!.cancel();
listener = null;
}
},
);
return controller.stream;
}