onData method
- @protected
- dynamic data
OnData listener callback
Implementation
@protected
void onData(dynamic /*String|List<int>*/ data) {
MqttLogger.log('MqttConnection::onData');
// Protect against 0 bytes but should never happen.
if (data.length == 0) {
MqttLogger.log('MqttServerConnection::onData - Error - 0 byte message');
return;
}
messageStream.addAll(data);
while (messageStream.isMessageAvailable()) {
var messageIsValid = true;
MqttMessage? msg;
try {
msg = MqttMessage.createFrom(messageStream);
} on Exception {
MqttLogger.log(
'MqttServerConnection::_ondata - message is not yet valid, '
'waiting for more data ...');
messageIsValid = false;
}
if (!messageIsValid) {
messageStream.reset();
return;
}
if (messageIsValid) {
messageStream.shrink();
MqttLogger.log(
'MqttServerConnection::_onData - message received ', msg);
if (!clientEventBus!.streamController.isClosed) {
if (msg!.header!.messageType == MqttMessageType.connectAck) {
clientEventBus!.fire(ConnectAckMessageAvailable(msg));
} else {
clientEventBus!.fire(MessageAvailable(msg));
}
MqttLogger.log(
'MqttServerConnection::_onData - message available event fired');
} else {
MqttLogger.log(
'MqttServerConnection::_onData - WARN - message available event not fired, event bus is closed');
}
}
}
}