request method
A function called when a request reaches this Link
Implementation
@override
Stream<Response> request(Request request, [NextLink? forward]) async* {
final payload = _serializer.serializeRequest(request);
String? phoenixSubscriptionId;
StreamSubscription<Response>? websocketSubscription;
StreamController<Response>? streamController;
final push = channel.push('doc', payload);
try {
final pushResponse = await push.future;
//set the subscription id in order to cancel the subscription later
phoenixSubscriptionId =
pushResponse.response['subscriptionId'] as String?;
if (phoenixSubscriptionId != null) {
//yield all messages for this subscription
streamController = StreamController();
websocketSubscription = channel.socket
.streamForTopic(phoenixSubscriptionId)
.map(
(event) => _parser.parseResponse(
event.payload!['result'] as Map<String, dynamic>,
),
)
.listen(streamController.add, onError: streamController.addError);
yield* streamController.stream;
} else if (pushResponse.isOk) {
yield _parser
.parseResponse(pushResponse.response as Map<String, dynamic>);
} else if (pushResponse.isError) {
// ignore: only_throw_errors
throw _parser.parseError(pushResponse.response as Map<String, dynamic>);
}
} catch (e, stackTrace) {
log(e.toString(), error: e, stackTrace: stackTrace);
} finally {
await websocketSubscription?.cancel();
await streamController?.close();
//this will be called once the caller stops listening to the stream
// (yield* stops if there is no one listening)
if (channel.state == PhoenixChannelState.joined &&
phoenixSubscriptionId != null) {
channel.push('unsubscribe', {'subscriptionId': phoenixSubscriptionId});
}
}
}