streamAsync method

Stream<ParsedRowBuffer> streamAsync(
  1. int connectionId,
  2. String sql, {
  3. int fetchSize = 1000,
  4. int chunkSize = 64 * 1024,
  5. Duration pollInterval = const Duration(milliseconds: 10),
  6. ResultEncoding resultEncoding = ResultEncoding.rowMajor,
  7. bool lazyStrings = false,
})
inherited

Poll-based async batched streaming via odbc_stream_start_async.

When resultEncoding is not ResultEncoding.rowMajor and the native library exports odbc_stream_start_async_options, batches use columnar v2 wire layout.

Implementation

Stream<ParsedRowBuffer> streamAsync(
  int connectionId,
  String sql, {
  int fetchSize = 1000,
  int chunkSize = 64 * 1024,
  Duration pollInterval = const Duration(milliseconds: 10),
  ResultEncoding resultEncoding = ResultEncoding.rowMajor,
  bool lazyStrings = false,
}) async* {
  final streamId = _native.streamStartAsync(
    connectionId,
    sql,
    fetchSize: fetchSize,
    chunkSize: chunkSize,
    resultEncodingWire: resultEncoding.wireCode,
  );
  if (streamId == null || streamId == 0) {
    throw Exception('Failed to start async stream: ${_native.getError()}');
  }

  final pending = BinaryFrameAccumulator();
  var streamDelay = pollInterval ~/ 10;
  if (streamDelay == Duration.zero) {
    streamDelay = const Duration(microseconds: 500);
  }
  final streamMaxDelay = pollInterval;
  try {
    while (true) {
      final status = _native.streamPollAsync(streamId);
      if (status == null) {
        throw Exception(
          'Async stream poll unavailable: ${_native.getError()}',
        );
      }
      if (status == _nativeStreamAsyncStatusPending) {
        await Future<void>.delayed(streamDelay);
        if (streamDelay < streamMaxDelay) {
          streamDelay = Duration(
            microseconds: (streamDelay.inMicroseconds * 2)
                .clamp(0, streamMaxDelay.inMicroseconds),
          );
        }
        continue;
      }
      streamDelay = pollInterval ~/ 10;
      if (streamDelay == Duration.zero) {
        streamDelay = const Duration(microseconds: 500);
      }
      if (status == _nativeStreamAsyncStatusDone) {
        break;
      }
      if (status == _nativeStreamAsyncStatusError ||
          status == _nativeStreamAsyncStatusCancelled) {
        throw Exception('Async stream failed with status $status');
      }
      if (status != _nativeStreamAsyncStatusReady) {
        throw Exception('Unexpected async stream status: $status');
      }

      final result = _native.streamFetch(streamId);
      if (!result.success) {
        throw Exception('Async stream fetch failed: ${_native.getError()}');
      }

      final data = result.data;
      if (data != null && data.isNotEmpty) {
        pending.add(data);
        for (final msg in pending.drainFrames()) {
          yield decodeBatchedStreamFrame(
            msg,
            lazyStrings: lazyStrings,
          );
        }
      }
    }

    if (pending.length > 0) {
      throw const FormatException(
        'Leftover bytes after stream; expected complete protocol messages',
      );
    }
  } finally {
    _native.streamClose(streamId);
  }
}