getResponseAsync method

  1. @override
Stream<String> getResponseAsync()
override

Implementation

@override
Stream<String> getResponseAsync() {
  _assertNotClosed();
  final text = _queryBuffer.toString();
  _queryBuffer.clear();
  final images =
      _pendingImages.isNotEmpty ? List<Uint8List>.from(_pendingImages) : null;
  final audio = _pendingAudio;
  _pendingImages.clear();
  _pendingAudio = null;
  _isCancelled = false;

  final controller = StreamController<String>();
  final genSw = Stopwatch()..start();
  int? firstChunkMs;
  var chunkCount = 0;

  // Serialize generation across all sessions of this model. Acquired before
  // the pump starts (in onListen) and released exactly once in every
  // terminal path (done / error / consumer cancel) so an abandoned stream
  // can't hold the lock forever.
  var mutexHeld = false;
  var released = false;
  void releaseMutex() {
    if (released) return;
    released = true;
    if (mutexHeld) generationMutex.release();
  }

  // Build the request payload:
  //  * pure text → pass a plain string (legacy fast path).
  //  * any image/audio → build a Message object with a `content` array of
  //    MessageContentItem (one text + N image/audio items), shaped per the
  //    upstream TS declarations.
  final JSAny messageArg;
  if ((images == null || images.isEmpty) && audio == null) {
    messageArg = text.toJS;
  } else {
    final contentItems = <Map<String, Object>>[
      if (text.isNotEmpty) <String, Object>{'type': 'text', 'text': text},
      if (images != null)
        for (final img in images)
          <String, Object>{
            'type': 'image',
            // Data URL is the broadest-compatible shape; upstream `path` can
            // also be a Blob URL or a file URL once we wire those.
            'image_url': <String, Object>{
              // Reuse the magic-number detector from ImagePromptPart that
              // already lives in this library (PNG / JPEG / WebP).
              'url':
                  'data:${ImagePromptPart._detectImageFormat(img)};base64,${base64Encode(img)}',
            },
          },
      if (audio != null)
        <String, Object>{
          'type': 'audio',
          // Audio payload (PCM 16kHz mono) per the native FFI contract.
          'input_audio': <String, Object>{
            'data': base64Encode(audio),
            'format': 'wav',
          },
        },
    ];
    messageArg = <String, Object>{
      'role': 'user',
      'content': contentItems,
    }.jsify() as JSAny;
  }
  // Gemma 4 path mirrors FfiInferenceModelSession.getResponseAsync — every
  // raw chunk is stringified and appended to rawBuffer so chat.dart can
  // run SdkResponseParser.extractToolCalls on the assembled JSON. Other
  // model types skip accumulation and `_lastRawResponse` stays null.
  final accumulateRaw = modelType == ModelType.gemma4;
  final rawBuffer = accumulateRaw ? StringBuffer() : null;
  if (accumulateRaw) {
    _lastRawResponse = null;
  }

  void startPump() {
    final raw = conversation.sendMessageStreaming(messageArg);
    final asyncIterSym = globalContext
        .getProperty<JSObject>('Symbol'.toJS)
        .getProperty<JSAny>('asyncIterator'.toJS);
    final factory = raw.getProperty<JSFunction?>(asyncIterSym);
    final iter = factory != null
        ? raw.callMethod<JSObject>(asyncIterSym)
        : raw; // assume it's already an iterator

    void pump() {
      if (controller.isClosed || _isCancelled) {
        releaseMutex();
        if (!controller.isClosed) controller.close();
        return;
      }
      iter.next().toDart.then((JSObject step) {
        if (controller.isClosed || _isCancelled) {
          releaseMutex();
          if (!controller.isClosed) controller.close();
          return;
        }
        final done = (step.getProperty<JSBoolean>('done'.toJS)).toDart;
        if (done) {
          if (accumulateRaw) {
            _lastRawResponse = rawBuffer!.toString();
          }
          if (kDebugMode) {
            final total = genSw.elapsedMilliseconds;
            debugPrint(
                '[LiteRtLmWebSession/perf] generation total: ${total}ms '
                '(prefill ${firstChunkMs ?? 0}ms, $chunkCount chunks)');
          }
          releaseMutex();
          controller.close();
          return;
        }
        if (firstChunkMs == null) {
          firstChunkMs = genSw.elapsedMilliseconds;
          if (kDebugMode) {
            debugPrint(
                '[LiteRtLmWebSession/perf] time-to-first-chunk: ${firstChunkMs}ms');
          }
        }
        chunkCount++;
        final value = step.getProperty<JSObject?>('value'.toJS);
        if (value != null) {
          // Stringify the JS chunk into the same JSON shape liblitert_lm
          // streams via the native FFI callback. Both engines then dump it
          // into the shared SdkTextExtractor — single source of truth for
          // text-vs-thinking extraction, identical to ffi_inference_model.dart.
          final jsonStr = _stringifyChunk(value);
          if (accumulateRaw) {
            rawBuffer!.write(jsonStr);
          }
          final text = SdkTextExtractor.extractTextFromResponse(jsonStr);
          if (text.isNotEmpty) controller.add(text);
        }
        pump();
      }, onError: (Object error, StackTrace st) {
        releaseMutex();
        if (!controller.isClosed) {
          controller.addError(error, st);
          controller.close();
        }
      });
    }

    pump();
  }

  // Acquire the model-wide generation mutex before kicking off generation,
  // so concurrent sessions take turns (serialized inference). Released in
  // every terminal path (done / error / cancel) via releaseMutex().
  controller.onListen = () async {
    try {
      await generationMutex.acquire();
      mutexHeld = true;
      if (_isCancelled || controller.isClosed) {
        releaseMutex();
        if (!controller.isClosed) await controller.close();
        return;
      }
      startPump();
    } catch (e, st) {
      releaseMutex();
      if (!controller.isClosed) {
        controller.addError(e, st);
        await controller.close();
      }
    }
  };
  controller.onCancel = () {
    _isCancelled = true;
    releaseMutex();
  };
  return controller.stream;
}