handlePublish method

bool handlePublish(
  1. MqttMessage? msg
)

Handles the receipt of publish messages from a message broker.

Implementation

bool handlePublish(MqttMessage? msg) {
  final pubMsg = msg as MqttPublishMessage;
  var publishSuccess = true;
  try {
    final topic = PublicationTopic(pubMsg.variableHeader!.topicName);
    MqttLogger.log(
        'PublishingManager::handlePublish - publish received from broker with topic $topic');
    if (pubMsg.header!.qos == MqttQos.atMostOnce) {
      // QOS AtMostOnce 0 require no response.
      // Send the message for processing to whoever is waiting.
      _fireMessageReceived(topic, msg);
      _notifyPublish(msg);
    } else if (pubMsg.header!.qos == MqttQos.atLeastOnce) {
      // QOS AtLeastOnce 1 requires an acknowledgement
      // Send the message for processing to whoever is waiting.
      _fireMessageReceived(topic, msg);
      _notifyPublish(msg);
      // If configured the client will send the acknowledgement, else the user must.
      final messageIdentifier = pubMsg.variableHeader!.messageIdentifier;
      if (!manuallyAcknowledgeQos1) {
        final ackMsg =
            MqttPublishAckMessage().withMessageIdentifier(messageIdentifier);
        connectionHandler!.sendMessage(ackMsg);
      } else {
        // Add to the awaiting manual acknowledge list
        awaitingManualAcknowledge[messageIdentifier!] = pubMsg;
      }
    } else if (pubMsg.header!.qos == MqttQos.exactlyOnce) {
      // QOS ExactlyOnce means we can't give it away yet, we need to do a handshake
      // to make sure the broker knows we got it, and we know he knows we got it.
      // If we've already got it thats ok, it just means its being republished because
      // of a handshake breakdown, overwrite our existing one for the sake of it
      if (!receivedMessages
          .containsKey(pubMsg.variableHeader!.messageIdentifier)) {
        receivedMessages[pubMsg.variableHeader!.messageIdentifier] = pubMsg;
      }
      final pubRecv = MqttPublishReceivedMessage()
          .withMessageIdentifier(pubMsg.variableHeader!.messageIdentifier);
      connectionHandler!.sendMessage(pubRecv);
    }
  } on Exception {
    publishSuccess = false;
  }
  return publishSuccess;
}