subscribe method

Future<void> subscribe(
  1. String topic, {
  2. bool force = true,
  3. EvtMqttPublishArrived? onMessage,
  4. bool futureWaitData = false,
  5. bool retain = false,
  6. MqttQos qos = MqttQos.qos0,
  7. bool dup = false,
  8. Duration? timeout,
})

Implementation

Future<void> subscribe(
  String topic, {
  bool force = true,
  EvtMqttPublishArrived? onMessage,
  bool futureWaitData = false,
  // mqtt head
  bool retain = false,
  MqttQos qos = MqttQos.qos0,
  bool dup = false,
  Duration? timeout,
}) {
  var com = Completer<void>();
  Timer? _timeout;
  if (timeout != null) {
    _timeout = Timer.periodic(timeout, (timer) {
      timer.cancel();
      if (!com.isCompleted) {
        com.completeError("subcribe timeout");
      }
    });
  }
  if (onMessage != null) {
    _dataArriveCallBack[topic] = onMessage;
  }
  // handle the events
  _dataArriveCallBack[topic] = (msg) {
    onMessage?.call(msg);
    //futureWaitData
    if (futureWaitData && !com.isCompleted) {
      _timeout?.cancel();
      com.complete();
    }
  };

  if (!force && _finishedSubCache.contains(topic)) {
    if (log) print("mqtt can't resub $topic when force=false");

    return futureWaitData ? com.future : Future.value();
  }
  var id = MessageIdentifierDispenser().getNextMessageIdentifier();
  // print("id $id");
  _idTopic[id] = [topic];

  var msg = MqttMessageSubscribe.withTopic(id, topic, qos);

  msg.fixedHead.retain = retain;
  msg.fixedHead.dup = dup;
  _subList[topic] = msg;
  // handle future when suback
  if (!futureWaitData) {
    _subComplate[topic] = () {
      if (!com.isCompleted) {
        _timeout?.cancel();
        com.complete();
        _subComplate.remove(topic);
      }
    };
  }
  _send(msg);
  return com.future;
}