waitForVmEventAfterSignal function
Future<bool>
waitForVmEventAfterSignal({
- required List<
String> streamIds, - required VmEventMatcher matches,
- required void signal(),
- required Duration timeout,
Subscribes to VM service streams, sends signal, then waits for a matching
event or timeout.
Implementation
Future<bool> waitForVmEventAfterSignal({
required List<String> streamIds,
required VmEventMatcher matches,
required void Function() signal,
required Duration timeout,
}) async {
final uri = readVmUri();
if (uri == null || uri.isEmpty) {
throw StateError('VM service URI not found. Is the app running?');
}
final wsUri = uri.replaceFirst('http://', 'ws://').replaceFirst('https://', 'wss://');
final client = HttpClient()..maxConnectionsPerHost = 1;
final ws = await WebSocket.connect(
wsUri,
customClient: client,
);
final events = StreamController<Map<String, dynamic>>();
final listenCompleters = <String, Completer<void>>{};
final requestStreamIds = <String, String>{};
var nextId = 0;
final wsSubscription = ws.listen(
(data) {
final message = jsonDecode(data as String) as Map<String, dynamic>;
final id = message['id'] as String?;
final listenCompleter = id != null ? listenCompleters[id] : null;
if (listenCompleter != null) {
listenCompleters.remove(id);
final streamId = requestStreamIds.remove(id);
if (!listenCompleter.isCompleted) {
final error = message['error'] as Map<String, dynamic>?;
if (error != null) {
final messageText = error['message'] as String? ?? 'Unknown VM service streamListen error';
final targetStream = streamId ?? 'unknown';
listenCompleter.completeError(
StateError('Failed to subscribe to VM stream $targetStream: $messageText'),
);
} else {
listenCompleter.complete();
}
}
return;
}
if (message['method'] == 'streamNotify') {
events.add(message);
}
},
onError: events.addError,
onDone: () {
if (!events.isClosed) {
events.close();
}
},
);
try {
for (final streamId in streamIds) {
final requestId = 'listen_${++nextId}_$streamId';
final completer = Completer<void>();
listenCompleters[requestId] = completer;
requestStreamIds[requestId] = streamId;
ws.add(
jsonEncode({
'jsonrpc': '2.0',
'id': requestId,
'method': 'streamListen',
'params': {'streamId': streamId},
}),
);
}
await Future.wait(listenCompleters.values.map((completer) => completer.future)).timeout(
const Duration(seconds: 3),
);
signal();
return await waitForVmServiceEvent(
events: events.stream,
matches: matches,
timeout: timeout,
);
} finally {
await wsSubscription.cancel();
if (!events.isClosed) {
await events.close();
}
await ws.close();
client.close(force: true);
}
}