streamAsync method
Stream<ParsedRowBuffer>
streamAsync(
- int connectionId,
- String sql, {
- int fetchSize = 1000,
- int chunkSize = 64 * 1024,
- Duration pollInterval = const Duration(milliseconds: 10),
- ResultEncoding resultEncoding = ResultEncoding.rowMajor,
- bool lazyStrings = false,
inherited
Poll-based async batched streaming via odbc_stream_start_async.
When resultEncoding is not ResultEncoding.rowMajor and the native
library exports odbc_stream_start_async_options, batches use columnar
v2 wire layout.
Implementation
Stream<ParsedRowBuffer> streamAsync(
int connectionId,
String sql, {
int fetchSize = 1000,
int chunkSize = 64 * 1024,
Duration pollInterval = const Duration(milliseconds: 10),
ResultEncoding resultEncoding = ResultEncoding.rowMajor,
bool lazyStrings = false,
}) async* {
final streamId = _native.streamStartAsync(
connectionId,
sql,
fetchSize: fetchSize,
chunkSize: chunkSize,
resultEncodingWire: resultEncoding.wireCode,
);
if (streamId == null || streamId == 0) {
throw Exception('Failed to start async stream: ${_native.getError()}');
}
final pending = BinaryFrameAccumulator();
var streamDelay = pollInterval ~/ 10;
if (streamDelay == Duration.zero) {
streamDelay = const Duration(microseconds: 500);
}
final streamMaxDelay = pollInterval;
var completed = false;
try {
while (true) {
final status = _native.streamPollAsync(streamId);
if (status == null) {
throw Exception(
'Async stream poll unavailable: ${_native.getError()}',
);
}
if (status == _nativeStreamAsyncStatusPending) {
await Future<void>.delayed(streamDelay);
if (streamDelay < streamMaxDelay) {
streamDelay = Duration(
microseconds: (streamDelay.inMicroseconds * 2)
.clamp(0, streamMaxDelay.inMicroseconds),
);
}
continue;
}
streamDelay = pollInterval ~/ 10;
if (streamDelay == Duration.zero) {
streamDelay = const Duration(microseconds: 500);
}
if (status == _nativeStreamAsyncStatusDone) {
break;
}
if (status == _nativeStreamAsyncStatusError ||
status == _nativeStreamAsyncStatusCancelled) {
throw Exception('Async stream failed with status $status');
}
if (status != _nativeStreamAsyncStatusReady) {
throw Exception('Unexpected async stream status: $status');
}
final result = _native.streamFetch(streamId);
if (!result.success) {
throw Exception('Async stream fetch failed: ${_native.getError()}');
}
final data = result.data;
if (data != null && data.isNotEmpty) {
pending.add(data);
for (final msg in pending.drainFrames()) {
yield decodeBatchedStreamFrame(
msg,
lazyStrings: lazyStrings,
);
}
}
}
if (pending.length > 0) {
throw const FormatException(
'Leftover bytes after stream; expected complete protocol messages',
);
}
completed = true;
} finally {
if (!completed) {
_native.streamCancel(streamId);
}
_native.streamClose(streamId);
}
}