transcribeStream method

Future<CactusStreamedTranscriptionResult> transcribeStream({
  1. String? audioFilePath,
  2. Stream<Uint8List>? audioStream,
  3. String prompt = whisperPrompt,
  4. CactusTranscriptionParams? params,
})

Implementation

Future<CactusStreamedTranscriptionResult> transcribeStream({
  String? audioFilePath,
  Stream<Uint8List>? audioStream,
  String prompt = whisperPrompt,
  CactusTranscriptionParams? params,
}) async {
  if (audioFilePath == null && audioStream == null) {
    throw ArgumentError('Must provide either audioFilePath or audioStream');
  }

  if (audioFilePath != null && audioStream != null) {
    throw ArgumentError('Cannot provide both audioFilePath and audioStream');
  }

  // File transcription mode
  if (audioFilePath != null) {
    final transcriptionParams = params ?? defaultTranscriptionParams;
    final model = _lastInitializedModel ?? defaultInitParams.model;
    final currentHandle = await _getValidatedHandle(model: model);

    if (currentHandle != null) {
      try {
        final streamedResult = CactusContext.transcribeStream(
          currentHandle,
          prompt,
          audioFilePath: audioFilePath,
          params: transcriptionParams,
        );
        streamedResult.result.then((result) {
          _logTranscriptionTelemetry(result, model, success: result.success, message: result.errorMessage);
        }).catchError((error) {
          _logTranscriptionTelemetry(null, model, success: false, message: error.toString());
        });

        return streamedResult;
      } catch (e) {
        debugPrint('Streaming transcription failed: $e');
        _logTranscriptionTelemetry(null, model, success: false, message: e.toString());
        rethrow;
      }
    }

    throw Exception('Model $_lastInitializedModel is not downloaded. Please download it before transcribing.');
  }

  final tokenController = StreamController<String>();
  final resultCompleter = Completer<CactusTranscriptionResult>();
  final List<int> buffer = [];

  final subscription = audioStream!.listen(
    (pcmChunk) {
      buffer.addAll(pcmChunk);
    },
    onError: (error) {
      debugPrint('Audio stream error: $error');
      if (!tokenController.isClosed) {
        tokenController.addError(error);
      }
      if (!resultCompleter.isCompleted) {
        resultCompleter.completeError(error);
      }
    },
    onDone: () async {
      if (buffer.isEmpty) {
        debugPrint('No audio data received');
        if (!tokenController.isClosed) {
          tokenController.close();
        }
        if (!resultCompleter.isCompleted) {
          resultCompleter.complete(CactusTranscriptionResult(
            success: false,
            text: '',
            errorMessage: 'No audio data received',
          ));
        }
        return;
      }

      final pcmData = PCMUtils.validatePCMBuffer(buffer)
          ? buffer
          : PCMUtils.trimToValidSamples(buffer);

      if (pcmData.isEmpty) {
        debugPrint('No valid audio data after trimming');
        if (!tokenController.isClosed) {
          tokenController.close();
        }
        if (!resultCompleter.isCompleted) {
          resultCompleter.complete(CactusTranscriptionResult(
            success: false,
            text: '',
            errorMessage: 'No valid audio data received',
          ));
        }
        return;
      }

      try {
        final streamedResult = await _transcribePCMStreamInternal(pcmData, prompt, params);

        streamedResult.stream.listen(
          (token) {
            if (!tokenController.isClosed) {
              tokenController.add(token);
            }
          },
          onError: (error) {
            if (!tokenController.isClosed) {
              tokenController.addError(error);
            }
          },
          onDone: () {
            if (!tokenController.isClosed) {
              tokenController.close();
            }
          },
        );

        final result = await streamedResult.result;
        if (!resultCompleter.isCompleted) {
          resultCompleter.complete(result);
        }
      } catch (e) {
        debugPrint('Failed to transcribe audio: $e');
        if (!tokenController.isClosed) {
          tokenController.addError(e);
          tokenController.close();
        }
        if (!resultCompleter.isCompleted) {
          resultCompleter.completeError(e);
        }
      }
    },
  );

  // Clean up subscription when done
  resultCompleter.future.whenComplete(() => subscription.cancel());

  return CactusStreamedTranscriptionResult(
    stream: tokenController.stream,
    result: resultCompleter.future,
  );
}