processImageStream static method
Stream image processing with real-time tokens
Matches Swift: RunAnywhere.processImageStream(_:prompt:maxTokens:temperature:topP:)
final result = await RunAnywhere.processImageStream(
VLMImage.filePath('/path/to/image.jpg'),
prompt: 'Describe this image',
);
// Listen to tokens as they arrive
result.stream.listen((token) {
print(token); // "A ", "white ", "dog ", ...
});
// Wait for final metrics
final metrics = await result.metrics;
print('Total: ${metrics.completionTokens} tokens');
// Or cancel early
result.cancel();
Implementation
static Future<VLMStreamingResult> processImageStream(
VLMImage image, {
required String prompt,
int maxTokens = 2048,
double temperature = 0.7,
double topP = 0.9,
}) async {
if (!_isInitialized) throw SDKError.notInitialized();
if (!DartBridge.vlm.isLoaded) throw SDKError.vlmNotInitialized();
final logger = SDKLogger('RunAnywhere.VLM.ProcessImageStream');
final modelId = DartBridge.vlm.currentModelId ?? 'unknown';
final startTime = DateTime.now();
DateTime? firstTokenTime;
// Create a broadcast stream controller for the tokens
final controller = StreamController<String>.broadcast();
final allTokens = <String>[];
try {
// Start streaming via the bridge
final tokenStream = _processImageStreamViaBridge(
image,
prompt,
maxTokens,
temperature,
topP,
useGpu: true,
);
// Forward tokens and collect them
final subscription = tokenStream.listen(
(token) {
// Track first token time
firstTokenTime ??= DateTime.now();
allTokens.add(token);
if (!controller.isClosed) {
controller.add(token);
}
},
onError: (Object error) {
logger.error('VLM streaming error: $error');
// Track streaming error
TelemetryService.shared.trackError(
errorCode: 'vlm_streaming_failed',
errorMessage: error.toString(),
context: {'model_id': modelId},
);
if (!controller.isClosed) {
controller.addError(error);
}
},
onDone: () {
if (!controller.isClosed) {
unawaited(controller.close());
}
},
);
// Build result future that completes when stream is done
final metricsFuture = controller.stream.toList().then((_) {
final endTime = DateTime.now();
final totalTimeMs = endTime.difference(startTime).inMicroseconds / 1000.0;
final tokensPerSecond =
totalTimeMs > 0 ? allTokens.length / (totalTimeMs / 1000) : 0.0;
// Calculate time to first token
int? timeToFirstTokenMs;
if (firstTokenTime != null) {
timeToFirstTokenMs = firstTokenTime!.difference(startTime).inMilliseconds;
}
logger.info(
'VLM streaming complete: ${allTokens.length} tokens, '
'${tokensPerSecond.toStringAsFixed(1)} tok/s',
);
// Track VLM streaming success
TelemetryService.shared.trackGeneration(
modelId: modelId,
modelName: DartBridge.vlm.currentModelId,
promptTokens: 0, // Image tokens not exposed yet
completionTokens: allTokens.length,
latencyMs: totalTimeMs.round(),
temperature: temperature,
maxTokens: maxTokens,
tokensPerSecond: tokensPerSecond,
timeToFirstTokenMs: timeToFirstTokenMs,
isStreaming: true,
);
return VLMResult(
text: allTokens.join(),
promptTokens: 0, // Not provided by streaming API
completionTokens: allTokens.length,
totalTimeMs: totalTimeMs,
tokensPerSecond: tokensPerSecond,
);
});
return VLMStreamingResult(
stream: controller.stream,
metrics: metricsFuture,
cancel: () {
logger.debug('Cancelling VLM streaming');
DartBridge.vlm.cancel();
subscription.cancel();
if (!controller.isClosed) {
controller.close();
}
},
);
} catch (e) {
logger.error('Failed to start VLM streaming: $e');
// Track streaming start failure
TelemetryService.shared.trackError(
errorCode: 'vlm_streaming_start_failed',
errorMessage: e.toString(),
context: {'model_id': modelId},
);
rethrow;
}
}