UDXP2PStreamAdapter constructor

UDXP2PStreamAdapter({
  1. required UDXStream udxStream,
  2. required UDXSessionConn parentConn,
  3. required Direction direction,
})

Implementation

UDXP2PStreamAdapter({
  required UDXStream udxStream,
  required UDXSessionConn parentConn,
  required Direction direction,
}) : _udxStream = udxStream,
      _parentConn = parentConn,
      _direction = direction {
  _logger.fine('[UDXP2PStreamAdapter ${id()}] Constructor: udxStreamId=${udxStream.id}, direction=$_direction');
  _udxStreamDataSubscription = _udxStream.data.listen(
          (data) {
        _logger.fine('[UDXP2PStreamAdapter ${id()}] _udxStream.data listener received ${data.length} bytes.');
        if (_pendingReadCompleter != null && !_pendingReadCompleter!.isCompleted) {
          _logger.fine('[UDXP2PStreamAdapter ${id()}] Completing pending read with ${data.length} bytes.');
          _pendingReadCompleter!.complete(data);
          _pendingReadCompleter = null; // Consume the completer
        } else if (!_incomingDataController.isClosed) { // Fallback to controller/buffer if no pending read
          _logger.fine('[UDXP2PStreamAdapter ${id()}] No pending read, adding ${data.length} bytes to _readBuffer (via _incomingDataController).');
          // We'll add to _readBuffer directly now, _incomingDataController might be removed if not used by .stream getter
          _readBuffer.add(data);
          // If .stream getter is vital, can still add to controller too, or rethink .stream
          if (!_incomingDataController.isClosed) _incomingDataController.add(data);

        } else {
          _logger.fine('[UDXP2PStreamAdapter ${id()}] No pending read and _incomingDataController is closed. Data might be lost or handled by buffer.');
          // If controller is closed, but stream isn't, buffer it.
          if (!_isClosed) {
            _readBuffer.add(data);
          }
        }
        _parentConn.notifyActivity();
      },
      onError: (err, s) {
        _logger.fine('[UDXP2PStreamAdapter ${id()}] Error on UDXStream data: $err');

        // Classify UDX error and isolate stream failure from connection
        final classifiedException = UDXExceptionHandler.classifyUDXException(
          err,
          'UDXP2PStreamAdapter(${id()}).data.onError',
          s,
        );

        // Log specific handling for packet loss
        if (classifiedException is UDXPacketLossException) {
          _logger.warning('[UDXP2PStreamAdapter ${id()}] Packet permanently lost on stream. Stream will be closed but connection should remain stable.');
        } else if (classifiedException is UDXStreamException) {
          _logger.info('[UDXP2PStreamAdapter ${id()}] Stream-specific error: ${classifiedException.message}');
        }

        if (_pendingReadCompleter != null && !_pendingReadCompleter!.isCompleted) {
          _pendingReadCompleter!.completeError(classifiedException, s);
          _pendingReadCompleter = null;
        }
        if (!_incomingDataController.isClosed) {
          _incomingDataController.addError(classifiedException, s);
        }
        _closeWithError(classifiedException, s);
      },
      onDone: () {
        _logger.fine('[UDXP2PStreamAdapter ${id()}] UDXStream data stream done.');
        _close();
      }
  );
  _udxStreamCloseSubscription = _udxStream.closeEvents.listen(
          (_) {
        _logger.fine('[UDXP2PStreamAdapter ${id()}] UDXStream close event received.');
        _close();
      },
      onError: (err, s) {
        _logger.fine('[UDXP2PStreamAdapter ${id()}] UDXStream close event error: $err');
        _closeWithError(err, s);
      }
  );
}