observe<T extends Model> method
Stream<SubscriptionEvent<T> >
observe<T extends Model>(
- ModelType<
T> modelType, { - QueryPredicate<
Model> ? where,
override
Implementation
@override
Stream<SubscriptionEvent<T>> observe<T extends Model>(ModelType<T> modelType,
{QueryPredicate? where}) async* {
await _setUpObserveIfNeeded();
// Step #1. Open the event channel if it's not already open. Note
// that there is only one event channel for all observe calls for all models
const _eventChannel =
EventChannel('com.amazonaws.amplify/datastore_observe_events');
_allModelsStreamFromMethodChannel = _allModelsStreamFromMethodChannel ??
_eventChannel.receiveBroadcastStream(0);
// Step #2. Apply client side filtering on the stream.
// Currently only modelType filtering is supported.
Stream<dynamic> filteredStream =
_allModelsStreamFromMethodChannel.where((event) {
//TODO: errors are not model specific. Should we pass all errors to users
return _getModelNameFromEvent(event) == modelType.modelName();
});
// Step #3. Deserialize events and return new broadcast stream
yield* filteredStream
.map((event) => SubscriptionEvent.fromMap(event, modelType))
.where((event) => where == null || where.evaluate(event.item))
.asBroadcastStream()
.cast<SubscriptionEvent<T>>();
}