subscribe method

Stream<Response> subscribe(
  1. Request payload,
  2. bool waitForConnection
)

Sends a query, mutation or subscription request to the server, and returns a stream of the response.

If the request is a query or mutation, a timeout will be applied to the request as specified by SocketClientConfig's queryAndMutationTimeout field.

If the request is a subscription, obviously no timeout is applied.

In case of socket disconnection, the returned stream will be closed.

Implementation

Stream<Response> subscribe(
  final Request payload,
  final bool waitForConnection,
) {
  final String id = _uuid.v4(
    options: {
      'random': randomBytesForUuid,
    },
  ).toString();
  final StreamController<Response> response = StreamController<Response>();
  StreamSubscription<SocketConnectionState>? sub;
  final bool addTimeout =
      !payload.isSubscription && config.queryAndMutationTimeout != null;

  final onListen = () {
    final Stream<SocketConnectionState> waitForConnectedStateWithoutTimeout =
        (waitForConnection
                ? _connectionStateController
                : _connectionStateController
                    .startWith(SocketConnectionState.connected))
            .where((SocketConnectionState state) =>
                state == SocketConnectionState.connected)
            .take(1);

    final Stream<SocketConnectionState> waitForConnectedState = addTimeout
        ? waitForConnectedStateWithoutTimeout.timeout(
            config.queryAndMutationTimeout!,
            onTimeout: (EventSink<SocketConnectionState> event) {
              print('Connection timed out.');
              response.addError(TimeoutException('Connection timed out.'));
              event.close();
              response.close();
            },
          )
        : waitForConnectedStateWithoutTimeout;

    sub = waitForConnectedState.listen((_) {
      final Stream<GraphQLSocketMessage> dataErrorComplete = _messages.where(
        (GraphQLSocketMessage message) {
          if (message is SubscriptionData) {
            return message.id == id;
          }

          if (message is SubscriptionNext) {
            return message.id == id;
          }

          if (message is SubscriptionError) {
            return message.id == id;
          }

          if (message is SubscriptionComplete) {
            return message.id == id;
          }

          return false;
        },
      ).takeWhile((_) => (!response.isClosed && !_wasDisposed));

      final Stream<GraphQLSocketMessage> subscriptionComplete = addTimeout
          ? dataErrorComplete
              .where((message) => message is SubscriptionComplete)
              .take(1)
              .timeout(
              config.queryAndMutationTimeout!,
              onTimeout: (EventSink<GraphQLSocketMessage> event) {
                response.addError(TimeoutException('Request timed out.'));
                event.close();
                response.close();
              },
            )
          : dataErrorComplete
              .where((message) => message is SubscriptionComplete)
              .take(1);

      subscriptionComplete.listen(
        (_) => response.close(),
        onDone: () {
          if (!config.autoReconnect) {
            response.close();
          }
        },
        onError: (_) {
          if (!config.autoReconnect) {
            response.close();
          }
        },
      );

      dataErrorComplete
          .where((message) => message is SubscriptionData)
          .cast<SubscriptionData>()
          .listen((message) => response.add(
                parse(message.toJson()),
              ));

      dataErrorComplete
          .where((message) => message is SubscriptionNext)
          .whereType<SubscriptionNext>()
          .listen((message) => response.add(
                parse(message.toJson()),
              ));

      dataErrorComplete
          .where((message) => message is SubscriptionError)
          .cast<SubscriptionError>()
          .listen((message) => response.addError(message));

      if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
        GraphQLSocketMessage operation = StartOperation(
          id,
          serialize(payload),
        );
        if (protocol == GraphQLProtocol.graphqlTransportWs) {
          operation = SubscribeOperation(
            id,
            serialize(payload),
          );
        }
        _write(operation);
        _subscriptionInitializers[id]!.hasBeenTriggered = true;
      }
    });
  };

  response.onListen = onListen;

  response.onCancel = () {
    _subscriptionInitializers.remove(id);

    sub?.cancel();
    if (protocol == GraphQLProtocol.graphqlWs &&
        _connectionStateController.value == SocketConnectionState.connected &&
        socketChannel != null) {
      _write(StopOperation(id));
    }
  };

  _subscriptionInitializers[id] = SubscriptionListener(onListen, false);

  return response.stream;
}