shellStreamLines function
Basic line streaming. Assuming system encoding
Implementation
Stream<String> shellStreamLines(Stream<List<int>> stream,
{Encoding? encoding}) {
encoding ??= shellContext.encoding;
StreamSubscription? subscription;
List<int>? currentLine;
const lineFeed = 10; // \n LF
const carriageReturn = 13; // \r CR
late StreamController<String> ctlr;
// devPrint('listen (paused: $paused)');
void addCurrentLine() {
if (subscription?.isPaused ?? false) {
// Do nothing, current line is discarded
} else {
if (currentLine != null) {
try {
ctlr.add(encoding!.decode(currentLine!));
} catch (_) {
// Ignore bad encoding
// ignore: avoid_print
print('ignoring: $currentLine');
}
}
}
currentLine = null;
}
ctlr = StreamController<String>(
onPause: () {
if (shellDebug) {
// ignore: avoid_print
print('onPause (paused: ${subscription?.isPaused})');
}
// Last one
addCurrentLine();
subscription?.pause();
},
onResume: () {
// devPrint('onResume (paused: $paused)');
if (subscription?.isPaused ?? false) {
subscription?.resume();
}
},
onListen: () {
void addToCurrentLine(List<int> data) {
if (currentLine == null) {
currentLine = data;
} else {
var newCurrentLine = Uint8List(currentLine!.length + data.length);
newCurrentLine.setAll(0, currentLine!);
newCurrentLine.setAll(currentLine!.length, data);
currentLine = newCurrentLine;
}
}
var lastWasCR = false;
subscription = stream.listen((data) {
var paused = subscription?.isPaused ?? false;
// devPrint('read $data (paused: $paused)');
if (paused) {
return;
}
// look for \n (10)
var start = 0;
for (var i = 0; i < data.length; i++) {
var byte = data[i];
if (byte == lineFeed || byte == carriageReturn) {
if (byte == lineFeed) {
// Ignore CRLF
if (lastWasCR) {
lastWasCR = false;
start = i + 1;
continue;
}
} else {
lastWasCR = true;
}
addToCurrentLine(data.sublist(start, i));
addCurrentLine();
// Skip it
start = i + 1;
} else {
lastWasCR = false;
}
}
// Store last current line
if (data.length > start) {
addToCurrentLine(data.sublist(start, data.length));
}
}, onDone: () {
// devPrint('onDone');
// Last one
addCurrentLine();
ctlr.close();
}, onError: (Object e, StackTrace st) {
ctlr.addError(e, st);
});
},
onCancel: () {
// devPrint('onCancel');
subscription?.cancel();
},
sync: true);
return ctlr.stream;
}