decodeFramesIntoPooled method

Future<void> decodeFramesIntoPooled(
  1. BorrowedBytesCodec codec,
  2. void onFrame(
    1. PooledBytes frame
    ), {
  3. BufferPool? pool,
  4. int capacity = 64 * 1024,
  5. bool allowTrailingBytesAtEof = false,
  6. bool cancelOnError = false,
})

Decode byte frames into pooled owned buffers.

This API copies each frame into a buffer rented from pool and delivers a PooledBytes lease.

Safety model:

This reduces per-frame allocations when frames are processed and released quickly, while keeping retention safe by default.

Implementation

Future<void> decodeFramesIntoPooled(
  BorrowedBytesCodec codec,
  void Function(PooledBytes frame) onFrame, {
  BufferPool? pool,
  int capacity = 64 * 1024,
  bool allowTrailingBytesAtEof = false,
  bool cancelOnError = false,
}) {
  final completer = Completer<void>();
  final framer = FramerBorrowedBytes(
    codec,
    capacity: capacity,
  );
  final BufferPool p = pool ?? SimpleBufferPool();

  StreamSubscription<List<int>>? sub;

  void finishError(Object error, [StackTrace? st]) {
    if (completer.isCompleted) return;
    completer.completeError(error, st);
    sub?.cancel();
  }

  sub = listen(
    (chunk) {
      if (completer.isCompleted) return;
      try {
        framer.add(chunk, (FrameBorrowedView view) {
          final n = view.length;
          final buf = p.rent(n);
          var off = 0;
          view.forEachSegment((b, o, len) {
            if (len == 0) return;
            buf.setRange(off, off + len, b, o);
            off += len;
          });
          onFrame(pooledBytesFrom(p, buf, n));
        });
      } catch (e, st) {
        finishError(e, st);
      }
    },
    onError: (Object e, StackTrace st) {
      finishError(e, st);
    },
    onDone: () {
      if (completer.isCompleted) return;
      try {
        framer.close(
          (FrameBorrowedView view) {
            final n = view.length;
            final buf = p.rent(n);
            var off = 0;
            view.forEachSegment((b, o, len) {
              if (len == 0) return;
              buf.setRange(off, off + len, b, o);
              off += len;
            });
            onFrame(pooledBytesFrom(p, buf, n));
          },
          allowTrailingBytesAtEof: allowTrailingBytesAtEof,
        );
        completer.complete();
      } catch (e, st) {
        finishError(e, st);
      }
    },
    cancelOnError: cancelOnError,
  );

  return completer.future;
}