request method

  1. @override
Stream<Response> request(
  1. Request request, [
  2. NextLink? forward
])

A function called when a request reaches this Link

Implementation

@override
Stream<Response> request(Request request, [forward]) async* {
  final String id = uuid.v4();
  final requestWithContext = request.withContextEntry<RequestId>(
    RequestId(id),
  );
  _requests.add(requestWithContext);

  if (_connectionStateController.value == ConnectionState.closed) {
    await _connect();
  }
  final StreamController<Response> response = StreamController();
  StreamSubscription<GraphQLSocketMessage>? messagesSubscription;

  response.onListen = () {
    final Stream<ConnectionState> waitForConnectedState =
        _connectionStateController
            .where((state) => state == ConnectionState.open)
            .take(1);

    waitForConnectedState.listen((_) {
      // listen for response messages
      messagesSubscription = _messagesController.stream
          .where((message) =>
              (message is SubscriptionData && message.id == id) ||
              (message is SubscriptionError && message.id == id) ||
              (message is SubscriptionComplete && message.id == id))
          .takeWhile((_) => !response.isClosed)
          .listen(
        (message) {
          if (message is SubscriptionData || message is SubscriptionError) {
            try {
              final parsed = _parseMessage(message);
              if (parsed.data == null && parsed.errors == null) {
                throw WebSocketLinkServerException(
                  originalException: null,
                  parsedResponse: parsed,
                  requestMessage: null,
                );
              }
              response.add(parsed);
            } catch (e) {
              response.addError(e);
            }
          } else if (message is SubscriptionComplete) {
            response.close();
          }
        },
        onError: response.addError,
        onDone: response.close,
      );
      // Send the request.
      _write(
        StartOperation(
          id,
          serializer.serializeRequest(requestWithContext),
        ),
      ).catchError(response.addError);
    });
  };

  response.onCancel = () {
    if (isDisabled) {
      return;
    }
    messagesSubscription?.cancel();
    _write(StopOperation(id)).catchError(response.addError);
    _requests.removeWhere((e) => e.context.entry<RequestId>()!.id == id);
  };

  yield* response.stream;
}