wdReadWithStream method
read a file with stream
Implementation
Future<void> wdReadWithStream(
Client self,
String path,
String savePath, {
void Function(int count, int total)? onProgress,
CancelToken? cancelToken,
}) async {
// fix auth error
var pResp = await this.wdOptions(self, path, cancelToken: cancelToken);
if (pResp.statusCode != 200) {
throw newResponseError(pResp);
}
Response<ResponseBody> resp;
// Reference Dio download
// request
try {
resp = await this.req(
self,
'GET',
path,
optionsHandler: (options) => options.responseType = ResponseType.stream,
// onReceiveProgress: onProgress,
cancelToken: cancelToken,
);
} on DioError catch (e) {
if (e.type == DioErrorType.badResponse) {
if (e.response!.requestOptions.receiveDataWhenStatusError == true) {
var res = await transformer.transformResponse(
e.response!.requestOptions..responseType = ResponseType.json,
e.response!.data as ResponseBody,
);
e.response!.data = res;
} else {
e.response!.data = null;
}
}
rethrow;
}
if (resp.statusCode != 200) {
throw newResponseError(resp);
}
resp.headers = Headers.fromMap(resp.data!.headers);
//If directory (or file) doesn't exist yet, the entire method fails
File file = File(savePath);
file.createSync(recursive: true);
var raf = file.openSync(mode: FileMode.write);
//Create a Completer to notify the success/error state.
var completer = Completer<Response>();
var future = completer.future;
var received = 0;
// Stream<Uint8List>
var stream = resp.data!.stream;
var compressed = false;
var total = 0;
var contentEncoding = resp.headers.value(Headers.contentEncodingHeader);
if (contentEncoding != null) {
compressed = ['gzip', 'deflate', 'compress'].contains(contentEncoding);
}
if (compressed) {
total = -1;
} else {
total =
int.parse(resp.headers.value(Headers.contentLengthHeader) ?? '-1');
}
late StreamSubscription subscription;
Future? asyncWrite;
var closed = false;
Future _closeAndDelete() async {
if (!closed) {
closed = true;
await asyncWrite;
await raf.close();
await file.delete();
}
}
subscription = stream.listen(
(data) {
subscription.pause();
// Write file asynchronously
asyncWrite = raf.writeFrom(data).then((_raf) {
// Notify progress
received += data.length;
onProgress?.call(received, total);
raf = _raf;
if (cancelToken == null || !cancelToken.isCancelled) {
subscription.resume();
}
}).catchError((err) async {
try {
await subscription.cancel();
} finally {
completer.completeError(DioError(
requestOptions: resp.requestOptions,
error: err,
));
}
});
},
onDone: () async {
try {
await asyncWrite;
closed = true;
await raf.close();
completer.complete(resp);
} catch (err) {
completer.completeError(DioError(
requestOptions: resp.requestOptions,
error: err,
));
}
},
onError: (e) async {
try {
await _closeAndDelete();
} finally {
completer.completeError(DioError(
requestOptions: resp.requestOptions,
error: e,
));
}
},
cancelOnError: true,
);
// ignore: unawaited_futures
cancelToken?.whenCancel.then((_) async {
await subscription.cancel();
await _closeAndDelete();
});
if (resp.requestOptions.receiveTimeout != null &&
resp.requestOptions.receiveTimeout!
.compareTo(Duration(milliseconds: 0)) >
0) {
future = future
.timeout(resp.requestOptions.receiveTimeout!)
.catchError((Object err) async {
await subscription.cancel();
await _closeAndDelete();
if (err is TimeoutException) {
throw DioError(
requestOptions: resp.requestOptions,
error:
'Receiving data timeout[${resp.requestOptions.receiveTimeout}ms]',
type: DioErrorType.receiveTimeout,
);
} else {
throw err;
}
});
}
await DioMixin.listenCancelForAsyncTask(cancelToken, future);
}