decompressFrameStream method

Stream<Uint8List> decompressFrameStream(
  1. Stream<Uint8List> stream
)

Decompression data from lz4 frame with stream api https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md

Implementation

Stream<Uint8List> decompressFrameStream(Stream<Uint8List> stream) async* {
  final contextPtr = malloc.allocate<Uint64>(1);
  _createDecompressionContext(contextPtr);
  final context = Pointer.fromAddress(contextPtr[0]);

  final dstSizePtr = malloc.allocate<Uint64>(1);
  final srcSizePtr = malloc.allocate<Uint64>(1);

  var estimateDstBufferSize = 0;

  Pointer<Uint8>? dstBuffer;
  int nextSrcSize = 0;
  NativeBytesBuilder? sourceBufferBuilder;
  List<NativeBytesBuilder> danglePointers = [];
  try {
    var remainder = BytesBuilder(copy: true);
    await for (final chunk in stream) {
      if (sourceBufferBuilder == null) {
        sourceBufferBuilder = NativeBytesBuilder(chunk.length);
        sourceBufferBuilder.add(chunk);
        estimateDstBufferSize = _validateFrameAndGetEstimatedDecodeBufferSize(
            chunk, sourceBufferBuilder.ptr);
        dstBuffer = malloc.allocate<Uint8>(estimateDstBufferSize);
      } else {
        var r = sourceBufferBuilder.add(chunk);
        if (r != null && r.isNotEmpty) {
          remainder.add(r);
        }
      }

      while (sourceBufferBuilder!.length >= nextSrcSize) {
        srcSizePtr.value = sourceBufferBuilder.length;
        dstSizePtr.value = estimateDstBufferSize;
        nextSrcSize = _decompressFrame(context, dstBuffer!, dstSizePtr,
            sourceBufferBuilder.ptr, srcSizePtr);
        final dstSize = dstSizePtr.value;
        if (dstSize > 0) {
          final decompressedChunkBuilder = BytesBuilder(copy: true);
          decompressedChunkBuilder.add(dstBuffer.asTypedList(dstSize));
          yield decompressedChunkBuilder.takeBytes();
        }
        if (nextSrcSize > 0) {
          final consumedSrcSize = srcSizePtr.value;
          if (consumedSrcSize < sourceBufferBuilder.length) {
            if (consumedSrcSize + nextSrcSize <
                sourceBufferBuilder.capacity) {
              danglePointers.add(sourceBufferBuilder);
              sourceBufferBuilder =
                  sourceBufferBuilder.shift(consumedSrcSize);
            } else {
              remainder.add(
                  sourceBufferBuilder.asTypedList().sublist(consumedSrcSize));
              sourceBufferBuilder.free();
              sourceBufferBuilder = NativeBytesBuilder(nextSrcSize);
            }
          } else {
            sourceBufferBuilder.free();
            sourceBufferBuilder = NativeBytesBuilder(nextSrcSize);
          }
          if (remainder.length > 0) {
            final r = sourceBufferBuilder.add(remainder.takeBytes());
            remainder.clear();
            if (r != null && r.isNotEmpty) {
              remainder.add(r);
            }
          }
        } else {
          return;
        }
      }
    }
  } finally {
    _freeDecompressionContext(context);
    malloc.free(contextPtr);
    malloc.free(dstSizePtr);
    malloc.free(srcSizePtr);
    if (dstBuffer != null) {
      malloc.free(dstBuffer);
    }
    if (sourceBufferBuilder != null) {
      sourceBufferBuilder.free();
    }
    for (final p in danglePointers) {
      p.free();
    }
  }
}