onMessage method
Handles incoming messages.
packet The incoming Packet to be processed.
This method is called by the transports when a new message is received. It is responsible for validating the message, updating routing information and potentially forwarding the message to other peers.
Throws a StopProcessing exception if the message is invalid or has already been processed, or if an error occurs during processing.
Implementation
@override
Future<Packet> onMessage(Packet packet) async {
// Check if the datagram has the minimum required length.
if (!Message.hasCorrectLength(packet.datagram)) {
// If the datagram is too short, it's likely malformed. Stop processing.
throw const StopProcessing();
}
// Check if the message timestamp is within the allowed deltaT.
if (packet.header.issuedAt < _now - deltaT ||
packet.header.issuedAt > _now + deltaT) {
// If the timestamp is outside the acceptable range, it might be an old
// or invalid message. Stop processing to prevent replay attacks.
throw const ExceptionInvalidTimestamp();
}
// Extract the source peer ID from the datagram.
packet.srcPeerId = Message.getSrcPeerId(packet.datagram);
// Drop the packet if it is an echo message (sent by this peer).
if (packet.srcPeerId == _selfId) {
// Prevent processing messages sent by this router to avoid loops.
throw const StopProcessing();
}
// Get the route associated with the source peer ID.
final route = routes[packet.srcPeerId];
// Drop the packet if it is a duplicate (already processed).
if (route != null &&
Route.maxStoredHeaders > 0 &&
route.lastHeaders.contains(packet.header)) {
// If the header has already been seen, the message is likely a
// duplicate. Stop processing to prevent redundant handling.
throw const StopProcessing();
}
// Reset the forwards count in the packet header for signature verification.
PacketHeader.setForwardsCount(0, packet.datagram);
// If the peer is unknown, verify the signature
// and store the address if successful.
if (route?.addresses[packet.srcFullAddress] == null) {
try {
// Verify the signature of the datagram to ensure its authenticity.
await crypto.verify(packet.datagram);
} on ExceptionInvalidSignature {
// If the signature is invalid, the message is likely tampered with.
// Stop processing to prevent potential security risks.
throw const StopProcessing();
}
// Create a new route for the peer and store it in the routing table.
routes[packet.srcPeerId] = Route(
header: packet.header,
peerId: packet.srcPeerId,
address: (ip: packet.srcFullAddress, properties: AddressProperties()),
);
_log('Keep ${packet.srcFullAddress} for ${packet.srcPeerId}');
} else {
// If the peer is known, update the last seen timestamp and add the
// header to the route's history.
routes[packet.srcPeerId]!
..addresses[packet.srcFullAddress]?.updateLastSeen()
..addHeader(packet.header);
_log(
'Update lastseen of ${packet.srcFullAddress} for ${packet.srcPeerId}',
);
}
// Extract the destination peer ID from the datagram.
packet.dstPeerId = Message.getDstPeerId(packet.datagram);
// If the message is for this peer, return it.
if (packet.dstPeerId == _selfId) {
// The message is intended for this router, so we return it for further
// processing by higher-level layers.
return packet;
}
// Check if the forwards count exceeds the maximum limit.
if (packet.header.forwardsCount >= maxForwardsLimit) {
// If the message has been forwarded too many times, it's likely stuck
// in a loop. Stop processing to prevent infinite forwarding.
throw const StopProcessing();
}
// Resolve the destination peer's addresses, excluding the source address
// to prevent echo.
final addresses = resolvePeerId(
packet.dstPeerId,
).where((e) => e != packet.srcFullAddress);
// If no route to the destination peer is found, log an error.
if (addresses.isEmpty) {
// If we cannot find a route to the destination, it means the peer is
// currently unreachable. Log an error to indicate the routing failure.
_log(
'Unknown route to ${packet.dstPeerId}. '
'Failed forwarding from ${packet.srcFullAddress}',
);
} else {
// Increment the forwards count and forward the message to the resolved
// addresses.
sendDatagram(
addresses: addresses,
datagram: PacketHeader.setForwardsCount(
packet.header.forwardsCount + 1,
packet.datagram,
),
);
// Log the forwarding event, including the source and destination
// addresses and the size of the datagram.
_log(
'forwarded from ${packet.srcFullAddress} '
'to $addresses ${packet.datagram.length} bytes',
);
}
// Stop processing the packet after forwarding or failing to forward.
// For RouterL0, which acts primarily as a relay, further processing is
// not necessary. This prevents the message from being handled by higher-
// level routers or application logic.
throw const StopProcessing();
}