getResponseAsync method
Implementation
@override
Stream<String> getResponseAsync({Message? message}) {
_assertNotClosed();
// StreamController (not async*/yield*) so the mutex release is tied to the
// controller lifecycle — it fires on done, error, AND consumer cancel /
// abandon. With async*+yield* an abandoned stream would never run the
// finally and would hold the generation mutex forever, deadlocking every
// other session.
final controller = StreamController<String>();
StreamSubscription? subscription;
var mutexHeld = false;
var finished = false;
Future<void> cleanup() async {
if (finished) return;
finished = true;
await subscription?.cancel();
if (mutexHeld) {
mutexHeld = false;
generationMutex.release();
}
}
controller.onListen = () async {
try {
if (message != null) await addQueryChunk(message);
await generationMutex.acquire();
mutexHeld = true;
subscription = eventChannel.receiveBroadcastStream().listen(
(event) {
if (event is! Map) return;
// Only consume events tagged for THIS session.
if (event['sessionId'] != sessionId) return;
if (controller.isClosed) return;
// Native emits generation errors as a TAGGED DATA event
// {code: ERROR, sessionId, message} (not an EventChannel error,
// which would be broadcast to every session and lose the id).
if (event['code'] == 'ERROR') {
controller.addError(Exception(
event['message'] ?? 'Unknown async error occurred'));
cleanup();
controller.close();
return;
}
final partial = event['partialResult'] as String? ?? '';
if (partial.isNotEmpty) controller.add(partial);
// Tagged completion (native sends done:true with our sessionId
// rather than closing the whole channel via endOfStream).
if (event['done'] == true) {
cleanup();
controller.close();
}
},
onError: (error, st) {
if (!controller.isClosed) controller.addError(error, st);
cleanup();
if (!controller.isClosed) controller.close();
},
);
unawaited(_platformService
.generateResponseAsyncForSession(sessionId)
.catchError((Object e, StackTrace st) {
// A synchronous native failure (before any event) must surface and
// release the mutex, not hang the controller.
if (!controller.isClosed) controller.addError(e, st);
cleanup();
if (!controller.isClosed) controller.close();
}));
} catch (e, st) {
if (!controller.isClosed) controller.addError(e, st);
await cleanup();
if (!controller.isClosed) await controller.close();
}
};
// Consumer cancelled / abandoned the stream — release the mutex.
controller.onCancel = () async {
await cleanup();
};
return controller.stream;
}