connect method
Implementation
Future<void> connect(List<String> servers) async {
String currentServer = servers[0];
if (servers.isNotEmpty) {
servers.removeRange(0, 1);
servers.add(currentServer);
}
_servers = servers;
if (_subscription != null) {
_subscription!.cancel();
}
Uri serverURI = Uri.parse(currentServer);
HttpClient? httpClient;
if (_opts['tls'] != null) { //tls = true or tls is TlsTrustedClient
BaseTLS tls;
if (_opts['tls'] is! BaseTLS) {
tls = BaseTLS();
} else {
tls = _opts['tls'];
}
if (kIsWeb) {
HttpOverrides.global = tls;
} else {
httpClient = tls.createHttpClient(SecurityContext());
}
}
if (kIsWeb) {
_socket = WebSocketChannel.connect(serverURI);
} else {
//_socket = IOWebSocketChannel.connect(currentServer, pingInterval: _pingInterval);
WebSocket socket = await WebSocket.connect(serverURI.toString(), customClient: httpClient).then((webSocket) {
webSocket.pingInterval = _pingInterval;
return webSocket;
}).catchError(
(Object error) => throw WebSocketChannelException.from(error),
);
_socket = IOWebSocketChannel(socket);
}
_outbound.clear();
_subscription = _socket!.stream.listen((data) async {
_closed = false;
if (_openSubscription != null) {
final o = _openSubscription!;
o.buffer.addAll(data);
if (o.buffer.length >= o.totalBytes + 2) {
// end message [_CR, _LF]
o.subscription.callback!(SubscriptionResult(o.buffer.sublist(0, o.totalBytes), o.subscription, o.subject));
_openSubscription = null;
}
return;
}
if (_peeked) {
await parse(data);
return;
}
final len = protoLen(data);
if (len > 0) {
Uint8List out = data.sublist(0, len);
final pm = utf8.decode(out);
debug(pm, '>>>');
if (pm.isNotEmpty) {
final m = info.allMatches(pm);
if (m.length != 1 || m.elementAt(0).groupCount != 1) {
throw NatsError.errorForCode(ErrorCode.BAD_PAYLOAD);
}
String jsonString = m.elementAt(0).group(1).toString();
try {
Map info = jsonDecode(jsonString);
checkOptions(info, _opts);
_peeked = true;
final conn = _auth.getConnect(info['nonce'], serverURI, _opts);
final cs = json.encode(conn);
send(utf8.encode('CONNECT $cs$_CR_LF'));
send(cmdMap[_Command.PING]);
_statusCallback(Status.CONNECT, null);
} catch (err) {
debug(err, 'Socket connection error: ');
_pongCompleter.completeError(err);
}
}
}
//
}, onError: (Object error, StackTrace stackTrace) {
debug(error, '<<< socketOnError: ');
_statusCallback(Status.STALE_CONNECTION, error);
}, onDone: () {
debug('<<< socketOnDone, reconnectAttempts=$_reconnectAttempts, maxReconnectAttempts=$_maxReconnectAttempts');
if (_maxReconnectAttempts == 0 || _reconnectAttempts < _maxReconnectAttempts) {
_reconnectAttempts++;
if (!_closed && _socket!.closeCode != null) {
_closed = true;
_socket!.sink.close(status.goingAway);
}
reconnect();
} else {
close();
}
});
}