getResponseAsync method
Implementation
@override
Stream<String> getResponseAsync() {
_assertNotClosed();
final text = _queryBuffer.toString();
_queryBuffer.clear();
final images =
_pendingImages.isNotEmpty ? List<Uint8List>.from(_pendingImages) : null;
final audio = _pendingAudio;
_pendingImages.clear();
_pendingAudio = null;
_isCancelled = false;
final controller = StreamController<String>();
final genSw = Stopwatch()..start();
int? firstChunkMs;
var chunkCount = 0;
// Serialize generation across all sessions of this model. Acquired before
// the pump starts (in onListen) and released exactly once in every
// terminal path (done / error / consumer cancel) so an abandoned stream
// can't hold the lock forever.
var mutexHeld = false;
var released = false;
void releaseMutex() {
if (released) return;
released = true;
if (mutexHeld) generationMutex.release();
}
// Build the request payload:
// * pure text → pass a plain string (legacy fast path).
// * any image/audio → build a Message object with a `content` array of
// MessageContentItem (one text + N image/audio items), shaped per the
// upstream TS declarations.
final JSAny messageArg;
if ((images == null || images.isEmpty) && audio == null) {
messageArg = text.toJS;
} else {
final contentItems = <Map<String, Object>>[
if (text.isNotEmpty) <String, Object>{'type': 'text', 'text': text},
if (images != null)
for (final img in images)
<String, Object>{
'type': 'image',
// Data URL is the broadest-compatible shape; upstream `path` can
// also be a Blob URL or a file URL once we wire those.
'image_url': <String, Object>{
// Reuse the magic-number detector from ImagePromptPart that
// already lives in this library (PNG / JPEG / WebP).
'url':
'data:${ImagePromptPart._detectImageFormat(img)};base64,${base64Encode(img)}',
},
},
if (audio != null)
<String, Object>{
'type': 'audio',
// Audio payload (PCM 16kHz mono) per the native FFI contract.
'input_audio': <String, Object>{
'data': base64Encode(audio),
'format': 'wav',
},
},
];
messageArg = <String, Object>{
'role': 'user',
'content': contentItems,
}.jsify() as JSAny;
}
// Gemma 4 path mirrors FfiInferenceModelSession.getResponseAsync — every
// raw chunk is stringified and appended to rawBuffer so chat.dart can
// run SdkResponseParser.extractToolCalls on the assembled JSON. Other
// model types skip accumulation and `_lastRawResponse` stays null.
final accumulateRaw = modelType == ModelType.gemma4;
final rawBuffer = accumulateRaw ? StringBuffer() : null;
if (accumulateRaw) {
_lastRawResponse = null;
}
void startPump() {
final raw = conversation.sendMessageStreaming(messageArg);
final asyncIterSym = globalContext
.getProperty<JSObject>('Symbol'.toJS)
.getProperty<JSAny>('asyncIterator'.toJS);
final factory = raw.getProperty<JSFunction?>(asyncIterSym);
final iter = factory != null
? raw.callMethod<JSObject>(asyncIterSym)
: raw; // assume it's already an iterator
void pump() {
if (controller.isClosed || _isCancelled) {
releaseMutex();
if (!controller.isClosed) controller.close();
return;
}
iter.next().toDart.then((JSObject step) {
if (controller.isClosed || _isCancelled) {
releaseMutex();
if (!controller.isClosed) controller.close();
return;
}
final done = (step.getProperty<JSBoolean>('done'.toJS)).toDart;
if (done) {
if (accumulateRaw) {
_lastRawResponse = rawBuffer!.toString();
}
if (kDebugMode) {
final total = genSw.elapsedMilliseconds;
debugPrint(
'[LiteRtLmWebSession/perf] generation total: ${total}ms '
'(prefill ${firstChunkMs ?? 0}ms, $chunkCount chunks)');
}
releaseMutex();
controller.close();
return;
}
if (firstChunkMs == null) {
firstChunkMs = genSw.elapsedMilliseconds;
if (kDebugMode) {
debugPrint(
'[LiteRtLmWebSession/perf] time-to-first-chunk: ${firstChunkMs}ms');
}
}
chunkCount++;
final value = step.getProperty<JSObject?>('value'.toJS);
if (value != null) {
// Stringify the JS chunk into the same JSON shape liblitert_lm
// streams via the native FFI callback. Both engines then dump it
// into the shared SdkTextExtractor — single source of truth for
// text-vs-thinking extraction, identical to ffi_inference_model.dart.
final jsonStr = _stringifyChunk(value);
if (accumulateRaw) {
rawBuffer!.write(jsonStr);
}
final text = SdkTextExtractor.extractTextFromResponse(jsonStr);
if (text.isNotEmpty) controller.add(text);
}
pump();
}, onError: (Object error, StackTrace st) {
releaseMutex();
if (!controller.isClosed) {
controller.addError(error, st);
controller.close();
}
});
}
pump();
}
// Acquire the model-wide generation mutex before kicking off generation,
// so concurrent sessions take turns (serialized inference). Released in
// every terminal path (done / error / cancel) via releaseMutex().
controller.onListen = () async {
try {
await generationMutex.acquire();
mutexHeld = true;
if (_isCancelled || controller.isClosed) {
releaseMutex();
if (!controller.isClosed) await controller.close();
return;
}
startPump();
} catch (e, st) {
releaseMutex();
if (!controller.isClosed) {
controller.addError(e, st);
await controller.close();
}
}
};
controller.onCancel = () {
_isCancelled = true;
releaseMutex();
};
return controller.stream;
}