handleIncomingDatagram method
Processes an incoming datagram from the multiplexer.
Implementation
Future<void> handleIncomingDatagram(Uint8List data, InternetAddress fromAddress, int fromPort) async {
// Track received bytes for anti-amplification
if (!_addressValidated) {
_bytesReceivedBeforeValidation += data.length;
// Address is validated after receiving a certain amount of data
// or on successful handshake completion
if (_bytesReceivedBeforeValidation >= 1000 || _handshakeCompleted) {
_onAddressValidated();
}
}
try {
final packet = UDXPacket.fromBytes(data);
// Check if version is supported
if (!UdxVersion.isSupported(packet.version) && !_handshakeCompleted) {
// Send VERSION_NEGOTIATION packet
final versionNegPacket = VersionNegotiationPacket(
destinationCid: packet.sourceCid,
sourceCid: packet.destinationCid,
supportedVersions: UdxVersion.supportedVersions,
);
multiplexer.send(versionNegPacket.toBytes(), fromAddress, fromPort);
emit('versionNegotiation', {'clientVersion': packet.version, 'supportedVersions': UdxVersion.supportedVersions});
return;
}
// Always update the remote CID from the packet's source CID.
// This ensures that even during retransmissions or path migrations,
// we are targeting the correct peer identifier.
cids.remoteCid = packet.sourceCid;
if (!_handshakeCompleted) {
// The handshake is considered complete on the first valid packet received.
_handshakeCompleted = true;
if (!_handshakeCompleter.isCompleted) {
_handshakeCompleter.complete();
}
// Notify observer of successful handshake
if (_handshakeStartTime != null) {
final duration = DateTime.now().difference(_handshakeStartTime!);
metricsObserver?.onHandshakeComplete(cids.localCid, duration, true, null);
}
emit('connect');
}
} catch (e) {
// Ignore if it's not a valid packet.
return;
}
try {
final udxPacket = UDXPacket.fromBytes(data);
// --- Path Migration Logic ---
final pathHasChanged = remoteAddress.address != fromAddress.address || remotePort != fromPort;
if (pathHasChanged && _pathChallengeData == null) {
_initiatePathValidation(fromAddress, fromPort);
}
// --- PMTUD: Handle ACKs for Probes & Trigger New Probes ---
for (final frame in udxPacket.frames.whereType<AckFrame>()) {
_handleAckFrameForPmtud(frame);
}
_sendMtuProbeIfNeeded();
// --- Process connection-level frames (not sequence-dependent) ---
for (final frame in udxPacket.frames) {
if (frame is ConnectionCloseFrame) {
emit('connectionClose', {
'errorCode': frame.errorCode,
'frameType': frame.frameType,
'reason': frame.reasonPhrase
});
await close();
return;
} else if (frame is MaxDataFrame) {
_handleMaxDataFrame(frame);
} else if (frame is MaxStreamsFrame) {
_handleMaxStreamsFrame(frame);
} else if (frame is PathChallengeFrame) {
_handlePathChallenge(frame, fromAddress, fromPort);
} else if (frame is PathResponseFrame) {
_handlePathResponse(frame, fromAddress, fromPort);
} else if (frame is DataBlockedFrame) {
emit('dataBlocked', {'maxData': frame.maxData});
}
}
// --- Process ACK frames at connection level ---
for (final frame in udxPacket.frames.whereType<AckFrame>()) {
_handleConnectionAckFrame(frame);
}
// --- Process RESET, STOP_SENDING, WINDOW_UPDATE immediately (not sequence-dependent) ---
for (final frame in udxPacket.frames) {
if (frame is ResetStreamFrame) {
final targetStreamId = udxPacket.destinationStreamId;
final stream = _registeredStreams[targetStreamId];
if (stream != null) {
stream.deliverReset(frame.errorCode);
}
} else if (frame is StopSendingFrame) {
final targetStreamId = udxPacket.destinationStreamId;
final stream = _registeredStreams[targetStreamId];
if (stream != null) {
stream.deliverStopSending(frame.errorCode);
}
} else if (frame is WindowUpdateFrame) {
final targetStreamId = udxPacket.destinationStreamId;
final stream = _registeredStreams[targetStreamId];
if (stream != null) {
stream.deliverWindowUpdate(frame.windowSize);
}
}
}
// --- Connection-level receive ordering ---
bool needsAck = false;
bool containsAckElicitingFrames = udxPacket.frames.any((f) => f is StreamFrame || f is PingFrame);
bool hasStreamData = udxPacket.frames.any((f) => f is StreamFrame && (f.data.isNotEmpty || f.isFin || f.isSyn));
// Control-only packets (ACKs, window updates) are processed immediately
// without advancing _nextExpectedSeq. They don't carry ordered stream
// data, so they must not consume sequence numbers. The Go UDX sends
// control packets with seq=0 to avoid creating sequence gaps, but even
// if a peer uses non-zero sequences for controls, we handle it safely.
if (!hasStreamData) {
_processPacketFrames(udxPacket, fromAddress, fromPort);
if (containsAckElicitingFrames) {
_receivedPacketSequences.add(udxPacket.sequence);
needsAck = true;
}
} else {
bool packetIsSequential = (udxPacket.sequence == _nextExpectedSeq);
if (packetIsSequential) {
_receivedPacketSequences.add(udxPacket.sequence);
_largestAckedPacketArrivalTime = DateTime.now();
_processPacketFrames(udxPacket, fromAddress, fromPort);
_nextExpectedSeq++;
_processConnectionReceiveBuffer(fromAddress, fromPort);
if (containsAckElicitingFrames) needsAck = true;
} else if (udxPacket.sequence > _nextExpectedSeq) {
// Future data packet — buffer for later
_connectionReceiveBuffer[udxPacket.sequence] = udxPacket;
if (containsAckElicitingFrames) {
_receivedPacketSequences.add(udxPacket.sequence);
needsAck = true;
}
} else {
// Old/duplicate data packet — still ACK
if (containsAckElicitingFrames) {
_receivedPacketSequences.add(udxPacket.sequence);
needsAck = true;
}
}
}
if (needsAck) {
_sendConnectionAck();
}
} catch (e) {
// Ignore invalid packets
}
}