sliceStream function

Stream<Uint8List> sliceStream(
  1. Stream<List<int>> stream,
  2. int sliceLength, {
  3. int? maxLength,
  4. bool copy = false,
})

Read stream and slice the content into chunks with target/max length of sliceLength.

When maxLength is specified and reached, the returned Stream is closed with and error.

copy controls whether the bytes the stream provides needs to be copied (e.g. because the underlying list may get modified).

Implementation

Stream<Uint8List> sliceStream(
  Stream<List<int>> stream,
  int sliceLength, {
  int? maxLength,
  bool copy = false,
}) async* {
  var total = 0;
  final buffer = <Uint8List>[];
  await for (List<int> bytes in stream) {
    var next = castBytes(bytes, copy: copy);

    total += next.length;
    if (maxLength != null && maxLength < total) {
      throw StateError('Max length reached: $maxLength bytes.');
    }

    buffer.add(next);
    int getBL() => buffer.fold<int>(0, (s, list) => s + list.length);

    while (getBL() >= sliceLength) {
      final bufferLength = getBL();
      Uint8List? overflow;
      if (bufferLength > sliceLength) {
        final last = buffer.removeLast();
        final index = sliceLength - bufferLength + last.length;
        final missing = Uint8List(index);
        missing.setRange(0, index, last);
        buffer.add(missing);
        overflow = Uint8List(last.length - index);
        overflow.setRange(0, overflow.length, last, index);
      }

      final bb = BytesBuffer._fromChunks(List.from(buffer));
      buffer.clear();
      if (overflow != null) {
        buffer.add(overflow);
      }
      yield bb.toBytes();
    }
  }
  if (buffer.isNotEmpty) {
    final bb = BytesBuffer._fromChunks(buffer);
    yield bb.toBytes();
  }
}