onMessage method

  1. @override
Future<Packet> onMessage(
  1. Packet packet
)
override

Handles incoming messages.

This method is called when a new message is received. It performs the following actions:

  1. Decrypts the message payload.
  2. Checks if the message is a confirmation and handles it accordingly.
  3. Sends a confirmation if the message is confirmable.
  4. Forwards the message to the subscriber if it has a payload.

Implementation

@override
Future<Packet> onMessage(Packet packet) async {
  // Call the superclass's onMessage method to perform basic message
  // validation and routing.
  await super.onMessage(packet);

  // Decrypt the message payload using the configured cryptography instance.
  packet.payload = await crypto.unseal(packet.datagram);

  // If the message is a confirmation, complete the corresponding
  // acknowledgement and stop processing the message.
  if (packet.header.messageType == PacketType.confirmation) {
    _ackCompleters.remove(packet.header.id)?.complete();
    // Stop processing the message since it's a confirmation.
    throw const StopProcessing();
  }

  // If the message is confirmable, send a confirmation message back to
  // the sender.
  if (packet.header.messageType == PacketType.confirmable) {
    // Send a confirmation message asynchronously without awaiting its
    // completion.
    unawaited(
      crypto
          .seal(
            Message(
              header: packet.header.copyWith(
                messageType: PacketType.confirmation,
              ),
              srcPeerId: selfId,
              dstPeerId: packet.srcPeerId,
            ).toBytes(),
          )
          .then(
            (datagram) => sendDatagram(
              addresses: [packet.srcFullAddress],
              datagram: datagram,
            ),
          ),
    );
  }

  // If the message has a payload and there is a listener, forward the
  // message to the subscriber.
  if (packet.payload.isNotEmpty && _messageController.hasListener) {
    _messageController.add(
      Message(
        header: packet.header,
        srcPeerId: packet.srcPeerId,
        dstPeerId: packet.dstPeerId,
        payload: packet.payload,
      ),
    );
  }

  return packet;
}