consume method
Consumes a stream with the producerId and peerId of the RemotePeer, appData is application level custom data which can be added to the consumer for the LocalPeer, this data will be available in the consumer object and can be used only by the LocalPeer.
Implementation
Future<void> consume(Map<String, dynamic> data) async {
if (!checkPermission(permissionTypeCheck: PermissionType.canConsume)) {
return;
}
if (_recvTransport == null) {
logger.i('🔔 Recv Transport Not Initialized, Creating RecvTransport');
await _createTransportOnServer(transportType: TransportType.recv);
}
try {
final peerId = data['peerId'];
final label = data['label'];
Map<String, Value>? parseAppDataMap =
convertToProtobufMap(data['appData']);
AppData parseAppData = AppData(appData: parseAppDataMap);
final remotePeer = _remotePeers[peerId];
if (remotePeer == null) {
throw Exception('Remote Peer Not Found with PeerId $peerId');
}
final labelData = remotePeer.getLabelData(label);
if (labelData == null) {
throw Exception('Remote Peer is not producing with Label $label');
}
logger.i('🔔 Consuming Stream with label $label');
final consumerFuture = Completer<Consumer>();
void handleStreamPlayable(Map<String, dynamic> streamData) {
if (streamData['label'] == label) {
remotePeer.remove('stream-playable', handleStreamPlayable);
consumerFuture.complete(streamData['consumer']);
}
}
remotePeer.on('stream-playable', handleStreamPlayable);
socket.publish(Request_Request.consume, {
'producerId': labelData['producerId'],
'producerPeerId': peerId,
'appData': parseAppData,
});
_pendingConsumerTasks[labelData['producerId']!] = consumerFuture;
// final consumer = await consumerFuture((error) {
// logger.e('❌ Error Consuming Stream | error: $error');
// }).whenComplete(() {
// _pendingConsumerTasks.remove(labelData['producerId']);
// });
// return consumer;
} catch (error) {
logger.e('❌ Error Consuming Stream | error: $error');
rethrow;
}
}