createIncoming static method

UDXStream createIncoming(
  1. UDX udx,
  2. UDPSocket socket,
  3. int localId,
  4. int remoteId,
  5. String host,
  6. int port, {
  7. required ConnectionId destinationCid,
  8. required ConnectionId sourceCid,
  9. bool framed = false,
  10. int initialSeq = 0,
  11. int? initialCwnd,
  12. bool firewall(
    1. UDPSocket socket,
    2. int port,
    3. String host
    )?,
})

Implementation

static UDXStream createIncoming(
  UDX udx,
  UDPSocket socket,
  int localId,
  int remoteId,
  String host,
  int port, {
  required ConnectionId destinationCid,
  required ConnectionId sourceCid,
  bool framed = false,
  int initialSeq = 0,
  int? initialCwnd,
  bool Function(UDPSocket socket, int port, String host)? firewall,
}) {
  if (socket.closing) {
    throw StateError('UDXStream.createIncoming: Socket is closing');
  }
  final stream = UDXStream(
    udx,
    localId,
    isInitiator: false,
    initialCwnd: initialCwnd,
    framed: framed,
    initialSeq: initialSeq,
    firewall: firewall,
  );
  stream._socket = socket;
  stream.remoteId = remoteId;
  stream.remoteHost = host;
  stream.remotePort = port;
  stream.remoteFamily = UDX.getAddressFamily(host);
  socket.registerStream(stream);
  stream._remoteConnectionWindowUpdateSubscription?.cancel();
  stream._remoteConnectionWindowUpdateSubscription = stream._socket!.on('remoteConnectionWindowUpdate').listen(stream._handleRemoteConnectionWindowUpdate);
  ////print('[UDXStream ${stream.id} Subscribing to remoteConnectionWindowUpdate on socket: ${stream._socket.hashCode}]');
  stream._connected = true;

  // Manually process the initial SYN packet to send a SYN-ACK
  final initialPacketBytes = socket.popInitialPacket(localId);
  if (initialPacketBytes != null) {
    try {
      final synPacket = UDXPacket.fromBytes(initialPacketBytes);
      // We received a SYN, so we must ACK it.
      // The next expected sequence from the peer is their sequence + 1.
      stream._nextExpectedSeq = synPacket.sequence + 1;
      stream._receivedPacketSequences.add(synPacket.sequence);
      stream._largestAckedPacketArrivalTime = DateTime.now();

      // Construct and send the SYN-ACK packet immediately.
      final synAckPacket = UDXPacket(
        destinationCid: sourceCid, // The incoming packet's source is our destination
        sourceCid: destinationCid, // Our source is the incoming packet's destination
        destinationStreamId: remoteId,
        sourceStreamId: localId,
        sequence: stream.packetManager.nextSequence,
        frames: [
          // The SYN part of the SYN-ACK
          StreamFrame(data: Uint8List(0), isSyn: true),
          // The ACK part of the SYN-ACK
          AckFrame(
            largestAcked: synPacket.sequence,
            ackDelay: 0,
            firstAckRangeLength: 1,
            ackRanges: [],
          ),
        ],
      );

      // Send the packet and register it for potential retransmission
      stream._sentPackets[synAckPacket.sequence] = (DateTime.now(), 0); // SYN-ACK has 0 data size
      socket.send(synAckPacket.toBytes());
      stream.packetManager.sendPacket(synAckPacket);
      stream._congestionController.onPacketSent(0);

    } catch (e) {
      // If parsing fails, close the stream with an error.
      stream.addError(StateError('Failed to parse initial SYN packet: $e'));
      stream.close();
    }
  }

  stream.packetManager.onRetransmit = (packet) {
    if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
    if (stream._socket!.closing) {
      ////print('[UDXStream ${stream.id}.onRetransmit] Socket is closing, skipping packet retransmission.');
      return;
    }
    stream._socket!.send(packet.toBytes());
  };
  stream.packetManager.onSendProbe = (packet) {
    if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
    if (stream._socket!.closing) {
      ////print('[UDXStream ${stream.id}.onSendProbe] Socket is closing, skipping probe packet send.');
      return;
    }
    stream._socket!.send(packet.toBytes());
  };

  if (stream._socket != null && stream.remoteHost != null && stream.remotePort != null) {
    if (!stream._socket!.closing) {
      stream._socket!.sendMaxDataFrame(
        UDPSocket.defaultInitialConnectionWindow
      );
      ////print('[UDXStream.createIncoming] DEBUG: Attempted to send MaxDataFrame for stream ${stream.id} to ${stream.remoteHost}:${stream.remotePort}');
    } else {
      ////print('[UDXStream ${stream.id}.createIncoming] Socket is closing, skipping MaxDataFrame send.');
    }
  }

  stream.emit('accepted');
  return stream;
}