open method

  1. @override
Future open(
  1. String? correlationId
)

Opens the component.

  • correlationId (optional) transaction id to trace execution through call chain.

Implementation

@override
Future open(String? correlationId) async {
  if (connection_ != null) {
    return;
  }

  var options = await connectionResolver_.resolve(correlationId);

  // todo
  // options['reconnectPeriod'] = reconnectTimeout_.toString();

  var client = mqtt.MqttServerClient.withPort(
      options.getAsString('host'), clientId_, options.getAsInteger('port'));
  client.keepAlivePeriod = keepAliveTimeout_ ~/ 1000;
  client.autoReconnect = retryConnect_;
  client.resubscribeOnAutoReconnect = retryConnect_;
  client.setProtocolV311();

  client.onConnected = () {
    connection_ = client;
    logger_.debug(correlationId,
        'Connected to MQTT broker at ' + options.getAsString('uri'));
  };

  var username = options['username'];
  var password = options['password'];

  try {
    await client
        .connect(username, password)
        .timeout(Duration(milliseconds: connectTimeout_));
  } catch (ex) {
    logger_.error(correlationId, ex as Exception,
        'Failed to connect to MQTT broker at ' + options.getAsString('uri'));
    var err = ConnectionException(correlationId, 'CONNECT_FAILED',
            'Connection to MQTT broker failed')
        .withCause(ex);
    throw err;
  }
}