handleIncomingDatagram method

Future<void> handleIncomingDatagram(
  1. Uint8List data,
  2. InternetAddress fromAddress,
  3. int fromPort
)

Processes an incoming datagram from the multiplexer.

Implementation

Future<void> handleIncomingDatagram(Uint8List data, InternetAddress fromAddress, int fromPort) async {
  // Track received bytes for anti-amplification
  if (!_addressValidated) {
    _bytesReceivedBeforeValidation += data.length;
    // Address is validated after receiving a certain amount of data
    // or on successful handshake completion
    if (_bytesReceivedBeforeValidation >= 1000 || _handshakeCompleted) {
      _onAddressValidated();
    }
  }

  try {
    final packet = UDXPacket.fromBytes(data);

    // Check if version is supported
    if (!UdxVersion.isSupported(packet.version) && !_handshakeCompleted) {
      // Send VERSION_NEGOTIATION packet
      final versionNegPacket = VersionNegotiationPacket(
        destinationCid: packet.sourceCid,
        sourceCid: packet.destinationCid,
        supportedVersions: UdxVersion.supportedVersions,
      );
      multiplexer.send(versionNegPacket.toBytes(), fromAddress, fromPort);
      emit('versionNegotiation', {'clientVersion': packet.version, 'supportedVersions': UdxVersion.supportedVersions});
      return;
    }

    // Always update the remote CID from the packet's source CID.
    // This ensures that even during retransmissions or path migrations,
    // we are targeting the correct peer identifier.
    cids.remoteCid = packet.sourceCid;

    if (!_handshakeCompleted) {
      // The handshake is considered complete on the first valid packet received.
      _handshakeCompleted = true;
      if (!_handshakeCompleter.isCompleted) {
        _handshakeCompleter.complete();
      }

      // Notify observer of successful handshake
      if (_handshakeStartTime != null) {
        final duration = DateTime.now().difference(_handshakeStartTime!);
        metricsObserver?.onHandshakeComplete(cids.localCid, duration, true, null);
      }

      emit('connect');
    }
  } catch (e) {
    // Ignore if it's not a valid packet.
    return;
  }

  try {
    final udxPacket = UDXPacket.fromBytes(data);

    // --- Path Migration Logic ---
    final pathHasChanged = remoteAddress.address != fromAddress.address || remotePort != fromPort;
    if (pathHasChanged && _pathChallengeData == null) {
      _initiatePathValidation(fromAddress, fromPort);
    }

    // --- PMTUD: Handle ACKs for Probes & Trigger New Probes ---
    for (final frame in udxPacket.frames.whereType<AckFrame>()) {
      _handleAckFrameForPmtud(frame);
    }
    _sendMtuProbeIfNeeded();

    // --- Process connection-level frames (not sequence-dependent) ---
    for (final frame in udxPacket.frames) {
      if (frame is ConnectionCloseFrame) {
        emit('connectionClose', {
          'errorCode': frame.errorCode,
          'frameType': frame.frameType,
          'reason': frame.reasonPhrase
        });
        await close();
        return;
      } else if (frame is MaxDataFrame) {
        _handleMaxDataFrame(frame);
      } else if (frame is MaxStreamsFrame) {
        _handleMaxStreamsFrame(frame);
      } else if (frame is PathChallengeFrame) {
        _handlePathChallenge(frame, fromAddress, fromPort);
      } else if (frame is PathResponseFrame) {
        _handlePathResponse(frame, fromAddress, fromPort);
      } else if (frame is DataBlockedFrame) {
        emit('dataBlocked', {'maxData': frame.maxData});
      }
    }

    // --- Process ACK frames at connection level ---
    for (final frame in udxPacket.frames.whereType<AckFrame>()) {
      _handleConnectionAckFrame(frame);
    }

    // --- Process RESET, STOP_SENDING, WINDOW_UPDATE immediately (not sequence-dependent) ---
    for (final frame in udxPacket.frames) {
      if (frame is ResetStreamFrame) {
        final targetStreamId = udxPacket.destinationStreamId;
        final stream = _registeredStreams[targetStreamId];
        if (stream != null) {
          stream.deliverReset(frame.errorCode);
        }
      } else if (frame is StopSendingFrame) {
        final targetStreamId = udxPacket.destinationStreamId;
        final stream = _registeredStreams[targetStreamId];
        if (stream != null) {
          stream.deliverStopSending(frame.errorCode);
        }
      } else if (frame is WindowUpdateFrame) {
        final targetStreamId = udxPacket.destinationStreamId;
        final stream = _registeredStreams[targetStreamId];
        if (stream != null) {
          stream.deliverWindowUpdate(frame.windowSize);
        }
      }
    }

    // --- Connection-level receive ordering ---
    bool needsAck = false;
    bool containsAckElicitingFrames = udxPacket.frames.any((f) => f is StreamFrame || f is PingFrame);
    bool hasStreamData = udxPacket.frames.any((f) => f is StreamFrame && (f.data.isNotEmpty || f.isFin || f.isSyn));

    // Control-only packets (ACKs, window updates) are processed immediately
    // without advancing _nextExpectedSeq. They don't carry ordered stream
    // data, so they must not consume sequence numbers. The Go UDX sends
    // control packets with seq=0 to avoid creating sequence gaps, but even
    // if a peer uses non-zero sequences for controls, we handle it safely.
    if (!hasStreamData) {
      _processPacketFrames(udxPacket, fromAddress, fromPort);
      if (containsAckElicitingFrames) {
        _receivedPacketSequences.add(udxPacket.sequence);
        needsAck = true;
      }
    } else {
      bool packetIsSequential = (udxPacket.sequence == _nextExpectedSeq);

      if (packetIsSequential) {
        _receivedPacketSequences.add(udxPacket.sequence);
        _largestAckedPacketArrivalTime = DateTime.now();

        _processPacketFrames(udxPacket, fromAddress, fromPort);
        _nextExpectedSeq++;
        _processConnectionReceiveBuffer(fromAddress, fromPort);

        if (containsAckElicitingFrames) needsAck = true;
      } else if (udxPacket.sequence > _nextExpectedSeq) {
        // Future data packet — buffer for later
        _connectionReceiveBuffer[udxPacket.sequence] = udxPacket;
        if (containsAckElicitingFrames) {
          _receivedPacketSequences.add(udxPacket.sequence);
          needsAck = true;
        }
      } else {
        // Old/duplicate data packet — still ACK
        if (containsAckElicitingFrames) {
          _receivedPacketSequences.add(udxPacket.sequence);
          needsAck = true;
        }
      }
    }

    if (needsAck) {
      _sendConnectionAck();
    }
  } catch (e) {
    // Ignore invalid packets
  }
}