watch method

  1. @override
Stream<ResultSet> watch(
  1. String sql, {
  2. List<Object?> parameters = const [],
  3. Duration throttle = const Duration(milliseconds: 30),
  4. Iterable<String>? triggerOnTables,
})
override

Execute a read query every time the source tables are modified.

Use throttle to specify the minimum interval between queries.

Source tables are automatically detected using EXPLAIN QUERY PLAN.

Implementation

@override
Stream<sqlite.ResultSet> watch(String sql,
    {List<Object?> parameters = const [],
    Duration throttle = const Duration(milliseconds: 30),
    Iterable<String>? triggerOnTables}) async* {
  assert(updates != null,
      'updates stream must be provided to allow query watching');
  final tables =
      triggerOnTables ?? await getSourceTables(this, sql, parameters);
  final filteredStream =
      updates!.transform(UpdateNotification.filterTablesTransformer(tables));
  final throttledStream = UpdateNotification.throttleStream(
      filteredStream, throttle,
      addOne: UpdateNotification.empty());

  // FIXME:
  // When the subscription is cancelled, this performs a final query on the next
  // update.
  // The loop only stops once the "yield" is reached.
  // Using asyncMap instead of a generator would solve it, but then the body
  // here can't be async for getSourceTables().
  await for (var _ in throttledStream) {
    yield await getAll(sql, parameters);
  }
}