onData method

void onData(
  1. dynamic byteData
)

OnData listener callback

Implementation

void onData(dynamic byteData) {
  MqttLogger.log(
      'MqttBrowserConnection::_onData - Message Received Started <<< ');

  // Normally the byteData is a ByteBuffer,
  // but for SKWasm / WASM, the byteData is a JSArrayBuffer,
  // so we need to convert it to a Dart ByteBuffer
  // before we convert it to a Uint8List.
  // ignore: invalid_runtime_check_with_js_interop_types
  if (byteData is JSArrayBuffer) {
    byteData = byteData.toDart;
  }
  // Protect against 0 bytes but should never happen.
  var data = Uint8List.view(byteData);
  if (data.isEmpty) {
    MqttLogger.log('MqttBrowserConnection::_ondata - Error - 0 byte message');
    return;
  }

  MqttLogger.log(
      'MqttBrowserConnection::_ondata - adding incoming data, data length is ${data.length}, '
      'message stream length is ${messageStream.length}, '
      'message stream position is ${messageStream.position}');
  messageStream.addAll(data);

  while (messageStream.isMessageAvailable()) {
    var messageIsValid = true;
    MqttMessage? msg;

    try {
      msg = MqttMessage.createFrom(messageStream);
      if (msg == null) {
        return;
      }
    } on Exception {
      MqttLogger.log(
          'MqttBrowserConnection::_ondata - message is not yet valid, '
          'waiting for more data ...');
      messageIsValid = false;
    }
    if (!messageIsValid) {
      messageStream.reset();
      return;
    }
    if (messageIsValid) {
      MqttLogger.log(
          'MqttBrowserConnection::_onData - MESSAGE RECEIVED -> ', msg);
      // If we have received a valid message we must shrink the stream
      messageStream.shrink();
      if (clientEventBus != null) {
        if (!clientEventBus!.streamController.isClosed) {
          if (msg!.header!.messageType == MqttMessageType.connectAck) {
            clientEventBus!.fire(MqttConnectAckMessageAvailable(msg));
          } else {
            clientEventBus!.fire(MqttMessageAvailable(msg));
          }
          MqttLogger.log(
              'MqttBrowserConnection::_onData - message available event fired');
        } else {
          MqttLogger.log(
              'MqttBrowserConnection::_onData - message not processed, event bus is closed');
        }
      } else {
        MqttLogger.log(
            'MqttBrowserConnection::_onData - message not processed, event bus is null');
      }
    }
  }
  MqttLogger.log(
      'MqttBrowserConnection::_onData - Message Received Ended <<< ');
}