streamQueryBuffer method
Stream<ParsedRowBuffer>
streamQueryBuffer(
- int connectionId,
- String sql, {
- int chunkSize = 1000,
- int? maxBufferBytes,
- bool lazyStrings = false,
inherited
Legacy buffer-mode streaming via odbc_stream_start. Materialises the
full result in the worker before yielding a single parsed chunk.
Implementation
Stream<ParsedRowBuffer> streamQueryBuffer(
int connectionId,
String sql, {
int chunkSize = 1000,
int? maxBufferBytes,
bool lazyStrings = false,
}) async* {
final streamId = await _streamStart(
connectionId,
sql,
chunkSize: chunkSize,
);
if (streamId == 0) {
final workerError = await _safeGetWorkerError();
throw AsyncError(
code: AsyncErrorCode.queryFailed,
message: workerError ?? 'Failed to start stream',
);
}
final buffer = BytesBuilder(copy: false);
final limit = maxBufferBytes;
var completed = false;
try {
while (true) {
final fetched = await _streamFetch(streamId);
if (!fetched.success) {
final workerError = fetched.error ?? await _safeGetWorkerError();
throw AsyncError(
code: AsyncErrorCode.queryFailed,
message: workerError ?? 'Stream fetch failed',
);
}
final data = fetched.data;
if (data != null && data.isNotEmpty) {
buffer.add(data);
if (limit != null && buffer.length > limit) {
throw const AsyncError(
code: AsyncErrorCode.queryFailed,
message: 'Streaming buffer exceeded maxBufferBytes',
);
}
}
if (!fetched.hasMore) {
break;
}
}
if (buffer.length > 0) {
yield decodeBatchedStreamFrame(
buffer.toBytes(),
lazyStrings: lazyStrings,
);
}
completed = true;
} finally {
if (!completed) {
await streamCancel(streamId);
}
await _streamClose(streamId);
}
}