openRead method

  1. @override
FileReadSession<Uint8List> openRead(
  1. String path, {
  2. int chunkSize = 64 * 1024,
  3. int start = 0,
  4. int? end,
})
override

Implementation

@override
FileReadSession<Uint8List> openRead(
  String path, {
  int chunkSize = 64 * 1024,
  int start = 0,
  int? end,
}) {
  if (path.isEmpty) {
    throw ArgumentError.value(path, 'path', 'path must not be empty');
  }
  if (chunkSize <= 0) {
    throw ArgumentError.value(
      chunkSize,
      'chunkSize',
      'chunkSize must be greater than zero',
    );
  }
  if (start < 0) {
    throw ArgumentError.value(start, 'start', 'start must not be negative');
  }
  if (end != null && end < start) {
    throw ArgumentError.value(
      end,
      'end',
      'end must be greater than or equal to start',
    );
  }

  if (!_forceNativeRead &&
      (Platform.isMacOS || Platform.isWindows || Platform.isLinux)) {
    return _openDesktopRead(
      path,
      chunkSize: chunkSize,
      start: start,
      end: end,
    );
  }

  StreamSubscription<dynamic>? subscription;
  String? streamId;
  Future<String?>? startReadFuture;
  bool cancelled = false;
  bool closed = false;
  Future<void>? cancelFuture;
  Future<void>? nativeCancelFuture;

  late final StreamController<Uint8List> controller;
  late final Future<void> Function() cancelOnce;
  Future<void> cancelStartedRead() async {
    final activeStreamId = streamId;
    if (activeStreamId != null && activeStreamId.isNotEmpty) {
      nativeCancelFuture ??= _cancelRead(activeStreamId);
      await nativeCancelFuture;
      return;
    }

    final pendingStartRead = startReadFuture;
    if (pendingStartRead == null) {
      return;
    }

    try {
      final pendingStreamId = await pendingStartRead;
      if (pendingStreamId != null && pendingStreamId.isNotEmpty) {
        nativeCancelFuture ??= _cancelRead(pendingStreamId);
        await nativeCancelFuture;
      }
    } on Object {
      // If startRead itself failed, there is no native stream to cancel.
    }
  }

  controller = StreamController<Uint8List>(
    onListen: () async {
      try {
        startReadFuture = methodChannel.invokeMethod<String>('startRead', {
          'path': path,
          'chunkSize': chunkSize,
          'start': start,
          'end': end,
        });
        streamId = await startReadFuture;

        if (streamId == null || streamId!.isEmpty) {
          throw PlatformException(
            code: FilegateErrorCode.missingStreamId,
            message: 'Native reader did not return a stream identifier.',
          );
        }

        if (cancelled) {
          await cancelStartedRead();
          return;
        }

        subscription = EventChannel('$_readChannelPrefix/$streamId')
            .receiveBroadcastStream()
            .listen(
              (event) {
                if (event is Uint8List) {
                  controller.add(event);
                  return;
                }
                if (event is ByteData) {
                  controller.add(event.buffer.asUint8List());
                  return;
                }
                if (event is List && event.every((item) => item is int)) {
                  controller.add(Uint8List.fromList(event.cast<int>()));
                  return;
                }

                controller.addError(
                  PlatformException(
                    code: FilegateErrorCode.invalidChunk,
                    message:
                        'Unexpected native chunk type: ${event.runtimeType}.',
                  ),
                );
                unawaited(cancelOnce());
              },
              onError: (Object error, StackTrace stackTrace) {
                controller.addError(error, stackTrace);
                unawaited(cancelOnce());
              },
              onDone: () async {
                await _closeController(
                  controller,
                  alreadyClosed: () => closed,
                  onClose: () => closed = true,
                );
              },
            );
      } catch (error, stackTrace) {
        if (!cancelled) {
          controller.addError(error, stackTrace);
        }
        await _closeController(
          controller,
          alreadyClosed: () => closed,
          onClose: () => closed = true,
        );
      }
    },
    onPause: () => subscription?.pause(),
    onResume: () => subscription?.resume(),
    onCancel: () => cancelOnce(),
  );

  cancelOnce = () {
    return cancelFuture ??= () async {
      cancelled = true;
      final hasStartedStream = subscription != null || streamId != null;
      await subscription?.cancel();
      await cancelStartedRead();
      final closeFuture = _closeController(
        controller,
        alreadyClosed: () => closed,
        onClose: () => closed = true,
      );
      if (hasStartedStream) {
        await closeFuture;
      } else {
        unawaited(closeFuture);
      }
    }();
  };

  return FileReadSession<Uint8List>(
    stream: controller.stream,
    onCancel: cancelOnce,
  );
}