openWithParams method

  1. @override
Future openWithParams(
  1. String? correlationId,
  2. ConnectionParams? connection,
  3. CredentialParams? credential
)

Opens the component with given _connection and credential parameters.

  • correlationId (optional) transaction id to trace execution through call chain.
  • connection connection parameters
  • credential credential parameters Return Future that recive null if all ok Throws error

Implementation

@override
Future openWithParams(String? correlationId, ConnectionParams? connection,
    CredentialParams? credential) async {
  var options =
      await _optionsResolver.compose(correlationId, connection, credential);

  if (_queueName == null && _exchangeName == null) {
    throw ConfigException(correlationId, 'NO_QUEUE',
        'Queue or exchange are not defined in connection parameters');
  }

  var settings = amqp.ConnectionSettings();
  var uri = Uri();
  var url = options.get('uri');
  uri = uri.resolve(url!);
  settings.host = uri.host;
  settings.port = uri.port;
  if (uri.userInfo != '') {
    var auth = amqp.PlainAuthenticator(
        options.get('username')!, options.get('password')!);
    settings.authProvider = auth;
  }

  _connection = amqp.Client(settings: settings);
  await _connection!.connect();

  _mqChanel = await _connection!.channel();

  // Automatically create queue, exchange and binding
  if (_autoCreate) {
    if (_exchangeName != null) {
      _exchange = await _mqChanel!
          .exchange(_exchangeName!, _exchangeType, durable: _persistent);
    }
    if (!_noQueue) {
      if (_queueName == null) {
        _queue = await _mqChanel!.queue('',
            durable: _persistent,
            autoDelete: true,
            exclusive: true,
            noWait: false);

        _queueName = _queue?.name;
      } else {
        _queue = await _mqChanel!.queue(_queueName!,
            durable: _persistent,
            exclusive: _exclusive,
            autoDelete: _autoDelete,
            noWait: false);
      }

      _queue =
          await _queue!.bind(_exchange!, _routingKey ?? '', noWait: false);
    }
  }
  return null;
}