connect method

Future<void> connect(
  1. List<String> servers
)

Implementation

Future<void> connect(List<String> servers) async {
  String currentServer = servers[0];
  if (servers.isNotEmpty) {
    servers.removeRange(0, 1);
    servers.add(currentServer);
  }
  _servers = servers;
  if (_subscription != null) {
    _subscription!.cancel();
  }
  Uri serverURI = Uri.parse(currentServer);

  HttpClient? httpClient;
  if (_opts['tls'] != null) {  //tls = true or tls is TlsTrustedClient
    BaseTLS tls;
    if (_opts['tls'] is! BaseTLS) {
      tls = BaseTLS();
    } else {
      tls = _opts['tls'];
    }
    if (kIsWeb) {
      HttpOverrides.global = tls;
    } else {
      httpClient = tls.createHttpClient(SecurityContext());
    }
  }

  if (kIsWeb) {
    _socket = WebSocketChannel.connect(serverURI);
  } else {
    //_socket = IOWebSocketChannel.connect(currentServer, pingInterval: _pingInterval);
    WebSocket socket = await WebSocket.connect(serverURI.toString(), customClient: httpClient).then((webSocket) {
      webSocket.pingInterval = _pingInterval;
      return webSocket;
    }).catchError(
          (Object error) => throw WebSocketChannelException.from(error),
    );
    _socket = IOWebSocketChannel(socket);
  }
  _outbound.clear();
  _subscription = _socket!.stream.listen((data) async {
    _closed = false;
    if (_openSubscription != null) {
      final o = _openSubscription!;
      o.buffer.addAll(data);
      if (o.buffer.length >= o.totalBytes + 2) {
        // end message [_CR, _LF]
        o.subscription.callback!(SubscriptionResult(o.buffer.sublist(0, o.totalBytes), o.subscription, o.subject));
        _openSubscription = null;
      }
      return;
    }
    if (_peeked) {
      await parse(data);
      return;
    }
    final len = protoLen(data);
    if (len > 0) {
      Uint8List out = data.sublist(0, len);
      final pm = utf8.decode(out);
      debug(pm, '>>>');
      if (pm.isNotEmpty) {
        final m = info.allMatches(pm);
        if (m.length != 1 || m.elementAt(0).groupCount != 1) {
          throw NatsError.errorForCode(ErrorCode.BAD_PAYLOAD);
        }
        String jsonString = m.elementAt(0).group(1).toString();
        try {
          Map info = jsonDecode(jsonString);
          checkOptions(info, _opts);
          _peeked = true;

          final conn = _auth.getConnect(info['nonce'], serverURI, _opts);
          final cs = json.encode(conn);
          send(utf8.encode('CONNECT $cs$_CR_LF'));
          send(cmdMap[_Command.PING]);
          _statusCallback(Status.CONNECT, null);
        } catch (err) {
          debug(err, 'Socket connection error: ');
          _pongCompleter.completeError(err);
        }
      }
    }
    //
  }, onError: (Object error, StackTrace stackTrace) {
    debug(error, '<<< socketOnError: ');
    _statusCallback(Status.STALE_CONNECTION, error);
  }, onDone: () {
    debug('<<< socketOnDone, reconnectAttempts=$_reconnectAttempts, maxReconnectAttempts=$_maxReconnectAttempts');
    if (_maxReconnectAttempts == 0 || _reconnectAttempts < _maxReconnectAttempts) {
      _reconnectAttempts++;
      if (!_closed && _socket!.closeCode != null) {
        _closed = true;
        _socket!.sink.close(status.goingAway);
      }
      reconnect();
    } else {
      close();
    }
  });
}