open method

  1. @override
Future open(
  1. IContext? context
)

Opens the component.

  • context (optional) a context to trace execution through call chain.

Implementation

@override
Future open(IContext? context) async {
  if (connection_ != null) {
    return;
  }

  var options = await connectionResolver_.resolve(context);

  // todo
  // options['reconnectPeriod'] = reconnectTimeout_.toString();

  var client = mqtt.MqttServerClient.withPort(
      options.getAsString('host'), clientId_, options.getAsInteger('port'));
  client.keepAlivePeriod = keepAliveTimeout_ ~/ 1000;
  client.autoReconnect = retryConnect_;
  client.resubscribeOnAutoReconnect = retryConnect_;
  client.setProtocolV311();

  client.onConnected = () {
    connection_ = client;
    logger_.debug(
        context, 'Connected to MQTT broker at ${options.getAsString('uri')}');
  };

  var username = options['username'];
  var password = options['password'];

  try {
    await client
        .connect(username, password)
        .timeout(Duration(milliseconds: connectTimeout_));
  } catch (ex) {
    logger_.error(context, ex as Exception,
        'Failed to connect to MQTT broker at ${options.getAsString('uri')}');
    var err = ConnectionException(
            context != null ? ContextResolver.getTraceId(context) : null,
            'CONNECT_FAILED',
            'Connection to MQTT broker failed')
        .withCause(ex);
    throw err;
  }
}