subscribe<TParsed> method

Stream<QueryResult<TParsed>> subscribe<TParsed>(
  1. SubscriptionOptions<TParsed> options
)

Implementation

Stream<QueryResult<TParsed>> subscribe<TParsed>(
    SubscriptionOptions<TParsed> options) async* {
  assert(
    options.fetchPolicy != FetchPolicy.cacheOnly,
    "Cannot subscribe with FetchPolicy.cacheOnly: $options",
  );
  final request = options.asRequest;

  // Add optimistic or cache-based result to the stream if any
  if (options.optimisticResult != null) {
    // TODO optimisticResults for streams just skip the cache for now
    yield QueryResult.optimistic(
      data: options.optimisticResult as Map<String, dynamic>?,
      options: options,
    );
  } else if (shouldRespondEagerlyFromCache(options.fetchPolicy)) {
    final cacheResult = cache.readQuery(
      request,
      optimistic: options.policies.mergeOptimisticData,
    );
    if (cacheResult != null) {
      yield QueryResult(
        options: options,
        source: QueryResultSource.cache,
        data: cacheResult,
      );
    }
  }

  try {
    yield* link
        .request(request)
        .map((response) {
          QueryResult<TParsed>? queryResult;
          bool rereadFromCache = false;
          try {
            queryResult = mapFetchResultToQueryResult(
              response,
              options,
              source: QueryResultSource.network,
            );

            rereadFromCache = attemptCacheWriteFromResponse(
              options.policies,
              request,
              response,
              queryResult,
            );
          } catch (failure, trace) {
            // we set the source to indicate where the source of failure
            queryResult ??= QueryResult(
              options: options,
              source: QueryResultSource.network,
            );

            queryResult.exception = coalesceErrors(
              exception: queryResult.exception,
              linkException: translateFailure(failure, trace),
            );
          }

          if (rereadFromCache) {
            // normalize results if previously written
            attempCacheRereadIntoResult(request, queryResult);
          }

          return queryResult;
        })
        .transform<QueryResult<TParsed>>(StreamTransformer.fromHandlers(
          handleError: (err, trace, sink) => sink.add(_wrapFailure(
            options,
            err,
            trace,
          )),
        ))
        .map((QueryResult<TParsed> queryResult) {
          maybeRebroadcastQueries();
          return queryResult;
        });
  } catch (ex, trace) {
    yield* Stream.fromIterable([
      _wrapFailure(
        options,
        ex,
        trace,
      )
    ]);
  }
}