onData method

  1. @protected
void onData(
  1. dynamic data
)

OnData listener callback

Implementation

@protected
void onData(dynamic /*String|List<int>*/ data) {
  MqttLogger.log('MqttConnection::onData');
  // Protect against 0 bytes but should never happen.
  if (data.length == 0) {
    MqttLogger.log('MqttServerConnection::onData - Error - 0 byte message');
    return;
  }

  messageStream.addAll(data);

  while (messageStream.isMessageAvailable()) {
    var messageIsValid = true;
    MqttMessage? msg;

    try {
      msg = MqttMessage.createFrom(messageStream);
    } on Exception {
      MqttLogger.log(
          'MqttServerConnection::_ondata - message is not yet valid, '
          'waiting for more data ...');
      messageIsValid = false;
    }
    if (!messageIsValid) {
      messageStream.reset();
      return;
    }
    if (messageIsValid) {
      messageStream.shrink();
      MqttLogger.log(
          'MqttServerConnection::_onData - message received ', msg);
      if (!clientEventBus!.streamController.isClosed) {
        if (msg!.header!.messageType == MqttMessageType.connectAck) {
          clientEventBus!.fire(ConnectAckMessageAvailable(msg));
        } else {
          clientEventBus!.fire(MessageAvailable(msg));
        }
        MqttLogger.log(
            'MqttServerConnection::_onData - message available event fired');
      } else {
        MqttLogger.log(
            'MqttServerConnection::_onData - WARN - message available event not fired, event bus is closed');
      }
    }
  }
}