streamQueryBatched method
Executes a SQL query and returns results as a batched stream.
Uses cursor-based batching; each batch is a complete protocol message.
fetchSize rows per batch, chunkSize buffer size in bytes.
Implementation
Stream<ParsedRowBuffer> streamQueryBatched(
int connectionId,
String sql, {
int fetchSize = 1000,
int chunkSize = 64 * 1024,
}) async* {
final streamId = _native.streamStartBatched(
connectionId,
sql,
fetchSize: fetchSize,
chunkSize: chunkSize,
);
if (streamId == 0) {
throw Exception('Failed to start batched stream: ${_native.getError()}');
}
var pending = BytesBuilder(copy: false);
try {
while (true) {
final result = _native.streamFetch(streamId);
if (!result.success) {
throw Exception('Stream fetch failed: ${_native.getError()}');
}
final data = result.data;
if (data == null || data.isEmpty) {
break;
}
pending.add(data);
while (pending.length >= BinaryProtocolParser.headerSize) {
final all = pending.toBytes();
final msgLen = BinaryProtocolParser.messageLengthFromHeader(all);
if (all.length < msgLen) break;
final msg = all.sublist(0, msgLen);
yield BinaryProtocolParser.parse(msg);
final remainder = all.sublist(msgLen);
pending = BytesBuilder(copy: false);
if (remainder.isNotEmpty) pending.add(remainder);
}
if (!result.hasMore) break;
}
if (pending.length > 0) {
throw const FormatException(
'Leftover bytes after stream; expected complete protocol messages',
);
}
} finally {
_native.streamClose(streamId);
}
}