subscribe<TParsed> method
Implementation
Stream<QueryResult<TParsed>> subscribe<TParsed>(
SubscriptionOptions<TParsed> options) async* {
assert(
options.fetchPolicy != FetchPolicy.cacheOnly,
"Cannot subscribe with FetchPolicy.cacheOnly: $options",
);
final request = options.asRequest;
// Add optimistic or cache-based result to the stream if any
if (options.optimisticResult != null) {
// TODO optimisticResults for streams just skip the cache for now
yield QueryResult.optimistic(
data: options.optimisticResult as Map<String, dynamic>?,
options: options,
);
} else if (shouldRespondEagerlyFromCache(options.fetchPolicy)) {
final cacheResult = cache.readQuery(
request,
optimistic: options.policies.mergeOptimisticData,
);
if (cacheResult != null) {
yield QueryResult(
options: options,
source: QueryResultSource.cache,
data: cacheResult,
);
}
}
try {
yield* link
.request(request)
.map((response) {
QueryResult<TParsed>? queryResult;
bool rereadFromCache = false;
try {
queryResult = mapFetchResultToQueryResult(
response,
options,
source: QueryResultSource.network,
);
rereadFromCache = attemptCacheWriteFromResponse(
options.policies,
request,
response,
queryResult,
);
} catch (failure, trace) {
// we set the source to indicate where the source of failure
queryResult ??= QueryResult(
options: options,
source: QueryResultSource.network,
);
queryResult.exception = coalesceErrors(
exception: queryResult.exception,
linkException: translateFailure(failure, trace),
);
}
if (rereadFromCache) {
// normalize results if previously written
attempCacheRereadIntoResult(request, queryResult);
}
return queryResult;
})
.transform<QueryResult<TParsed>>(StreamTransformer.fromHandlers(
handleError: (err, trace, sink) => sink.add(_wrapFailure(
options,
err,
trace,
)),
))
.map((QueryResult<TParsed> queryResult) {
maybeRebroadcastQueries();
return queryResult;
});
} catch (ex, trace) {
yield* Stream.fromIterable([
_wrapFailure(
options,
ex,
trace,
)
]);
}
}