subscribe method
Implementation
Future<void> subscribe(
String topic, {
bool force = true,
EvtMqttPublishArrived? onMessage,
bool futureWaitData = false,
// mqtt head
bool retain = false,
MqttQos qos = MqttQos.qos0,
bool dup = false,
Duration? timeout,
}) {
var com = Completer<void>();
Timer? _timeout;
if (timeout != null) {
_timeout = Timer.periodic(timeout, (timer) {
timer.cancel();
if (!com.isCompleted) {
com.completeError("subcribe timeout");
}
});
}
if (onMessage != null) {
_dataArriveCallBack[topic] = onMessage;
}
// handle the events
_dataArriveCallBack[topic] = (msg) {
onMessage?.call(msg);
//futureWaitData
if (futureWaitData && !com.isCompleted) {
_timeout?.cancel();
com.complete();
}
};
if (!force && _finishedSubCache.contains(topic)) {
if (log) print("mqtt can't resub $topic when force=false");
return futureWaitData ? com.future : Future.value();
}
var id = MessageIdentifierDispenser().getNextMessageIdentifier();
// print("id $id");
_idTopic[id] = [topic];
var msg = MqttMessageSubscribe.withTopic(id, topic, qos);
msg.fixedHead.retain = retain;
msg.fixedHead.dup = dup;
_subList[topic] = msg;
// handle future when suback
if (!futureWaitData) {
_subComplate[topic] = () {
if (!com.isCompleted) {
_timeout?.cancel();
com.complete();
_subComplate.remove(topic);
}
};
}
_send(msg);
return com.future;
}