observe<T extends Model> method

  1. @override
Stream<SubscriptionEvent<T>> observe<T extends Model>(
  1. ModelType<T> modelType, {
  2. QueryPredicate<Model>? where,
})
override

Implementation

@override
Stream<SubscriptionEvent<T>> observe<T extends Model>(ModelType<T> modelType,
    {QueryPredicate? where}) async* {
  await _setUpObserveIfNeeded();

  // Step #1. Open the event channel if it's not already open. Note
  // that there is only one event channel for all observe calls for all models
  const _eventChannel =
      EventChannel('com.amazonaws.amplify/datastore_observe_events');
  _allModelsStreamFromMethodChannel = _allModelsStreamFromMethodChannel ??
      _eventChannel.receiveBroadcastStream(0);

  // Step #2. Apply client side filtering on the stream.
  // Currently only modelType filtering is supported.
  Stream<dynamic> filteredStream =
      _allModelsStreamFromMethodChannel.where((event) {
    //TODO: errors are not model specific. Should we pass all errors to users
    return _getModelNameFromEvent(event) == modelType.modelName();
  });

  // Step #3. Deserialize events and return new broadcast stream
  yield* filteredStream
      .map((event) => SubscriptionEvent.fromMap(event, modelType))
      .where((event) => where == null || where.evaluate(event.item))
      .asBroadcastStream()
      .cast<SubscriptionEvent<T>>();
}