UDXP2PStreamAdapter constructor
UDXP2PStreamAdapter({
- required UDXStream udxStream,
- required UDXSessionConn parentConn,
- required Direction direction,
Implementation
UDXP2PStreamAdapter({
required UDXStream udxStream,
required UDXSessionConn parentConn,
required Direction direction,
}) : _udxStream = udxStream,
_parentConn = parentConn,
_direction = direction {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Constructor: udxStreamId=${udxStream.id}, direction=$_direction');
_udxStreamDataSubscription = _udxStream.data.listen(
(data) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] _udxStream.data listener received ${data.length} bytes.');
if (_pendingReadCompleter != null && !_pendingReadCompleter!.isCompleted) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Completing pending read with ${data.length} bytes.');
_pendingReadCompleter!.complete(data);
_pendingReadCompleter = null; // Consume the completer
} else if (!_incomingDataController.isClosed) { // Fallback to controller/buffer if no pending read
_logger.fine('[UDXP2PStreamAdapter ${id()}] No pending read, adding ${data.length} bytes to _readBuffer (via _incomingDataController).');
// We'll add to _readBuffer directly now, _incomingDataController might be removed if not used by .stream getter
_readBuffer.add(data);
// If .stream getter is vital, can still add to controller too, or rethink .stream
if (!_incomingDataController.isClosed) _incomingDataController.add(data);
} else {
_logger.fine('[UDXP2PStreamAdapter ${id()}] No pending read and _incomingDataController is closed. Data might be lost or handled by buffer.');
// If controller is closed, but stream isn't, buffer it.
if (!_isClosed) {
_readBuffer.add(data);
}
}
_parentConn.notifyActivity();
},
onError: (err, s) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] Error on UDXStream data: $err');
// Classify UDX error and isolate stream failure from connection
final classifiedException = UDXExceptionHandler.classifyUDXException(
err,
'UDXP2PStreamAdapter(${id()}).data.onError',
s,
);
// Log specific handling for packet loss
if (classifiedException is UDXPacketLossException) {
_logger.warning('[UDXP2PStreamAdapter ${id()}] Packet permanently lost on stream. Stream will be closed but connection should remain stable.');
} else if (classifiedException is UDXStreamException) {
_logger.info('[UDXP2PStreamAdapter ${id()}] Stream-specific error: ${classifiedException.message}');
}
if (_pendingReadCompleter != null && !_pendingReadCompleter!.isCompleted) {
_pendingReadCompleter!.completeError(classifiedException, s);
_pendingReadCompleter = null;
}
if (!_incomingDataController.isClosed) {
_incomingDataController.addError(classifiedException, s);
}
_closeWithError(classifiedException, s);
},
onDone: () {
_logger.fine('[UDXP2PStreamAdapter ${id()}] UDXStream data stream done.');
_close();
}
);
_udxStreamCloseSubscription = _udxStream.closeEvents.listen(
(_) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] UDXStream close event received.');
_close();
},
onError: (err, s) {
_logger.fine('[UDXP2PStreamAdapter ${id()}] UDXStream close event error: $err');
_closeWithError(err, s);
}
);
}