open method
Opens the component.
correlationId
(optional) transaction id to trace execution through call chain.
Implementation
@override
Future open(String? correlationId) async {
if (connection_ != null) {
return;
}
var options = await connectionResolver_.resolve(correlationId);
// todo
// options['reconnectPeriod'] = reconnectTimeout_.toString();
var client = mqtt.MqttServerClient.withPort(
options.getAsString('host'), clientId_, options.getAsInteger('port'));
client.keepAlivePeriod = keepAliveTimeout_ ~/ 1000;
client.autoReconnect = retryConnect_;
client.resubscribeOnAutoReconnect = retryConnect_;
client.setProtocolV311();
client.onConnected = () {
connection_ = client;
logger_.debug(correlationId,
'Connected to MQTT broker at ' + options.getAsString('uri'));
};
var username = options['username'];
var password = options['password'];
try {
await client
.connect(username, password)
.timeout(Duration(milliseconds: connectTimeout_));
} catch (ex) {
logger_.error(correlationId, ex as Exception,
'Failed to connect to MQTT broker at ' + options.getAsString('uri'));
var err = ConnectionException(correlationId, 'CONNECT_FAILED',
'Connection to MQTT broker failed')
.withCause(ex);
throw err;
}
}