openRead method
Implementation
@override
FileReadSession<Uint8List> openRead(
String path, {
int chunkSize = 64 * 1024,
int start = 0,
int? end,
}) {
if (path.isEmpty) {
throw ArgumentError.value(path, 'path', 'path must not be empty');
}
if (chunkSize <= 0) {
throw ArgumentError.value(
chunkSize,
'chunkSize',
'chunkSize must be greater than zero',
);
}
if (start < 0) {
throw ArgumentError.value(start, 'start', 'start must not be negative');
}
if (end != null && end < start) {
throw ArgumentError.value(
end,
'end',
'end must be greater than or equal to start',
);
}
if (!_forceNativeRead &&
(Platform.isMacOS || Platform.isWindows || Platform.isLinux)) {
return _openDesktopRead(
path,
chunkSize: chunkSize,
start: start,
end: end,
);
}
StreamSubscription<dynamic>? subscription;
String? streamId;
Future<String?>? startReadFuture;
bool cancelled = false;
bool closed = false;
Future<void>? cancelFuture;
Future<void>? nativeCancelFuture;
late final StreamController<Uint8List> controller;
late final Future<void> Function() cancelOnce;
Future<void> cancelStartedRead() async {
final activeStreamId = streamId;
if (activeStreamId != null && activeStreamId.isNotEmpty) {
nativeCancelFuture ??= _cancelRead(activeStreamId);
await nativeCancelFuture;
return;
}
final pendingStartRead = startReadFuture;
if (pendingStartRead == null) {
return;
}
try {
final pendingStreamId = await pendingStartRead;
if (pendingStreamId != null && pendingStreamId.isNotEmpty) {
nativeCancelFuture ??= _cancelRead(pendingStreamId);
await nativeCancelFuture;
}
} on Object {
// If startRead itself failed, there is no native stream to cancel.
}
}
controller = StreamController<Uint8List>(
onListen: () async {
try {
startReadFuture = methodChannel.invokeMethod<String>('startRead', {
'path': path,
'chunkSize': chunkSize,
'start': start,
'end': end,
});
streamId = await startReadFuture;
if (streamId == null || streamId!.isEmpty) {
throw PlatformException(
code: FilegateErrorCode.missingStreamId,
message: 'Native reader did not return a stream identifier.',
);
}
if (cancelled) {
await cancelStartedRead();
return;
}
subscription = EventChannel('$_readChannelPrefix/$streamId')
.receiveBroadcastStream()
.listen(
(event) {
if (event is Uint8List) {
controller.add(event);
return;
}
if (event is ByteData) {
controller.add(event.buffer.asUint8List());
return;
}
if (event is List && event.every((item) => item is int)) {
controller.add(Uint8List.fromList(event.cast<int>()));
return;
}
controller.addError(
PlatformException(
code: FilegateErrorCode.invalidChunk,
message:
'Unexpected native chunk type: ${event.runtimeType}.',
),
);
unawaited(cancelOnce());
},
onError: (Object error, StackTrace stackTrace) {
controller.addError(error, stackTrace);
unawaited(cancelOnce());
},
onDone: () async {
await _closeController(
controller,
alreadyClosed: () => closed,
onClose: () => closed = true,
);
},
);
} catch (error, stackTrace) {
if (!cancelled) {
controller.addError(error, stackTrace);
}
await _closeController(
controller,
alreadyClosed: () => closed,
onClose: () => closed = true,
);
}
},
onPause: () => subscription?.pause(),
onResume: () => subscription?.resume(),
onCancel: () => cancelOnce(),
);
cancelOnce = () {
return cancelFuture ??= () async {
cancelled = true;
final hasStartedStream = subscription != null || streamId != null;
await subscription?.cancel();
await cancelStartedRead();
final closeFuture = _closeController(
controller,
alreadyClosed: () => closed,
onClose: () => closed = true,
);
if (hasStartedStream) {
await closeFuture;
} else {
unawaited(closeFuture);
}
}();
};
return FileReadSession<Uint8List>(
stream: controller.stream,
onCancel: cancelOnce,
);
}