internalConnect method
Future<MqttConnectionStatus>
internalConnect(
- String? hostname,
- int? port,
- MqttConnectMessage? connectMessage
override
Synchronously connect to the specific Mqtt Connection.
Implementation
@override
Future<MqttConnectionStatus> internalConnect(
String? hostname, int? port, MqttConnectMessage? connectMessage) async {
var connectionAttempts = 0;
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect entered');
do {
// Initiate the connection
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect - '
'initiating connection try $connectionAttempts, auto reconnect in progress $autoReconnectInProgress');
connectionStatus.state = MqttConnectionState.connecting;
// Don't reallocate the connection if this is an auto reconnect
if (!autoReconnectInProgress!) {
connection = MqttBrowserWsConnection(clientEventBus);
if (websocketProtocols != null) {
connection.protocols = websocketProtocols;
}
connection.onDisconnected = onDisconnected;
}
// Connect
try {
if (!autoReconnectInProgress!) {
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect - calling connect');
await connection.connect(hostname, port);
} else {
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect - calling connectAuto');
await connection.connectAuto(hostname, port);
}
} on Exception {
// Ignore exceptions in an auto reconnect sequence
if (autoReconnectInProgress!) {
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect'
' exception thrown during auto reconnect - ignoring');
} else {
rethrow;
}
}
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect - '
'connection complete');
// Transmit the required connection message to the broker.
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect '
'sending connect message');
sendMessage(connectMessage!);
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect - '
'pre sleep, state = $connectionStatus');
// We're the sync connection handler so we need to wait for the
// brokers acknowledgement of the connections
await connectTimer.sleep();
connectionAttempts++;
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect - '
'post sleep, state = $connectionStatus');
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress!) {
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect failed, attempt $connectionAttempts');
if (onFailedConnectionAttempt != null) {
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::calling onFailedConnectionAttempt');
onFailedConnectionAttempt!(connectionAttempts);
}
}
}
} while (connectionStatus.state != MqttConnectionState.connected &&
connectionAttempts < maxConnectionAttempts!);
// If we've failed to handshake with the broker, throw an exception.
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress!) {
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect failed');
if (onFailedConnectionAttempt == null) {
if (connectionStatus.reasonCode == MqttConnectReasonCode.notSet) {
throw MqttNoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
} else {
throw MqttNoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The reason code is ${connectionStatus.reasonCode}');
}
} else {
connectionStatus.state = MqttConnectionState.faulted;
}
}
}
MqttLogger.log(
'MqttSynchronousMqttBrowserConnectionHandler::internalConnect '
'exited with state $connectionStatus');
initialConnectionComplete = true;
return connectionStatus;
}