streamQueryColumnarBatched method
Stream<TypedColumnarResult>
streamQueryColumnarBatched(
- int connectionId,
- String sql, {
- int fetchSize = 1000,
- int chunkSize = 64 * 1024,
- bool lazyStrings = false,
- ResultEncoding resultEncoding = ResultEncoding.columnar,
inherited
Like streamQueryBatched but decodes columnar v2 frames directly to
TypedColumnarResult without a row-major intermediate.
Implementation
Stream<TypedColumnarResult> streamQueryColumnarBatched(
int connectionId,
String sql, {
int fetchSize = 1000,
int chunkSize = 64 * 1024,
bool lazyStrings = false,
ResultEncoding resultEncoding = ResultEncoding.columnar,
}) async* {
final streamId = _native.streamStartBatched(
connectionId,
sql,
fetchSize: fetchSize,
chunkSize: chunkSize,
resultEncodingWire: resultEncoding.wireCode,
);
if (streamId == 0) {
throw Exception('Failed to start batched stream: ${_native.getError()}');
}
final pending = BinaryFrameAccumulator();
var completed = false;
try {
while (true) {
final result = _native.streamFetch(streamId);
if (!result.success) {
throw Exception('Stream fetch failed: ${_native.getError()}');
}
final data = result.data;
if (data == null || data.isEmpty) {
break;
}
pending.add(data);
for (final msg in pending.drainFrames()) {
yield BinaryProtocolParser.parseColumnarToTyped(
msg,
lazyStrings: lazyStrings,
);
}
if (!result.hasMore) break;
}
if (pending.length > 0) {
throw const FormatException(
'Leftover bytes after stream; expected complete protocol messages',
);
}
completed = true;
} finally {
if (!completed) {
_native.streamCancel(streamId);
}
_native.streamClose(streamId);
}
}