onData method

void onData(
  1. dynamic byteData
)

OnData listener callback

Implementation

void onData(dynamic /*String|List<int>*/ byteData) {
  MqttLogger.log('MqttBrowserConnection::_onData');

  // Normally the byteData is a ByteBuffer,
  // but for SKWasm / WASM, the byteData is a JSArrayBuffer,
  // so we need to convert it to a Dart ByteBuffer
  // before we convert it to a Uint8List.
  // ignore: invalid_runtime_check_with_js_interop_types
  if (byteData is JSArrayBuffer) {
    byteData = byteData.toDart;
  }

  // Protect against 0 bytes but should never happen.
  var data = Uint8List.view(byteData);
  if (data.isEmpty) {
    MqttLogger.log('MqttBrowserConnection::_ondata - Error - 0 byte message');
    return;
  }

  messageStream.addAll(data);

  while (messageStream.isMessageAvailable()) {
    var messageIsValid = true;
    MqttMessage? msg;

    try {
      msg = MqttMessage.createFrom(messageStream);
    } on Exception {
      MqttLogger.log(
          'MqttBrowserConnection::_ondata - message is not yet valid, '
          'waiting for more data ...');
      messageIsValid = false;
    }
    if (!messageIsValid) {
      messageStream.reset();
      return;
    }
    if (messageIsValid) {
      messageStream.shrink();
      MqttLogger.log(
          'MqttBrowserConnection::_onData - message received ', msg);
      if (!clientEventBus!.streamController.isClosed) {
        if (msg!.header!.messageType == MqttMessageType.connectAck) {
          clientEventBus!.fire(ConnectAckMessageAvailable(msg));
        } else {
          clientEventBus!.fire(MessageAvailable(msg));
        }
        MqttLogger.log(
            'MqttBrowserConnection::_onData - message available event fired');
      } else {
        MqttLogger.log(
            'MqttBrowserConnection::_onData - WARN - message available event not fired, event bus is closed');
      }
    }
  }
}