start method

void start()

start connect

Implementation

void start() {
  if (_stoped) return;
  if (_started) return; //once function
  _started = true;
  //register events
  transport.onConnect(() {
    _buf.clear();
    lasthead = null;
    _idTopic.clear();
    _finishedSubCache.clear();
    MessageIdentifierDispenser().reset();
    _resetTimePeriodic();
    _send(_connectPacket);
  });
  transport.onClose(() {
    _onConnectClose();
  });
  transport.onMessage((msg) {
    _buf.addAll(msg.message);
    while(_buf.bytes.length > 0){
      late MqttMessage pack;
      if (lasthead != null) {
        if (_buf.availableBytes < lasthead!.remainingLength) {
          // continue wait
          return;
        }
        try {
          _buf.shrink();
          pack = MqttMessageFactory.readMessage(lasthead!,
              MqttBuffer.fromList(_buf.read(lasthead!.remainingLength)));
          lasthead = null;
        } on Exception catch (e) {
          lasthead = null;
          loger.log("\u001b[31m$e\u001b[0m");
          close(e);
          return;
        }
      } else {
        var head = MqttMessageFactory.readHead(_buf);
        if (_buf.availableBytes < head.remainingLength) {
          lasthead = head;
          return;
        }
        try {
          _buf.shrink();
          pack = MqttMessageFactory.readMessage(
              head, MqttBuffer.fromList(_buf.read(head.remainingLength)));
          _buf.shrink();
        } on Exception catch (e) {
          loger.log("\u001b[31m$e\u001b[0m");
          close(e);
          return;
        }
      }

      if (log) {
        loger.log("\u001b[31m↓\u001b[0m ${pack.toString()}", name: "mqtt");
      }
      switch (pack.fixedHead.messageType) {
        case MqttMessageType.connack:
          _onMqttConack?.call(pack as MqttMessageConnack);
          if (_onMqttConack == null &&
              (pack as MqttMessageConnack).returnCode !=
                  MqttConnectReturnCode.connectionAccepted) {
            // _conn.close();
            close((pack).returnCode);
          }
          break;
        case MqttMessageType.suback:
          var obj = pack as MqttMessageSuback;
          if (_idTopic.containsKey(obj.msgid)) {
            var _topics = _idTopic[obj.msgid]!;
            // print("this topic: $_topics");
            _idTopic.remove(obj.msgid);
            for (var _topic in _topics) {
              _finishedSubCache.add(_topic);
              _subComplate[_topic]?.call();
            }
          }
          break;
        case MqttMessageType.publish:
          var obj = pack as MqttMessagePublish;
          final wildcardKeys = _dataArriveCallBack.keys.where((key) =>
            key.split('#').length == 2,
          ).map((key) =>
            key.split('#').first,
          ).where((key) =>
            obj.topicName.startsWith(key),
          ).firstOrNull;
          if (wildcardKeys != null) {
            _dataArriveCallBack['${wildcardKeys}#']?.call(obj);
          } else {
            _dataArriveCallBack[obj.topicName]?.call(obj);
          }
          break;
        default:
      }
    }
  });
  transport.connect(deadline: keepAlive);
}