openWithParams method
Future
openWithParams(
- String? correlationId,
- ConnectionParams? connection,
- CredentialParams? credential
Opens the component with given _connection and credential parameters.
correlationId
(optional) transaction id to trace execution through call chain.connection
connection parameterscredential
credential parameters Return Future that recive null if all ok Throws error
Implementation
@override
Future openWithParams(String? correlationId, ConnectionParams? connection,
CredentialParams? credential) async {
var options =
await _optionsResolver.compose(correlationId, connection, credential);
if (_queueName == null && _exchangeName == null) {
throw ConfigException(correlationId, 'NO_QUEUE',
'Queue or exchange are not defined in connection parameters');
}
var settings = amqp.ConnectionSettings();
var uri = Uri();
var url = options.get('uri');
uri = uri.resolve(url!);
settings.host = uri.host;
settings.port = uri.port;
if (uri.userInfo != '') {
var auth = amqp.PlainAuthenticator(
options.get('username')!, options.get('password')!);
settings.authProvider = auth;
}
_connection = amqp.Client(settings: settings);
await _connection!.connect();
_mqChanel = await _connection!.channel();
// Automatically create queue, exchange and binding
if (_autoCreate) {
if (_exchangeName != null) {
_exchange = await _mqChanel!
.exchange(_exchangeName!, _exchangeType, durable: _persistent);
}
if (!_noQueue) {
if (_queueName == null) {
_queue = await _mqChanel!.queue('',
durable: _persistent,
autoDelete: true,
exclusive: true,
noWait: false);
_queueName = _queue?.name;
} else {
_queue = await _mqChanel!.queue(_queueName!,
durable: _persistent,
exclusive: _exclusive,
autoDelete: _autoDelete,
noWait: false);
}
_queue =
await _queue!.bind(_exchange!, _routingKey ?? '', noWait: false);
}
}
return null;
}