processImageStream static method

Future<VLMStreamingResult> processImageStream(
  1. VLMImage image, {
  2. required String prompt,
  3. int maxTokens = 2048,
  4. double temperature = 0.7,
  5. double topP = 0.9,
})

Stream image processing with real-time tokens

Matches Swift: RunAnywhere.processImageStream(_:prompt:maxTokens:temperature:topP:)

final result = await RunAnywhere.processImageStream(
  VLMImage.filePath('/path/to/image.jpg'),
  prompt: 'Describe this image',
);

// Listen to tokens as they arrive
result.stream.listen((token) {
  print(token); // "A ", "white ", "dog ", ...
});

// Wait for final metrics
final metrics = await result.metrics;
print('Total: ${metrics.completionTokens} tokens');

// Or cancel early
result.cancel();

Implementation

static Future<VLMStreamingResult> processImageStream(
  VLMImage image, {
  required String prompt,
  int maxTokens = 2048,
  double temperature = 0.7,
  double topP = 0.9,
}) async {
  if (!_isInitialized) throw SDKError.notInitialized();
  if (!DartBridge.vlm.isLoaded) throw SDKError.vlmNotInitialized();

  final logger = SDKLogger('RunAnywhere.VLM.ProcessImageStream');
  final modelId = DartBridge.vlm.currentModelId ?? 'unknown';
  final startTime = DateTime.now();
  DateTime? firstTokenTime;

  // Create a broadcast stream controller for the tokens
  final controller = StreamController<String>.broadcast();
  final allTokens = <String>[];

  try {
    // Start streaming via the bridge
    final tokenStream = _processImageStreamViaBridge(
      image,
      prompt,
      maxTokens,
      temperature,
      topP,
      useGpu: true,
    );

    // Forward tokens and collect them
    final subscription = tokenStream.listen(
      (token) {
        // Track first token time
        firstTokenTime ??= DateTime.now();
        allTokens.add(token);
        if (!controller.isClosed) {
          controller.add(token);
        }
      },
      onError: (Object error) {
        logger.error('VLM streaming error: $error');

        // Track streaming error
        TelemetryService.shared.trackError(
          errorCode: 'vlm_streaming_failed',
          errorMessage: error.toString(),
          context: {'model_id': modelId},
        );

        if (!controller.isClosed) {
          controller.addError(error);
        }
      },
      onDone: () {
        if (!controller.isClosed) {
          unawaited(controller.close());
        }
      },
    );

    // Build result future that completes when stream is done
    final metricsFuture = controller.stream.toList().then((_) {
      final endTime = DateTime.now();
      final totalTimeMs = endTime.difference(startTime).inMicroseconds / 1000.0;
      final tokensPerSecond =
          totalTimeMs > 0 ? allTokens.length / (totalTimeMs / 1000) : 0.0;

      // Calculate time to first token
      int? timeToFirstTokenMs;
      if (firstTokenTime != null) {
        timeToFirstTokenMs = firstTokenTime!.difference(startTime).inMilliseconds;
      }

      logger.info(
        'VLM streaming complete: ${allTokens.length} tokens, '
        '${tokensPerSecond.toStringAsFixed(1)} tok/s',
      );

      // Track VLM streaming success
      TelemetryService.shared.trackGeneration(
        modelId: modelId,
        modelName: DartBridge.vlm.currentModelId,
        promptTokens: 0, // Image tokens not exposed yet
        completionTokens: allTokens.length,
        latencyMs: totalTimeMs.round(),
        temperature: temperature,
        maxTokens: maxTokens,
        tokensPerSecond: tokensPerSecond,
        timeToFirstTokenMs: timeToFirstTokenMs,
        isStreaming: true,
      );

      return VLMResult(
        text: allTokens.join(),
        promptTokens: 0, // Not provided by streaming API
        completionTokens: allTokens.length,
        totalTimeMs: totalTimeMs,
        tokensPerSecond: tokensPerSecond,
      );
    });

    return VLMStreamingResult(
      stream: controller.stream,
      metrics: metricsFuture,
      cancel: () {
        logger.debug('Cancelling VLM streaming');
        DartBridge.vlm.cancel();
        subscription.cancel();
        if (!controller.isClosed) {
          controller.close();
        }
      },
    );
  } catch (e) {
    logger.error('Failed to start VLM streaming: $e');

    // Track streaming start failure
    TelemetryService.shared.trackError(
      errorCode: 'vlm_streaming_start_failed',
      errorMessage: e.toString(),
      context: {'model_id': modelId},
    );

    rethrow;
  }
}