streamAsync method

Stream<ParsedRowBuffer> streamAsync(
  1. int connectionId,
  2. String sql, {
  3. int fetchSize = 1000,
  4. int chunkSize = 64 * 1024,
  5. Duration pollInterval = const Duration(milliseconds: 10),
  6. int? maxBufferBytes,
})

Runs sql using native async stream lifecycle: stream_start_async -> stream_poll_async -> stream_fetch -> stream_close.

This is a poll-based non-blocking stream path. It yields full protocol messages as ParsedRowBuffer values.

Implementation

Stream<ParsedRowBuffer> streamAsync(
  int connectionId,
  String sql, {
  int fetchSize = 1000,
  int chunkSize = 64 * 1024,
  Duration pollInterval = const Duration(milliseconds: 10),
  int? maxBufferBytes,
}) async* {
  final streamId = await _streamStartAsync(
    connectionId,
    sql,
    fetchSize: fetchSize,
    chunkSize: chunkSize,
  );
  if (streamId == 0) {
    final workerError = await _safeGetWorkerError();
    throw AsyncError(
      code: AsyncErrorCode.queryFailed,
      message: workerError ?? 'Failed to start async stream',
    );
  }

  var pending = BytesBuilder(copy: false);
  final limit = maxBufferBytes;
  try {
    while (true) {
      final status = await _streamPollAsync(streamId);
      if (status == _streamAsyncStatusPending) {
        await Future<void>.delayed(pollInterval);
        continue;
      }
      if (status == _streamAsyncStatusDone) {
        break;
      }
      if (status == _streamAsyncStatusError ||
          status == _streamAsyncStatusCancelled) {
        final workerError = await _safeGetWorkerError();
        throw AsyncError(
          code: AsyncErrorCode.queryFailed,
          message: workerError ?? 'Async stream failed with status $status',
        );
      }
      if (status != _streamAsyncStatusReady) {
        final workerError = await _safeGetWorkerError();
        throw AsyncError(
          code: AsyncErrorCode.queryFailed,
          message: workerError ?? 'Unexpected async stream status: $status',
        );
      }

      final fetched = await _streamFetch(streamId);
      if (!fetched.success) {
        final workerError = fetched.error ?? await _safeGetWorkerError();
        throw AsyncError(
          code: AsyncErrorCode.queryFailed,
          message: workerError ?? 'Async stream fetch failed',
        );
      }

      final data = fetched.data;
      if (data != null && data.isNotEmpty) {
        pending.add(data);
        if (limit != null && pending.length > limit) {
          throw const AsyncError(
            code: AsyncErrorCode.queryFailed,
            message: 'Streaming buffer exceeded maxBufferBytes',
          );
        }

        while (pending.length >= BinaryProtocolParser.headerSize) {
          final all = pending.toBytes();
          final msgLen = BinaryProtocolParser.messageLengthFromHeader(all);
          if (all.length < msgLen) {
            break;
          }

          final msg = all.sublist(0, msgLen);
          yield BinaryProtocolParser.parse(msg);

          final remainder = all.sublist(msgLen);
          pending = BytesBuilder(copy: false);
          if (remainder.isNotEmpty) {
            pending.add(remainder);
          }
        }
      }
    }

    if (pending.length > 0) {
      throw const FormatException(
        'Leftover bytes after stream; expected complete protocol messages',
      );
    }
  } finally {
    await _streamClose(streamId);
  }
}