createIncoming static method
UDXStream
createIncoming(
- UDX udx,
- UDPSocket socket,
- int localId,
- int remoteId,
- String host,
- int port, {
- required ConnectionId destinationCid,
- required ConnectionId sourceCid,
- bool framed = false,
- int initialSeq = 0,
- int? initialCwnd,
- bool firewall(
- UDPSocket socket,
- int port,
- String host
)?,
})
Implementation
static UDXStream createIncoming(
UDX udx,
UDPSocket socket,
int localId,
int remoteId,
String host,
int port, {
required ConnectionId destinationCid,
required ConnectionId sourceCid,
bool framed = false,
int initialSeq = 0,
int? initialCwnd,
bool Function(UDPSocket socket, int port, String host)? firewall,
}) {
if (socket.closing) {
throw StateError('UDXStream.createIncoming: Socket is closing');
}
final stream = UDXStream(
udx,
localId,
isInitiator: false,
initialCwnd: initialCwnd,
framed: framed,
initialSeq: initialSeq,
firewall: firewall,
);
stream._socket = socket;
stream.remoteId = remoteId;
stream.remoteHost = host;
stream.remotePort = port;
stream.remoteFamily = UDX.getAddressFamily(host);
socket.registerStream(stream);
stream._remoteConnectionWindowUpdateSubscription?.cancel();
stream._remoteConnectionWindowUpdateSubscription = stream._socket!.on('remoteConnectionWindowUpdate').listen(stream._handleRemoteConnectionWindowUpdate);
////print('[UDXStream ${stream.id} Subscribing to remoteConnectionWindowUpdate on socket: ${stream._socket.hashCode}]');
stream._connected = true;
// Manually process the initial SYN packet to send a SYN-ACK
final initialPacketBytes = socket.popInitialPacket(localId);
if (initialPacketBytes != null) {
try {
final synPacket = UDXPacket.fromBytes(initialPacketBytes);
// We received a SYN, so we must ACK it.
// The next expected sequence from the peer is their sequence + 1.
stream._nextExpectedSeq = synPacket.sequence + 1;
stream._receivedPacketSequences.add(synPacket.sequence);
stream._largestAckedPacketArrivalTime = DateTime.now();
// Construct and send the SYN-ACK packet immediately.
final synAckPacket = UDXPacket(
destinationCid: sourceCid, // The incoming packet's source is our destination
sourceCid: destinationCid, // Our source is the incoming packet's destination
destinationStreamId: remoteId,
sourceStreamId: localId,
sequence: stream.packetManager.nextSequence,
frames: [
// The SYN part of the SYN-ACK
StreamFrame(data: Uint8List(0), isSyn: true),
// The ACK part of the SYN-ACK
AckFrame(
largestAcked: synPacket.sequence,
ackDelay: 0,
firstAckRangeLength: 1,
ackRanges: [],
),
],
);
// Send the packet and register it for potential retransmission
stream._sentPackets[synAckPacket.sequence] = (DateTime.now(), 0); // SYN-ACK has 0 data size
socket.send(synAckPacket.toBytes());
stream.packetManager.sendPacket(synAckPacket);
stream._congestionController.onPacketSent(0);
} catch (e) {
// If parsing fails, close the stream with an error.
stream.addError(StateError('Failed to parse initial SYN packet: $e'));
stream.close();
}
}
stream.packetManager.onRetransmit = (packet) {
if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
if (stream._socket!.closing) {
////print('[UDXStream ${stream.id}.onRetransmit] Socket is closing, skipping packet retransmission.');
return;
}
stream._socket!.send(packet.toBytes());
};
stream.packetManager.onSendProbe = (packet) {
if (!stream.connected || stream._socket == null || stream.remotePort == null || stream.remoteHost == null) return;
if (stream._socket!.closing) {
////print('[UDXStream ${stream.id}.onSendProbe] Socket is closing, skipping probe packet send.');
return;
}
stream._socket!.send(packet.toBytes());
};
if (stream._socket != null && stream.remoteHost != null && stream.remotePort != null) {
if (!stream._socket!.closing) {
stream._socket!.sendMaxDataFrame(
UDPSocket.defaultInitialConnectionWindow
);
////print('[UDXStream.createIncoming] DEBUG: Attempted to send MaxDataFrame for stream ${stream.id} to ${stream.remoteHost}:${stream.remotePort}');
} else {
////print('[UDXStream ${stream.id}.createIncoming] Socket is closing, skipping MaxDataFrame send.');
}
}
stream.emit('accepted');
return stream;
}