read method
Reads data from the stream.
Implementation
@override
Future<Uint8List> read([int? maxLength]) async {
_logger.fine('[UDXP2PStreamAdapter ${id()}] read called. maxLength: $maxLength, isClosed: $_isClosed, buffer: ${_readBuffer.length}, pendingRead: ${_pendingReadCompleter != null}');
if (_isClosed && _readBuffer.isEmpty) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Stream closed and buffer empty. Returning EOF.');
return Uint8List(0);
}
if (_readBuffer.isNotEmpty) {
final currentChunk = _readBuffer.removeAt(0);
if (maxLength == null || currentChunk.length <= maxLength) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Consumed ${currentChunk.length} bytes from buffer.');
return currentChunk;
} else {
final toReturn = currentChunk.sublist(0, maxLength);
final remainder = currentChunk.sublist(maxLength);
_readBuffer.insert(0, remainder);
_logger.fine('[UDXP2PStreamAdapter ${id()}] Consumed $maxLength bytes from buffer, remainder ${remainder.length} put back.');
return toReturn;
}
}
// Buffer is empty, and stream is not closed (or if closed, there might still be a pending completer from data that arrived just before close)
if (_pendingReadCompleter != null) {
// This should ideally not happen if completers are managed strictly (i.e., only one pending read).
// However, if it does, it means a read was called while another was already pending.
_logger.fine('[UDXP2PStreamAdapter ${id()}] Warning: Another read was already pending. This new read will also wait.');
// Fall through to create a new completer, the old one will be orphaned or error.
// A better approach might be to queue read requests, but for now, let's keep it simpler.
}
_logger.fine('[UDXP2PStreamAdapter ${id()}] Buffer empty, creating _pendingReadCompleter.');
_pendingReadCompleter = Completer<Uint8List>();
try {
final newData = await _pendingReadCompleter!.future.timeout(
const Duration(seconds: 30), // Slightly increased timeout
onTimeout: () {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Read timeout on _pendingReadCompleter.');
if (_pendingReadCompleter?.isCompleted == false) {
_pendingReadCompleter!.completeError(TimeoutException('Read timeout on UDXP2PStreamAdapter', const Duration(seconds: 25)));
}
throw TimeoutException('Read timeout on UDXP2PStreamAdapter', const Duration(seconds: 30));
}
);
// _pendingReadCompleter is set to null by the listener when it completes it.
// Or it should be nulled here if completed by timeout error path, though throw happens first.
_logger.fine('[UDXP2PStreamAdapter ${id()}] Received ${newData.length} bytes via _pendingReadCompleter.');
if (maxLength == null || newData.length <= maxLength) {
return newData;
} else {
final toReturn = newData.sublist(0, maxLength);
final remainder = newData.sublist(maxLength);
_readBuffer.add(remainder); // Buffer the remainder
_logger.fine('[UDXP2PStreamAdapter ${id()}] Consumed $maxLength bytes from completer, remainder ${remainder.length} buffered.');
return toReturn;
}
} catch (e) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Error awaiting _pendingReadCompleter: $e');
// Ensure completer is nulled if it was this read's completer that errored
if (_pendingReadCompleter?.isCompleted == false) {
// If error is not from the completer itself (e.g. future cancelled), complete it with error.
// _pendingReadCompleter!.completeError(e); // This might cause issues if already completing.
}
_pendingReadCompleter = null; // Nullify on error too
if (_isClosed && _readBuffer.isEmpty) return Uint8List(0); // EOF if closed
rethrow; // Rethrow the error (e.g., TimeoutException)
}
}