streamQuery method

Stream<ParsedRowBuffer> streamQuery(
  1. int connectionId,
  2. String sql, {
  3. int chunkSize = 1000,
  4. int? maxBufferBytes,
})

Runs sql in the worker using native streaming.

This path uses odbc_stream_start + odbc_stream_fetch. Data is accumulated and parsed at the end, matching sync streamQuery behavior. maxBufferBytes caps total accumulated bytes.

Implementation

Stream<ParsedRowBuffer> streamQuery(
  int connectionId,
  String sql, {
  int chunkSize = 1000,
  int? maxBufferBytes,
}) async* {
  final streamId = await _streamStart(
    connectionId,
    sql,
    chunkSize: chunkSize,
  );
  if (streamId == 0) {
    final workerError = await _safeGetWorkerError();
    throw AsyncError(
      code: AsyncErrorCode.queryFailed,
      message: workerError ?? 'Failed to start stream',
    );
  }

  final buffer = BytesBuilder(copy: false);
  final limit = maxBufferBytes;
  try {
    while (true) {
      final fetched = await _streamFetch(streamId);
      if (!fetched.success) {
        final workerError = fetched.error ?? await _safeGetWorkerError();
        throw AsyncError(
          code: AsyncErrorCode.queryFailed,
          message: workerError ?? 'Stream fetch failed',
        );
      }

      final data = fetched.data;
      if (data != null && data.isNotEmpty) {
        buffer.add(data);
        if (limit != null && buffer.length > limit) {
          throw const AsyncError(
            code: AsyncErrorCode.queryFailed,
            message: 'Streaming buffer exceeded maxBufferBytes',
          );
        }
      }

      if (!fetched.hasMore) {
        break;
      }
    }

    if (buffer.length > 0) {
      yield BinaryProtocolParser.parse(buffer.toBytes());
    }
  } finally {
    await _streamClose(streamId);
  }
}