wdReadWithStream method

Future<void> wdReadWithStream(
  1. Client self,
  2. String path,
  3. String savePath, {
  4. void onProgress(
    1. int count,
    2. int total
    )?,
  5. CancelToken? cancelToken,
})

read a file with stream

Implementation

Future<void> wdReadWithStream(
  Client self,
  String path,
  String savePath, {
  void Function(int count, int total)? onProgress,
  CancelToken? cancelToken,
}) async {
  // fix auth error
  var pResp = await this.wdOptions(self, path, cancelToken: cancelToken);
  if (pResp.statusCode != 200) {
    throw newResponseError(pResp);
  }

  Response<ResponseBody> resp;

  // Reference Dio download
  // request
  try {
    resp = await this.req(
      self,
      'GET',
      path,
      optionsHandler: (options) => options.responseType = ResponseType.stream,
      // onReceiveProgress: onProgress,
      cancelToken: cancelToken,
    );
  } on DioError catch (e) {
    if (e.type == DioErrorType.badResponse) {
      if (e.response!.requestOptions.receiveDataWhenStatusError == true) {
        var res = await transformer.transformResponse(
          e.response!.requestOptions..responseType = ResponseType.json,
          e.response!.data as ResponseBody,
        );
        e.response!.data = res;
      } else {
        e.response!.data = null;
      }
    }
    rethrow;
  }
  if (resp.statusCode != 200) {
    throw newResponseError(resp);
  }

  resp.headers = Headers.fromMap(resp.data!.headers);

  //If directory (or file) doesn't exist yet, the entire method fails
  File file = File(savePath);
  file.createSync(recursive: true);

  var raf = file.openSync(mode: FileMode.write);

  //Create a Completer to notify the success/error state.
  var completer = Completer<Response>();
  var future = completer.future;
  var received = 0;

  // Stream<Uint8List>
  var stream = resp.data!.stream;
  var compressed = false;
  var total = 0;
  var contentEncoding = resp.headers.value(Headers.contentEncodingHeader);
  if (contentEncoding != null) {
    compressed = ['gzip', 'deflate', 'compress'].contains(contentEncoding);
  }
  if (compressed) {
    total = -1;
  } else {
    total =
        int.parse(resp.headers.value(Headers.contentLengthHeader) ?? '-1');
  }

  late StreamSubscription subscription;
  Future? asyncWrite;
  var closed = false;
  Future _closeAndDelete() async {
    if (!closed) {
      closed = true;
      await asyncWrite;
      await raf.close();
      await file.delete();
    }
  }

  subscription = stream.listen(
    (data) {
      subscription.pause();
      // Write file asynchronously
      asyncWrite = raf.writeFrom(data).then((_raf) {
        // Notify progress
        received += data.length;

        onProgress?.call(received, total);

        raf = _raf;
        if (cancelToken == null || !cancelToken.isCancelled) {
          subscription.resume();
        }
      }).catchError((err) async {
        try {
          await subscription.cancel();
        } finally {
          completer.completeError(DioError(
            requestOptions: resp.requestOptions,
            error: err,
          ));
        }
      });
    },
    onDone: () async {
      try {
        await asyncWrite;
        closed = true;
        await raf.close();
        completer.complete(resp);
      } catch (err) {
        completer.completeError(DioError(
          requestOptions: resp.requestOptions,
          error: err,
        ));
      }
    },
    onError: (e) async {
      try {
        await _closeAndDelete();
      } finally {
        completer.completeError(DioError(
          requestOptions: resp.requestOptions,
          error: e,
        ));
      }
    },
    cancelOnError: true,
  );

  // ignore: unawaited_futures
  cancelToken?.whenCancel.then((_) async {
    await subscription.cancel();
    await _closeAndDelete();
  });

  if (resp.requestOptions.receiveTimeout != null &&
      resp.requestOptions.receiveTimeout!
              .compareTo(Duration(milliseconds: 0)) >
          0) {
    future = future
        .timeout(resp.requestOptions.receiveTimeout!)
        .catchError((Object err) async {
      await subscription.cancel();
      await _closeAndDelete();
      if (err is TimeoutException) {
        throw DioError(
          requestOptions: resp.requestOptions,
          error:
              'Receiving data timeout[${resp.requestOptions.receiveTimeout}ms]',
          type: DioErrorType.receiveTimeout,
        );
      } else {
        throw err;
      }
    });
  }
  await DioMixin.listenCancelForAsyncTask(cancelToken, future);
}