asStream method
Adapt this buffered reader to a Stream<List
This creates a pull-based stream that only reads from P2PStream when the returned stream is actively being listened to.
IMPORTANT: This method should NOT be used if you need to access remainingBuffer afterward, as there's no way to track what DelimitedReader consumed from the stream. For the relay use case, use readExact and readByte directly instead.
Implementation
Stream<List<int>> asStream() {
late StreamController<List<int>> controller;
bool cancelled = false;
Future<void> pumpData() async {
try {
// First, yield any data already in buffer
if (_buffer.remainingLength > 0 && !cancelled) {
final buffered = _buffer.read(_buffer.remainingLength);
if (!cancelled && !controller.isClosed) {
controller.add(buffered);
}
}
// Then, pull data on-demand from P2PStream
while (!_eof && !_closed && !cancelled && !controller.isClosed) {
final chunk = await _stream.read();
if (chunk.isEmpty) {
// EOF reached
_eof = true;
break;
}
// Just pass through directly without buffering
if (!cancelled && !controller.isClosed) {
controller.add(chunk);
}
}
} catch (e, s) {
if (!cancelled && !controller.isClosed) {
controller.addError(e, s);
}
} finally {
if (!cancelled && !controller.isClosed) {
await controller.close();
}
}
}
controller = StreamController<List<int>>(
onListen: pumpData,
onCancel: () {
cancelled = true;
},
);
return controller.stream;
}