read method

Stream<Uint8List> read({
  1. int? length,
  2. int offset = 0,
  3. void onProgress(
    1. int bytesRead
    )?,
})

Reads at most count bytes from the file starting at offset. If length is null, reads until end of file. Returns a Stream of chunks. onProgress is called with the total number of bytes already read. Use readBytes if you want a single Uint8List.

Implementation

Stream<Uint8List> read({
  int? length,
  int offset = 0,
  void Function(int bytesRead)? onProgress,
}) async* {
  const chunkSize = 16 * 1024;
  const maxBytesOnTheWire = chunkSize * 64;

  // Get the file size if not specified.
  if (length == null) {
    final fileStat = await stat();
    final fileSize = fileStat.size;

    if (fileSize == null) {
      throw SftpError('Can not get file size');
    }

    length = fileSize - offset;
  }

  if (length == 0) return;

  if (length < 0) {
    throw SftpError('Length must be positive: $length');
  }

  final streamController = StreamController<Uint8List>();

  var bytessRecieved = 0;
  var bytessRequested = 0;

  Future<void> readChunk(int chunkStart) async {
    final chunkEnd = min(chunkStart + chunkSize, offset + length!);
    final chunkLength = chunkEnd - chunkStart;

    bytessRequested += chunkLength;

    late final Uint8List? chunk;

    try {
      chunk = await _readChunk(chunkLength, chunkStart);
    } catch (e, st) {
      if (!streamController.isClosed) {
        streamController.addError(e, st);
        streamController.close();
      }
      return;
    }

    if (chunk == null) {
      streamController.close();
      return;
    }

    streamController.add(chunk);
    bytessRecieved += chunkLength;

    if (onProgress != null) onProgress(bytessRecieved);

    if (bytessRecieved >= length) {
      streamController.close();
      return;
    }
  }

  void scheduleRead() {
    if (streamController.isPaused || streamController.isClosed) {
      return;
    }

    while (bytessRequested < length!) {
      final bytesOnTheWire = bytessRequested - bytessRecieved;
      if (bytesOnTheWire >= maxBytesOnTheWire) return;
      readChunk(bytessRequested + offset).then((_) => scheduleRead());
    }
  }

  streamController.onListen = scheduleRead;
  streamController.onResume = scheduleRead;

  yield* streamController.stream;
}