handleIncomingDatagram method

void handleIncomingDatagram(
  1. Uint8List data,
  2. InternetAddress fromAddress,
  3. int fromPort
)

Processes an incoming datagram from the multiplexer.

Implementation

void handleIncomingDatagram(Uint8List data, InternetAddress fromAddress, int fromPort) {
  try {
    final packet = UDXPacket.fromBytes(data);
    // Always update the remote CID from the packet's source CID.
    // This ensures that even during retransmissions or path migrations,
    // we are targeting the correct peer identifier.
    cids.remoteCid = packet.sourceCid;

    if (!_handshakeCompleted) {
      // The handshake is considered complete on the first valid packet received.
      _handshakeCompleted = true;
      if (!_handshakeCompleter.isCompleted) {
        _handshakeCompleter.complete();
      }
      emit('connect');
    }
  } catch (e) {
    // Ignore if it's not a valid packet.
    return;
  }

  try {
    final udxPacket = UDXPacket.fromBytes(data);

    // --- Path Migration Logic ---
    final pathHasChanged = remoteAddress.address != fromAddress.address || remotePort != fromPort;
    // A packet arrived from a new path. If we aren't already validating a
    // path, start a new validation.
    if (pathHasChanged && _pathChallengeData == null) {
      _initiatePathValidation(fromAddress, fromPort);
    }
    final remotePeerKey = '${fromAddress.address}:$fromPort';

    // --- PMTUD: Handle ACKs for Probes & Trigger New Probes ---
    for (final frame in udxPacket.frames.whereType<AckFrame>()) {
      _handleAckFrameForPmtud(frame);
    }
    _sendMtuProbeIfNeeded();

    // Process connection-level frames first
    for (final frame in udxPacket.frames) {
      if (frame is MaxDataFrame) {
        _handleMaxDataFrame(frame);
      } else if (frame is MaxStreamsFrame) {
        _handleMaxStreamsFrame(frame);
      } else if (frame is PathChallengeFrame) {
        _handlePathChallenge(frame, fromAddress, fromPort);
      } else if (frame is PathResponseFrame) {
        _handlePathResponse(frame, fromAddress, fromPort);
      } else if (frame is StreamFrame) {
        _connectionBytesReceived += frame.data.length;
        _checkAndSendLocalMaxDataUpdate();
      }
    }

    final targetStreamId = udxPacket.destinationStreamId;
    if (_registeredStreams.containsKey(targetStreamId)) {
      final targetStream = _registeredStreams[targetStreamId]!;
      targetStream.internalHandleSocketEvent({
        'data': data,
        'address': fromAddress.address,
        'port': fromPort,
      });
    } else {
      // Check for a SYN flag to create a new stream
      final synFrame = udxPacket.frames.whereType<StreamFrame>().firstWhereOrNull((f) => f.isSyn);
      if (synFrame != null) {
        // print( '[SOCK ${cids.localCid}] SYN frame detected for stream ${udxPacket.destinationStreamId}. Creating new UDXStream.');
        // Enforce incoming stream limit
        final currentIncomingStreams = _registeredStreams.values.where((s) => !s.isInitiator).length;

        if (currentIncomingStreams >= _localMaxStreams) {
          // Reject the stream
          final resetFrame = ResetStreamFrame(errorCode: 2); // STREAM_LIMIT_ERROR
          final responsePacket = UDXPacket(
            destinationCid: udxPacket.sourceCid,
            sourceCid: udxPacket.destinationCid,
            destinationStreamId: udxPacket.sourceStreamId,
            sourceStreamId: targetStreamId,
            sequence: 0,
            frames: [resetFrame],
          );
          multiplexer.send(responsePacket.toBytes(), fromAddress, fromPort);
          return;
        }

        // Buffer the initial packet *before* creating the stream,
        // so the stream's constructor can pop it.
        _initialPacketBuffer[targetStreamId] = data;

        final newStream = UDXStream.createIncoming(
          udx,
          this,
          targetStreamId,
          udxPacket.sourceStreamId,
          fromAddress.address,
          fromPort,
          initialSeq: udxPacket.sequence,
          destinationCid: udxPacket.destinationCid,
          sourceCid: udxPacket.sourceCid,
        );

        emit('stream', newStream);
        _streamBuffer.add(newStream);
      } else if (targetStreamId != 0) {
        emit('unmatchedUDXPacket', {
          'packet': udxPacket,
          'remoteAddress': fromAddress,
          'remotePort': fromPort,
          'rawData': data,
        });
      }
    }
  } catch (e) {
    // //print('UDPSocket: Error processing incoming datagram: $e. From: ${fromAddress.address}:$fromPort');
  }
}