read method

  1. @override
Future<Uint8List> read([
  1. int? maxLength
])
override

Reads data from the stream.

Implementation

@override
Future<Uint8List> read([int? maxLength]) async {
  _logger.fine('[UDXP2PStreamAdapter ${id()}] read called. maxLength: $maxLength, isClosed: $_isClosed, buffer: ${_readBuffer.length}, pendingRead: ${_pendingReadCompleter != null}');

  if (_isClosed && _readBuffer.isEmpty) {
    _logger.fine('[UDXP2PStreamAdapter ${id()}] Stream closed and buffer empty. Returning EOF.');
    return Uint8List(0);
  }

  if (_readBuffer.isNotEmpty) {
    final currentChunk = _readBuffer.removeAt(0);
    if (maxLength == null || currentChunk.length <= maxLength) {
      _logger.fine('[UDXP2PStreamAdapter ${id()}] Consumed ${currentChunk.length} bytes from buffer.');
      return currentChunk;
    } else {
      final toReturn = currentChunk.sublist(0, maxLength);
      final remainder = currentChunk.sublist(maxLength);
      _readBuffer.insert(0, remainder);
      _logger.fine('[UDXP2PStreamAdapter ${id()}] Consumed $maxLength bytes from buffer, remainder ${remainder.length} put back.');
      return toReturn;
    }
  }

  // Buffer is empty, and stream is not closed (or if closed, there might still be a pending completer from data that arrived just before close)
  if (_pendingReadCompleter != null) {
    // This should ideally not happen if completers are managed strictly (i.e., only one pending read).
    // However, if it does, it means a read was called while another was already pending.
    _logger.fine('[UDXP2PStreamAdapter ${id()}] Warning: Another read was already pending. This new read will also wait.');
    // Fall through to create a new completer, the old one will be orphaned or error.
    // A better approach might be to queue read requests, but for now, let's keep it simpler.
  }

  _logger.fine('[UDXP2PStreamAdapter ${id()}] Buffer empty, creating _pendingReadCompleter.');
  _pendingReadCompleter = Completer<Uint8List>();

  try {
    final newData = await _pendingReadCompleter!.future.timeout(
        const Duration(seconds: 30), // Slightly increased timeout
        onTimeout: () {
          _logger.fine('[UDXP2PStreamAdapter ${id()}] Read timeout on _pendingReadCompleter.');
          if (_pendingReadCompleter?.isCompleted == false) {
            _pendingReadCompleter!.completeError(TimeoutException('Read timeout on UDXP2PStreamAdapter', const Duration(seconds: 25)));
          }
          throw TimeoutException('Read timeout on UDXP2PStreamAdapter', const Duration(seconds: 30));
        }
    );
    // _pendingReadCompleter is set to null by the listener when it completes it.
    // Or it should be nulled here if completed by timeout error path, though throw happens first.

    _logger.fine('[UDXP2PStreamAdapter ${id()}] Received ${newData.length} bytes via _pendingReadCompleter.');

    if (maxLength == null || newData.length <= maxLength) {
      return newData;
    } else {
      final toReturn = newData.sublist(0, maxLength);
      final remainder = newData.sublist(maxLength);
      _readBuffer.add(remainder); // Buffer the remainder
      _logger.fine('[UDXP2PStreamAdapter ${id()}] Consumed $maxLength bytes from completer, remainder ${remainder.length} buffered.');
      return toReturn;
    }
  } catch (e) {
    _logger.fine('[UDXP2PStreamAdapter ${id()}] Error awaiting _pendingReadCompleter: $e');
    // Ensure completer is nulled if it was this read's completer that errored
    if (_pendingReadCompleter?.isCompleted == false) {
      // If error is not from the completer itself (e.g. future cancelled), complete it with error.
      // _pendingReadCompleter!.completeError(e); // This might cause issues if already completing.
    }
    _pendingReadCompleter = null; // Nullify on error too
    if (_isClosed && _readBuffer.isEmpty) return Uint8List(0); // EOF if closed
    rethrow; // Rethrow the error (e.g., TimeoutException)
  }
}