start method
void
start()
start connect
Implementation
void start() {
if (_stoped) return;
if (_started) return; //once function
_started = true;
//register events
transport.onConnect(() {
_buf.clear();
lasthead = null;
_idTopic.clear();
_finishedSubCache.clear();
MessageIdentifierDispenser().reset();
_resetTimePeriodic();
_send(_connectPacket);
});
transport.onClose(() {
_onConnectClose();
});
transport.onMessage((msg) {
_buf.addAll(msg.message);
while(_buf.bytes.length > 0){
late MqttMessage pack;
if (lasthead != null) {
if (_buf.availableBytes < lasthead!.remainingLength) {
// continue wait
return;
}
try {
_buf.shrink();
pack = MqttMessageFactory.readMessage(lasthead!,
MqttBuffer.fromList(_buf.read(lasthead!.remainingLength)));
lasthead = null;
} on Exception catch (e) {
lasthead = null;
loger.log("\u001b[31m$e\u001b[0m");
close(e);
return;
}
} else {
var head = MqttMessageFactory.readHead(_buf);
if (_buf.availableBytes < head.remainingLength) {
lasthead = head;
return;
}
try {
_buf.shrink();
pack = MqttMessageFactory.readMessage(
head, MqttBuffer.fromList(_buf.read(head.remainingLength)));
_buf.shrink();
} on Exception catch (e) {
loger.log("\u001b[31m$e\u001b[0m");
close(e);
return;
}
}
if (log) {
loger.log("\u001b[31m↓\u001b[0m ${pack.toString()}", name: "mqtt");
}
switch (pack.fixedHead.messageType) {
case MqttMessageType.connack:
_onMqttConack?.call(pack as MqttMessageConnack);
if (_onMqttConack == null &&
(pack as MqttMessageConnack).returnCode !=
MqttConnectReturnCode.connectionAccepted) {
// _conn.close();
close((pack).returnCode);
}
break;
case MqttMessageType.suback:
var obj = pack as MqttMessageSuback;
if (_idTopic.containsKey(obj.msgid)) {
var _topics = _idTopic[obj.msgid]!;
// print("this topic: $_topics");
_idTopic.remove(obj.msgid);
for (var _topic in _topics) {
_finishedSubCache.add(_topic);
_subComplate[_topic]?.call();
}
}
break;
case MqttMessageType.publish:
var obj = pack as MqttMessagePublish;
final wildcardKeys = _dataArriveCallBack.keys.where((key) =>
key.split('#').length == 2,
).map((key) =>
key.split('#').first,
).where((key) =>
obj.topicName.startsWith(key),
).firstOrNull;
if (wildcardKeys != null) {
_dataArriveCallBack['${wildcardKeys}#']?.call(obj);
} else {
_dataArriveCallBack[obj.topicName]?.call(obj);
}
break;
default:
}
}
});
transport.connect(deadline: keepAlive);
}