sliceStream function
Read stream
and slice the content into chunks with target/max length of
sliceLength
.
When maxLength
is specified and reached, the returned Stream is closed
with and error.
copy
controls whether the bytes the stream
provides needs to be copied
(e.g. because the underlying list may get modified).
Implementation
Stream<Uint8List> sliceStream(
Stream<List<int>> stream,
int sliceLength, {
int? maxLength,
bool copy = false,
}) async* {
var total = 0;
final buffer = <Uint8List>[];
await for (List<int> bytes in stream) {
var next = castBytes(bytes, copy: copy);
total += next.length;
if (maxLength != null && maxLength < total) {
throw StateError('Max length reached: $maxLength bytes.');
}
buffer.add(next);
int getBL() => buffer.fold<int>(0, (s, list) => s + list.length);
while (getBL() >= sliceLength) {
final bufferLength = getBL();
Uint8List? overflow;
if (bufferLength > sliceLength) {
final last = buffer.removeLast();
final index = sliceLength - bufferLength + last.length;
final missing = Uint8List(index);
missing.setRange(0, index, last);
buffer.add(missing);
overflow = Uint8List(last.length - index);
overflow.setRange(0, overflow.length, last, index);
}
final bb = BytesBuffer._fromChunks(List.from(buffer));
buffer.clear();
if (overflow != null) {
buffer.add(overflow);
}
yield bb.toBytes();
}
}
if (buffer.isNotEmpty) {
final bb = BytesBuffer._fromChunks(buffer);
yield bb.toBytes();
}
}