queryStream<T> method

Stream<T?> queryStream<T>(
  1. String sql, {
  2. List<Object>? arguments,
  3. required String queryableName,
  4. required bool isView,
  5. required T mapper(
    1. Map<String, Object?>
    ),
})

Executes a SQLite query that returns a stream of single query results or null.

Implementation

Stream<T?> queryStream<T>(
  final String sql, {
  final List<Object>? arguments,
  required final String queryableName,
  required final bool isView,
  required final T Function(Map<String, Object?>) mapper,
}) {
  // ignore: close_sinks
  final changeListener = ArgumentError.checkNotNull(_changeListener);
  final controller = StreamController<T?>.broadcast();

  Future<void> executeQueryAndNotifyController() async {
    final result = await query(sql, arguments: arguments, mapper: mapper);
    controller.add(result);
  }

  controller.onListen = () async => executeQueryAndNotifyController();

  // listen on all updates if the stream is on a view, only listen to the
  // name of the table if the stream is on a entity.
  final subscription = changeListener.stream
      .where((updatedTable) =>
          isView || updatedTable.equals(queryableName, ignoreCase: true))
      .listen(
        (_) async => executeQueryAndNotifyController(),
        onDone: () => controller.close(),
      );

  controller.onCancel = () => subscription.cancel();

  return controller.stream;
}