fetch method

Future<Map<String, FetchPoolResult>> fetch(
  1. {dynamic progressCallback(
    1. double
    )?}
)

Starts fetching the list of URLs

Returns a Future with the map of results, keyed by each URL. This method is only allowed to be called once on a FetchPool instance. If you need to repeat a fetch, you need to create a fresh instance.

A progressCallback function can optionally be passed in. It will be called repeatedly to report on the overall estimated progress. The progress is not calculated by the overall combined download size of all URLs (since that would require a roundtrip to the server for each URL before even beginning the download). Instead, the progress mainly reports the percentage of completed downloads plus the download percentage of active downloads.

Implementation

Future<Map<String, FetchPoolResult>> fetch(
    {Function(double)? progressCallback}) async {
  if (_hasFetchBeenRun) {
    throw StateError(
        'It is illegal to run fetch more than once on the same instance.');
  }

  _hasFetchBeenRun = true;

  var pool = Pool(maxConcurrent);
  final uniqueUrls = urls.toSet().toList();
  final Map<String, double> activeJobs = {};
  var completedJobCount = 0;
  double lastTotalProgress = 0;

  void notifyProgressCallback() {
    if (progressCallback != null) {
      final combinedActivePercentage =
          activeJobs.values.fold<double>(0, (sum, percent) => sum + percent);
      final activeJobFraction = activeJobs.isNotEmpty
          ? (activeJobs.length *
              (combinedActivePercentage / (activeJobs.length * 100)))
          : 0;
      final completedJobFraction = completedJobCount + activeJobFraction;
      final totalProgress = (completedJobFraction / uniqueUrls.length) * 100;

      if (totalProgress != lastTotalProgress) {
        progressCallback(totalProgress);
        lastTotalProgress = totalProgress;
      }
    }
  }

  var poolStream =
      pool.forEach<String, FetchPoolResult>(uniqueUrls, (urlString) async {
    final url = Uri.parse(urlString);
    final String filename = filenameFromUrl(urlString, fileNamingStrategy);
    final String destinationPath = p.join(destinationDirectory, filename);
    final File destinationFile = File(destinationPath);
    final bool destinationFileExists = await destinationFile.exists();
    FetchPoolResult result;

    double calculateProgress(int downloadedByteCount, int? contentLength) {
      if (contentLength != null && contentLength > 0) {
        return (downloadedByteCount / contentLength) * 100;
      }

      return 0;
    }

    activeJobs[urlString] = 0;

    if (fileOverwritingStrategy == FetchPoolFileOverwritingStrategy.skip &&
        destinationFileExists) {
      result = FetchPoolResult(
          url: urlString,
          localPath: destinationPath,
          persistenceResult: FetchPoolFilePersistenceResult.skipped);
    } else {
      final request = http.Request('GET', url);
      final http.StreamedResponse response = await client.send(request);

      if (response.statusCode == HttpStatus.ok) {
        await destinationFile.create(recursive: true);

        IOSink destinationFileWriteStream = destinationFile.openWrite();
        int downloadedByteCount = 0;

        await response.stream.listen((List<int> chunk) {
          // Display percentage of completion
          final percentage =
              calculateProgress(downloadedByteCount, response.contentLength);
          activeJobs[urlString] = percentage;
          notifyProgressCallback();

          destinationFileWriteStream.add(chunk);
          downloadedByteCount += chunk.length;
        }).asFuture();

        // Display percentage of completion
        final percentage =
            calculateProgress(downloadedByteCount, response.contentLength);
        activeJobs[urlString] = percentage;
        notifyProgressCallback();

        // Close the stream
        await destinationFileWriteStream.close();

        result = FetchPoolResult(
            url: urlString,
            localPath: destinationPath,
            persistenceResult: destinationFileExists
                ? FetchPoolFilePersistenceResult.overwritten
                : FetchPoolFilePersistenceResult.saved);
      } else {
        result = FetchPoolResult(
            url: urlString, error: 'Status ${response.statusCode}');
      }
    }

    activeJobs.remove(urlString);
    completedJobCount += 1;

    notifyProgressCallback();

    return result;
  }, onError: (urlString, error, stackTrace) {
    resultsByUrl[urlString] = FetchPoolResult(url: urlString, error: error);

    activeJobs.remove(urlString);
    completedJobCount += 1;

    notifyProgressCallback();

    /// Do not return the error to the output stream
    return false;
  });

  await for (var result in poolStream) {
    resultsByUrl[result.url] = result;
  }

  return resultsByUrl;
}