connect method

  1. @override
Future<MqttClientConnectionStatus> connect(
  1. String server,
  2. int port
)
override

Connect

Implementation

@override
Future<MqttClientConnectionStatus> connect(String server, int port) {
  final completer = Completer<MqttClientConnectionStatus>();
  MqttLogger.log('MqttWs2Connection::connect - entered');
  Uri uri;
  try {
    uri = Uri.parse(server);
  } on Exception catch (_, stack) {
    final message =
        'MqttWsConnection::connect - The URI supplied for the WS2 connection '
        'is not valid - $server';
    Error.throwWithStackTrace(NoConnectionException(message), stack);
  }
  if (uri.scheme != 'wss') {
    final message =
        'MqttWsConnection::connect - The URI supplied for the WS2 has an '
        'incorrect scheme - $server';
    throw NoConnectionException(message);
  }
  uri = uri.replace(port: port);

  final uriString = uri.toString();
  MqttLogger.log(
    'MqttWs2Connection::connect - WS URL is $uriString, protocols are $protocols',
  );

  try {
    SecureSocket.connect(
      uri.host,
      uri.port,
      context: context,
      onBadCertificate: onBadCertificate,
    ).then((socket) {
      MqttLogger.log('MqttWs2Connection::connect - securing socket');
      _performWSHandshake(socket, uri)
          .then((_) {
            client = WebSocket.fromUpgradedSocket(
              _DetachedSocket(
                socket,
                _subscription as StreamSubscription<Uint8List>?,
              ),
              serverSide: false,
            );
            readWrapper = ReadWrapper();
            messageStream = MqttByteBuffer(typed.Uint8Buffer());
            MqttLogger.log('MqttWs2Connection::connect - start listening');
            _startListening();
            completer.complete(
              MqttClientConnectionStatus()
                ..state = MqttConnectionState.connected,
            );
          })
          .catchError((e) {
            onError(e);
            completer.completeError(e);
          });
    });
  } on SocketException catch (e, stack) {
    final message =
        'MqttWs2Connection::connect - The connection to the message broker '
        '{$server}:{$port} could not be made. Error is ${e.toString()}';
    completer.completeError(e);
    Error.throwWithStackTrace(NoConnectionException(message), stack);
  } on HandshakeException catch (e, stack) {
    final message =
        'MqttWs2Connection::connect - Handshake exception to the message broker '
        '{$server}:{$port}. Error is ${e.toString()}';
    completer.completeError(e);
    Error.throwWithStackTrace(NoConnectionException(message), stack);
  } on TlsException catch (e, stack) {
    final message =
        'MqttWs2Connection::connect - TLS exception raised on secure connection. '
        'Error is ${e.toString()}';
    Error.throwWithStackTrace(NoConnectionException(message), stack);
  }
  return completer.future;
}