wrapStream<T> method

Stream<T> wrapStream<T>(
  1. String operation,
  2. Stream<T> factory()
)

Wraps a stream-returning ODBC call with open/close telemetry events.

Per-chunk events are intentionally omitted to avoid amplification on large result sets.

Implementation

Stream<T> wrapStream<T>(
  String operation,
  Stream<T> Function() factory,
) async* {
  final start = DateTime.now();
  var chunkCount = 0;
  await _telemetry.recordEvent(
    name: '$operation.open',
    severity: TelemetrySeverity.info,
    message: 'stream subscription opened',
    context: {'operation': operation},
  );
  try {
    await for (final chunk in factory()) {
      chunkCount++;
      yield chunk;
    }
    await _telemetry.recordEvent(
      name: '$operation.close',
      severity: TelemetrySeverity.info,
      message: 'stream completed normally',
      context: {
        'operation': operation,
        'chunkCount': chunkCount,
        'durationMs': DateTime.now().difference(start).inMilliseconds,
      },
    );
  } on Object catch (e) {
    await _telemetry.recordEvent(
      name: '$operation.error',
      severity: TelemetrySeverity.error,
      message: 'stream failed: $e',
      context: {
        'operation': operation,
        'chunkCount': chunkCount,
        'durationMs': DateTime.now().difference(start).inMilliseconds,
      },
    );
    rethrow;
  }
}