internalHandleSocketEvent method

void internalHandleSocketEvent(
  1. dynamic event
)

Handles an incoming socket event (datagram).

Implementation

void internalHandleSocketEvent(dynamic event) {
  final Uint8List rawData = event['data'] as Uint8List;
  final String sourceHost = event['address'] as String;
  final int sourcePort = event['port'] as int;

  // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Received event from $sourceHost:$sourcePort. My remote: $remoteHost:$remotePort. My ID: $id, My remoteID: $remoteId');

  if (remoteHost == null || remotePort == null) {
    // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: remoteHost or remotePort is null.');
    return;
  }
  if (sourceHost != remoteHost || sourcePort != remotePort) {
    // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: Source $sourceHost:$sourcePort does not match expected remote $remoteHost:$remotePort.');
    return;
  }

  UDXPacket packet;
  try {
    packet = UDXPacket.fromBytes(rawData);
    // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Parsed packet: DestID=${packet.destinationStreamId}, SrcID=${packet.sourceStreamId}, Seq=${packet.sequence}, Frames=${packet.frames.map((f) => f.type).toList()}');
  } catch (e) {
    ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Error parsing packet: $e');
    return;
  }

  if (packet.destinationStreamId != id) {
    // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: Packet DestID ${packet.destinationStreamId} does not match my ID $id.');
    return;
  }

  if (remoteId == null) {
    // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: My remoteId is null.');
    return;
  }
  if (packet.sourceStreamId != remoteId) {
    // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding: Packet SrcID ${packet.sourceStreamId} does not match my remoteId $remoteId.');
    return;
  }

  // ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Processing packet from $sourceHost:$sourcePort, DestID=${packet.destinationStreamId}, SrcID=${packet.sourceStreamId}, Seq=${packet.sequence}');

  bool needsAck = false;
  bool packetIsSequential = (packet.sequence == _nextExpectedSeq);
  bool containsAckElicitingFrames = packet.frames.any((f) => f is StreamFrame || f is PingFrame);

  // Process ACK and RESET frames first, as they are not sequence-dependent.
  for (final frame in packet.frames) {
    if (frame is ResetStreamFrame) {
      ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Immediate ResetStreamFrame: ErrorCode=${frame.errorCode}');
      addError(StreamResetError(frame.errorCode));
      _close(isReset: true);
      return; // Stop all further processing
    }
    if (frame is AckFrame) {
      // DIAGNOSTIC LOGGING START
      // ////print('[UDXStream $id internalHandleSocketEvent] ACK FRAME: largestAcked=${frame.largestAcked}, firstAckRangeLength=${frame.firstAckRangeLength}, ackRanges=${frame.ackRanges.map((r) => "G:${r.gap},L:${r.ackRangeLength}").toList()}');
      // ////print('[UDXStream $id internalHandleSocketEvent] PRE-ACK: _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck');
      // DIAGNOSTIC LOGGING END

      // Let PacketManager update its internal state (_sentPackets, _retransmitTimers)
      final List<int> newlySackedSequences = packetManager.handleAckFrame(frame);

      // Update stream's own knowledge of all ever acked sequences (for _sendAck SACK generation)
      // and its true highest cumulative ack. This is independent of CC mirror logic below.
      // This is now handled by packetManager.handleAckFrame which updates _currentHighestCumulativeAckFromRemote
      // based on its internal processing of frame.largestAcked and frame.ackRanges.

      // Determine if this ACK frame advances the Congestion Controller's cumulative ACK point.
      // _currentHighestCumulativeAckFromRemote is updated by packetManager.handleAckFrame based on the frame's content.
      bool advancesCcCumulativePoint = _currentHighestCumulativeAckFromRemote > _ccMirrorOfHighestProcessedCumulativeAck;
      // DIAGNOSTIC LOGGING START
      // ////print('[UDXStream $id internalHandleSocketEvent] advancesCcCumulativePoint=$advancesCcCumulativePoint (_currentHighestCumulativeAckFromRemote=$_currentHighestCumulativeAckFromRemote > _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck)');
      // DIAGNOSTIC LOGGING END

      // This is the largest value that the CC should consider as its new cumulative ACK point if advancesCcCumulativePoint is true.
      // If not advancing, CC should use its old _ccMirrorOfHighestProcessedCumulativeAck for context.
      // final int effectiveCumulativeAckForCC = advancesCcCumulativePoint ? _currentHighestCumulativeAckFromRemote : _ccMirrorOfHighestProcessedCumulativeAck; // Old logic
      final int largestAckedInThisFrame = frame.largestAcked; // New logic: always pass frame.largestAcked as the 5th arg to onPacketAcked

      if (newlySackedSequences.isNotEmpty) {
        for (final ackedSeq in newlySackedSequences) {
          final sentPacketInfo = _sentPackets.remove(ackedSeq);
          if (sentPacketInfo != null) {
            final (sentTime, ackedSize) = sentPacketInfo;

            // A packet contributes to a new cumulative ACK for the CC if:
            // 1. The CC's overall cumulative point is advancing in this frame (advancesCcCumulativePoint is true)
            // 2. This specific packet (ackedSeq) falls within the range of this new advancement.
            //    It must be greater than the CC's old mirror and less than or equal to the stream's new true cumulative ack.
            bool isNewCumulativeForCCArg = advancesCcCumulativePoint &&
                                        ackedSeq > _ccMirrorOfHighestProcessedCumulativeAck &&
                                        ackedSeq <= _currentHighestCumulativeAckFromRemote;
            // DIAGNOSTIC LOGGING START
            // ////print('[UDXStream $id internalHandleSocketEvent] CALLING _congestionController.onPacketAcked: ackedSeq=$ackedSeq, ackedSize=$ackedSize, sentTime=$sentTime, ackDelayMs=${frame.ackDelay}, isNewCumulativeForCC=$isNewCumulativeForCCArg, currentFrameLargestAckedValueToPass=${largestAckedInThisFrame}');
            // DIAGNOSTIC LOGGING END
            _congestionController.onPacketAcked(
              ackedSize,
              sentTime,
              Duration(milliseconds: frame.ackDelay),
              isNewCumulativeForCCArg,
              largestAckedInThisFrame // Pass frame.largestAcked as the 5th argument
            );

            // Update connection-level window accounting
            if (_socket != null) {
              _socket!.decrementConnectionBytesSent(ackedSize);
            }

            // Clean up retransmission attempt tracking for acknowledged packets
            _retransmissionAttempts.remove(ackedSeq);

            emit('ack', ackedSeq);

            if (_drain != null && !_drain!.isCompleted) {
              final connWindowAvailable = _socket?.getAvailableConnectionSendWindow() ?? 0;
              if (inflight < cwnd && inflight < _remoteReceiveWindow && connWindowAvailable > 0) {
                _drain!.complete();
              }
            }
          }
        }
      }

      if (advancesCcCumulativePoint) {
        _ccMirrorOfHighestProcessedCumulativeAck = _currentHighestCumulativeAckFromRemote;
      } else {
        // If the CC's cumulative point did not advance with this frame (advancesCcCumulativePoint is false),
        // then this ACK frame, from the CC's perspective of its *own* cumulative progress,
        // acts as a duplicate ACK for the sequence number it was expecting next (_ccMirrorOfHighestProcessedCumulativeAck + 1).
        // We pass _ccMirrorOfHighestProcessedCumulativeAck to processDuplicateAck because that's the
        // highest cumulative point the CC has processed so far.
        // DIAGNOSTIC LOGGING START
        // ////print('[UDXStream $id internalHandleSocketEvent] CALLING _congestionController.processDuplicateAck: _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck (because CC cumulative point did not advance)');
        // DIAGNOSTIC LOGGING END
        _congestionController.processDuplicateAck(_ccMirrorOfHighestProcessedCumulativeAck);
      }
      // DIAGNOSTIC LOGGING START
      // ////print('[UDXStream $id internalHandleSocketEvent] POST-ACK: _ccMirrorOfHighestProcessedCumulativeAck=$_ccMirrorOfHighestProcessedCumulativeAck');
      // DIAGNOSTIC LOGGING END
      // TODO: Handle ECN counts from ACK frame if ECN is implemented.
    }
  }

  if (packetIsSequential) {
    // This packet is the next expected one. Process its frames.
    ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Processing sequential packet: Seq=${packet.sequence}');
    _receivedPacketSequences.add(packet.sequence); // Track for ACK generation
    _largestAckedPacketArrivalTime = DateTime.now(); // Update arrival time for potential largest_acked

    for (final frame in packet.frames) {
      if (frame is StreamFrame) {
        ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential StreamFrame: DataLen=${frame.data.length}, IsFin=${frame.isFin}');
        needsAck = true;
        if (frame.data.isNotEmpty) {
          ////print('[DEBUG] UDXStream ${this.id} adding data to _dataController: ${frame.data.length} bytes');
          _dataController.add(frame.data);
          if (_socket != null && remoteHost != null && remotePort != null) {
            _socket!.onStreamDataProcessed(frame.data.length);
          }
        }
        if (frame.isFin) {
          ////print('[DEBUG] UDXStream ${this.id} closing _dataController due to FIN flag');
          _dataController.close();
        }
      } else if (frame is PingFrame) {
        ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential PingFrame.');
        needsAck = true;
      } else if (frame is WindowUpdateFrame) {
        ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential WindowUpdateFrame: Size=${frame.windowSize}');
        _remoteReceiveWindow = frame.windowSize;
        if (_drain != null && !_drain!.isCompleted) {
          final connWindowAvailable = _socket?.getAvailableConnectionSendWindow() ?? 0;
          if (inflight < cwnd && inflight < _remoteReceiveWindow && connWindowAvailable > 0) {
            _drain!.complete();
          }
        }
        emit('drain');
      } else if (frame is MaxDataFrame) {
          ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Sequential MaxDataFrame (unusual in stream context but handling): Size=${frame.maxData}');
           if (_drain != null && !_drain!.isCompleted) {
             final connWindowAvailable = _socket?.getAvailableConnectionSendWindow() ?? 0;
              if (inflight < cwnd && inflight < _remoteReceiveWindow && connWindowAvailable > 0) {
                _drain!.complete();
              }
          }
      }
      // AckFrames and ResetStreamFrames are handled above and don't affect sequential processing here.
    }
    _nextExpectedSeq++; // Advance expected sequence *after* processing all frames in this sequential packet
    _processReceiveBuffer(sourceHost, sourcePort);
  } else if (packet.sequence > _nextExpectedSeq) {
    // Packet is for the future. Buffer if it contains stream data.
    bool hasStreamDataContent = packet.frames.any((f) => f is StreamFrame && (f.data.isNotEmpty || f.isFin));
    if (hasStreamDataContent) {
      ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Buffering out-of-order packet with StreamFrame(s): Seq=${packet.sequence}, ExpectedSeq=$_nextExpectedSeq.');
      _receiveBuffer[packet.sequence] = packet; // Buffer the whole packet
    } else {
      ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Received out-of-order packet without StreamFrames (e.g. just ACK or future PING): Seq=${packet.sequence}. Processing relevant frames.');
    }
    // Always ACK out-of-order packets that might contain data or PINGs to help sender with loss detection/recovery.
    // AckFrames themselves don't need this type of ACK.
    if (containsAckElicitingFrames) {
      _receivedPacketSequences.add(packet.sequence); // Add to sequences for SACKing
      // _largestAckedPacketArrivalTime will be updated in _sendAck based on the full _receivedPacketSequences
      needsAck = true;
    }
  } else { // packet.sequence < _nextExpectedSeq
    // Packet is older than expected, likely a duplicate.
    ////print('[UDXStream ${this.id}.internalHandleSocketEvent] Discarding old/duplicate packet: Seq=${packet.sequence}, ExpectedSeq=$_nextExpectedSeq.');
    // Still ACK if it contained frames that would normally be ACKed (Stream, Ping)
    // This ensures SACKs are sent even for duplicates, helping sender confirm receipt.
    if (containsAckElicitingFrames) {
      _receivedPacketSequences.add(packet.sequence); // Add to sequences for SACKing
      needsAck = true;
    }
  }

  if (needsAck) {
    _sendAck();
  }
}