subscribe<TModel extends RepositoryModel> method

Stream<List<TModel>> subscribe<TModel extends RepositoryModel>({
  1. OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
  2. Query? query,
})

Listen for streaming changes when the sqliteProvider is upserted. This method utilizes remoteProvider's get.

get is invoked on the memoryCacheProvider and sqliteProvider following an upsert invocation. For more, see notifySubscriptionsWithLocalData.

policy is only applied to the initial population of the stream. Only local data is supplied on subsequent events to notifySubscriptionsWithLocalData.

It is strongly recommended that this invocation be immediately .listened assigned with the assignment/subscription .cancel()'d as soon as the data is no longer needed. The stream will not close naturally.

Implementation

Stream<List<TModel>> subscribe<TModel extends RepositoryModel>({
  OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
  Query? query,
}) {
  query ??= Query();
  if (subscriptions[TModel]?[query] != null) {
    return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
  }

  final controller = StreamController<List<TModel>>(
    onCancel: () async {
      await subscriptions[TModel]?[query]?.close();
      subscriptions[TModel]?.remove(query);
      if (subscriptions[TModel]?.isEmpty ?? false) {
        subscriptions.remove(TModel);
      }
    },
  );

  subscriptions[TModel] ??= {};
  subscriptions[TModel]?[query] = controller;

  // ignore: discarded_futures
  get<TModel>(query: query, policy: policy).then((results) {
    if (!controller.isClosed) controller.add(results);
  });

  return controller.stream;
}