onMessage method
Handles incoming messages.
This method is called when a new message is received. It performs the following actions:
- Decrypts the message payload.
- Checks if the message is a confirmation and handles it accordingly.
- Sends a confirmation if the message is confirmable.
- Forwards the message to the subscriber if it has a payload.
Implementation
@override
Future<Packet> onMessage(Packet packet) async {
// Call the superclass's onMessage method to perform basic message
// validation and routing.
await super.onMessage(packet);
// Decrypt the message payload using the configured cryptography instance.
packet.payload = await crypto.unseal(packet.datagram);
// If the message is a confirmation, complete the corresponding
// acknowledgement and stop processing the message.
if (packet.header.messageType == PacketType.confirmation) {
_ackCompleters.remove(packet.header.id)?.complete();
// Stop processing the message since it's a confirmation.
throw const StopProcessing();
}
// If the message is confirmable, send a confirmation message back to
// the sender.
if (packet.header.messageType == PacketType.confirmable) {
// Send a confirmation message asynchronously without awaiting its
// completion.
unawaited(
crypto
.seal(
Message(
header: packet.header.copyWith(
messageType: PacketType.confirmation,
),
srcPeerId: selfId,
dstPeerId: packet.srcPeerId,
).toBytes(),
)
.then(
(datagram) => sendDatagram(
addresses: [packet.srcFullAddress],
datagram: datagram,
),
),
);
}
// If the message has a payload and there is a listener, forward the
// message to the subscriber.
if (packet.payload.isNotEmpty && _messageController.hasListener) {
_messageController.add(
Message(
header: packet.header,
srcPeerId: packet.srcPeerId,
dstPeerId: packet.dstPeerId,
payload: packet.payload,
),
);
}
return packet;
}