MqttMessageQueue class Null safety
Message queue that sends and receives messages via MQTT message broker.
MQTT is a popular light-weight protocol to communicate IoT devices.
Configuration parameters
topic
: name of MQTT topic to subscribeqos
: QoS from 0 to 2. Default 0connection(s)
:discovery_key
: (optional) a key to retrieve the connection from IDiscoveryhost
: host name or IP addressport
: port numberuri
: resource URI or connection string with all parameters in it
credential(s)
:store_key
: (optional) a key to retrieve the credentials from ICredentialStoreusername
: user namepassword
: user password
options
:serialize_envelope
: (optional) true to serialize entire message as JSON, false to send only message payload (default: true)autosubscribe
: (optional) true to automatically subscribe on option (default: false)qos
: (optional) quality of service level aka QOS (default: 0)retain
: (optional) retention flag for published messages (default: false)retry_connect
: (optional) turns on/off automated reconnect when connection is log (default: true)connect_timeout
: (optional) number of milliseconds to wait for connection (default: 30000)reconnect_timeout
: (optional) number of milliseconds to wait on each reconnection attempt (default: 1000)keepalive_timeout
: (optional) number of milliseconds to ping broker while inactive (default: 3000)
References
- :logger::*:1.0 (optional) ILogger components to pass log messages
- :counters::*:1.0 (optional) ICounters components to pass collected measurements
- :discovery::*:1.0 (optional) IDiscovery services to resolve connections
- :credential-store::*:1.0 (optional) Credential stores to resolve credentials
- :connection:mqtt::1.0 (optional) Shared connection to MQTT service
See MessageQueue See MessagingCapabilities
Example
var queue = MqttMessageQueue('myqueue');
queue.configure(ConfigParams.fromTuples([
'topic', 'mytopic',
'connection.protocol', 'mqtt'
'connection.host', 'localhost'
'connection.port', 1883
]));
await queue.open('123');
...
await queue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));
var message await = queue.receive('123')
if (message != null) {
...
await queue.complete('123', message);
}
- Inheritance
-
- Object
- MessageQueue
- MqttMessageQueue
- Implemented types
Constructors
- MqttMessageQueue([String? name])
- Creates a new instance of the message queue.
Properties
- capabilities ↔ MessagingCapabilities
-
read / writeinherited
- connectionResolver ↔ ConnectionResolver
-
read / writeinherited
- counters ↔ CompositeCounters
-
read / writeinherited
- credentialResolver ↔ CredentialResolver
-
read / writeinherited
- hashCode → int
-
The hash code for this object.
read-onlyinherited
- logger ↔ CompositeLogger
-
read / writeinherited
- name ↔ String
-
read / writeinherited
- runtimeType → Type
-
A representation of the runtime type of the object.
read-onlyinherited
Methods
-
abandon(
MessageEnvelope message) → Future -
Returnes message into the queue and makes it available for all subscribers to receive it again.
This method is usually used to return a message which could not be processed at the moment
to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently
or/and send to dead letter queue.
override
-
beginListen(
String? correlationId, IMessageReceiver receiver) → void -
Listens for incoming messages without blocking the current thread.
inherited
-
checkOpen(
String? correlationId) → void -
Checks if the queue has been opened and throws an exception is it's not.
inherited
-
clear(
String? correlationId) → Future -
Clears component state.
override
-
close(
String? correlationId) → Future -
Closes component and frees used resources.
override
-
complete(
MessageEnvelope message) → Future -
Permanently removes a message from the queue.
This method is usually used to remove the message after successful processing.
override
-
configure(
ConfigParams config) → void -
Configures component by passing configuration parameters.
override
-
endListen(
String? correlationId) → void -
Ends listening for incoming messages.
When this method is call listen unblocks the thread and execution continues.
override
-
fromMessage(
MessageEnvelope? message) → Map< String, dynamic> ? -
getCapabilities(
) → MessagingCapabilities -
Gets the queue capabilities
inherited
-
getName(
) → String -
Gets the queue name
inherited
-
getTopic(
) → String -
isOpen(
) → bool -
Checks if the component is opened.
override
-
listen(
String? correlationId, IMessageReceiver receiver) → void -
Listens for incoming messages and blocks the current thread until queue is closed.
override
-
moveToDeadLetter(
MessageEnvelope message) → Future -
Permanently removes a message from the queue and sends it to dead letter queue.
override
-
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a non-existent method or property is accessed.
inherited
-
onMessage(
String topic, String data, dynamic packet) → void -
override
-
open(
String? correlationId) → Future -
Opens the component.
override
-
openWithParams(
String? correlationId, ConnectionParams? connection, CredentialParams? credential) → Future -
Opens the component with given connection and credential parameters.
override
-
peek(
String? correlationId) → Future< MessageEnvelope?> -
Peeks a single incoming message from the queue without removing it.
If there are no messages available in the queue it returns null.
override
-
peekBatch(
String? correlationId, int messageCount) → Future< List< MessageEnvelope> > -
Peeks multiple incoming messages from the queue without removing them.
If there are no messages available in the queue it returns an empty list.
override
-
readMessageCount(
) → Future< int> -
Reads the current number of messages in the queue to be delivered.
override
-
receive(
String? correlationId, int waitTimeout) → Future< MessageEnvelope?> -
Receives an incoming message and removes it from the queue.
override
-
renewLock(
MessageEnvelope message, int lockTimeout) → Future -
Renews a lock on a message that makes it invisible from other receivers in the queue.
This method is usually used to extend the message processing time.
override
-
send(
String? correlationId, MessageEnvelope message) → Future -
Sends a message into the queue.
override
-
sendAsObject(
String? correlationId, String messageType, dynamic message) → Future -
Sends an object into the queue.
Before sending the object is converted into JSON string and wrapped in a MessageEnvelope.
inherited
-
sendMessageToReceiver(
IMessageReceiver? receiver, MessageEnvelope? message) → void -
setReferences(
IReferences references) → void -
Sets references to dependent components.
override
-
subscribe(
String? correlationId) → Future -
toString(
) → String -
Gets a string representation of the object.
inherited
-
unsetReferences(
) → void -
Unsets (clears) previously set references to dependent components.
override
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited