handleIncomingDatagram method
Processes an incoming datagram from the multiplexer.
Implementation
void handleIncomingDatagram(Uint8List data, InternetAddress fromAddress, int fromPort) {
try {
final packet = UDXPacket.fromBytes(data);
// Always update the remote CID from the packet's source CID.
// This ensures that even during retransmissions or path migrations,
// we are targeting the correct peer identifier.
cids.remoteCid = packet.sourceCid;
if (!_handshakeCompleted) {
// The handshake is considered complete on the first valid packet received.
_handshakeCompleted = true;
if (!_handshakeCompleter.isCompleted) {
_handshakeCompleter.complete();
}
emit('connect');
}
} catch (e) {
// Ignore if it's not a valid packet.
return;
}
try {
final udxPacket = UDXPacket.fromBytes(data);
// --- Path Migration Logic ---
final pathHasChanged = remoteAddress.address != fromAddress.address || remotePort != fromPort;
// A packet arrived from a new path. If we aren't already validating a
// path, start a new validation.
if (pathHasChanged && _pathChallengeData == null) {
_initiatePathValidation(fromAddress, fromPort);
}
final remotePeerKey = '${fromAddress.address}:$fromPort';
// --- PMTUD: Handle ACKs for Probes & Trigger New Probes ---
for (final frame in udxPacket.frames.whereType<AckFrame>()) {
_handleAckFrameForPmtud(frame);
}
_sendMtuProbeIfNeeded();
// Process connection-level frames first
for (final frame in udxPacket.frames) {
if (frame is MaxDataFrame) {
_handleMaxDataFrame(frame);
} else if (frame is MaxStreamsFrame) {
_handleMaxStreamsFrame(frame);
} else if (frame is PathChallengeFrame) {
_handlePathChallenge(frame, fromAddress, fromPort);
} else if (frame is PathResponseFrame) {
_handlePathResponse(frame, fromAddress, fromPort);
} else if (frame is StreamFrame) {
_connectionBytesReceived += frame.data.length;
_checkAndSendLocalMaxDataUpdate();
}
}
final targetStreamId = udxPacket.destinationStreamId;
if (_registeredStreams.containsKey(targetStreamId)) {
final targetStream = _registeredStreams[targetStreamId]!;
targetStream.internalHandleSocketEvent({
'data': data,
'address': fromAddress.address,
'port': fromPort,
});
} else {
// Check for a SYN flag to create a new stream
final synFrame = udxPacket.frames.whereType<StreamFrame>().firstWhereOrNull((f) => f.isSyn);
if (synFrame != null) {
// print( '[SOCK ${cids.localCid}] SYN frame detected for stream ${udxPacket.destinationStreamId}. Creating new UDXStream.');
// Enforce incoming stream limit
final currentIncomingStreams = _registeredStreams.values.where((s) => !s.isInitiator).length;
if (currentIncomingStreams >= _localMaxStreams) {
// Reject the stream
final resetFrame = ResetStreamFrame(errorCode: 2); // STREAM_LIMIT_ERROR
final responsePacket = UDXPacket(
destinationCid: udxPacket.sourceCid,
sourceCid: udxPacket.destinationCid,
destinationStreamId: udxPacket.sourceStreamId,
sourceStreamId: targetStreamId,
sequence: 0,
frames: [resetFrame],
);
multiplexer.send(responsePacket.toBytes(), fromAddress, fromPort);
return;
}
// Buffer the initial packet *before* creating the stream,
// so the stream's constructor can pop it.
_initialPacketBuffer[targetStreamId] = data;
final newStream = UDXStream.createIncoming(
udx,
this,
targetStreamId,
udxPacket.sourceStreamId,
fromAddress.address,
fromPort,
initialSeq: udxPacket.sequence,
destinationCid: udxPacket.destinationCid,
sourceCid: udxPacket.sourceCid,
);
emit('stream', newStream);
_streamBuffer.add(newStream);
} else if (targetStreamId != 0) {
emit('unmatchedUDXPacket', {
'packet': udxPacket,
'remoteAddress': fromAddress,
'remotePort': fromPort,
'rawData': data,
});
}
}
} catch (e) {
// //print('UDPSocket: Error processing incoming datagram: $e. From: ${fromAddress.address}:$fromPort');
}
}