request method

  1. @override
Stream<Response> request(
  1. Request request, [
  2. NextLink? forward
])

A function called when a request reaches this Link

Implementation

@override
Stream<Response> request(Request request, [NextLink? forward]) async* {
  final payload = _serializer.serializeRequest(request);
  String? phoenixSubscriptionId;
  StreamSubscription<Response>? websocketSubscription;

  StreamController<Response>? streamController;
  final push = channel.push('doc', payload);
  try {
    final pushResponse = await push.future;
    //set the subscription id in order to cancel the subscription later
    phoenixSubscriptionId =
        pushResponse.response['subscriptionId'] as String?;

    if (phoenixSubscriptionId != null) {
      //yield all messages for this subscription
      streamController = StreamController();

      websocketSubscription = channel.socket
          .streamForTopic(phoenixSubscriptionId)
          .map(
            (event) => _parser.parseResponse(
              event.payload!['result'] as Map<String, dynamic>,
            ),
          )
          .listen(streamController.add, onError: streamController.addError);
      yield* streamController.stream;
    } else if (pushResponse.isOk) {
      yield _parser
          .parseResponse(pushResponse.response as Map<String, dynamic>);
    } else if (pushResponse.isError) {
      // ignore: only_throw_errors
      throw _parser.parseError(pushResponse.response as Map<String, dynamic>);
    }
  } catch (e, stackTrace) {
    log(e.toString(), error: e, stackTrace: stackTrace);
  } finally {
    await websocketSubscription?.cancel();
    await streamController?.close();
    //this will be called once the caller stops listening to the stream
    // (yield* stops if there is no one listening)
    if (channel.state == PhoenixChannelState.joined &&
        phoenixSubscriptionId != null) {
      channel.push('unsubscribe', {'subscriptionId': phoenixSubscriptionId});
    }
  }
}