GraphQLWebSocketServer constructor

GraphQLWebSocketServer(
  1. RemoteClient client, {
  2. Duration? keepAliveInterval,
  3. Duration? connectionInitWaitTimeout,
})

Creates the server logic for receiving and responding messages through the client. The client.protocol should be in the supportedProtocols and that will determine the logic executed.

Implementation

GraphQLWebSocketServer(
  this.client, {
  this.keepAliveInterval,
  this.connectionInitWaitTimeout,
}) {
  done.onError((Object e, StackTrace s) async {
    if (isTransportWsProtocol) {
      await client.closeWithReason(
        e is FormatException
            ? ErrorReason.invalidMessage
            : ErrorReason.internalError,
        e.toString(),
      );
    }
    return _onDone();
  });
  _connectionInitTimer = connectionInitWaitTimeout == null
      ? null
      : Timer(connectionInitWaitTimeout!, _connectionInitTimeout);

  _clientMessageSubscription = client.stream.listen(
    (msg) async {
      if (msg.type == OperationMessage.gqlConnectionInit) {
        _connectionInitTimer?.cancel();
        if (_receivedInit && isTransportWsProtocol) {
          return client.closeWithReason(
            ErrorReason.tooManyInitialisations,
            'Too many initialisation requests',
          );
        }
        _receivedInit = true;

        try {
          Map<String, Object?>? connectionParams;
          if (msg.payload is Map) {
            connectionParams = (msg.payload as Map?)?.cast();
          } else if (msg.payload != null) {
            throw FormatException(
                '${msg.type} payload must be a map (object).');
          }

          final connect = await onConnect(client, connectionParams);
          if (!connect) throw false;
          _init = true;
          client.sink.add(
            const OperationMessage(OperationMessage.gqlConnectionAck),
          );

          if (keepAliveInterval != null) {
            final _keepAliveMsg = OperationMessage(
              isTransportWsProtocol
                  ? OperationMessage.pong
                  : OperationMessage.gqlConnectionKeepAlive,
            );
            client.sink.add(_keepAliveMsg);
            _timer ??= Timer.periodic(keepAliveInterval!, (timer) {
              client.sink.add(_keepAliveMsg);
            });
          }
        } catch (e) {
          final _unauthorized = e == false;
          final String message =
              _unauthorized ? 'The connection was rejected.' : e.toString();

          if (isTransportWsProtocol) {
            return client.closeWithReason(
              _unauthorized
                  ? ErrorReason.unauthorized
                  : e is FormatException
                      ? ErrorReason.invalidMessage
                      : ErrorReason.internalError,
              message,
            );
          } else {
            client.sink.add(
              OperationMessage(
                OperationMessage.gqlConnectionError,
                payload: {'message': message},
              ),
            );
          }
        }
      } else if (msg.type == OperationMessage.ping) {
        client.sink.add(const OperationMessage(OperationMessage.pong));
      } else if (_init) {
        if (msg.type == OperationMessage.subscribe ||
            msg.type == OperationMessage.gqlStart) {
          if (msg.id == null) {
            throw FormatException('${msg.type} id is required.');
          }
          if (isTransportWsProtocol &&
              _currentOperationIds.containsKey(msg.id)) {
            return client.closeWithReason(
              ErrorReason.duplicateSubscriptionId,
              'Subscriber for ${msg.id} already exists',
            );
          }
          _currentOperationIds[msg.id!] = null;
          if (msg.payload == null) {
            throw FormatException('${msg.type} payload is required.');
          } else if (msg.payload is! Map) {
            throw FormatException(
                '${msg.type} payload must be a map (object).');
          }
          final payload = msg.payload as Map;
          final Object? query = payload['query'];
          final Object? variables = payload['variables'];
          final Object? operationName = payload['operationName'];
          final Object? extensions = payload['extensions'];
          if (query is! String) {
            throw FormatException(
                '${msg.type} payload must contain a string named "query".');
          }
          if (variables is! Map?) {
            throw FormatException('${msg.type} payload\'s "variables" field'
                ' must be a map (object).');
          }
          if (operationName is! String?) {
            throw FormatException(
                '${msg.type} payload\'s "operationName" field'
                ' must be a string.');
          }
          if (extensions is! Map?) {
            throw FormatException('${msg.type} payload\'s "extensions" field'
                ' must be a map (object).');
          }
          final result = await onOperation(
            msg.id,
            query,
            variables?.cast<String, dynamic>(),
            operationName,
            extensions?.cast<String, dynamic>(),
          );
          if (!result.didExecute) {
            client.sink.add(OperationMessage(
              OperationMessage.gqlError,
              id: msg.id,
              payload: result.errors,
            ));
            if (isTransportWsProtocol) {
              _currentOperationIds.remove(msg.id);
              return;
            }
          } else {
            await _sendData(msg.id!, result);
          }
          // Don't send complete if the client completed the subscription
          if (!isTransportWsProtocol ||
              _currentOperationIds.containsKey(msg.id)) {
            client.sink.add(
                OperationMessage(OperationMessage.gqlComplete, id: msg.id));
          }
          _currentOperationIds.remove(msg.id);
        } else if (msg.type == OperationMessage.gqlComplete ||
            msg.type == OperationMessage.gqlStop) {
          if (msg.id == null) {
            throw FormatException('${msg.type} id is required.');
          }
          final subs = _currentOperationIds.remove(msg.id);
          if (subs != null) {
            await subs.cancel();
          }
        } else if (msg.type == OperationMessage.gqlConnectionTerminate) {
          await _clientMessageSubscription.cancel();
          await _onDone();
        }
      } else if (msg.type == OperationMessage.subscribe) {
        return client.closeWithReason(
          ErrorReason.unauthorized,
          'Unauthorized',
        );
      }
    },
    onError: _done.completeError,
    onDone: _onDone,
    cancelOnError: true,
  );
}