getResponseAsync method

  1. @override
Stream<String> getResponseAsync({
  1. Message? message,
})
override

Implementation

@override
Stream<String> getResponseAsync({Message? message}) {
  _assertNotClosed();

  // StreamController (not async*/yield*) so the mutex release is tied to the
  // controller lifecycle — it fires on done, error, AND consumer cancel /
  // abandon. With async*+yield* an abandoned stream would never run the
  // finally and would hold the generation mutex forever, deadlocking every
  // other session.
  final controller = StreamController<String>();
  StreamSubscription? subscription;
  var mutexHeld = false;
  var finished = false;

  Future<void> cleanup() async {
    if (finished) return;
    finished = true;
    await subscription?.cancel();
    if (mutexHeld) {
      mutexHeld = false;
      generationMutex.release();
    }
  }

  controller.onListen = () async {
    try {
      if (message != null) await addQueryChunk(message);
      await generationMutex.acquire();
      mutexHeld = true;
      subscription = eventChannel.receiveBroadcastStream().listen(
        (event) {
          if (event is! Map) return;
          // Only consume events tagged for THIS session.
          if (event['sessionId'] != sessionId) return;
          if (controller.isClosed) return;
          // Native emits generation errors as a TAGGED DATA event
          // {code: ERROR, sessionId, message} (not an EventChannel error,
          // which would be broadcast to every session and lose the id).
          if (event['code'] == 'ERROR') {
            controller.addError(Exception(
                event['message'] ?? 'Unknown async error occurred'));
            cleanup();
            controller.close();
            return;
          }
          final partial = event['partialResult'] as String? ?? '';
          if (partial.isNotEmpty) controller.add(partial);
          // Tagged completion (native sends done:true with our sessionId
          // rather than closing the whole channel via endOfStream).
          if (event['done'] == true) {
            cleanup();
            controller.close();
          }
        },
        onError: (error, st) {
          if (!controller.isClosed) controller.addError(error, st);
          cleanup();
          if (!controller.isClosed) controller.close();
        },
      );
      unawaited(_platformService
          .generateResponseAsyncForSession(sessionId)
          .catchError((Object e, StackTrace st) {
        // A synchronous native failure (before any event) must surface and
        // release the mutex, not hang the controller.
        if (!controller.isClosed) controller.addError(e, st);
        cleanup();
        if (!controller.isClosed) controller.close();
      }));
    } catch (e, st) {
      if (!controller.isClosed) controller.addError(e, st);
      await cleanup();
      if (!controller.isClosed) await controller.close();
    }
  };

  // Consumer cancelled / abandoned the stream — release the mutex.
  controller.onCancel = () async {
    await cleanup();
  };

  return controller.stream;
}