onData method
void
onData(
- dynamic byteData
OnData listener callback
Implementation
void onData(dynamic byteData) {
MqttLogger.log(
'MqttBrowserConnection::_onData - Message Received Started <<< ');
// Protect against 0 bytes but should never happen.
var data = Uint8List.view(byteData);
if (data.isEmpty) {
MqttLogger.log('MqttBrowserConnection::_ondata - Error - 0 byte message');
return;
}
MqttLogger.log(
'MqttBrowserConnection::_ondata - adding incoming data, data length is ${data.length}, '
'message stream length is ${messageStream.length}, '
'message stream position is ${messageStream.position}');
messageStream.addAll(data);
while (messageStream.isMessageAvailable()) {
var messageIsValid = true;
MqttMessage? msg;
try {
msg = MqttMessage.createFrom(messageStream);
if (msg == null) {
return;
}
} on Exception {
MqttLogger.log(
'MqttBrowserConnection::_ondata - message is not yet valid, '
'waiting for more data ...');
messageIsValid = false;
}
if (!messageIsValid) {
messageStream.reset();
return;
}
if (messageIsValid) {
MqttLogger.log(
'MqttBrowserConnection::_onData - MESSAGE RECEIVED -> ', msg);
// If we have received a valid message we must shrink the stream
messageStream.shrink();
if (clientEventBus != null) {
if (!clientEventBus!.streamController.isClosed) {
if (msg!.header!.messageType == MqttMessageType.connectAck) {
clientEventBus!.fire(MqttConnectAckMessageAvailable(msg));
} else {
clientEventBus!.fire(MqttMessageAvailable(msg));
}
MqttLogger.log(
'MqttBrowserConnection::_onData - message available event fired');
} else {
MqttLogger.log(
'MqttBrowserConnection::_onData - message not processed, event bus is closed');
}
} else {
MqttLogger.log(
'MqttBrowserConnection::_onData - message not processed, event bus is null');
}
}
}
MqttLogger.log(
'MqttBrowserConnection::_onData - Message Received Ended <<< ');
}