streamAsync method
Runs sql using native async stream lifecycle:
stream_start_async -> stream_poll_async -> stream_fetch -> stream_close.
This is a poll-based non-blocking stream path. It yields full protocol messages as ParsedRowBuffer values.
Implementation
Stream<ParsedRowBuffer> streamAsync(
int connectionId,
String sql, {
int fetchSize = 1000,
int chunkSize = 64 * 1024,
Duration pollInterval = const Duration(milliseconds: 10),
int? maxBufferBytes,
}) async* {
final streamId = await _streamStartAsync(
connectionId,
sql,
fetchSize: fetchSize,
chunkSize: chunkSize,
);
if (streamId == 0) {
final workerError = await _safeGetWorkerError();
throw AsyncError(
code: AsyncErrorCode.queryFailed,
message: workerError ?? 'Failed to start async stream',
);
}
var pending = BytesBuilder(copy: false);
final limit = maxBufferBytes;
try {
while (true) {
final status = await _streamPollAsync(streamId);
if (status == _streamAsyncStatusPending) {
await Future<void>.delayed(pollInterval);
continue;
}
if (status == _streamAsyncStatusDone) {
break;
}
if (status == _streamAsyncStatusError ||
status == _streamAsyncStatusCancelled) {
final workerError = await _safeGetWorkerError();
throw AsyncError(
code: AsyncErrorCode.queryFailed,
message: workerError ?? 'Async stream failed with status $status',
);
}
if (status != _streamAsyncStatusReady) {
final workerError = await _safeGetWorkerError();
throw AsyncError(
code: AsyncErrorCode.queryFailed,
message: workerError ?? 'Unexpected async stream status: $status',
);
}
final fetched = await _streamFetch(streamId);
if (!fetched.success) {
final workerError = fetched.error ?? await _safeGetWorkerError();
throw AsyncError(
code: AsyncErrorCode.queryFailed,
message: workerError ?? 'Async stream fetch failed',
);
}
final data = fetched.data;
if (data != null && data.isNotEmpty) {
pending.add(data);
if (limit != null && pending.length > limit) {
throw const AsyncError(
code: AsyncErrorCode.queryFailed,
message: 'Streaming buffer exceeded maxBufferBytes',
);
}
while (pending.length >= BinaryProtocolParser.headerSize) {
final all = pending.toBytes();
final msgLen = BinaryProtocolParser.messageLengthFromHeader(all);
if (all.length < msgLen) {
break;
}
final msg = all.sublist(0, msgLen);
yield BinaryProtocolParser.parse(msg);
final remainder = all.sublist(msgLen);
pending = BytesBuilder(copy: false);
if (remainder.isNotEmpty) {
pending.add(remainder);
}
}
}
}
if (pending.length > 0) {
throw const FormatException(
'Leftover bytes after stream; expected complete protocol messages',
);
}
} finally {
await _streamClose(streamId);
}
}