streamQueryColumnarBatched method

Stream<TypedColumnarResult> streamQueryColumnarBatched(
  1. int connectionId,
  2. String sql, {
  3. int fetchSize = 1000,
  4. int chunkSize = 64 * 1024,
  5. bool lazyStrings = false,
  6. ResultEncoding resultEncoding = ResultEncoding.columnar,
})
inherited

Like streamQueryBatched but decodes columnar v2 frames directly to TypedColumnarResult without a row-major intermediate.

Implementation

Stream<TypedColumnarResult> streamQueryColumnarBatched(
  int connectionId,
  String sql, {
  int fetchSize = 1000,
  int chunkSize = 64 * 1024,
  bool lazyStrings = false,
  ResultEncoding resultEncoding = ResultEncoding.columnar,
}) async* {
  final streamId = _native.streamStartBatched(
    connectionId,
    sql,
    fetchSize: fetchSize,
    chunkSize: chunkSize,
    resultEncodingWire: resultEncoding.wireCode,
  );

  if (streamId == 0) {
    throw Exception('Failed to start batched stream: ${_native.getError()}');
  }

  final pending = BinaryFrameAccumulator();
  try {
    while (true) {
      final result = _native.streamFetch(streamId);

      if (!result.success) {
        throw Exception('Stream fetch failed: ${_native.getError()}');
      }

      final data = result.data;
      if (data == null || data.isEmpty) {
        break;
      }
      pending.add(data);

      for (final msg in pending.drainFrames()) {
        yield BinaryProtocolParser.parseColumnarToTyped(
          msg,
          lazyStrings: lazyStrings,
        );
      }

      if (!result.hasMore) break;
    }

    if (pending.length > 0) {
      throw const FormatException(
        'Leftover bytes after stream; expected complete protocol messages',
      );
    }
  } finally {
    _native.streamClose(streamId);
  }
}