observeResources method

Future<Stream<List<ResourceResultDoc>>> observeResources({
  1. required String operator,
  2. required String expression,
  3. required String collection,
  4. Map<String, Value> variables = const {},
  5. Int64? startingFrom,
})

Implementation

Future<Stream<List<ResourceResultDoc>>> observeResources({
  required String operator,
  required String expression,
  required String collection,
  Map<String, Value> variables = const {},
  Int64? startingFrom,
}) async {
  final exp = Exp()..exp = expression;
  exp.vars.addAll(variables.map(MapEntry.new));
  final request = ObserveResourcesRequest()
    ..collection = collection
    ..expression = exp;

  if (startingFrom != null) {
    request.startingFrom = TxId()..txId = startingFrom;
  }

  final envelop = await requestEnvelope(request: request);
  final stream = getServiceClient(operator).query.observeResources(envelop);
  return stream.map(
    (transactions) =>
        transactions.transactions.map(ResourceResultDoc.fromModel).toList(),
  );
}