internalHandleSocketEvent method
void
internalHandleSocketEvent(
- dynamic event
Handles an incoming socket event (datagram).
Implementation
void internalHandleSocketEvent(dynamic event) {
final Uint8List rawData = event['data'] as Uint8List;
final String sourceHost = event['address'] as String;
final int sourcePort = event['port'] as int;
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Received event from $sourceHost:$sourcePort. My remote: $remoteHost:$remotePort. My ID: $id, My remoteID: $remoteId');
if (remoteHost == null || remotePort == null) {
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: remoteHost or remotePort is null.');
return;
}
if (sourceHost != remoteHost || sourcePort != remotePort) {
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: Source $sourceHost:$sourcePort does not match expected remote $remoteHost:$remotePort.');
return;
}
UDXPacket packet;
try {
packet = UDXPacket.fromBytes(rawData);
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Parsed packet: DestID=${packet.destinationStreamId}, SrcID=${packet.sourceStreamId}, Seq=${packet.sequence}, Frames=${packet.frames.map((f) => f.type).toList()}');
} catch (e) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Error parsing packet: $e');
return;
}
if (packet.destinationStreamId != id) {
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: Packet DestID ${packet.destinationStreamId} does not match my ID $id.');
return;
}
if (remoteId == null) {
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: My remoteId is null.');
return;
}
if (packet.sourceStreamId != remoteId) {
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: Packet SrcID ${packet.sourceStreamId} does not match my remoteId $remoteId.');
return;
}
// ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Processing packet from $sourceHost:$sourcePort, DestID=${packet.destinationStreamId}, SrcID=${packet.sourceStreamId}, Seq=${packet.sequence}');
bool needsAck = false;
bool packetIsSequential = (packet.sequence == _nextExpectedSeq);
bool containsAckElicitingFrames = packet.frames.any((f) => f is StreamFrame || f is PingFrame);
// Process ACK and RESET frames first, as they are not sequence-dependent.
for (final frame in packet.frames) {
if (frame is ResetStreamFrame) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Immediate ResetStreamFrame: ErrorCode=${frame.errorCode}');
addError(StreamResetError(frame.errorCode));
_close(isReset: true);
return; // Stop all further processing
}
if (frame is AckFrame) {
// DIAGNOSTIC LOGGING START
// ////print('[UDXStream $id internalHandleSocketEvent] ACK FRAME: largestAcked=${frame.largestAcked}, firstAckRangeLength=${frame.firstAckRangeLength}, ackRanges=${frame.ackRanges.map((r) => "G:${r.gap},L:${r.ackRangeLength}").toList()}');
// ////print('[UDXStream $id internalHandleSocketEvent] PRE-ACK: _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck');
// DIAGNOSTIC LOGGING END
// Let PacketManager update its internal state (_sentPackets, _retransmitTimers)
final List<int> newlySackedSequences = packetManager.handleAckFrame(frame);
// Update stream's own knowledge of all ever acked sequences (for _sendAck SACK generation)
// and its true highest cumulative ack. This is independent of CC mirror logic below.
// This is now handled by packetManager.handleAckFrame which updates _currentHighestCumulativeAckFromRemote
// based on its internal processing of frame.largestAcked and frame.ackRanges.
// Determine if this ACK frame advances the Congestion Controller's cumulative ACK point.
// _currentHighestCumulativeAckFromRemote is updated by packetManager.handleAckFrame based on the frame's content.
bool advancesCcCumulativePoint = _currentHighestCumulativeAckFromRemote > _ccMirrorOfHighestProcessedCumulativeAck;
// DIAGNOSTIC LOGGING START
// ////print('[UDXStream $id internalHandleSocketEvent] advancesCcCumulativePoint=$advancesCcCumulativePoint (_currentHighestCumulativeAckFromRemote=$_currentHighestCumulativeAckFromRemote > _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck)');
// DIAGNOSTIC LOGGING END
// This is the largest value that the CC should consider as its new cumulative ACK point if advancesCcCumulativePoint is true.
// If not advancing, CC should use its old _ccMirrorOfHighestProcessedCumulativeAck for context.
// final int effectiveCumulativeAckForCC = advancesCcCumulativePoint ? _currentHighestCumulativeAckFromRemote : _ccMirrorOfHighestProcessedCumulativeAck; // Old logic
final int largestAckedInThisFrame = frame.largestAcked; // New logic: always pass frame.largestAcked as the 5th arg to onPacketAcked
if (newlySackedSequences.isNotEmpty) {
for (final ackedSeq in newlySackedSequences) {
final sentPacketInfo = _sentPackets.remove(ackedSeq);
if (sentPacketInfo != null) {
final (sentTime, ackedSize) = sentPacketInfo;
// A packet contributes to a new cumulative ACK for the CC if:
// 1. The CC's overall cumulative point is advancing in this frame (advancesCcCumulativePoint is true)
// 2. This specific packet (ackedSeq) falls within the range of this new advancement.
// It must be greater than the CC's old mirror and less than or equal to the stream's new true cumulative ack.
bool isNewCumulativeForCCArg = advancesCcCumulativePoint &&
ackedSeq > _ccMirrorOfHighestProcessedCumulativeAck &&
ackedSeq <= _currentHighestCumulativeAckFromRemote;
// DIAGNOSTIC LOGGING START
// ////print('[UDXStream $id internalHandleSocketEvent] CALLING _congestionController.onPacketAcked: ackedSeq=$ackedSeq, ackedSize=$ackedSize, sentTime=$sentTime, ackDelayMs=${frame.ackDelay}, isNewCumulativeForCC=$isNewCumulativeForCCArg, currentFrameLargestAckedValueToPass=${largestAckedInThisFrame}');
// DIAGNOSTIC LOGGING END
_congestionController.onPacketAcked(
ackedSize,
sentTime,
Duration(milliseconds: frame.ackDelay),
isNewCumulativeForCCArg,
largestAckedInThisFrame // Pass frame.largestAcked as the 5th argument
);
// Update connection-level window accounting
if (_socket != null) {
_socket!.decrementConnectionBytesSent(ackedSize);
}
// Clean up retransmission attempt tracking for acknowledged packets
_retransmissionAttempts.remove(ackedSeq);
emit('ack', ackedSeq);
if (_drain != null && !_drain!.isCompleted) {
final connWindowAvailable = _socket?.getAvailableConnectionSendWindow() ?? 0;
if (inflight < cwnd && inflight < _remoteReceiveWindow && connWindowAvailable > 0) {
_drain!.complete();
}
}
}
}
}
if (advancesCcCumulativePoint) {
_ccMirrorOfHighestProcessedCumulativeAck = _currentHighestCumulativeAckFromRemote;
} else {
// If the CC's cumulative point did not advance with this frame (advancesCcCumulativePoint is false),
// then this ACK frame, from the CC's perspective of its *own* cumulative progress,
// acts as a duplicate ACK for the sequence number it was expecting next (_ccMirrorOfHighestProcessedCumulativeAck + 1).
// We pass _ccMirrorOfHighestProcessedCumulativeAck to processDuplicateAck because that's the
// highest cumulative point the CC has processed so far.
// DIAGNOSTIC LOGGING START
// ////print('[UDXStream $id internalHandleSocketEvent] CALLING _congestionController.processDuplicateAck: _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck (because CC cumulative point did not advance)');
// DIAGNOSTIC LOGGING END
_congestionController.processDuplicateAck(_ccMirrorOfHighestProcessedCumulativeAck);
}
// DIAGNOSTIC LOGGING START
// ////print('[UDXStream $id internalHandleSocketEvent] POST-ACK: _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck');
// DIAGNOSTIC LOGGING END
// TODO: Handle ECN counts from ACK frame if ECN is implemented.
}
}
if (packetIsSequential) {
// This packet is the next expected one. Process its frames.
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Processing sequential packet: Seq=${packet.sequence}');
_receivedPacketSequences.add(packet.sequence); // Track for ACK generation
_largestAckedPacketArrivalTime = DateTime.now(); // Update arrival time for potential largest_acked
for (final frame in packet.frames) {
if (frame is StreamFrame) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential StreamFrame: DataLen=${frame.data.length}, IsFin=${frame.isFin}');
needsAck = true;
if (frame.data.isNotEmpty) {
////print('[DEBUG] UDXStream ${this.id} adding data to _dataController: ${frame.data.length} bytes');
_dataController.add(frame.data);
if (_socket != null && remoteHost != null && remotePort != null) {
_socket!.onStreamDataProcessed(frame.data.length);
}
}
if (frame.isFin) {
////print('[DEBUG] UDXStream ${this.id} closing _dataController due to FIN flag');
_dataController.close();
}
} else if (frame is PingFrame) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential PingFrame.');
needsAck = true;
} else if (frame is WindowUpdateFrame) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential WindowUpdateFrame: Size=${frame.windowSize}');
_remoteReceiveWindow = frame.windowSize;
if (_drain != null && !_drain!.isCompleted) {
final connWindowAvailable = _socket?.getAvailableConnectionSendWindow() ?? 0;
if (inflight < cwnd && inflight < _remoteReceiveWindow && connWindowAvailable > 0) {
_drain!.complete();
}
}
emit('drain');
} else if (frame is MaxDataFrame) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential MaxDataFrame (unusual in stream context but handling): Size=${frame.maxData}');
if (_drain != null && !_drain!.isCompleted) {
final connWindowAvailable = _socket?.getAvailableConnectionSendWindow() ?? 0;
if (inflight < cwnd && inflight < _remoteReceiveWindow && connWindowAvailable > 0) {
_drain!.complete();
}
}
}
// AckFrames and ResetStreamFrames are handled above and don't affect sequential processing here.
}
_nextExpectedSeq++; // Advance expected sequence *after* processing all frames in this sequential packet
_processReceiveBuffer(sourceHost, sourcePort);
} else if (packet.sequence > _nextExpectedSeq) {
// Packet is for the future. Buffer if it contains stream data.
bool hasStreamDataContent = packet.frames.any((f) => f is StreamFrame && (f.data.isNotEmpty || f.isFin));
if (hasStreamDataContent) {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Buffering out-of-order packet with StreamFrame(s): Seq=${packet.sequence}, ExpectedSeq=$_nextExpectedSeq.');
_receiveBuffer[packet.sequence] = packet; // Buffer the whole packet
} else {
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Received out-of-order packet without StreamFrames (e.g. just ACK or future PING): Seq=${packet.sequence}. Processing relevant frames.');
}
// Always ACK out-of-order packets that might contain data or PINGs to help sender with loss detection/recovery.
// AckFrames themselves don't need this type of ACK.
if (containsAckElicitingFrames) {
_receivedPacketSequences.add(packet.sequence); // Add to sequences for SACKing
// _largestAckedPacketArrivalTime will be updated in _sendAck based on the full _receivedPacketSequences
needsAck = true;
}
} else { // packet.sequence < _nextExpectedSeq
// Packet is older than expected, likely a duplicate.
////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding old/duplicate packet: Seq=${packet.sequence}, ExpectedSeq=$_nextExpectedSeq.');
// Still ACK if it contained frames that would normally be ACKed (Stream, Ping)
// This ensures SACKs are sent even for duplicates, helping sender confirm receipt.
if (containsAckElicitingFrames) {
_receivedPacketSequences.add(packet.sequence); // Add to sequences for SACKing
needsAck = true;
}
}
if (needsAck) {
_sendAck();
}
}