streamQueryBatched method

Stream<ParsedRowBuffer> streamQueryBatched(
  1. int connectionId,
  2. String sql, {
  3. int fetchSize = 1000,
  4. int chunkSize = 64 * 1024,
})

Executes a SQL query and returns results as a batched stream.

Uses cursor-based batching; each batch is a complete protocol message. fetchSize rows per batch, chunkSize buffer size in bytes.

Implementation

Stream<ParsedRowBuffer> streamQueryBatched(
  int connectionId,
  String sql, {
  int fetchSize = 1000,
  int chunkSize = 64 * 1024,
}) async* {
  final streamId = _native.streamStartBatched(
    connectionId,
    sql,
    fetchSize: fetchSize,
    chunkSize: chunkSize,
  );

  if (streamId == 0) {
    throw Exception('Failed to start batched stream: ${_native.getError()}');
  }

  var pending = BytesBuilder(copy: false);
  try {
    while (true) {
      final result = _native.streamFetch(streamId);

      if (!result.success) {
        throw Exception('Stream fetch failed: ${_native.getError()}');
      }

      final data = result.data;
      if (data == null || data.isEmpty) {
        break;
      }
      pending.add(data);

      while (pending.length >= BinaryProtocolParser.headerSize) {
        final all = pending.toBytes();
        final msgLen = BinaryProtocolParser.messageLengthFromHeader(all);
        if (all.length < msgLen) break;

        final msg = all.sublist(0, msgLen);
        yield BinaryProtocolParser.parse(msg);

        final remainder = all.sublist(msgLen);
        pending = BytesBuilder(copy: false);
        if (remainder.isNotEmpty) pending.add(remainder);
      }

      if (!result.hasMore) break;
    }

    if (pending.length > 0) {
      throw const FormatException(
        'Leftover bytes after stream; expected complete protocol messages',
      );
    }
  } finally {
    _native.streamClose(streamId);
  }
}